Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Introdução
O Azure Synapse Dedicated SQL Pool Connector for Apache Spark no Azure Synapse Analytics permite a transferência eficiente de grandes conjuntos de dados entre o tempo de execução do Apache Spark e o pool SQL dedicado. O conector é enviado como uma biblioteca predefinida com a Área de Trabalho do Azure Synapse. O conector é implementado usando a linguagem Scala. O conector suporta Scala e Python. Para usar o Conector com outras opções de idioma do bloco de anotações, use o comando mágico do Spark - %%spark.
Em um alto nível, o conector fornece os seguintes recursos:
- Leia a partir do Azure Synapse Dedicated SQL Pool:
- Leia grandes conjuntos de dados a partir de tabelas e visões do Synapse Dedicated SQL Pool (Internas e Externas).
- Suporte abrangente à propagação de predicados, onde os filtros em DataFrame são mapeados para a correspondente propagação de predicados em SQL.
- Suporte para poda de colunas.
- Suporte para otimização de consultas.
- Escreva no pool de SQL dedicado do Azure Synapse:
- Ingerir dados em grande volume em tipos de tabelas internas e externas.
- Suporta as seguintes preferências do modo de salvamento DataFrame:
AppendErrorIfExistsIgnoreOverwrite
- O tipo Write to External Table suporta o formato de arquivo Parquet e Delimited Text (exemplo - CSV).
- Para gravar dados em tabelas internas, o conector agora usa instrução COPY em vez da abordagem CETAS/CTAS.
- Melhorias para otimizar o desempenho da taxa de transferência de gravação de ponta a ponta.
- Introduz um identificador de retorno de chamada opcional (um argumento de função Scala) que os clientes podem usar para receber métricas pós-gravação.
- Alguns exemplos incluem - número de registros, duração para concluir determinada ação e motivo da falha.
Abordagem de orquestração
Leia
Escrever
Pré-requisitos
Os pré-requisitos, como a configuração dos recursos necessários do Azure e as etapas para configurá-los, são discutidos nesta seção.
Recursos do Azure
Analise e configure os seguintes Recursos do Azure dependentes:
- Azure Data Lake Storage - usado como a conta de armazenamento principal para o Azure Synapse Workspace.
- Azure Synapse Workspace - crie blocos de anotações, crie e implante fluxos de trabalho de entrada e saída baseados em DataFrame.
- Pool SQL dedicado (anteriormente SQL DW) - fornece recursos de armazenamento de dados corporativos.
- Azure Synapse Serverless Spark Pool - Tempo de execução do Spark onde os trabalhos são executados como Aplicativos Spark.
Preparar a base de dados
Conecte-se ao banco de dados Synapse Dedicated SQL Pool e execute as seguintes instruções de instalação:
Crie um utilizador de base de dados mapeado para a Identidade de utilizador do Microsoft Entra usada para iniciar sessão no Azure Synapse Workspace.
CREATE USER [username@domain.com] FROM EXTERNAL PROVIDER;Crie um esquema no qual as tabelas serão definidas, de modo que o Connector possa gravar e ler com êxito as respetivas tabelas.
CREATE SCHEMA [<schema_name>];
Autenticação
Autenticação baseada no Microsoft Entra ID
A autenticação baseada no Microsoft Entra ID é uma abordagem de autenticação integrada. O usuário deve entrar com êxito no Espaço de Trabalho do Azure Synapse Analytics.
Autenticação básica
Uma abordagem de autenticação básica requer que o usuário configure username e password opções. Consulte a seção - Opções de configuração para saber mais sobre parâmetros de configuração relevantes para leitura e gravação em tabelas no Azure Synapse Dedicated SQL Pool.
Autorização
Azure Data Lake Storage Gen2 (Armazenamento Azure Data Lake Gen2)
Há duas maneiras de conceder permissões de acesso ao Azure Data Lake Storage Gen2 - Conta de Armazenamento:
- Controle de Acesso Baseado em Funções - Função de Colaborador de Dados de Blob de Armazenamento
- A atribuição de
Storage Blob Data Contributor Roleconcede ao Utilizador permissões para ler, gravar e eliminar nos Blobs de Armazenamento do Azure. - O RBAC oferece uma abordagem de controle geral ao nível do contêiner.
- A atribuição de
-
Listas de controle de acesso (ACL)
- A abordagem ACL permite controles refinados sobre caminhos e/ou arquivos específicos em uma determinada pasta.
- As verificações de ACL não serão aplicadas se o Usuário já tiver recebido permissões usando a abordagem RBAC.
- Existem dois tipos amplos de permissões de ACL:
- Permissões de acesso (aplicadas em um nível ou objeto específico).
- Permissões padrão (aplicadas automaticamente a todos os objetos filho quando são criados).
- Os tipos de permissões incluem:
-
ExecutePermite a capacidade de percorrer ou navegar pelas hierarquias de pastas. -
Readpermite a leitura. -
Writepermite escrever.
-
- É importante configurar ACLs para que o conector possa gravar e ler com êxito nos locais de armazenamento.
Nota
Se quiser executar blocos de anotações usando pipelines do Synapse Workspace, você também deve conceder permissões de acesso listadas acima para a identidade gerenciada padrão do Synapse Workspace. O nome de identidade gerenciado padrão do espaço de trabalho é igual ao nome do espaço de trabalho.
Para usar o espaço de trabalho Synapse com contas de armazenamento seguras, um ponto de extremidade privado gerido deve ser configurado a partir do notebook. O ponto de extremidade privado gerenciado deve ser aprovado na seção da conta
Private endpoint connectionsde armazenamento ADLS Gen2 noNetworkingpainel.
Azure Synapse Dedicated SQL Pool
Para permitir uma interação eficaz com o SQL Pool Dedicado do Azure Synapse, é necessário obter a seguinte autorização, a menos que se seja um utilizador também configurado como um Active Directory Admin no Endpoint SQL Dedicado.
Ler cenário
Conceda ao usuário
db_exporterusando o procedimentosp_addrolememberarmazenado do sistema .EXEC sp_addrolemember 'db_exporter', [<your_domain_user>@<your_domain_name>.com];
Escrever cenário
- O Connector usa o comando COPY para gravar dados da zona de preparação no destino gerido da tabela interna.
Configure as permissões necessárias descritas aqui.
A seguir está um trecho de acesso rápido do mesmo:
--Make sure your user has the permissions to CREATE tables in the [dbo] schema GRANT CREATE TABLE TO [<your_domain_user>@<your_domain_name>.com]; GRANT ALTER ON SCHEMA::<target_database_schema_name> TO [<your_domain_user>@<your_domain_name>.com]; --Make sure your user has ADMINISTER DATABASE BULK OPERATIONS permissions GRANT ADMINISTER DATABASE BULK OPERATIONS TO [<your_domain_user>@<your_domain_name>.com]; --Make sure your user has INSERT permissions on the target table GRANT INSERT ON <your_table> TO [<your_domain_user>@<your_domain_name>.com]
- O Connector usa o comando COPY para gravar dados da zona de preparação no destino gerido da tabela interna.
Documentação da API
Azure Synapse Dedicated SQL Pool Connector for Apache Spark - Documentação da API.
Opções de configuração
Para inicializar e orquestrar com êxito a operação de leitura ou gravação, o Conector espera determinados parâmetros de configuração. A definição de objeto - com.microsoft.spark.sqlanalytics.utils.Constants fornece uma lista de constantes padronizadas para cada chave de parâmetro.
A seguir está a lista de opções de configuração com base no cenário de uso:
-
Ler usando a autenticação baseada no Microsoft Entra ID
- As credenciais são mapeadas automaticamente e o usuário não precisa fornecer opções de configuração específicas.
- O argumento de nome de tabela de três partes no
synapsesqlmétodo é necessário para ler a respetiva tabela no Pool SQL Dedicado do Azure Synapse.
-
Ler usando autenticação básica
- Ponto de extremidade SQL dedicado do Azure Synapse
-
Constants.SERVER- Synapse Dedicated SQL Pool End Point (FQDN do servidor) -
Constants.USER- Nome de usuário SQL. -
Constants.PASSWORD- Senha de usuário SQL.
-
- Ponto Final do Armazenamento Azure Data Lake (Gen 2) - Pastas de Preparo
-
Constants.DATA_SOURCE- O caminho de armazenamento definido no parâmetro de localização da fonte de dados é usado para o preparo de dados.
-
- Ponto de extremidade SQL dedicado do Azure Synapse
-
Escrever usando a autenticação baseada no Microsoft Entra ID
- Ponto de extremidade SQL dedicado do Azure Synapse
- Por padrão, o Conector infere o endpoint Synapse Dedicated SQL pelo nome do banco de dados definido no parâmetro de nome de tabela de três partes do método
synapsesql. - Como alternativa, os utilizadores podem usar a opção
Constants.SERVERpara especificar o ponto de extremidade SQL. Verifique se o ponto final hospeda o banco de dados correspondente com o respetivo esquema.
- Por padrão, o Conector infere o endpoint Synapse Dedicated SQL pelo nome do banco de dados definido no parâmetro de nome de tabela de três partes do método
- Ponto Final do Armazenamento Azure Data Lake (Gen 2) - Pastas de Preparo
- Para o tipo de tabela interna:
- Configure a opção
Constants.TEMP_FOLDERou a opçãoConstants.DATA_SOURCE. - Se o usuário optar por dar a opção
Constants.DATA_SOURCE, a pasta de configuração será derivada usando o valorlocationda fonte de dados. - Se ambos forem fornecidos, o valor da
Constants.TEMP_FOLDERopção será usado. - Na ausência de uma opção de pasta de preparação, o Conector criará uma com base na configuração do tempo de execução -
spark.sqlanalyticsconnector.stagingdir.prefix.
- Configure a opção
- Para o tipo de tabela externa:
-
Constants.DATA_SOURCEé uma opção de configuração necessária. - O conector usa o caminho de armazenamento definido no parâmetro location da fonte de dados em combinação com o argumento
locationdo métodosynapsesqle deriva o caminho absoluto para guardar os dados da tabela externa. - Se o argumento
locationpara o métodosynapsesqlnão for especificado, o conector derivará o valor do local como<base_path>/dbName/schemaName/tableName.
-
- Para o tipo de tabela interna:
- Ponto de extremidade SQL dedicado do Azure Synapse
-
Escrever usando autenticação básica
- Ponto de extremidade SQL dedicado do Azure Synapse
-
Constants.SERVER- - Synapse Dedicated SQL Pool End Point (FQDN do servidor). -
Constants.USER- Nome de usuário SQL. -
Constants.PASSWORD- Senha de usuário SQL. -
Constants.STAGING_STORAGE_ACCOUNT_KEYassociada à Conta de Armazenamento que hospedaConstants.TEMP_FOLDERS(apenas tipos de tabelas internas) ouConstants.DATA_SOURCE.
-
- Ponto Final do Armazenamento Azure Data Lake (Gen 2) - Pastas de Preparo
- As credenciais de autenticação básica do SQL não se aplicam aos pontos finais de armazenamento de acesso.
- Portanto, certifique-se de atribuir permissões de acesso ao armazenamento relevantes, conforme descrito na seção Azure Data Lake Storage Gen2.
- Ponto de extremidade SQL dedicado do Azure Synapse
Modelos de código
Esta seção apresenta modelos de código de referência para descrever como usar e invocar o Azure Synapse Dedicated SQL Pool Connector for Apache Spark.
Nota
Usando o conector em Python-
- O conector é suportado em Python apenas para Spark 3. Consulte a Secção - Utilização de dados materializados entre células.
- O identificador de retorno de chamada não está disponível em Python.
Leia a partir do Azure Synapse Dedicated SQL Pool
Solicitação de leitura - synapsesql assinatura do método
Ler a partir de uma tabela usando a autenticação baseada no Microsoft Entra ID
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Defaults to storage path defined in the runtime configurations
option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>").
//Three-part table name from where data will be read.
synapsesql("<database_name>.<schema_name>.<table_name>").
//Column-pruning i.e., query select column values.
select("<some_column_1>", "<some_column_5>", "<some_column_n>").
//Push-down filter criteria that gets translated to SQL Push-down Predicates.
filter(col("Title").startsWith("E")).
//Fetch a sample of 10 records
limit(10)
//Show contents of the dataframe
dfToReadFromTable.show()
Ler a partir de uma consulta usando a autenticação baseada no Microsoft Entra ID
Nota
Restrições durante a leitura da consulta:
- O nome da tabela e a consulta não podem ser especificados ao mesmo tempo.
- Só são permitidas consultas selecionadas. DDL e DML SQLs não são permitidos.
- As opções de seleção e filtro no dataframe não são enviadas para o pool dedicado SQL quando uma consulta é especificada.
- A leitura a partir de uma query só está disponível no Apache Spark 3.
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
// Name of the SQL Dedicated Pool or database where to run the query
// Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
option(Constants.DATABASE, "<database_name>").
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Defaults to storage path defined in the runtime configurations
option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
//query from which data will be read
.option(Constants.QUERY, "select <column_name>, count(*) as cnt from <schema_name>.<table_name> group by <column_name>")
synapsesql()
val dfToReadFromQueryAsArgument:DataFrame = spark.read.
// Name of the SQL Dedicated Pool or database where to run the query
// Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
option(Constants.DATABASE, "<database_name>")
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Defaults to storage path defined in the runtime configurations
option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
//query from which data will be read
.synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")
//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()
Ler a partir de uma tabela usando autenticação básica
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Set database user name
option(Constants.USER, "<user_name>").
//Set user's password to the database
option(Constants.PASSWORD, "<user_password>").
//Set name of the data source definition that is defined with database scoped credentials.
//Data extracted from the table will be staged to the storage path defined on the data source's location setting.
option(Constants.DATA_SOURCE, "<data_source_name>").
//Three-part table name from where data will be read.
synapsesql("<database_name>.<schema_name>.<table_name>").
//Column-pruning i.e., query select column values.
select("<some_column_1>", "<some_column_5>", "<some_column_n>").
//Push-down filter criteria that gets translated to SQL Push-down Predicates.
filter(col("Title").startsWith("E")).
//Fetch a sample of 10 records
limit(10)
//Show contents of the dataframe
dfToReadFromTable.show()
Ler a partir de uma consulta usando autenticação básica
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
// Name of the SQL Dedicated Pool or database where to run the query
// Database can be specified as a Spark Config or as a Constant - Constants.DATABASE
spark.conf.set("spark.sqlanalyticsconnector.dw.database", "<database_name>")
// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
//Name of the SQL Dedicated Pool or database where to run the query
//Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
option(Constants.DATABASE, "<database_name>").
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Set database user name
option(Constants.USER, "<user_name>").
//Set user's password to the database
option(Constants.PASSWORD, "<user_password>").
//Set name of the data source definition that is defined with database scoped credentials.
//Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
option(Constants.DATA_SOURCE, "<data_source_name>").
//Query where data will be read.
option(Constants.QUERY, "select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>" ).
synapsesql()
val dfToReadFromQueryAsArgument:DataFrame = spark.read.
//Name of the SQL Dedicated Pool or database where to run the query
//Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
option(Constants.DATABASE, "<database_name>").
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Set database user name
option(Constants.USER, "<user_name>").
//Set user's password to the database
option(Constants.PASSWORD, "<user_password>").
//Set name of the data source definition that is defined with database scoped credentials.
//Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
option(Constants.DATA_SOURCE, "<data_source_name>").
//Query where data will be read.
synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")
//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()
Escrever no Azure Synapse Dedicated SQL Pool
Write Request - synapsesql assinatura do método
synapsesql(tableName:String,
tableType:String = Constants.INTERNAL,
location:Option[String] = None,
callBackHandle=Option[(Map[String, Any], Option[Throwable])=>Unit]):Unit
Escrever usando a autenticação baseada no Microsoft Entra ID
A seguir está um modelo de código abrangente que descreve como usar o conector para cenários de gravação:
//Add required imports
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
//Define read options for example, if reading from CSV source, configure header and delimiter options.
val pathToInputSource="abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_folder>/<some_dataset>.csv"
//Define read configuration for the input CSV
val dfReadOptions:Map[String, String] = Map("header" -> "true", "delimiter" -> ",")
//Initialize DataFrame that reads CSV data from a given source
val readDF:DataFrame=spark.
read.
options(dfReadOptions).
csv(pathToInputSource).
limit(1000) //Reads first 1000 rows from the source CSV input.
//Setup and trigger the read DataFrame for write to Synapse Dedicated SQL Pool.
//Fully qualified SQL Server DNS name can be obtained using one of the following methods:
// 1. Synapse Workspace - Manage Pane - SQL Pools - <Properties view of the corresponding Dedicated SQL Pool>
// 2. From Azure Portal, follow the bread-crumbs for <Portal_Home> -> <Resource_Group> -> <Dedicated SQL Pool> and then go to Connection Strings/JDBC tab.
//If `Constants.SERVER` is not provided, the value will be inferred by using the `database_name` in the three-part table name argument to the `synapsesql` method.
//Like-wise, if `Constants.TEMP_FOLDER` is not provided, the connector will use the runtime staging directory config (see section on Configuration Options for details).
val writeOptionsWithAADAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")
//Setup optional callback/feedback function that can receive post write metrics of the job performed.
var errorDuringWrite:Option[Throwable] = None
val callBackFunctionToReceivePostWriteMetrics: (Map[String, Any], Option[Throwable]) => Unit =
(feedback: Map[String, Any], errorState: Option[Throwable]) => {
println(s"Feedback map - ${feedback.map{case(key, value) => s"$key -> $value"}.mkString("{",",\n","}")}")
errorDuringWrite = errorState
}
//Configure and submit the request to write to Synapse Dedicated SQL Pool (note - default SaveMode is set to ErrorIfExists)
//Sample below is using AAD-based authentication approach; See further examples to leverage SQL Basic auth.
readDF.
write.
//Configure required configurations.
options(writeOptionsWithAADAuth).
//Choose a save mode that is apt for your use case.
mode(SaveMode.Overwrite).
synapsesql(tableName = "<database_name>.<schema_name>.<table_name>",
//For external table type value is Constants.EXTERNAL
tableType = Constants.INTERNAL,
//Optional parameter that is used to specify external table's base folder; defaults to `database_name/schema_name/table_name`
location = None,
//Optional parameter to receive a callback.
callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))
//If write request has failed, raise an error and fail the Cell's execution.
if(errorDuringWrite.isDefined) throw errorDuringWrite.get
Escrever usando autenticação básica
O trecho de código a seguir substitui a definição de gravação descrita na seção Gravar usando autenticação baseada em ID do Microsoft Entra, para enviar uma solicitação de gravação usando a abordagem de autenticação básica do SQL.
//Define write options to use SQL basic authentication
val writeOptionsWithBasicAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
//Set database user name
Constants.USER -> "<user_name>",
//Set database user's password
Constants.PASSWORD -> "<user_password>",
//Required only when writing to an external table. For write to internal table, this can be used instead of TEMP_FOLDER option.
Constants.DATA_SOURCE -> "<Name of the datasource as defined in the target database>"
//To be used only when writing to internal tables. Storage path will be used for data staging.
Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")
//Configure and submit the request to write to Synapse Dedicated SQL Pool.
readDF.
write.
options(writeOptionsWithBasicAuth).
//Choose a save mode that is apt for your use case.
mode(SaveMode.Overwrite).
synapsesql(tableName = "<database_name>.<schema_name>.<table_name>",
//For external table type value is Constants.EXTERNAL
tableType = Constants.INTERNAL,
//Not required for writing to an internal table
location = None,
//Optional parameter.
callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))
Em uma abordagem de autenticação básica, para ler dados de um caminho de armazenamento de origem, outras opções de configuração são necessárias. O trecho de código a seguir fornece um exemplo de como ler de uma fonte de dados do Azure Data Lake Storage Gen2 usando credenciais de um Principal de Serviço:
//Specify options that Spark runtime must support when interfacing and consuming source data
val storageAccountName="<storageAccountName>"
val storageContainerName="<storageContainerName>"
val subscriptionId="<AzureSubscriptionID>"
val spnClientId="<ServicePrincipalClientID>"
val spnSecretKeyUsedAsAuthCred="<spn_secret_key_value>"
val dfReadOptions:Map[String, String]=Map("header"->"true",
"delimiter"->",",
"fs.defaultFS" -> s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net",
s"fs.azure.account.auth.type.$storageAccountName.dfs.core.windows.net" -> "OAuth",
s"fs.azure.account.oauth.provider.type.$storageAccountName.dfs.core.windows.net" ->
"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id" -> s"$spnClientId",
"fs.azure.account.oauth2.client.secret" -> s"$spnSecretKeyUsedAsAuthCred",
"fs.azure.account.oauth2.client.endpoint" -> s"https://login.microsoftonline.com/$subscriptionId/oauth2/token",
"fs.AbstractFileSystem.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.Abfs",
"fs.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem")
//Initialize the Storage Path string, where source data is maintained/kept.
val pathToInputSource=s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net/<base_path_for_source_data>/<specific_file (or) collection_of_files>"
//Define data frame to interface with the data source
val df:DataFrame = spark.
read.
options(dfReadOptions).
csv(pathToInputSource).
limit(100)
Modos de salvamento DataFrame suportados
Os seguintes modos de salvamento são suportados ao gravar dados de origem em uma tabela de destino no Pool SQL Dedicado do Azure Synapse:
- ErrorIfExists (modo de gravação padrão)
- Se a tabela de destino existir, a gravação será anulada e uma exceção será retornada ao chamador. Caso contrário, cria-se uma nova tabela com dados das pastas de preparação.
- Ignorar
- Se a tabela de destino existir, a solicitação de gravação será ignorada sem retornar um erro. Caso contrário, cria-se uma nova tabela com dados das pastas de preparação.
- Substituir
- Se a tabela de destino existir, os dados existentes no destino serão substituídos por dados das pastas de preparo. Caso contrário, cria-se uma nova tabela com dados das pastas de preparação.
- Anexar
- Se a tabela de destino existir, os novos dados serão anexados a ela. Caso contrário, cria-se uma nova tabela com dados das pastas de preparação.
Identificador de retorno de chamada de pedido de escrita
As novas alterações na API de caminho de gravação introduziram um recurso experimental para fornecer ao cliente um mapa chave-valor> de métricas pós-gravação. As chaves para as métricas são definidas na nova definição de objeto - Constants.FeedbackConstants. As métricas podem ser recuperadas como uma string JSON ao fornecer o identificador de retorno de chamada (a Scala Function). Segue-se a assinatura da função:
//Function signature is expected to have two arguments - a `scala.collection.immutable.Map[String, Any]` and an Option[Throwable]
//Post-write if there's a reference of this handle passed to the `synapsesql` signature, it will be invoked by the closing process.
//These arguments will have valid objects in either Success or Failure case. In case of Failure the second argument will be a `Some(Throwable)`.
(Map[String, Any], Option[Throwable]) => Unit
A seguir estão algumas métricas notáveis (apresentadas no caso do camelo):
WriteFailureCauseDataStagingSparkJobDurationInMillisecondsNumberOfRecordsStagedForSQLCommitSQLStatementExecutionDurationInMillisecondsrows_processed
A seguir está um exemplo de cadeia de caracteres JSON com métricas pós-gravação:
{
SparkApplicationId -> <spark_yarn_application_id>,
SQLStatementExecutionDurationInMilliseconds -> 10113,
WriteRequestReceivedAtEPOCH -> 1647523790633,
WriteRequestProcessedAtEPOCH -> 1647523808379,
StagingDataFileSystemCheckDurationInMilliseconds -> 60,
command -> "COPY INTO [schema_name].[table_name] ...",
NumberOfRecordsStagedForSQLCommit -> 100,
DataStagingSparkJobEndedAtEPOCH -> 1647523797245,
SchemaInferenceAssertionCompletedAtEPOCH -> 1647523790920,
DataStagingSparkJobDurationInMilliseconds -> 5252,
rows_processed -> 100,
SaveModeApplied -> TRUNCATE_COPY,
DurationInMillisecondsToValidateFileFormat -> 75,
status -> Completed,
SparkApplicationName -> <spark_application_name>,
ThreePartFullyQualifiedTargetTableName -> <database_name>.<schema_name>.<table_name>,
request_id -> <query_id_as_retrieved_from_synapse_dedicated_sql_db_query_reference>,
StagingFolderConfigurationCheckDurationInMilliseconds -> 2,
JDBCConfigurationsSetupAtEPOCH -> 193,
StagingFolderConfigurationCheckCompletedAtEPOCH -> 1647523791012,
FileFormatValidationsCompletedAtEPOCHTime -> 1647523790995,
SchemaInferenceCheckDurationInMilliseconds -> 91,
SaveModeRequested -> Overwrite,
DataStagingSparkJobStartedAtEPOCH -> 1647523791993,
DurationInMillisecondsTakenToGenerateWriteSQLStatements -> 4
}
Mais exemplos de código
Utilizando dados materializados através de células
O Spark DataFrame pode createOrReplaceTempView ser usado para acessar dados obtidos em outra célula, registrando uma exibição temporária.
- Célula onde os dados são obtidos (digamos com a preferência de idioma do Bloco de Anotações como
Scala)
//Necessary imports
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
//Configure options and read from Synapse Dedicated SQL Pool.
val readDF = spark.read.
//Set Synapse Dedicated SQL End Point name.
option(Constants.SERVER, "<synapse-dedicated-sql-end-point>.sql.azuresynapse.net").
//Set database user name.
option(Constants.USER, "<user_name>").
//Set database user's password.
option(Constants.PASSWORD, "<user_password>").
//Set name of the data source definition that is defined with database scoped credentials.
option(Constants.DATA_SOURCE,"<data_source_name>").
//Set the three-part table name from which the read must be performed.
synapsesql("<database_name>.<schema_name>.<table_name>").
//Optional - specify number of records the DataFrame would read.
limit(10)
//Register the temporary view (scope - current active Spark Session)
readDF.createOrReplaceTempView("<temporary_view_name>")
- Agora, altere a preferência de idioma no Notebook
PySpark (Python)e busque dados da vista registada<temporary_view_name>
spark.sql("select * from <temporary_view_name>").show()
Tratamento das respostas
Invocar synapsesql tem dois estados finais possíveis - Sucesso ou um Estado de Falha. Esta seção descreve como lidar com a resposta de solicitação para cada cenário.
Ler resposta de solicitação
Após a conclusão, o fragmento de resposta de leitura é exibido na saída da célula. Uma falha na célula atual também cancelará as execuções das células subsequentes. Informações detalhadas sobre erros estão disponíveis nos logs do aplicativo Spark.
Escrever resposta de solicitação
Por padrão, uma resposta de gravação é impressa na saída da célula. Em caso de falha, a célula atual é marcada como falhada e as execuções de células subsequentes serão canceladas. A outra abordagem é passar a opção handle de retorno de chamada para o método synapsesql. O identificador de retorno de chamada fornecerá acesso programático à resposta de gravação.
Outras considerações
- Ao ler as tabelas do Pool SQL Dedicado do Azure Synapse:
- Considere aplicar os filtros necessários no DataFrame para aproveitar a funcionalidade de poda de colunas do Connector.
- O cenário de leitura não dá suporte à cláusula
TOP(n-rows), ao formular as instruções de consultaSELECT. A opção para limitar dados é usar a cláusula limit(.) do DataFrame.- Consulte o exemplo na seção Usando dados materializados entre células.
- Ao escrever nas tabelas do Pool SQL Dedicado do Azure Synapse:
- Para tipos de tabelas internas:
- As tabelas são criadas com a distribuição de dados ROUND_ROBIN.
- Os tipos de coluna são inferidos a partir do DataFrame que leria dados da origem. As colunas de cadeia de caracteres são mapeadas para
NVARCHAR(4000).
- Para tipos de tabelas externas:
- O paralelismo inicial do DataFrame orienta a organização de dados para a tabela externa.
- Os tipos de coluna são inferidos a partir do DataFrame que leria dados da origem.
- Uma melhor distribuição de dados entre executores pode ser alcançada ajustando o parâmetro
spark.sql.files.maxPartitionBytese o parâmetrorepartitiondo DataFrame. - Ao escrever grandes conjuntos de dados, é importante levar em consideração o impacto da configuração de Nível de Desempenho DWU que limita o tamanho da transação.
- Para tipos de tabelas internas:
- Monitore as tendências de utilização do Azure Data Lake Storage Gen2 para identificar comportamentos de limitação que podem afetar o desempenho de leitura e gravação.