Compartir a través de


Envío de trabajos de Spark mediante herramientas de línea de comandos

Se aplica a: SQL Server 2019 (15.x)

En este artículo se proporcionan instrucciones sobre cómo usar herramientas de línea de comandos para ejecutar trabajos de Spark en clústeres de macrodatos de SQL Server.

Important

Los clústeres de macrodatos de Microsoft SQL Server 2019 se retiran. La compatibilidad con clústeres de macrodatos de SQL Server 2019 finalizó a partir del 28 de febrero de 2025. Para obtener más información, consulte la entrada de blog del anuncio y las opciones de macrodatos en la plataforma de Microsoft SQL Server.

Prerequisites

Trabajos de Spark que usan azdata o Livy

En este artículo se proporcionan ejemplos de cómo usar patrones de línea de comandos para enviar aplicaciones spark a clústeres de macrodatos de SQL Server.

Los comandos de la CLI azdata bdc spark de datos de Azure exponen todas las funcionalidades de Clústeres de macrodatos de SQL Server en la línea de comandos. Este artículo se centra en el envío de trabajos. Pero azdata bdc spark también admite modos interactivos para Python, Scala, SQL y R mediante el azdata bdc spark session comando .

Si necesita integración directa con una API REST, use las llamadas estándar de Livy para enviar trabajos. En este artículo se usa la curl herramienta de línea de comandos de los ejemplos de Livy para ejecutar la llamada a la API rest. Para obtener un ejemplo detallado que muestra cómo interactuar con el punto de conexión de Spark Livy mediante código de Python, consulte Uso de Spark desde el punto de conexión de Livy en GitHub.

ETL simple que usa Clústeres de macrodatos Spark

Esta aplicación de extracción, transformación y carga (ETL) sigue un patrón de ingeniería de datos común. Carga datos tabulares desde una ruta de acceso de la zona de aterrizaje del sistema de archivos distribuido (HDFS) de Apache Hadoop. A continuación, usa un formato de tabla para escribir en una ruta de acceso de zona procesada por HDFS.

Descargue el conjunto de datos de la aplicación de ejemplo. A continuación, cree aplicaciones de PySpark mediante PySpark, Spark Scala o Spark SQL.

En las secciones siguientes, encontrará ejercicios de ejemplo para cada solución. Seleccione la pestaña de la plataforma. Ejecutará la aplicación mediante azdata o curl.

En este ejemplo se usa la siguiente aplicación PySpark. Se guarda como un archivo de Python denominado parquet_etl_sample.py en el equipo local.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Read clickstream_data from storage pool HDFS into a Spark data frame. Applies column renames.
df = spark.read.option("inferSchema", "true").csv('/securelake/landing/criteo/test.txt', sep='\t', 
    header=False).toDF("feat1","feat2","feat3","feat4","feat5","feat6","feat7","feat8",
    "feat9","feat10","feat11","feat12","feat13","catfeat1","catfeat2","catfeat3","catfeat4",
    "catfeat5","catfeat6","catfeat7","catfeat8","catfeat9","catfeat10","catfeat11","catfeat12",
    "catfeat13","catfeat14","catfeat15","catfeat16","catfeat17","catfeat18","catfeat19",
    "catfeat20","catfeat21","catfeat22","catfeat23","catfeat24","catfeat25","catfeat26")

# Print the data frame inferred schema
df.printSchema()

tot_rows = df.count()
print("Number of rows:", tot_rows)

# Drop the managed table
spark.sql("DROP TABLE dl_clickstream")

# Write data frame to HDFS managed table by using optimized Delta Lake table format
df.write.format("parquet").mode("overwrite").saveAsTable("dl_clickstream")

print("Sample ETL pipeline completed")

Copia de la aplicación PySpark en HDFS

Almacene la aplicación en HDFS para que el clúster pueda acceder a ella para su ejecución. Como procedimiento recomendado, normalice y controle las ubicaciones de aplicación dentro del clúster para simplificar la administración.

En este caso de uso de ejemplo, todas las aplicaciones de canalización ETL se almacenan en la ruta de acceso hdfs:/apps/ETL-Pipelines . La aplicación de ejemplo se almacena en hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py.

Ejecute el comando siguiente para cargar parquet_etl_sample.py desde el equipo de desarrollo local o de ensayo en el clúster de HDFS.

azdata bdc hdfs cp --from-path parquet_etl_sample.py  --to-path "hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py"

Ejecución de la aplicación Spark

Use el comando siguiente para enviar la aplicación a Clústeres de macrodatos de SQL Server Spark para su ejecución.

El azdata comando ejecuta la aplicación mediante parámetros especificados habitualmente. Para obtener opciones de parámetros completas para azdata bdc spark batch create, vea azdata bdc spark.

Esta aplicación requiere el spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation parámetro de configuración. Por lo tanto, el comando usa la --config opción . Esta configuración muestra cómo pasar configuraciones a la sesión de Spark.

Puede usar la --config opción para especificar varios parámetros de configuración. También puede especificarlos dentro de la sesión de la aplicación estableciendo la configuración en el SparkSession objeto .

azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py \
--config '{"spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m

Warning

El parámetro "name" o "n" para el nombre del lote debe ser único cada vez que se crea un nuevo lote.

Supervisión de trabajos de Spark

Los azdata bdc spark batch comandos proporcionan acciones de administración para trabajos por lotes de Spark.

Para enumerar todos los trabajos en ejecución, ejecute el siguiente comando.

  • El comando azdata:

    azdata bdc spark batch list -o table
    
  • El curl comando, mediante Livy:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches
    

Para obtener información de un lote de Spark con el identificador especificado, ejecute el siguiente comando. batch id Se devuelve de spark batch create.

  • El comando azdata:

    azdata bdc spark batch info --batch-id 0
    
  • El curl comando, mediante Livy:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>
    

Para obtener información de estado de un lote de Spark con el identificador especificado, ejecute el siguiente comando.

  • El comando azdata:

    azdata bdc spark batch state --batch-id 0
    
  • El curl comando, mediante Livy:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/state
    

Para obtener los registros de un lote de Spark con el identificador especificado, ejecute el siguiente comando.

  • El comando azdata:

    azdata bdc spark batch log --batch-id 0
    
  • El curl comando, mediante Livy:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/log
    

Next steps

Para obtener información sobre cómo solucionar problemas de código spark, consulte Solución de problemas de un cuaderno de PySpark.

El código de ejemplo completo de Spark está disponible en ejemplos de Spark de clústeres de macrodatos de SQL Server en GitHub.

Para obtener más información sobre los clústeres de macrodatos de SQL Server y los escenarios relacionados, vea Clústeres de macrodatos de SQL Server.