Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Você pode ler e gravar tabelas do Amazon Redshift com o Azure Databricks.
Importante
A documentação da federação de consulta herdada foi desativada e pode não ser atualizada. As configurações mencionadas neste conteúdo não são oficialmente endossadas ou testadas pelo Databricks. Se Federação Lakehouse der suporte ao seu banco de dados de origem, Databricks recomenda usá-lo.
A fonte de dados do Databricks Redshift usa Amazon S3 para transferir dados de forma eficiente para dentro e fora do Redshift e utiliza JDBC para disparar automaticamente os comandos apropriados COPY e UNLOAD no Redshift.
Observação
No Databricks Runtime 11.3 LTS e posteriores, o Databricks Runtime inclui o driver JDBC do Redshift, acessível usando a redshift palavra-chave para a opção de formato. Consulte as notas de lançamento das versões do Databricks Runtime e a compatibilidade para as versões de driver incluídas em cada Databricks Runtime. Os drivers fornecidos pelo usuário ainda têm suporte e têm precedência sobre o driver JDBC empacotado.
No Databricks Runtime 10.4 LTS e abaixo, a instalação manual do driver JDBC do Redshift é necessária e as consultas devem usar o driver (com.databricks.spark.redshift) para o formato. Consulte a instalação do driver do Redshift.
Uso
Os exemplos a seguir demonstram a conexão com o driver do Redshift. Substitua os valores do parâmetro url se estiver usando o driver JDBC do PostgreSQL.
Depois de configurar suas credenciais do AWS, você pode usar a fonte de dados com a API de fonte de dados do Spark em Python, SQL, R ou Scala.
Importante
Locais externos definidos no Unity Catalog não são compatíveis como tempdir locais.
Python
# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") # Optional - will use default port 5439 if not specified.
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a query
df = (spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table
# Write back to a table
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
)
# Write back to a table using IAM Role based authentication
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
)
SQL
Leia dados usando o SQL no Databricks Runtime 10.4 LTS e abaixo:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
dbtable '<table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Leia dados usando o SQL no Databricks Runtime 11.3 LTS e acima:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
host '<hostname>',
port '<port>', /* Optional - will use default port 5439 if not specified. *./
user '<username>',
password '<password>',
database '<database-name>'
dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
tempdir 's3a://<bucket>/<directory-path>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Gravar dados usando SQL:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
dbtable '<new-table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;
A API do SQL dá suporte apenas à criação de novas tabelas e não à substituição ou acréscimo.
R
Leia os dados usando o R no Databricks Runtime 10.4 LTS e abaixo:
df <- read.df(
NULL,
"com.databricks.spark.redshift",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
dbtable = "<your-table-name>",
url = "jdbc:redshift://<the-rest-of-the-connection-string>")
Leia dados usando o R no Databricks Runtime 11.3 LTS e superior:
df <- read.df(
NULL,
"redshift",
host = "hostname",
port = "port",
user = "username",
password = "password",
database = "database-name",
dbtable = "schema-name.table-name",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
forward_spark_s3_credentials = "true",
dbtable = "<your-table-name>")
Scala (linguagem de programação)
// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") /* Optional - will use default port 5439 if not specified. */
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", true)
.load()
// Read data from a query
val df = spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table
// Write back to a table
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
// Write back to a table using IAM Role based authentication
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
Recomendações para trabalhar com o Redshift
A execução da consulta pode extrair grandes quantidades de dados para S3. Se você planeja executar várias consultas em relação aos mesmos dados no Redshift, o Databricks recomenda salvar os dados extraídos usando o Delta Lake.
Configuração
Autenticação no S3 e no Redshift
A fonte de dados envolve várias conexões de rede, ilustradas no diagrama a seguir:
┌───────┐
┌───────────────────>│ S3 │<─────────────────┐
│ IAM or keys └───────┘ IAM or keys │
│ ^ │
│ │ IAM or keys │
v v ┌──────v────┐
┌────────────┐ ┌───────────┐ │┌──────────┴┐
│ Redshift │ │ Spark │ ││ Spark │
│ │<──────────>│ Driver │<────────>| Executors │
└────────────┘ └───────────┘ └───────────┘
JDBC with Configured
username / in
password Spark
(SSL enabled by default)
A fonte de dados lê e grava dados no S3 ao transferir dados de/para o Redshift. Como resultado, ele requer credenciais do AWS com acesso de leitura e gravação a um bucket S3 (especificado usando o tempdir parâmetro de configuração).
Observação
A fonte de dados não limpa os arquivos temporários que ele cria no S3. Como resultado, recomendamos que você use um bucket S3 temporário dedicado com uma configuração de ciclo de vida de objeto para garantir que os arquivos temporários sejam excluídos automaticamente após um período de expiração especificado. Consulte a seção Criptografia deste documento para obter uma discussão sobre como criptografar esses arquivos. Você não pode usar uma localidade externa definida no Catálogo do Unity como uma tempdir localidade.
As seções a seguir descrevem as opções de configuração de autenticação de cada conexão:
Driver do Spark para Redshift
O driver spark se conecta ao Redshift via JDBC usando um nome de usuário e senha. O Redshift não dá suporte ao uso de funções IAM para autenticar essa conexão. Por padrão, essa conexão usa criptografia SSL; para obter mais detalhes, consulte Criptografia.
Spark para S3
O S3 atua como um intermediário para armazenar dados em massa ao ler ou gravar no Redshift. O Spark conecta-se ao S3 usando as interfaces do Hadoop FileSystem e usando diretamente o cliente S3 do SDK do Amazon Java.
Observação
Você não pode usar montagens DBFS para configurar o acesso ao S3 para Redshift.
Definir chaves na configuração do Hadoop: Você pode especificar chaves AWS usando propriedades de configuração do Hadoop. Se a configuração
tempdirapontar para um sistema de arquivoss3a://, você pode definir as propriedadesfs.s3a.access.keyefs.s3a.secret.keyem um arquivo de configuração XML do Hadoop ou chamarsc.hadoopConfiguration.set()para configurar a configuração global do Hadoop no Spark. Se você usar ums3n://sistema de arquivos, poderá fornecer as chaves de configuração herdadas, conforme mostrado no exemplo a seguir.Scala (linguagem de programação)
Por exemplo, se você estiver usando o
s3asistema de arquivos, adicione:sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")Para o sistema de arquivos herdado
s3n, adicione:sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")Python
O comando a seguir depende de alguns internos do Spark, mas deve funcionar com todas as versões do PySpark e é improvável que mude no futuro:
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
Redshift para S3
Defina a opção forward_spark_s3_credentials para true encaminhar automaticamente as credenciais de chave do AWS que o Spark está usando para se conectar ao S3 via JDBC para o Redshift. A consulta JDBC insere essas credenciais, portanto, o Databricks recomenda fortemente que você habilite a criptografia SSL da conexão JDBC.
Encriptação
Protegendo jdbc: a menos que quaisquer configurações relacionadas a SSL estejam presentes na URL JDBC, a fonte de dados por padrão habilita a criptografia SSL e também verifica se o servidor Redshift é confiável (ou seja,
sslmode=verify-full). Para isso, um certificado de servidor é baixado automaticamente dos servidores Amazon na primeira vez que é necessário. Caso isso falhe, um arquivo de certificado pré-empacotado é usado como um fallback. Isso vale tanto para o Redshift quanto para os drivers JDBC do PostgreSQL.Caso haja algum problema com esse recurso ou você simplesmente queira desabilitar o SSL, você pode chamar
.option("autoenablessl", "false")seuDataFrameReaderouDataFrameWriter.Se você quiser especificar configurações personalizadas relacionadas a SSL, siga as instruções na documentação do Redshift: Usando SSL e Certificados de Servidor em Java e Opções de Configuração do Driver JDBC. Quaisquer opções relacionadas a SSL presentes no JDBC
urlusado com a fonte de dados terão precedência (ou seja, a configuração automática não será ativada).Criptografar dados UNLOAD armazenados em S3 (dados armazenados ao ler do Redshift): de acordo com a documentação do Redshift sobre o Descarregamento de Dados para S3, "UNLOAD criptografa automaticamente arquivos de dados usando a criptografia do lado do servidor do Amazon S3 (SSE-S3)."
O Redshift também dá suporte à criptografia do lado do cliente com uma chave personalizada (veja: Descarregando arquivos de dados criptografados), mas a fonte de dados não tem a capacidade de especificar a chave simétrica necessária.
Criptografando dados COPY armazenados em S3 (dados armazenados ao gravar no Redshift): de acordo com a documentação do Redshift sobre Carregar Arquivos de Dados Criptografados do Amazon S3:
Você pode usar o COPY comando para carregar arquivos de dados que foram carregados no Amazon S3 usando criptografia do lado do servidor com chaves de criptografia gerenciadas pelo AWS (SSE-S3 ou SSE-KMS), criptografia do lado do cliente ou ambas. O COPY não dá suporte à criptografia do lado do servidor do Amazon S3 com uma chave fornecida pelo cliente (SSE-C).
Parâmetros
O mapa de parâmetros ou OPÇÕES fornecido no SQL do Spark dá suporte às seguintes configurações:
| Parâmetro | Obrigatório | Padrão | Descrição |
|---|---|---|---|
| dbtable | Sim, a menos que a consulta seja especificada. | Nenhum | A tabela a ser criada ou lida no Redshift. Esse parâmetro é necessário ao salvar dados de volta no Redshift. |
| consulta | Sim, a menos que dbtable seja especificado. | Nenhum | A consulta que será usada para leitura no Redshift. |
| usuário | Não | Nenhum | O nome de usuário do Redshift. Deve ser usado em conjunto com a opção de senha. Só poderá ser usado se o usuário e a senha não forem passados na URL, passar ambos resultará em um erro. Use esse parâmetro quando o nome de usuário contiver caracteres especiais que precisam ser escapados. |
| senha | Não | Nenhum | A senha do Redshift. Precisa ser usado em conjunto com a opção user. Só poderá ser usado se o usuário e a senha não forem passados na URL; passar ambos resultará em um erro. Use esse parâmetro quando a senha contiver caracteres especiais que precisam ser escapados. |
| URL | Sim | Nenhum | Uma URL JDBC, do formatojdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password>subprotocol pode ser postgresql ou redshift, dependendo de qual driver JDBC você carregou. Um driver compatível com o Redshift deve estar no classpath e corresponder a essa URL.
host e port devem apontar para o nó mestre do Redshift, portanto, grupos de segurança e/ou VPC devem ser configurados para permitir o acesso do seu aplicativo do driver.
database identifica um nome user de banco de dados do Redshift e password são credenciais para acessar o banco de dados, que deve ser inserido nessa URL para JDBC, e sua conta de usuário deve ter privilégios necessários para a tabela que está sendo referenciada. |
| caminho_de_busca | Não | Nenhum | Defina o caminho de pesquisa de esquema no Redshift. Será definido usando o SET search_path to comando. Deve ser uma lista separada por vírgulas de nomes de esquema onde procurar pelas tabelas. Consulte a documentação do Redshift sobre search_path. |
| aws_iam_role (função IAM da AWS) | Somente se estiver usando funções IAM para autorizar. | Nenhum | ARN totalmente especificado da função para operações COPY/UNLOAD do IAM Redshift anexada ao cluster do Redshift, por exemplo, . |
| forward_spark_s3_credentials | Não | false |
Se true, a fonte de dados descobrirá automaticamente as credenciais que o Spark está usando para se conectar ao S3 e encaminha essas credenciais para o Redshift via JDBC. Essas credenciais são enviadas como parte da consulta JDBC, portanto, é altamente recomendável habilitar a criptografia SSL da conexão JDBC ao usar essa opção. |
| temporary_aws_access_key_id (ID de chave de acesso temporário do AWS) | Não | Nenhum | A chave de acesso AWS deve ter permissões de gravação no bucket S3. |
| temporary_aws_secret_access_key | Não | Nenhum | Chave de acesso secreta do AWS correspondente à chave de acesso fornecida. |
| token_de_sessão_temporária_aws | Não | Nenhum | Token de sessão do AWS correspondente à chave de acesso fornecida. |
| tempdir | Sim | Nenhum | Um diretório gravável no Amazon S3, a ser utilizado para dados descarregados durante a leitura e para dados Avro a serem carregados no Redshift durante a gravação. Se você estiver usando a fonte de dados do Redshift para Spark como parte de um pipeline de ETL regular, poderá ser útil definir uma Política de Ciclo de Vida em um bucket e usá-la como um local temporário para esses dados. Você não pode usar locais externos definidos no Catálogo do Unity como tempdir locais. |
| jdbcdriver | Não | Determinado pelo subprotocol da URL JDBC. | O nome de classe do driver JDBC a ser usado. Essa classe precisa estar no caminho de classe. Na maioria dos casos, não deve ser necessário especificar essa opção, pois o nome da classe de driver apropriado deve ser determinado automaticamente pelo subprotocol da URL JDBC. |
| diststyle | Não | EVEN |
O Estilo de Distribuição do Redshift a ser usado ao criar uma tabela. Pode ser um de EVEN, KEY ou ALL (consulte a documentação do Redshift). Ao usar KEY, você também deve definir uma chave de distribuição com a opção distkey. |
| distkey | Não, a menos que use DISTSTYLE KEY |
Nenhum | O nome de uma coluna na tabela a ser usada como a chave de distribuição ao criar uma tabela. |
| especificação de chave de ordenação | Não | Nenhum | Uma definição completa da Chave de Classificação do Redshift. Os exemplos incluem:
|
| usestagingtable (obsoleto) | Não | true |
Definir essa opção obsoleta para false faz com que a tabela de destino de uma operação de sobrescrita seja descartada imediatamente no início do processo de escrita, tornando a operação de sobrescrita não atômica e reduzindo a disponibilidade da tabela de destino. Isso pode reduzir os requisitos temporários de espaço em disco para substituições.Como a configuração usestagingtable=false da operação tem risco de perda de dados ou indisponibilidade, ela foi descontinuada em favor da exigência de excluir manualmente a tabela de destino. |
| descrição | Não | Nenhum | Uma descrição da tabela. Será definido usando o comando SQL COMMENT e deverá aparecer na maioria das ferramentas de consulta. Veja também os description metadados para definir descrições em colunas individuais. |
| pré-ações | Não | Nenhum | Uma lista separada de comandos SQL a serem executados antes do carregamento do comando ;. Pode ser útil executar alguns comandos DELETE ou algo semelhante aqui antes de carregar novos dados. Se o comando contiver %s, o nome da tabela será formatado antes da execução (caso você esteja usando uma tabela de preparo).Lembre-se de que, se esses comandos falharem, ele será tratado como um erro e uma exceção será gerada. Se estiver usando uma tabela de preparo, as alterações serão revertidas e a tabela de backup será restaurada se as ações prévias falharem. |
| ações posteriores | Não | Nenhum | Uma ; lista separada de comandos SQL a serem executados após um COPY bem-sucedido ao carregar dados. Pode ser útil, ao carregar novos dados, executar alguns comandos GRANT ou execuções semelhantes aqui. Se o comando contiver %s, o nome da tabela será formatado antes da execução (caso você esteja usando uma tabela de preparo).Lembre-se de que, se esses comandos falharem, ele será tratado como um erro e uma exceção será gerada. Se estiver usando uma tabela de estágio, as alterações serão revertidas, e a tabela de backup será restaurada caso as ações posteriores falhem. |
| opções extras de cópia | Não | Nenhum | Uma lista de opções extras a serem acrescentadas ao comando Redshift COPY ao carregar dados, por exemplo, TRUNCATECOLUMNS ou MAXERROR n (consulte os documentos do Redshift para outras opções).Como essas opções são acrescentadas ao final do comando, somente opções que fazem sentido no final do comando podem ser usadas, mas que devem abranger a maioria dos COPY casos de uso possíveis. |
| tempformat | Não | AVRO |
O formato no qual salvar arquivos temporários no S3 ao gravar no Redshift. O padrão é AVRO; os outros valores permitidos são CSV e CSV GZIP para CSV e CSV compactado, respectivamente.O Redshift é significativamente mais rápido ao carregar CSV do que ao carregar arquivos Avro, portanto, usar esse tempformat pode fornecer um grande aumento de desempenho ao gravar no Redshift. |
| csvnullstring | Não | @NULL@ |
O valor de cadeia de caracteres a ser gravado para nulos ao usar o tempformat CSV. Esse deve ser um valor que não aparece em seus dados reais. |
| separadorCSV | Não | , |
Separador a ser usado ao gravar arquivos temporários com tempformat definido como CSV ou CSV GZIP. Esse deve ser um caractere ASCII válido, por exemplo, "," ou "\|". |
| csvignorarbrancoàesquerda (ignora espaços em branco iniciais) | Não | true |
Quando definido como true, remove o espaço em branco à esquerda dos valores durante gravações quando tempformat é definido como CSV ou CSV GZIP. Caso contrário, o espaço em branco será mantido. |
| csvignoretrailingwhitespace | Não | true |
Quando definido como true, remove o espaço em branco à direita dos valores durante gravações quando tempformat é definido como CSV ou CSV GZIP. Caso contrário, o espaço em branco será mantido. |
| inferir_tipo_de_timestamp_ntz | Não | false |
Se true, os valores do tipo Redshift TIMESTAMP serão interpretados como TimestampNTZType (timestamp sem fuso horário) durante a leitura. Caso contrário, todos os carimbos de data/hora são interpretados como sendo TimestampType, independentemente do tipo na tabela do Redshift subjacente. |
Opções de configuração adicionais
Configurando o tamanho máximo das colunas de cadeia de caracteres
Ao criar tabelas do Redshift, o comportamento padrão é criar TEXT colunas para colunas de cadeia de caracteres. O Redshift armazena TEXT colunas como VARCHAR(256), portanto, essas colunas têm um tamanho máximo de 256 caracteres (origem).
Para dar suporte a colunas maiores, você pode usar o maxlength campo de metadados de coluna para especificar o comprimento máximo de colunas de cadeia de caracteres individuais. Isso também é útil para implementar otimizações de desempenho de economia de espaço declarando colunas com um comprimento máximo menor do que o padrão.
Observação
Devido a limitações no Spark, as APIs de linguagem SQL e R não dão suporte à modificação de metadados de coluna.
Python
df = ... # the dataframe you'll want to write to Redshift
# Specify the custom width of each column
columnLengthMap = {
"language_code": 2,
"country_code": 2,
"url": 2083,
}
# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
metadata = {'maxlength': length}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
df.write \
.format("com.databricks.spark.redshift") \
.option("url", jdbcURL) \
.option("tempdir", s3TempDirectory) \
.option("dbtable", sessionTable) \
.save()
Scala (linguagem de programação)
Aqui está um exemplo de atualização dos campos de metadados de várias colunas usando a API Scala do Spark:
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom width of each column
val columnLengthMap = Map(
"language_code" -> 2,
"country_code" -> 2,
"url" -> 2083
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
val metadata = new MetadataBuilder().putLong("maxlength", length).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
df.write
.format("com.databricks.spark.redshift")
.option("url", jdbcURL)
.option("tempdir", s3TempDirectory)
.option("dbtable", sessionTable)
.save()
Definir um tipo de coluna personalizado
Se você precisar definir manualmente um tipo de coluna, poderá usar os metadados de redshift_type coluna. Por exemplo, se você deseja substituir o Spark SQL Schema -> Redshift SQL correspondente de tipo para atribuir um tipo de coluna definido pelo usuário, faça o seguinte:
Python
# Specify the custom type of each column
columnTypeMap = {
"language_code": "CHAR(2)",
"country_code": "CHAR(2)",
"url": "BPCHAR(111)",
}
df = ... # the dataframe you'll want to write to Redshift
# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
metadata = {'redshift_type': colType}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
Scala (linguagem de programação)
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom type of each column
val columnTypeMap = Map(
"language_code" -> "CHAR(2)",
"country_code" -> "CHAR(2)",
"url" -> "BPCHAR(111)"
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
Configurar a codificação de coluna
Ao criar uma tabela, use o encoding campo de metadados de coluna para especificar uma codificação de compactação para cada coluna (consulte documentos da Amazon para codificações disponíveis).
Definindo descrições em colunas
O Redshift permite que as colunas tenham descrições anexadas que devem aparecer na maioria das ferramentas de consulta (usando o COMMENT comando). Você pode definir o description campo de metadados de coluna para especificar uma descrição para colunas individuais.
Rebaixamento de consultas no Redshift
O otimizador do Spark envia os seguintes operadores para o Redshift:
FilterProjectSortLimitAggregationJoin
Dentro Project e Filter, ele dá suporte às seguintes expressões:
- A maioria dos operadores lógicos boolianos
- Comparações
- Operadores aritméticos básicos
- Conversões numéricas e de cadeia de caracteres
- A maioria das funções de cadeia de caracteres
- Subconsultas escalares, se puderem ser enviadas inteiramente para o Redshift.
Observação
Este pushdown não dá suporte a expressões que operam em datas e timestamps.
No interior Aggregation, ele dá suporte às seguintes funções de agregação:
AVGCOUNTMAXMINSUMSTDDEV_SAMPSTDDEV_POPVAR_SAMPVAR_POP
combinado com a cláusula DISTINCT, quando aplicável.
No interior Join, ele dá suporte aos seguintes tipos de junções:
INNER JOINLEFT OUTER JOINRIGHT OUTER JOINLEFT SEMI JOINLEFT ANTI JOIN- Subconsultas que são reescritas em
Joinpelo otimizador, sendo por exemploWHERE EXISTS,WHERE NOT EXISTS
Observação
O pushdown de junção não dá suporte FULL OUTER JOIN.
A técnica de pushdown é mais benéfica em consultas com LIMIT. Uma consulta como SELECT * FROM large_redshift_table LIMIT 10 poderia levar muito tempo, pois a tabela inteira seria primeiro UNLOADed para S3 como um resultado intermediário. Com o recurso de pushdown, o LIMIT é executado no Redshift. Em consultas com agregações, empurrar a agregação para baixo no Redshift também ajuda a reduzir a quantidade de dados que precisam ser transferidos.
O pushdown de consulta para o Redshift é habilitado por padrão. Ele pode ser desabilitado definindo spark.databricks.redshift.pushdown como false. Mesmo quando desabilitado, o Spark ainda envia filtros para baixo e executa a eliminação de coluna no Redshift.
Instalação do driver do Redshift
A fonte de dados do Redshift também requer um driver JDBC compatível com o Redshift. Como o Redshift é baseado no sistema de banco de dados PostgreSQL, você pode usar o driver JDBC do PostgreSQL incluído no Databricks Runtime ou no driver JDBC do Redshift recomendado pela Amazon. Nenhuma instalação é necessária para usar o driver JDBC do PostgreSQL. A versão do driver JDBC do PostgreSQL incluída em cada versão do Databricks Runtime está listada nas notas de versão do Databricks Runtime.
Para instalar manualmente o driver JDBC do Redshift:
- Baixe o driver da Amazon.
- Carregue o driver no espaço de trabalho do Azure Databricks. Consulte Instalar bibliotecas.
- Instale a biblioteca no cluster.
Observação
O Databricks recomenda usar a versão mais recente do driver JDBC do Redshift. As versões do driver JDBC do Redshift abaixo da 1.2.41 têm as seguintes limitações:
- A versão 1.2.16 do driver retorna dados vazios ao usar uma
wherecláusula em uma consulta SQL. - As versões do driver anteriores à 1.2.41 podem retornar resultados inválidos, pois a nulabilidade de uma coluna está sendo relatada incorretamente como "Não anulável" em vez de "Desconhecido".
Garantias transacionais
Esta seção descreve as garantias transacionais da fonte de dados do Redshift para Spark.
Visão geral sobre as propriedades do Redshift e do S3
Para obter informações gerais sobre garantias transacionais do Redshift, consulte o capítulo Gerenciando operações de gravação simultâneas na documentação do Redshift. Em poucas palavras, o Redshift fornece isolamento serializável de acordo com a documentação do comando BEGIN do Redshift:
embora você possa usar qualquer um dos quatro níveis de isolamento de transação, Amazon Redshift processa todos os níveis de isolamento de forma serializável.
De acordo com a documentação do Redshift:
O Amazon Redshift dá suporte a um comportamento de confirmação automática padrão no qual cada comando SQL executado separadamente é confirmado individualmente.
Assim, comandos individuais como COPY e UNLOAD são atômicos e transacionais, enquanto os comandos explícitos BEGIN e END só devem ser necessários para impor a atomicidade de vários comandos ou consultas.
Ao ler e gravar no Redshift, a fonte de dados lê e grava dados no S3. O Spark e o Redshift produzem saída particionada e a armazenam em vários arquivos no S3. De acordo com a documentação do Modelo de Consistência de Dados do Amazon S3, as operações de listagem de bucket S3 são eventualmente consistentes, portanto, os arquivos devem tomar medidas especiais para evitar dados ausentes ou incompletos devido a essa característica de consistência eventual.
Garantias da fonte de dados do Redshift para Spark
Acrescentar a uma tabela existente
Ao inserir linhas no Redshift, a fonte de dados usa o comando COPY e especifica manifestos para se proteger contra determinadas operações S3 eventualmente consistentes. Como resultado, spark-redshift adições às tabelas existentes possuem as mesmas propriedades atômicas e transacionais que os comandos regulares do Redshift COPY.
Criar uma nova tabela (SaveMode.CreateIfNotExists)
A criação de uma nova tabela é um processo de duas etapas, consistindo em um CREATE TABLE comando seguido por um comando COPY para acrescentar o conjunto inicial de linhas. Ambas as operações são executadas na mesma transação.
Sobrescrever uma tabela existente
Por padrão, a fonte de dados usa transações para executar substituições, que são implementadas excluindo a tabela de destino, criando uma nova tabela vazia e acrescentando linhas a ela.
Se a configuração obsoleta usestagingtable estiver definida como false, a fonte de dados executa o comando DELETE TABLE antes de acrescentar linhas à nova tabela, o que sacrifica a atomicidade da operação de substituição, ao mesmo tempo que reduz a quantidade de espaço de staging que o Redshift precisa durante a substituição.
Consultar tabela do Redshift
As consultas usam o comando UNLOAD do Redshift para executar uma consulta e salvar seus resultados em S3 e usar manifestos para se proteger contra determinadas operações S3 eventualmente consistentes. Como resultado, as consultas da fonte de dados do Redshift para Spark devem ter as mesmas propriedades de consistência que as consultas regulares do Redshift.
Problemas comuns e soluções
O bucket S3 e o cluster redshift estão em diferentes regiões do AWS
Por padrão, as cópias do S3 <–> Redshift não funcionarão se o bucket S3 e o cluster redshift estiverem em diferentes regiões do AWS.
Se você tentar ler uma tabela do Redshift quando o bucket S3 estiver em uma região diferente, poderá ver um erro como:
ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.
Da mesma forma, tentar gravar no Redshift usando um bucket S3 em uma região diferente pode causar o seguinte erro:
error: Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
Escreve: O comando Redshift COPY dá suporte à indicação explícita da região do bucket S3, para que você possa fazer as gravações no Redshift funcionarem corretamente nesses casos adicionando
region 'the-region-name'à configuraçãoextracopyoptions. Por exemplo, com um bucket na região Leste dos EUA (Virgínia) e a API Scala, use:.option("extracopyoptions", "region 'us-east-1'")Como alternativa, você pode usar a
awsregionconfiguração:.option("awsregion", "us-east-1")Lê: O comando UNLOAD do Redshift também dá suporte à especificação explícita da região do bucket S3. Você pode fazer com que as leituras funcionem corretamente ao adicionar a região à configuração
awsregion:.option("awsregion", "us-east-1")
Erro de autenticação ao usar uma senha com caracteres especiais na URL JDBC
Se você estiver fornecendo o nome de usuário e a senha como parte da url JDBC e a senha contiver caracteres especiais, como ;, ?ou &, você poderá ver a seguinte exceção:
java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'
Isso é causado por caracteres especiais no nome de usuário ou senha que não são tratados corretamente pelo driver JDBC. Certifique-se de especificar o nome de usuário e a senha usando as opções de DataFrame correspondentes user e password. Para obter mais informações, confira Parâmetros.
A consulta Spark de longa execução fica eternamente pendente, mesmo após a conclusão da operação correspondente no Redshift.
Se você estiver lendo ou gravando grandes quantidades de dados de e para o Redshift, sua consulta Spark pode ficar pendente indefinidamente, mesmo que a página de monitoramento do Redshift da AWS mostre que a operação correspondente LOAD ou UNLOAD foi concluída e que o cluster está ocioso. Isso é causado pela conexão entre o redshift e o tempo limite do Spark. Para evitar isso, verifique se o tcpKeepAlive sinalizador JDBC está habilitado e TCPKeepAliveMinutes está definido como um valor baixo (por exemplo, 1).
Para obter informações adicionais, consulte Configuração do JDBC Driver do Amazon Redshift.
Marca temporal com semântica de fuso horário
Ao ler dados, os tipos de dados do Redshift TIMESTAMP e TIMESTAMPTZ são mapeados para o Spark TimestampType, e um valor é convertido para Tempo Universal Coordenado (UTC) e armazenado como o carimbo de data/hora UTC. Para um Redshift TIMESTAMP, o fuso horário local é assumido, pois o valor não tem nenhuma informação de fuso horário. Ao gravar dados em uma tabela do Redshift, um Spark TimestampType é mapeado para o tipo de dados do Redshift TIMESTAMP .
Guia de migração
A fonte de dados agora exige que você defina forward_spark_s3_credentials explicitamente antes que as credenciais do Spark S3 sejam encaminhadas para o Redshift. Essa mudança não terá impacto se você usar os mecanismos de autenticação aws_iam_role ou temporary_aws_*. No entanto, se você se baseou no comportamento padrão antigo, agora deve definir forward_spark_s3_credentials explicitamente como true para continuar usando o mecanismo de autenticação anterior do Redshift para S3. Para obter uma discussão sobre os três mecanismos de autenticação e suas compensações de segurança, consulte a seção Autenticação para S3 e Redshift deste documento.