Partager via


Connecteur Apache Spark : SQL Server et Azure SQL

Le connecteur Apache Spark pour SQL Server et Azure SQL est un connecteur hautes performances que vous pouvez utiliser pour inclure des données transactionnelles dans l’analytique Big Data et conserver les résultats pour les requêtes ad hoc ou la création de rapports. À l’aide du connecteur, vous pouvez utiliser n’importe quelle base de données SQL, locale ou dans le cloud, comme source de données d’entrée ou récepteur de données de sortie pour les travaux Spark.

Remarque

Ce connecteur n'est pas maintenu activement. Cet article est conservé uniquement à des fins d’archivage.

Cette bibliothèque contient le code source pour le connecteur Apache Spark pour SQL Server et les plateformes Azure SQL.

Apache Spark est un moteur d’analytique unifié pour le traitement des données à grande échelle.

Deux versions du connecteur sont disponibles via Maven : une version compatible 2.4.x et une version compatible 3.0.x. Téléchargez les connecteurs à partir de maven.org et importez-les à l’aide de coordonnées :

Connecteur Coordonnée Maven
Connecteur compatible Spark 2.4.x com.microsoft.azure:spark-mssql-connector:1.0.2
Connecteur compatible Spark 3.0.x com.microsoft.azure:spark-mssql-connector_2.12:1.1.0
Connecteur compatible Spark 3.1.x com.microsoft.azure:spark-mssql-connector_2.12:1.2.0

Vous pouvez également générer le connecteur à partir de la source ou télécharger le fichier JAR à partir de la section Release dans GitHub. Pour obtenir les informations les plus récentes sur le connecteur, consultez le référentiel GitHub du connecteur SQL Spark.

Fonctionnalités prises en charge

  • Prise en charge de toutes les liaisons Spark (Scala, Python, R)
  • Prise en charge de l’authentification de base et de l’onglet Clé Active Directory (AD)
  • Prise en charge de l’écriture réorganisée dataframe
  • Prise en charge de l’écriture dans une instance unique ou un pool de données SQL Server dans des clusters Big Data SQL Server
  • Prise en charge du connecteur fiable pour Sql Server Single Instance
Composant Versions prises en charge
Apache Spark 2.4.x, 3.0.x, 3.1.x
Langage de programmation Scala 2.11, 2.12
Pilote Microsoft JDBC pour SQL Server 8,4
Microsoft SQL Server SQL Server 2008 ou version ultérieure
Bases de données SQL Azure Soutenu

Options prises en charge

Le connecteur Apache Spark pour SQL Server et Azure SQL prend en charge les options définies dans l’article JDBC SQL DataSource .

En outre, le connecteur prend en charge les options suivantes :

Choix Par défaut Descriptif
reliabilityLevel BEST_EFFORT BEST_EFFORT ou NO_DUPLICATES. NO_DUPLICATES implémente une insertion fiable dans les scénarios de redémarrage de l’exécuteur
dataPoolDataSource none none implique que la valeur n’est pas définie et que le connecteur doit écrire dans une instance unique SQL Server. Définissez cette valeur sur le nom de la source de données pour écrire une table de pool de données dans des clusters Big Data.
isolationLevel READ_COMMITTED Spécifier le niveau d’isolation
tableLock false Implémente une insertion avec l'option TABLOCK pour améliorer les performances d’écriture
schemaCheckEnabled true Désactive le cadre de données strict et la vérification du schéma de table SQL lors de la définition de false

Définissez d’autres options de copie en bloc en tant qu’options sur le dataframe. Le connecteur transmet ces options aux bulkcopy APIs en écriture.

Comparaison entre les performances

Apache Spark Connector pour SQL Server et Azure SQL est jusqu’à 15 fois plus rapide que le connecteur JDBC générique pour l’écriture dans SQL Server. Les caractéristiques de performances varient selon le type, le volume de données, les options utilisées et peuvent afficher des variations entre chaque exécution. Les résultats de performances suivants sont le temps nécessaire pour remplacer une table SQL avec 143,9M lignes dans un spark dataframe. Spark dataframe est configuré en lisant store_sales la table HDFS générée à l’aide du banc d'essai TPCDS Spark. Le temps de lecture de store_sales à dataframe est exclu. Les résultats sont moyennés sur trois itérations.

Type de connecteur Paramètres Descriptif Temps d’écriture
JDBCConnector Par défaut Connecteur JDBC générique avec les options par défaut 1 385 secondes
sql-spark-connector BEST_EFFORT Meilleur effort sql-spark-connector avec les options par défaut 580 secondes
sql-spark-connector NO_DUPLICATES Fiable sql-spark-connector 709 secondes
sql-spark-connector BEST_EFFORT + tabLock=true Effort maximal sql-spark-connector avec le verrou de table activé 72 secondes
sql-spark-connector NO_DUPLICATES + tabLock=true Fiable sql-spark-connector avec verrouillage de table activé 198 secondes

Configuration

  • Configuration Spark : num_executors = 20, executor_memory = '1664 Mo', executor_cores = 2
  • Configuration de Data Gen : facteur_d'échelle=50, tables_partitionnées=true
  • Fichier store_sales de données avec le nombre de lignes 143 997 590

Environnement

  • SQL Server Cluster Big Data CU5
  • master + 6 nœuds
  • Chaque nœud un serveur Gen-5, avec 512 Go de RAM, 4 To NVM par nœud et 10 Gbits/s de carte réseau

Problèmes couramment rencontrés

java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException

Cette erreur se produit lorsque vous utilisez une version antérieure du mssql pilote dans votre environnement Hadoop. Le connecteur inclut désormais ce pilote. Si vous avez précédemment utilisé le connecteur Azure SQL et installé manuellement des pilotes sur votre cluster pour la compatibilité de l’authentification Microsoft Entra, supprimez ces pilotes.

Pour corriger l’erreur :

  1. Si vous utilisez un environnement Hadoop générique, vérifiez et supprimez le mssql fichier JAR avec la commande suivante : rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jar. Si vous utilisez Databricks, ajoutez un script d’init global ou de cluster pour supprimer les anciennes versions du mssql pilote du /databricks/jars dossier ou ajoutez cette ligne à un script existant : rm /databricks/jars/*mssql*

  2. Ajoutez les packages adal4j et mssql. Par exemple, vous pouvez utiliser Maven, mais n’importe quel moyen devrait fonctionner.

    Avertissement

    N’installez pas le connecteur SQL Spark de cette façon.

  3. Ajoutez la classe de pilote à votre configuration de connexion. Par exemple:

    connectionProperties = {
      `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver`
    }`
    

Pour plus d’informations, consultez la résolution sur https://github.com/microsoft/sql-spark-connector/issues/26.

Démarrer

Le connecteur Apache Spark pour SQL Server et Azure SQL est basé sur l’API Spark DataSourceV1 et l’API en bloc SQL Server. Il utilise la même interface que le connecteur Spark-SQL JDBC intégré. À l’aide de cette intégration, vous pouvez facilement intégrer le connecteur et migrer vos travaux Spark existants en mettant à jour le paramètre de format avec com.microsoft.sqlserver.jdbc.spark.

Pour inclure le connecteur dans vos projets, téléchargez ce référentiel et générez le fichier JAR à l’aide de SBT.

Écrire dans une nouvelle table SQL

Avertissement

Le overwrite mode supprime d’abord la table s’il existe déjà dans la base de données. Utilisez cette option avec soin pour éviter une perte de données inattendue.

Si vous utilisez le mode overwrite sans option truncate lors de la recréation de la table, l’opération supprime les index. En outre, une table columnstore passe à une table de tas. Pour conserver les index existants, définissez l’option truncate sur true. Par exemple : .option("truncate","true").

server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"

table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

Ajouter à la table SQL

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("append") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

Spécifier le niveau d’isolation

Ce connecteur utilise le READ_COMMITTED niveau d’isolation par défaut lorsqu’il insère en bloc des données dans la base de données. Pour remplacer le niveau d’isolation, utilisez l’option mssqlIsolationLevel suivante :

    .option("mssqlIsolationLevel", "READ_UNCOMMITTED") \

Lecture à partir d’une table SQL

jdbcDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password).load()

Authentification de Microsoft Entra

Exemple Python avec le principal de service

context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]

jdbc_db = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("accessToken", access_token) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Exemple Python avec mot de passe Active Directory

jdbc_df = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("authentication", "ActiveDirectoryPassword") \
        .option("user", user_name) \
        .option("password", password) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Pour vous authentifier à l’aide d’Active Directory, installez la dépendance requise.

Lorsque vous utilisez ActiveDirectoryPassword, la valeur user doit être au format UPN, par exemple username@domainname.com.

Pour Scala, installez l’artefact com.microsoft.aad.adal4j .

Pour Python, installez la adal bibliothèque. Cette bibliothèque est disponible via pip.

Pour voir des exemples, consultez les cahiers d'exemples.