適用於:SQL Server 2019 (15.x)
本文提供如何使用命令行工具來在 SQL Server 巨量數據叢集上執行 Spark 作業的指引。
Important
MICROSOFT SQL Server 2019 巨量數據叢集已淘汰。 SQL Server 2019 巨量數據叢集的支援已於 2025 年 2 月 28 日結束。 如需詳細資訊,請參閱 Microsoft SQL Server 平臺上的公告部落格文章和巨量數據選項。
Prerequisites
- 已設定並登入叢集的 SQL Server 2019 巨量資料工具:
azdata- 對
curlLivy 執行 REST API 呼叫的應用程式
使用 azdata 或 Livy 的 Spark 作業
本文提供如何使用命令行模式將 Spark 應用程式提交至 SQL Server 巨量數據叢集的範例。
Azure Data CLI azdata bdc spark 命令會在 命令行上呈現 SQL Server 巨量數據叢集 Spark 的所有功能。 本文著重於作業提交。 但也 azdata bdc spark 透過 命令支援 Python、Scala、SQL 和 R 的 azdata bdc spark session 互動式模式。
如果您需要與 REST API 的直接整合,請使用標準 Livy 呼叫來提交作業。 本文使用 curl Livy 範例中的命令行工具來執行 REST API 呼叫。 如需示範如何使用 Python 程式代碼與 Spark Livy 端點互動的詳細範例,請參閱從 GitHub 上的 Livy 端點使用 Spark 。
使用巨量數據叢集 Spark 的簡單 ETL
此擷取、轉換和載入 (ETL) 應用程式遵循常見的數據工程模式。 它會從 Apache Hadoop 分散式文件系統 (HDFS) 登陸區域路徑載入表格式數據。 然後,它會使用數據表格式寫入 HDFS 處理的區域路徑。
下載 範例應用程式的數據集。 然後使用 PySpark、Spark Scala 或 Spark SQL 建立 PySpark 應用程式。
在下列各節中,您將找到每個解決方案的範例練習。 選取您平臺的索引標籤。 您將使用 azdata 或 curl執行應用程式。
此範例使用下列 PySpark 應用程式。 它會儲存為本機計算機上名為 parquet_etl_sample.py 的 Python 檔案。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Read clickstream_data from storage pool HDFS into a Spark data frame. Applies column renames.
df = spark.read.option("inferSchema", "true").csv('/securelake/landing/criteo/test.txt', sep='\t',
header=False).toDF("feat1","feat2","feat3","feat4","feat5","feat6","feat7","feat8",
"feat9","feat10","feat11","feat12","feat13","catfeat1","catfeat2","catfeat3","catfeat4",
"catfeat5","catfeat6","catfeat7","catfeat8","catfeat9","catfeat10","catfeat11","catfeat12",
"catfeat13","catfeat14","catfeat15","catfeat16","catfeat17","catfeat18","catfeat19",
"catfeat20","catfeat21","catfeat22","catfeat23","catfeat24","catfeat25","catfeat26")
# Print the data frame inferred schema
df.printSchema()
tot_rows = df.count()
print("Number of rows:", tot_rows)
# Drop the managed table
spark.sql("DROP TABLE dl_clickstream")
# Write data frame to HDFS managed table by using optimized Delta Lake table format
df.write.format("parquet").mode("overwrite").saveAsTable("dl_clickstream")
print("Sample ETL pipeline completed")
將 PySpark 應用程式複製到 HDFS
將應用程式儲存在 HDFS 中,讓叢集可以存取它以供執行。 最佳做法是標準化和管理叢集中的應用程式位置,以簡化系統管理。
在此範例使用案例中,所有 ETL 管線應用程式都會儲存在 hdfs:/apps/ETL-Pipelines 路徑上。 範例應用程式會儲存在 hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py。
執行下列命令,將 parquet_etl_sample.py 從本機開發或預備計算機上傳至 HDFS 叢集。
azdata bdc hdfs cp --from-path parquet_etl_sample.py --to-path "hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py"
執行Spark應用程式
使用下列命令將應用程式提交至 SQL Server 巨量資料叢集 Spark 以執行。
命令 azdata 會使用常用的參數來執行應用程式。 如需 的完整參數選項 azdata bdc spark batch create,請參閱 azdata bdc spark。
此應用程式需要組 spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation 態參數。 因此,命令會使用 --config 選項。 此設定示範如何將組態傳遞至Spark會話。
您可以使用 --config 選項來指定多個組態參數。 您也可以藉由在 物件中設定組態,在應用程式會話內 SparkSession 指定它們。
azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py \
--config '{"spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m
Warning
每次建立新批次時,批次名稱的 「name」 或 「n」 參數應該是唯一的。
監視 Spark 作業
azdata bdc spark batch命令會提供Spark批次作業的管理動作。
若要 列出所有執行中的作業,請執行下列命令。
azdata命令:azdata bdc spark batch list -o table使用 Livy 的
curl指令:curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches
若要 取得 具有指定標識碼的Spark批次資訊,請執行下列命令。
batch id從spark batch create傳回 。
azdata命令:azdata bdc spark batch info --batch-id 0使用 Livy 的
curl指令:curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>
若要取得具有指定識別碼的Spark批次 狀態資訊 ,請執行下列命令。
azdata命令:azdata bdc spark batch state --batch-id 0使用 Livy 的
curl指令:curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/state
若要 取得 具有指定標識碼的Spark批次記錄,請執行下列命令。
azdata命令:azdata bdc spark batch log --batch-id 0使用 Livy 的
curl指令:curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/log
Next steps
如需針對 Spark 程式代碼進行疑難解答的資訊,請參閱 針對 PySpark 筆記本進行疑難解答。
GitHub 上的 SQL Server 巨量數據叢集 Spark 範例 提供完整的 Spark 範例程式代碼。
如需 SQL Server 巨量數據叢集和相關案例的詳細資訊,請參閱 SQL Server 巨量數據叢集。