Partilhar via


Consultar o Amazon Redshift usando o Azure Databricks

Você pode ler e escrever tabelas do Amazon Redshift com o Azure Databricks.

Importante

A documentação de federação de consultas herdada foi desativada e pode não ser atualizada no futuro. As configurações mencionadas neste conteúdo não são oficialmente endossadas ou testadas pela Databricks. Se a Lakehouse Federation oferecer suporte ao seu banco de dados de origem, o Databricks recomenda usá-lo.

A fonte de dados Databricks Redshift utiliza o Amazon S3 para transferir dados eficientemente para dentro e fora do Redshift e usa JDBC para acionar automaticamente os COPY e UNLOAD comandos apropriados no Redshift.

Observação

No Databricks Runtime 11.3 LTS e superior, o Databricks Runtime inclui o driver JDBC Redshift, acessível usando a redshift palavra-chave para a opção de formato. Consulte as notas de versão e compatibilidade do Databricks Runtime para saber as versões de driver incluídas em cada Databricks Runtime. Os drivers fornecidos pelo usuário ainda são suportados e têm precedência sobre o driver JDBC incluído.

No Databricks Runtime 10.4 LTS ou versões anteriores, é necessária a instalação manual do driver JDBC Redshift, e as consultas devem usar o driver (com.databricks.spark.redshift) para o formato. Consulte instalação do driver Redshift.

Utilização

Os exemplos a seguir demonstram a conexão com o driver Redshift. Substitua os valores do parâmetro url se estiver a usar o driver JDBC PostgreSQL.

Depois de configurar suas credenciais da AWS, você pode usar a fonte de dados com a API de fonte de dados do Spark em Python, SQL, R ou Scala.

Importante

Locais externos definidos no Catálogo Unity não são suportados como locais tempdir.

Python

# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") # Optional - will use default port 5439 if not specified.
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a query
df = (spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table

# Write back to a table
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()
)

# Write back to a table using IAM Role based authentication
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()
)

SQL

Leia os dados usando SQL no Databricks Runtime 10.4 LTS e abaixo:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  dbtable '<table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Leia os dados usando SQL no Databricks Runtime 11.3 LTS e superior:


DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  host '<hostname>',
  port '<port>', /* Optional - will use default port 5439 if not specified. *./
  user '<username>',
  password '<password>',
  database '<database-name>'
  dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
  tempdir 's3a://<bucket>/<directory-path>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Escreva dados usando SQL:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
  dbtable '<new-table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;

A API SQL oferece suporte apenas à criação de novas tabelas e não à substituição ou acréscimo.

R

Leia os dados usando R no Databricks Runtime 10.4 LTS e abaixo:

df <- read.df(
   NULL,
   "com.databricks.spark.redshift",
   tempdir = "s3a://<your-bucket>/<your-directory-path>",
   dbtable = "<your-table-name>",
   url = "jdbc:redshift://<the-rest-of-the-connection-string>")

Leia os dados usando R no Databricks Runtime 11.3 LTS e superior:

df <- read.df(
  NULL,
  "redshift",
  host = "hostname",
  port = "port",
  user = "username",
  password = "password",
  database = "database-name",
  dbtable = "schema-name.table-name",
  tempdir = "s3a://<your-bucket>/<your-directory-path>",
  forward_spark_s3_credentials = "true",
  dbtable = "<your-table-name>")

Escala

// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") /* Optional - will use default port 5439 if not specified. */
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", true)
  .load()

// Read data from a query
val df = spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table

// Write back to a table
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()

// Write back to a table using IAM Role based authentication
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()

Recomendações para trabalhar com Redshift

A execução da consulta pode extrair grandes quantidades de dados para o S3. Se você planeja executar várias consultas nos mesmos dados no Redshift, o Databricks recomenda salvar os dados extraídos usando o Delta Lake.

Configuração

Autenticação no S3 e no Redshift

A fonte de dados envolve várias conexões de rede, ilustradas no diagrama a seguir:

                            ┌───────┐
       ┌───────────────────>│  S3   │<─────────────────┐
       │    IAM or keys     └───────┘    IAM or keys   │
       │                        ^                      │
       │                        │ IAM or keys          │
       v                        v               ┌──────v────┐
┌────────────┐            ┌───────────┐         │┌──────────┴┐
│  Redshift  │            │  Spark    │         ││   Spark   │
│            │<──────────>│  Driver   │<────────>| Executors │
└────────────┘            └───────────┘          └───────────┘
               JDBC with                  Configured
               username /                     in
               password                     Spark
        (SSL enabled by default)

A fonte de dados lê e grava dados no S3 ao transferir dados de/para o Redshift. Como resultado, requer credenciais da AWS com acesso de leitura e gravação para um bucket S3 (especificado usando o parâmetro de configuração tempdir).

Observação

A fonte de dados não limpa os arquivos temporários que cria no S3. Como resultado, recomendamos que você use um bucket temporário dedicado do S3 com uma configuração do ciclo de vida do objeto para garantir que os arquivos temporários sejam excluídos automaticamente após um período de expiração especificado. Consulte a seção Criptografia deste documento para obter uma discussão sobre como criptografar esses arquivos. Não é possível usar um local externo definido no Unity Catalog como um local tempdir.

As seções a seguir descrevem as opções de configuração de autenticação de cada conexão:

Motorista de faísca para Redshift

O driver Spark se conecta ao Redshift via JDBC usando um nome de usuário e senha. O Redshift não suporta o uso de funções do IAM para autenticar essa conexão. Por padrão, essa conexão usa criptografia SSL; para obter mais detalhes, consulte Criptografia.

Spark para S3

O S3 atua como intermediário para armazenar dados em massa ao ler ou gravar no Redshift. O Spark se conecta ao S3 usando as interfaces do Hadoop FileSystem e diretamente usando o cliente S3 do Amazon Java SDK.

Observação

Não é possível usar montagens DBFS para configurar o acesso ao S3 para Redshift.

  • Definir chaves no Hadoop conf: você pode especificar chaves da AWS usando as propriedades de configuração do Hadoop. Se a sua tempdir configuração apontar para um s3a:// sistema de ficheiros, poderá definir as fs.s3a.access.key e fs.s3a.secret.key propriedades num arquivo de configuração XML do Hadoop ou chamar sc.hadoopConfiguration.set() para configurar as fs.s3a.access.key configurações globais do Hadoop do Spark. Se você usar um s3n:// sistema de arquivos, poderá fornecer as chaves de configuração herdadas, conforme mostrado no exemplo a seguir.

    Escala

    Por exemplo, se você estiver usando o sistema de s3a arquivos, adicione:

    sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
    

    Para o sistema de arquivos herdado s3n , adicione:

    sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
    
    Python

    O comando a seguir depende de alguns internos do Spark, mas deve funcionar com todas as versões do PySpark e é improvável que mude no futuro:

      sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>")
      sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
    

Redshift para S3

Defina a opção forward_spark_s3_credentials para true para encaminhar automaticamente para o Redshift, via JDBC, as credenciais de chave AWS que o Spark está a usar para se conectar ao S3. A consulta JDBC incorpora essas credenciais, portanto, o Databricks recomenda que você habilite a criptografia SSL da conexão JDBC.

Encriptação

  • Protegendo o JDBC: a menos que qualquer configuração relacionada ao SSL esteja presente na URL do JDBC, a fonte de dados por padrão habilita a criptografia SSL e também verifica se o servidor Redshift é confiável (ou seja, sslmode=verify-full). Para isso, um certificado de servidor é baixado automaticamente dos servidores da Amazon na primeira vez que é necessário. Caso isso falhe, um arquivo de certificado pré-empacotado é usado como um fallback. Isso vale para os drivers JDBC Redshift e PostgreSQL.

    No caso de haver algum problema com esta funcionalidade, ou se simplesmente queres desativar o SSL, podes chamar .option("autoenablessl", "false") no teu DataFrameReader ou DataFrameWriter.

    Se desejar especificar configurações personalizadas relacionadas a SSL, siga as instruções na documentação do Redshift: Usando SSL e certificados de servidor em Java e opções de configuração do driver JDBC Todas as opções relacionadas a SSL presentes no JDBC url usado com a fonte de dados têm precedência (ou seja, a configuração automática não será acionada).

  • Criptografando dados UNLOAD armazenados no S3 (dados armazenados ao ler do Redshift): De acordo com a documentação do Redshift sobre Descarregamento de dados para o S3, "UNLOAD criptografa automaticamente arquivos de dados usando a criptografia do lado do servidor do Amazon S3 (SSE-S3)."

    O Redshift também suporta criptografia do lado do cliente com uma chave personalizada (consulte: Descarregando arquivos de dados criptografados), mas a fonte de dados não tem a capacidade de especificar a chave simétrica necessária.

  • Criptografando dados COPY armazenados no S3 (dados armazenados ao gravar no Redshift): De acordo com a documentação do Redshift sobre como carregar arquivos de dados criptografados do Amazon S3:

Você pode usar o comando para carregar arquivos de dados que foram carregados no Amazon S3 usando criptografia do lado do servidor com chaves de criptografia gerenciadas pela AWS (SSE-S3 ou SSE-KMS), criptografia do lado do COPY cliente ou ambas. O COPY não oferece suporte à criptografia do lado do servidor do Amazon S3 com uma chave fornecida pelo cliente (SSE-C).

Parâmetros

O mapa de parâmetros ou OPTIONS fornecidos no Spark SQL suportam as seguintes configurações:

Parâmetro Obrigatório Predefinido Descrição
dbtable Sim, a menos que a consulta seja especificada. Nenhum A tabela a partir da qual criar ou ler no Redshift. Este parâmetro é necessário ao salvar dados de volta no Redshift.
consulta Sim, a menos que dbtable seja especificado. Nenhum A consulta a partir da qual ler no Redshift.
utilizador Não Nenhum O nome de usuário Redshift. Deve ser usado em conjunto com a opção de senha. Pode ser usado somente se o usuário e a senha não forem passados na URL, passar ambos resultará em um erro. Use esse parâmetro quando o nome de usuário contiver caracteres especiais que precisam ser escapados.
palavra-passe Não Nenhum A senha do Redshift. Deve ser usado em conjunto com a opção user. Só pode ser utilizado se o utilizador e a palavra-passe não forem passados no URL; passar ambos resultará em um erro. Use esse parâmetro quando a senha contiver caracteres especiais que precisam ser escapados.
URL Sim Nenhum URL do JDBC, do formato
jdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password>
subprotocol pode ser postgresql ou redshift, dependendo de qual driver JDBC você carregou. Um driver compatível com Redshift deve estar no classpath e corresponder a essa URL. host e port devem apontar para o nó principal do Redshift, portanto, os grupos de segurança e/ou VPC devem ser configurados para permitir o acesso a partir da sua aplicação do driver. database identifica um nome user de banco de dados Redshift e password são credenciais para acessar o banco de dados, que devem ser incorporadas nessa URL para JDBC, e sua conta de usuário deve ter privilégios necessários para a tabela que está sendo referenciada.
caminho de pesquisa Não Nenhum Defina o caminho de pesquisa do esquema no Redshift. Será definido usando o SET search_path to comando. Deve ser uma lista de nomes de esquemas, separada por vírgulas, para procurar tabelas. Consulte a documentação sobre search_path no Redshift.
aws_iam_role (função IAM) Somente se estiver usando funções do IAM para autorizar. Nenhum ARN completo da Função de Operações COPY/UNLOAD do IAM Redshift anexada ao cluster Redshift, por exemplo, arn:aws:iam::123456789000:role/<redshift-iam-role>.
forward_spark_s3_credentials Não false Se true, a fonte de dados descobre automaticamente as credenciais que o Spark está a usar para se conectar ao S3 e encaminha essas credenciais para o Redshift via JDBC. Essas credenciais são enviadas como parte da consulta JDBC, portanto, é altamente recomendável habilitar a criptografia SSL da conexão JDBC ao usar essa opção.
chave_de_acesso_aws_temporária_id Não Nenhum A chave de acesso da AWS deve ter permissões de gravação para o bucket do S3.
chave_de_acesso_secreta_temporária_aws Não Nenhum Chave de acesso secreta da AWS correspondente à chave de acesso fornecida.
token_temporário_da_sessão_aws Não Nenhum Token de sessão da AWS correspondente à chave de acesso fornecida.
tempdir Sim Nenhum Um local gravável no Amazon S3, para ser usado para dados descarregados durante a leitura e dados Avro para serem carregados no Redshift durante a gravação. Se você estiver usando a fonte de dados Redshift para o Spark como parte de um pipeline ETL regular, pode ser útil definir uma Política de Ciclo de Vida em um bucket e usá-la como um local temporário para esses dados.
Não é possível usar locais externos definidos no Unity Catalog como tempdir locais.
jdbcdriver Não Determinado pelo subprotocolo da URL JDBC. O nome da classe do driver JDBC a ser usado. Essa classe deve estar no classpath. Na maioria dos casos, não deve ser necessário especificar essa opção, pois o nome da classe de driver apropriada deve ser determinado automaticamente pelo subprotocolo da URL JDBC.
diststyle Não EVEN O estilo de distribuição do Redshift a ser usado ao criar uma tabela. Pode ser um de EVEN, KEY ou ALL (consulte os documentos do Redshift). Ao usar KEY, deve também definir uma chave de distribuição com a opção distkey.
distkey Não, a menos que se utilize DISTSTYLE KEY Nenhum O nome de uma coluna na tabela a ser usada como chave de distribuição ao criar uma tabela.
especificação de chave de ordenação Não Nenhum Uma definição completa da Chave de Ordenação do Redshift. Os exemplos incluem:
  • SORTKEY(my_sort_column)
  • COMPOUND SORTKEY(sort_col_1, sort_col_2)
  • INTERLEAVED SORTKEY(sort_col_1, sort_col_2)
usestagingtable (Preterido) Não true Definir esta opção obsoleta para false faz com que a tabela de destino de uma operação de sobrescrita seja descartada imediatamente no início da gravação, tornando a operação de sobrescrita não atômica e reduzindo a disponibilidade da tabela de destino. Isso pode reduzir os requisitos temporários de espaço em disco para substituições.
Como a configuração usestagingtable=false da operação corre o risco de perda ou indisponibilidade de dados, ela é preterida em favor da exigência de que se elimine manualmente a tabela de destino.
descrição Não Nenhum Uma descrição para a tabela. Será definido usando o comando SQL COMMENT e deve aparecer na maioria das ferramentas de consulta. Consulte também os description metadados para definir descrições em colunas individuais.
Pré-ações Não Nenhum Uma lista de ; comandos SQL separados a serem executados antes de carregar o COPY comando. Pode ser útil executar alguns DELETE comandos ou similares aqui antes de carregar novos dados. Se o comando contiver %s, o nome da tabela será formatado antes da execução (caso você esteja usando uma tabela de preparo).
Esteja avisado que, se esses comandos falharem, isso será tratado como um erro e uma exceção será lançada. Se estiver usando uma tabela de preparo, as alterações serão revertidas e a tabela de backup restaurada se as ações anteriores falharem.
Ações Posteriores Não Nenhum Uma lista separada ; de comandos SQL a serem executados após um COPY carregamento de dados bem-sucedido. Pode ser útil executar alguns GRANT comandos ou similares aqui ao carregar novos dados. Se o comando contiver %s, o nome da tabela será formatado antes da execução (caso você esteja usando uma tabela de preparo).
Esteja avisado que, se esses comandos falharem, isso será tratado como um erro e uma exceção será lançada. Se usar uma tabela de ensaio, as alterações serão revertidas e a tabela de backup será restaurada se as ações de pós-processamento falharem.
opções de cópia extra Não Nenhum Uma lista de opções extras para anexar ao comando Redshift COPY ao carregar dados, por exemplo, TRUNCATECOLUMNS ou MAXERROR n (consulte os documentos do Redshift para obter outras opções).
Como essas opções são anexadas ao final do comando, apenas as opções que fazem sentido no final do comando podem ser usadas, mas isso deve abranger a maioria dos casos de COPY uso possíveis.
tempformat Não AVRO O formato no qual salvar arquivos temporários no S3 ao gravar no Redshift. O padrão é AVRO; os outros valores permitidos são CSV e CSV GZIP para CSV e CSV compactado, respetivamente.
O Redshift é significativamente mais rápido ao carregar CSV do que ao carregar arquivos Avro, portanto, usar esse tempformat pode fornecer um grande aumento de desempenho ao gravar no Redshift.
csvnullstring Não @NULL@ O valor de String a utilizar para nulos ao usar o formato temporário CSV. Este deve ser um valor que não aparece em seus dados reais.
separador CSV Não , Separador a ser usado ao gravar arquivos temporários com tempformat definido como CSV ou CSV GZIP. Este deve ser um caractere ASCII válido, por exemplo, "," ou "\|".
csvIgnoreLeadingWhitespace Não true Quando definido como true, remove o espaço em branco à esquerda dos valores durante as gravações quando tempformat está definido como CSV ou CSV GZIP. Caso contrário, o espaço em branco será mantido.
csvignoretrailingwhitespace Não true Quando definido como true, remove o espaço em branco à direita dos valores durante as gravações quando tempformat está definido como CSV ou CSV GZIP. Caso contrário, o espaço em branco será mantido.
infer_timestamp_ntz_type Não false Caso true, os valores do tipo Redshift TIMESTAMP são interpretados como TimestampNTZType (timestamp sem fuso horário) ao realizar leituras. Caso contrário, todas as marcas temporais são interpretadas como TimestampType independentemente do tipo na tabela Redshift de base.

Opções de configuração adicionais

Configurando o tamanho máximo de colunas de cadeia de caracteres

Ao criar tabelas Redshift, o comportamento padrão é criar TEXT colunas para colunas de cadeia de caracteres. O Redshift armazena TEXT colunas como VARCHAR(256), portanto, essas colunas têm um tamanho máximo de 256 caracteres (fonte).

Para dar suporte a colunas maiores, você pode usar o campo de metadados de maxlength coluna para especificar o comprimento máximo de colunas de cadeia de caracteres individuais. Isso também é útil para implementar otimizações de desempenho que economizam espaço, declarando colunas com um comprimento máximo menor do que o padrão.

Observação

Devido a limitações no Spark, as APIs das linguagens SQL e R não oferecem suporte à modificação de metadados de coluna.

Python

df = ... # the dataframe you'll want to write to Redshift

# Specify the custom width of each column
columnLengthMap = {
  "language_code": 2,
  "country_code": 2,
  "url": 2083,
}

# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
  metadata = {'maxlength': length}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", jdbcURL) \
  .option("tempdir", s3TempDirectory) \
  .option("dbtable", sessionTable) \
  .save()

Escala

Aqui está um exemplo de atualização de campos de metadados de várias colunas usando a API Scala do Spark:

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom width of each column
val columnLengthMap = Map(
  "language_code" -> 2,
  "country_code" -> 2,
  "url" -> 2083
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
  val metadata = new MetadataBuilder().putLong("maxlength", length).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

df.write
  .format("com.databricks.spark.redshift")
  .option("url", jdbcURL)
  .option("tempdir", s3TempDirectory)
  .option("dbtable", sessionTable)
.save()

Definir um tipo de coluna personalizada

Se você precisar definir manualmente um tipo de coluna, poderá usar os metadados da redshift_type coluna. Por exemplo, se desejar substituir o mecanismo de correspondência do tipo Spark SQL Schema -> Redshift SQL para atribuir um tipo de coluna definido pelo usuário, pode fazê-lo da seguinte maneira:

Python

# Specify the custom type of each column
columnTypeMap = {
  "language_code": "CHAR(2)",
  "country_code": "CHAR(2)",
  "url": "BPCHAR(111)",
}

df = ... # the dataframe you'll want to write to Redshift

# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
  metadata = {'redshift_type': colType}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

Escala

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom type of each column
val columnTypeMap = Map(
  "language_code" -> "CHAR(2)",
  "country_code" -> "CHAR(2)",
  "url" -> "BPCHAR(111)"
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
  val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

Configurar a codificação de colunas

Ao criar uma tabela, use o campo de metadados de encoding coluna para especificar uma codificação de compactação para cada coluna (consulte Amazon docs para codificações disponíveis).

Definir descrições nas colunas

O Redshift permite que as colunas tenham descrições anexadas que devem aparecer na maioria das ferramentas de consulta (usando o COMMENT comando). Você pode definir o campo de metadados da description coluna para especificar uma descrição para colunas individuais.

Consulta por pushdown no Redshift

O otimizador Spark empurra os seguintes operadores para o Redshift:

  • Filter
  • Project
  • Sort
  • Limit
  • Aggregation
  • Join

Dentro Project e Filter, suporta as seguintes expressões:

  • A maioria dos operadores lógicos booleanos
  • Comparações
  • Operações aritméticas básicas
  • Moldes numéricos e de cordas
  • A maioria das funções de cadeia de caracteres
  • Subconsultas escalares, caso possam ser integradas completamente no Redshift.

Observação

Esta limitação não suporta expressões que operam em datas e timestamps.

Dentro de Aggregation, ele suporta as seguintes funções de agregação:

  • AVG
  • COUNT
  • MAX
  • MIN
  • SUM
  • STDDEV_SAMP
  • STDDEV_POP
  • VAR_SAMP
  • VAR_POP

combinada com a cláusula DISTINCT, se for caso disso.

Dentro de Join, suporta os seguintes tipos de joins:

  • INNER JOIN
  • LEFT OUTER JOIN
  • RIGHT OUTER JOIN
  • LEFT SEMI JOIN
  • LEFT ANTI JOIN
  • Subconsultas que são reescritas Join pelo otimizador, por exemplo WHERE EXISTS, WHERE NOT EXISTS

Observação

O pushdown de junção não suporta FULL OUTER JOIN.

A funcionalidade pushdown pode ser mais benéfica em consultas com LIMIT. Uma consulta como essa SELECT * FROM large_redshift_table LIMIT 10 poderia levar muito tempo, pois toda a tabela seria primeiro descarregada para o S3 como um resultado intermediário. Com pushdown, o LIMIT é executado em Redshift. Em consultas com agregações, empurrar a agregação para baixo no Redshift também ajuda a reduzir a quantidade de dados que precisam ser transferidos.

O pushdown de consulta no Redshift está ativado por padrão. Ele pode ser desativado definindo spark.databricks.redshift.pushdown como false. Mesmo quando desativado, o Spark ainda aplica filtros e executa a eliminação de colunas no Redshift.

Instalação do driver Redshift

A fonte de dados Redshift também requer um driver JDBC compatível com Redshift. Como o Redshift é baseado no sistema de banco de dados PostgreSQL, você pode usar o driver JDBC PostgreSQL incluído no Databricks Runtime ou o driver JDBC Redshift recomendado pela Amazon. Nenhuma instalação é necessária para usar o driver JDBC PostgreSQL. A versão do driver JDBC PostgreSQL incluída em cada versão do Databricks Runtime está listada nas notas de versão do Databricks Runtime.

Para instalar manualmente o driver JDBC Redshift:

  1. Baixe o driver da Amazon.
  2. Carregue o driver em seu espaço de trabalho do Azure Databricks. Consulte Instalar bibliotecas.
  3. Instale a biblioteca no cluster.

Observação

O Databricks recomenda o uso da versão mais recente do driver JDBC Redshift. As versões do driver JDBC Redshift abaixo de 1.2.41 têm as seguintes limitações:

  • A versão 1.2.16 do driver retorna dados vazios ao usar uma where cláusula em uma consulta SQL.
  • Versões do driver abaixo de 1.2.41 podem retornar resultados inválidos porque a anulabilidade de uma coluna é relatada incorretamente como "Não anulável" em vez de "Desconhecido".

Garantias transacionais

Esta seção descreve as garantias transacionais da fonte de dados Redshift para o Spark.

Informações gerais sobre as propriedades Redshift e S3

Para obter informações gerais sobre garantias transacionais do Redshift, consulte o capítulo Gerenciando operações de gravação simultâneas na documentação do Redshift. Em resumo, o Redshift fornece isolamento serializável de acordo com a documentação para o comando Redshift BEGIN:

[embora] você possa usar qualquer um dos quatro níveis de isolamento de transação, o Amazon Redshift processa todos os níveis de isolamento como serializáveis.

De acordo com a documentação do Redshift:

O Amazon Redshift oferece suporte a um comportamento de confirmação automática padrão no qual cada comando SQL executado separadamente é confirmado individualmente.

Assim, comandos individuais como COPY e UNLOAD são atómicos e transacionais, enquanto BEGIN e END explícitos só são necessários para garantir a atomicidade de múltiplos comandos ou consultas.

Ao ler e gravar no Redshift, a fonte de dados lê e grava dados no S3. Tanto o Spark quanto o Redshift produzem saída particionada e a armazenam em vários arquivos no S3. De acordo com a documentação do modelo de consistência de dados do Amazon S3, as operações de listagem de bucket do S3 são eventualmente consistentes, portanto, os arquivos devem ir para comprimentos especiais para evitar dados ausentes ou incompletos devido a essa fonte de consistência eventual.

Garantias da fonte de dados Redshift para o Spark

Acrescentar a uma tabela existente

Ao inserir linhas no Redshift, a fonte de dados usa o comando COPY e especifica manifestos para proteger contra determinadas operações do S3 eventualmente consistentes. Como resultado, spark-redshift os acréscimos às tabelas existentes têm as mesmas propriedades atômicas e transacionais que os comandos regulares do Redshift COPY .

Criar uma nova tabela (SaveMode.CreateIfNotExists)

A criação de uma nova tabela é um processo de duas etapas, que consiste em um CREATE TABLE comando seguido por um comando COPY para acrescentar o conjunto inicial de linhas. Ambas as operações são realizadas na mesma transação.

Sobrescrever uma tabela existente

Por padrão, a fonte de dados usa transações para executar substituições, que são implementadas excluindo a tabela de destino, criando uma nova tabela vazia e anexando linhas a ela.

Se a configuração preterida usestagingtable for definida como false, a fonte de dados confirmará o comando DELETE TABLE antes de acrescentar linhas à nova tabela, sacrificando a atomicidade da operação de sobrescrição, mas reduzindo a quantidade de espaço temporário que o Redshift precisa durante a sobrescrição.

Consultar tabela Redshift

As consultas usam o comando Redshift UNLOAD para executar uma consulta e guardar os seus resultados no S3, e utilizam manifestos para se precaver contra certas operações do S3 em modelo eventualmente consistente. Como resultado, as consultas da fonte de dados Redshift para o Spark devem ter as mesmas propriedades de consistência que as consultas regulares do Redshift.

Problemas e soluções comuns

O bucket do S3 e o cluster do Redshift estão em regiões diferentes da AWS

Por padrão, as cópias do S3 <-> Redshift não funcionam se o bucket do S3 e o cluster do Redshift estiverem em regiões diferentes da AWS.

Se você tentar ler uma tabela Redshift quando o bucket do S3 estiver em uma região diferente, poderá ver um erro como:

ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.

Da mesma forma, tentar gravar no Redshift usando um bucket do S3 em uma região diferente pode causar o seguinte erro:

error:  Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
  • Gravações: O comando Redshift COPY suporta a especificação explícita da região do bucket do S3, para que você possa fazer com que as gravações no Redshift funcionem corretamente nesses casos, adicionando region 'the-region-name' à extracopyoptions configuração. Por exemplo, com um bucket na região Leste dos EUA (Virgínia) e a API Scala, use:

    .option("extracopyoptions", "region 'us-east-1'")
    

    Como alternativa, você pode usar a awsregion configuração:

    .option("awsregion", "us-east-1")
    
  • Leia: O comando Redshift UNLOAD também suporta especificação explícita da região do bucket do S3. Você pode fazer com que os processos de leitura funcionem corretamente adicionando a região à configuração awsregion.

    .option("awsregion", "us-east-1")
    

Erro de autenticação ao usar uma senha com caracteres especiais na URL JDBC

Se você estiver fornecendo o nome de usuário e a senha como parte da URL JDBC e a senha contiver caracteres especiais, como ;, ?ou &, você poderá ver a seguinte exceção:

java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'

Isso é causado por caracteres especiais no nome de utilizador ou na senha que não estão a ser tratados corretamente pelo driver JDBC. Certifique-se de especificar o nome de utilizador e a senha usando as opções correspondentes do DataFrame user e password. Para obter mais informações, consulte Parâmetros.

A consulta Spark de longa duração fica indefinidamente suspensa mesmo que a operação correspondente no Redshift esteja concluída.

Se estiver a ler ou a escrever grandes quantidades de dados para e do Redshift, a consulta do Spark poderá ficar indefinidamente pendente, mesmo que a página AWS Redshift Monitoring mostre que a operação correspondente foi concluída e que o cluster está ocioso. Isso é causado pela expiração da conexão entre Redshift e Spark. Para evitar isso, assegure-se de que o sinalizador JDBC tcpKeepAlive esteja ativado e que TCPKeepAliveMinutes esteja definido como um valor baixo (por exemplo, 1).

Para obter informações adicionais, consulte Configuração do driver JDBC do Amazon Redshift.

Marca temporal com semântica de fuso horário

Ao ler dados, tanto os tipos de dados do Redshift TIMESTAMP como TIMESTAMPTZ são mapeados para o Spark TimestampType, e um valor é convertido para Tempo Universal Coordenado (UTC) e armazenado como o carimbo de data/hora UTC. Para um Redshift TIMESTAMP, o fuso horário local é assumido, pois o valor não tem nenhuma informação de fuso horário. Ao gravar dados numa tabela do Redshift, um Spark TimestampType é mapeado para o tipo de dados do Redshift TIMESTAMP.

Guia de migração

A fonte de dados agora exige que você defina forward_spark_s3_credentials explicitamente antes que as credenciais do Spark S3 sejam encaminhadas para o Redshift. Esta alteração não terá impacto se utilizar os mecanismos de autenticação aws_iam_role ou temporary_aws_*. No entanto, se costumava confiar no comportamento padrão antigo, agora deve definir forward_spark_s3_credentials explicitamente para true para continuar a usar o seu anterior mecanismo de autenticação Redshift para S3. Para obter uma discussão sobre os três mecanismos de autenticação e suas compensações de segurança, consulte a seção Autenticação no S3 e Redshift deste documento.