Freigeben über


Abfragen von Amazon Redshift mit Azure Databricks

Sie können Tabellen aus Amazon Redshift mit Azure Databricks lesen und schreiben.

Wichtig

Die Legacy-Abfrageverbunddokumentation wurde eingestellt und kann nicht aktualisiert werden. Die in diesem Inhalt genannten Konfigurationen werden nicht offiziell von Databricks unterstützt oder getestet. Wenn Lakehouse Federation Ihre Quelldatenbank unterstützt, empfiehlt Databricks stattdessen die Verwendung.

Die Databricks Redshift-Datenquelle verwendet Amazon S3, um Daten effizient in und aus Redshift zu übertragen und nutzt JDBC, um automatisch die entsprechenden COPY- und UNLOAD-Befehle auf Redshift auszulösen.

Hinweis

In Databricks Runtime 11.3 LTS und höher enthält Databricks Runtime den Redshift ODBC-Treiber, auf den über das redshift Schlüsselwort für die Formatoption zugegriffen werden kann. siehe Releasenotizen zu den Versionen und zur Kompatibilität der Databricks Runtime für die Treiberversionen, die in jeder Databricks Runtime enthalten sind. Vom Benutzer bereitgestellte Treiber werden weiterhin unterstützt und haben vor dem gebündelten JDBC-Treiber Vorrang.

In Databricks Runtime 10.4 LTS und darunter ist eine manuelle Installation des Redshift JDBC-Treibers erforderlich, und Abfragen sollten den Treiber (com.databricks.spark.redshift) für das Format verwenden. Siehe Redshift-Treiberinstallation.

Verwendung

Die folgenden Beispiele veranschaulichen die Verbindung mit dem Redshift-Treiber. Ersetzen Sie die url Parameterwerte, falls Sie den PostgreSQL JDBC-Treiber verwenden.

Nachdem Sie Ihre AWS-Anmeldeinformationen konfiguriert haben, können Sie die Datenquelle mit der Spark-Datenquellen-API in Python, SQL, R oder Scala verwenden.

Wichtig

Externe Speicherorte, die im Unity-Katalog definiert sind, werden nicht als tempdir Speicherorte unterstützt.

Python

# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") # Optional - will use default port 5439 if not specified.
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a query
df = (spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table

# Write back to a table
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()
)

# Write back to a table using IAM Role based authentication
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()
)

SQL

Lesen von Daten mithilfe von SQL für Databricks Runtime 10.4 LTS und darunter:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  dbtable '<table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Lesen von Daten mithilfe von SQL für Databricks Runtime 11.3 LTS und höher:


DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  host '<hostname>',
  port '<port>', /* Optional - will use default port 5439 if not specified. *./
  user '<username>',
  password '<password>',
  database '<database-name>'
  dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
  tempdir 's3a://<bucket>/<directory-path>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Schreiben von Daten mit SQL:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
  dbtable '<new-table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;

Die SQL-API unterstützt nur die Erstellung neuer Tabellen und nicht das Überschreiben oder Anfügen.

R

Lesen von Daten mithilfe von R in Databricks Runtime 10.4 LTS und früher:

df <- read.df(
   NULL,
   "com.databricks.spark.redshift",
   tempdir = "s3a://<your-bucket>/<your-directory-path>",
   dbtable = "<your-table-name>",
   url = "jdbc:redshift://<the-rest-of-the-connection-string>")

Lesen von Daten mit R on Databricks Runtime 11.3 LTS und höher:

df <- read.df(
  NULL,
  "redshift",
  host = "hostname",
  port = "port",
  user = "username",
  password = "password",
  database = "database-name",
  dbtable = "schema-name.table-name",
  tempdir = "s3a://<your-bucket>/<your-directory-path>",
  forward_spark_s3_credentials = "true",
  dbtable = "<your-table-name>")

Skala

// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") /* Optional - will use default port 5439 if not specified. */
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", true)
  .load()

// Read data from a query
val df = spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table

// Write back to a table
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()

// Write back to a table using IAM Role based authentication
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()

Empfehlungen für das Arbeiten mit Redshift

Die Abfrageausführung kann große Datenmengen in S3 extrahieren. Wenn Sie mehrere Abfragen für dieselben Daten in Redshift durchführen möchten, empfiehlt Databricks, die extrahierten Daten mithilfe von Delta Lake zu speichern.

Konfiguration

Authentifizieren bei S3 und Redshift

Die Datenquelle umfasst mehrere Netzwerkverbindungen, die im folgenden Diagramm veranschaulicht werden:

                            ┌───────┐
       ┌───────────────────>│  S3   │<─────────────────┐
       │    IAM or keys     └───────┘    IAM or keys   │
       │                        ^                      │
       │                        │ IAM or keys          │
       v                        v               ┌──────v────┐
┌────────────┐            ┌───────────┐         │┌──────────┴┐
│  Redshift  │            │  Spark    │         ││   Spark   │
│            │<──────────>│  Driver   │<────────>| Executors │
└────────────┘            └───────────┘          └───────────┘
               JDBC with                  Configured
               username /                     in
               password                     Spark
        (SSL enabled by default)

Die Datenquelle liest und schreibt Daten in S3, wenn Daten an/aus Redshift übertragen werden. Daher erfordert es AWS-Anmeldeinformationen mit Lese- und Schreibzugriff auf einen S3-Bucket (angegeben mithilfe des tempdir Konfigurationsparameters).

Hinweis

Die Datenquelle bereinigt nicht die temporären Dateien, die sie in S3 erstellt. Daher wird empfohlen, einen dedizierten temporären S3-Bucket mit einer Objektlebenszykluskonfiguration zu verwenden, um sicherzustellen, dass temporäre Dateien nach einem bestimmten Ablaufzeitraum automatisch gelöscht werden. Weitere Informationen zum Verschlüsseln dieser Dateien finden Sie im Abschnitt "Verschlüsselung " in diesem Dokument. Sie können keinen externen Speicherort verwenden, der in Unity Catalog als tempdir-Speicherort definiert ist.

In den folgenden Abschnitten werden die Konfigurationsoptionen für die Authentifizierung der einzelnen Verbindungen beschrieben:

Spark-Treiber zu Redshift

Der Spark-Treiber verbindet sich über JDBC mit Redshift und verwendet dabei einen Benutzernamen und ein Passwort. Redshift unterstützt nicht die Verwendung von IAM-Rollen zum Authentifizieren dieser Verbindung. Standardmäßig verwendet diese Verbindung SSL-Verschlüsselung; weitere Informationen finden Sie unter Verschlüsselung.

Spark zu S3

S3 fungiert als Vermittler zum Speichern von Massendaten beim Lesen oder Schreiben in Redshift. Spark stellt eine Verbindung zu S3 sowohl über die Hadoop FileSystem-Schnittstellen als auch direkt über den S3-Client des Amazon Java SDK her.

Hinweis

Sie können DBFS-Einbindungen nicht verwenden, um den Zugriff auf S3 für Redshift zu konfigurieren.

  • Festlegen von Schlüsseln in Hadoop conf: Sie können AWS-Schlüssel mithilfe von Hadoop-Konfigurationseigenschaften angeben. Wenn Ihre tempdir Konfiguration auf ein s3a:// Dateisystem verweist, können Sie die fs.s3a.access.key und fs.s3a.secret.key Eigenschaften in einer Hadoop-XML-Konfigurationsdatei festlegen oder sc.hadoopConfiguration.set() aufrufen, um die globale Hadoop-Konfiguration von Spark zu konfigurieren. Wenn Sie ein s3n:// Dateisystem verwenden, können Sie die Legacykonfigurationsschlüssel bereitstellen, wie im folgenden Beispiel gezeigt.

    Skala

    Wenn Sie beispielsweise das s3a Dateisystem verwenden, fügen Sie Folgendes hinzu:

    sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
    

    Fügen Sie für das Legacydateisystem s3n Folgendes hinzu:

    sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
    
    Python

    Der folgende Befehl basiert auf einigen Spark-Internen, sollte jedoch mit allen PySpark-Versionen arbeiten und ist in Zukunft unwahrscheinlich, dass sich dies ändert:

      sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>")
      sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
    

Redshift zu S3

Legen Sie die forward_spark_s3_credentials Option auf true fest, um die von Spark zum Herstellen einer Verbindung mit S3 über JDBC verwendeten AWS-Schlüsselanmeldeinformationen automatisch an Redshift weiterzuleiten. Die JDBC-Abfrage bettet diese Anmeldeinformationen ein, sodass Databricks dringend empfiehlt, dass Sie die SSL-Verschlüsselung der JDBC-Verbindung aktivieren.

Verschlüsselung

  • Absicherung von JDBC: Sofern keine SSL-bezogenen Einstellungen in der JDBC-URL vorhanden sind, aktiviert die Datenquelle standardmäßig SSL-Verschlüsselung und überprüft außerdem, ob der Redshift-Server vertrauenswürdig ist (d. h.sslmode=verify-full). Dazu wird automatisch ein Serverzertifikat von den Amazon-Servern heruntergeladen, wenn es zum ersten Mal benötigt wird. Falls dies fehlschlägt, wird eine vorab gebündelte Zertifikatdatei als Fallback verwendet. Dies gilt sowohl für die Redshift- als auch für die PostgreSQL JDBC-Treiber.

    Falls Probleme mit diesem Feature auftreten, oder falls Sie SSL einfach deaktivieren möchten, können Sie .option("autoenablessl", "false") auf Ihrem DataFrameReader oder Ihrem DataFrameWriter aufrufen.

    Wenn Sie benutzerdefinierte SSL-bezogene Einstellungen angeben möchten, können Sie die Anweisungen in der Redshift-Dokumentation befolgen: Verwenden von SSL- und Serverzertifikaten in Java und JDBC-Treiberkonfigurationsoptionen. Alle SSL-bezogenen Optionen, die im JDBC url mit der Datenquelle verwendet werden, haben Vorrang (d. h. die automatische Konfiguration wird nicht ausgelöst).

  • Verschlüsseln von UNLOAD-Daten, die in S3 gespeichert sind (Beim Lesen von Redshift gespeicherte Daten): Laut redshift-Dokumentation zum Entladen von Daten nach S3 verschlüsselt UNLOAD automatisch Datendateien mithilfe der serverseitigen Amazon S3-Verschlüsselung (SSE-S3)."

    Redshift unterstützt auch die clientseitige Verschlüsselung mit einem benutzerdefinierten Schlüssel (siehe: Entladen verschlüsselter Datendateien), die Datenquelle verfügt jedoch nicht über die Möglichkeit, den erforderlichen symmetrischen Schlüssel anzugeben.

  • Verschlüsseln von COPY-Daten, die in S3 gespeichert sind (Beim Schreiben in Redshift gespeicherte Daten): Laut redshift-Dokumentation zum Laden verschlüsselter Datendateien von Amazon S3:

Sie können den COPY Befehl verwenden, um Datendateien zu laden, die mit serverseitiger Verschlüsselung mit von AWS verwalteten Verschlüsselungsschlüsseln (SSE-S3 oder SSE-KMS), clientseitiger Verschlüsselung oder beides in Amazon S3 hochgeladen wurden. COPY unterstützt keine serverseitige Amazon S3-Verschlüsselung mit einem vom Kunden bereitgestellten Schlüssel (SSE-C).

Parameter

Die in Spark SQL bereitgestellte Parameterzuordnung oder -optionen unterstützen die folgenden Einstellungen:

Parameter Erforderlich Standard BESCHREIBUNG
dbtable Ja, es sei denn, Abfrage ist angegeben. Nichts Die Tabelle, aus der in Redshift erstellt oder gelesen werden soll. Dieser Parameter ist erforderlich, wenn Daten wieder in Redshift gespeichert werden.
Abfrage Ja, es sei denn, dbtable ist angegeben. Nichts Die Abfrage, aus der in Redshift gelesen werden soll.
Benutzer Nein Nichts Der Benutzername "Redshift". Muss zusammen mit der Kennwortoption verwendet werden. Kann nur verwendet werden, wenn der Benutzer und das Kennwort nicht in der URL übergeben werden. Das Übergeben beider führt zu einem Fehler. Verwenden Sie diesen Parameter, wenn der Benutzername Sonderzeichen enthält, die mit Escapezeichen versehen werden müssen.
Passwort Nein Nichts Das Kennwort "Redshift". Muss zusammen mit der Option user verwendet werden. Kann nur verwendet werden, wenn Benutzer und Kennwort nicht in der URL übergeben werden; das Übergeben beider wird zu einem Fehler führen. Verwenden Sie diesen Parameter, wenn das Kennwort Sonderzeichen enthält, die mit Escapezeichen versehen werden müssen.
URL Ja Nichts Eine JDBC-URL im Format
jdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password>
subprotocol kann postgresql oder redshift sein, abhängig davon, welchen JDBC-Treiber Sie geladen haben. Ein Redshift-kompatibler Treiber muss sich auf dem Klassenpfad befinden und dieser URL entsprechen. host und port sollten auf den Redshift-Masterknoten verweisen, sodass Sicherheitsgruppen und/oder VPC konfiguriert werden müssen, um den Zugriff von Ihrer Treiberanwendung zu ermöglichen. database gibt einen Redshift-Datenbanknamen an, user und password sind Anmeldeinformationen für den Zugriff auf die Datenbank, die in diese URL für JDBC eingebettet werden müssen, und Ihr Benutzerkonto sollte über erforderliche Berechtigungen für die Tabelle verfügen, auf die verwiesen wird.
Suchpfad Nein Nichts Legen Sie den Schemasuchpfad in Redshift fest. Wird mit dem SET search_path to Befehl festgelegt. Sollte eine durch Trennzeichen getrennte Liste von Schemanamen sein, in der nach Tabellen gesucht werden soll. Siehe Redshift-Dokumentation von search_path.
aws_iam_role (AWS IAM Rolle) Nur bei Verwendung von IAM-Rollen zum Autorisieren. Nichts Vollständig angegebener ARN der IAM-Rolle für COPY/UNLOAD-Vorgänge in Redshift, die an den Redshift-Cluster angehängt ist, z. B. arn:aws:iam::123456789000:role/<redshift-iam-role>.
Weiterleiten von Spark S3-Zugangsdaten Nein false Wenn true, erkannt die Datenquelle automatisch die Anmeldeinformationen, die Spark zum Herstellen einer Verbindung mit S3 verwendet, und leitet diese Anmeldeinformationen über JDBC an Redshift weiter. Diese Anmeldeinformationen werden als Teil der JDBC-Abfrage gesendet, sodass dringend empfohlen wird, die SSL-Verschlüsselung der JDBC-Verbindung bei Verwendung dieser Option zu aktivieren.
Temporärer_AWS-Zugriffsschlüssel_ID Nein Nichts AWS-Zugriffsschlüssel muss über Schreibberechtigungen für den S3-Bucket verfügen.
Temporärer_AWS_Geheimer_Access_Key Nein Nichts AWS Geheimer Zugriffsschlüssel, der dem bereitgestellten Zugriffsschlüssel entspricht.
temporäre_AWS_Sitzungstoken Nein Nichts AWS-Sitzungstoken, das dem bereitgestellten Zugriffsschlüssel entspricht.
tempdir Ja Nichts Ein beschreibbarer Speicherort in Amazon S3, der für entladene Daten beim Lesen und zum Laden von Avro-Daten in Redshift beim Schreiben verwendet wird. Wenn Sie die Redshift-Datenquelle für Spark als Teil einer regulären ETL-Pipeline verwenden, kann es hilfreich sein, eine Lifecycle-Richtlinie für einen Bucket festzulegen und diese als temporärer Speicherort für diese Daten zu verwenden.
Externe Speicherorte, die im Unity-Katalog definiert sind , können nicht als tempdir Speicherorte verwendet werden.
JDBC-Treiber Nein Bestimmt durch das Unterprotokoll der JDBC-URL. Der Klassenname des zu verwendenden JDBC-Treibers. Diese Klasse muss im Klassenpfad vorhanden sein. In den meisten Fällen sollte es nicht erforderlich sein, diese Option anzugeben, da der entsprechende Treiberklassenname automatisch durch das Unterprotokoll der JDBC-URL bestimmt werden sollte.
diststyle Nein EVEN Die Redshift-Verteilungsart, die beim Erstellen einer Tabelle verwendet werden soll. Kann entweder EVEN, KEY oder ALL sein (siehe Redshift-Dokumentation). Bei Verwendung von KEY, müssen Sie auch einen Verteilungsschlüssel mit der distkey-Option festlegen.
distkey Nein, es sei denn, sie verwenden DISTSTYLE KEY Nichts Der Name einer Spalte in der Tabelle, die beim Erstellen einer Tabelle als Verteilungsschlüssel verwendet werden soll.
Sortierschlüsselspezifikation Nein Nichts Eine vollständige Redshift Sortierungsschlüssel-Definition. Beispiele sind:
  • SORTKEY(my_sort_column)
  • COMPOUND SORTKEY(sort_col_1, sort_col_2)
  • INTERLEAVED SORTKEY(sort_col_1, sort_col_2)
usestagingtable (veraltet) Nein true Durch das Festlegen dieser veralteten Option auf false wird die Zieltabelle eines Überschreibungsvorgangs sofort zu Beginn des Schreibvorgangs gelöscht. Dies macht den Überschreibungsvorgang nicht atomar und reduziert die Verfügbarkeit der Zieltabelle. Dies kann die temporären Anforderungen an den Speicherplatz bei Überschreibungen verringern.
Da das Festlegen des usestagingtable=false-Vorgangs zu Datenverlust oder Nichtverfügbarkeit führen kann, wird es als veraltet markert und stattdessen verlangt, dass Sie die Zieltabelle manuell trennen.
Beschreibung Nein Nichts Eine Beschreibung für die Tabelle. Wird mit dem SQL COMMENT-Befehl festgelegt und sollte in den meisten Abfragetools angezeigt werden. Siehe auch die description Metadaten zum Festlegen von Beschreibungen für einzelne Spalten.
Voraktionen Nein Nichts Eine durch ; getrennte Liste von SQL-Befehlen, die vor dem Laden der COPY-Anweisung ausgeführt werden soll. Es kann hilfreich sein, einige DELETE Befehle oder ähnliche Befehle hier auszuführen, bevor neue Daten geladen werden. Wenn der Befehl enthält %s, wird der Tabellenname vor der Ausführung formatiert (falls Sie eine Stagingtabelle verwenden).
Beachten Sie, dass, wenn diese Anweisungen fehlschlagen, dies als Fehler behandelt wird und eine Ausnahme ausgelöst wird. Wenn Sie eine Stagingtabelle verwenden, werden die Änderungen rückgängig gemacht und die Sicherungstabelle wiederhergestellt, wenn Vorabaktionen fehlschlagen.
nachträgliche Maßnahmen Nein Nichts Eine ;-getrennte Liste von SQL-Befehlen, die beim Laden von Daten nach einem erfolgreichen COPY ausgeführt werden sollen. Es kann hilfreich sein, einige GRANT Befehle oder ähnliche Befehle hier auszuführen, wenn neue Daten geladen werden. Wenn der Befehl enthält %s, wird der Tabellenname vor der Ausführung formatiert (falls Sie eine Stagingtabelle verwenden).
Beachten Sie, dass, wenn diese Anweisungen fehlschlagen, dies als Fehler behandelt wird und eine Ausnahme ausgelöst wird. Wenn Sie eine Stagingtabelle verwenden, werden die Änderungen rückgängig gemacht und die Sicherungstabelle wiederhergestellt, wenn nachfolgende Aktionen fehlschlagen.
Extra-Kopieroptionen Nein Nichts Eine Liste mit zusätzlichen Optionen, die beim Laden von Daten an den Befehl "Redshift COPY " angefügt werden sollen, TRUNCATECOLUMNS z. B. oder MAXERROR n (weitere Optionen finden Sie in den Redshift-Dokumenten ).
Da diese Optionen am Ende des COPY Befehls angefügt werden, können nur Optionen verwendet werden, die am Ende des Befehls sinnvoll sind, aber dies sollte die meisten möglichen Anwendungsfälle abdecken.
tempformat Nein AVRO Das Format, in dem temporäre Dateien in S3 beim Schreiben in Redshift gespeichert werden sollen. Standardwert ist AVRO; die anderen zulässigen Werte sind CSV und CSV GZIP für CSV und gzipped CSV.
Redshift ist beim Laden von CSV wesentlich schneller als beim Laden von Avro-Dateien, sodass die Verwendung dieses Tempformats beim Schreiben in Redshift eine große Leistungssteigerung bieten kann.
csvnullstring Nein @NULL@ Der String-Wert, der bei Verwendung des CSV-Tempformats für Nullwerte geschrieben werden soll. Dies sollte ein Wert sein, der nicht in Den tatsächlichen Daten angezeigt wird.
CSV-Trennzeichen Nein , Trennzeichen, das beim Schreiben temporärer Dateien verwendet werden soll, bei Festlegung von tempformat auf CSV oder CSV GZIP. Dies muss ein gültiges ASCII-Zeichen sein, z. B. "," oder "\|".
csvIgnoriereLeerzeichenvorDemText Nein true Wenn dieser Wert auf "true" festgelegt ist, werden führende Leerzeichen während des Schreibens aus den Werten entfernt, wenn tempformat auf CSV oder CSV GZIP gesetzt ist. Andernfalls wird der Leerraum beibehalten.
csvignoretrailingwhitespace Nein true Wenn dies auf „true“ festgelegt ist, werden nachgestellte Leerzeichen von Werten während Schreibvorgängen entfernt, wenn tempformat auf CSV oder CSV GZIP festgelegt ist. Andernfalls wird das Leerzeichen beibehalten.
infer_timestamp_ntz_type Nein false Bei einer Festlegung auf true werden Werte mit dem Redshift-Typ TIMESTAMP bei Lesevorgängen als TimestampNTZType (Zeitstempel ohne Zeitzone) interpretiert. Andernfalls werden alle Zeitstempel als TimestampType interpretiert, unabhängig vom Typ in der zugrunde liegenden Redshift-Tabelle.

Zusätzliche Konfigurationsoptionen

Konfigurieren der maximalen Größe von Zeichenfolgenspalten

Beim Erstellen von Redshift-Tabellen besteht das Standardverhalten darin, TEXT Spalten für Zeichenfolgenspalten zu erstellen. Rotshift speichert TEXT Spalten als VARCHAR(256), sodass diese Spalten eine maximale Größe von 256 Zeichen (Quelle) aufweisen.

Um größere Spalten zu unterstützen, können Sie das maxlength Spaltenmetadatenfeld verwenden, um die maximale Länge einzelner Zeichenfolgenspalten anzugeben. Dies ist auch hilfreich für die Implementierung von leistungssparenden Leistungsoptimierungen, indem Spalten mit einer geringeren maximalen Länge als der Standard deklariert werden.

Hinweis

Aufgrund von Einschränkungen in Spark unterstützen die SQL- und R-Sprach-APIs keine Änderung der Spaltenmetadaten.

Python

df = ... # the dataframe you'll want to write to Redshift

# Specify the custom width of each column
columnLengthMap = {
  "language_code": 2,
  "country_code": 2,
  "url": 2083,
}

# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
  metadata = {'maxlength': length}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", jdbcURL) \
  .option("tempdir", s3TempDirectory) \
  .option("dbtable", sessionTable) \
  .save()

Skala

Hier ist ein Beispiel für das Aktualisieren der Metadatenfelder mehrerer Spalten mithilfe der Scala-API von Spark:

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom width of each column
val columnLengthMap = Map(
  "language_code" -> 2,
  "country_code" -> 2,
  "url" -> 2083
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
  val metadata = new MetadataBuilder().putLong("maxlength", length).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

df.write
  .format("com.databricks.spark.redshift")
  .option("url", jdbcURL)
  .option("tempdir", s3TempDirectory)
  .option("dbtable", sessionTable)
.save()

Festlegen eines benutzerdefinierten Spaltentyps

Wenn Sie einen Spaltentyp manuell festlegen müssen, können Sie die redshift_type Spaltenmetadaten verwenden. Wenn Sie z. B. den Spark SQL Schema -> Redshift SQL-Typabgleicher überschreiben möchten, um einen benutzerdefinierten Spaltentyp zuzuweisen, können Sie wie folgt vorgehen:

Python

# Specify the custom type of each column
columnTypeMap = {
  "language_code": "CHAR(2)",
  "country_code": "CHAR(2)",
  "url": "BPCHAR(111)",
}

df = ... # the dataframe you'll want to write to Redshift

# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
  metadata = {'redshift_type': colType}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

Skala

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom type of each column
val columnTypeMap = Map(
  "language_code" -> "CHAR(2)",
  "country_code" -> "CHAR(2)",
  "url" -> "BPCHAR(111)"
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
  val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

Konfigurieren der Spaltencodierung

Verwenden Sie beim Erstellen einer Tabelle das encoding Spaltenmetadatenfeld, um eine Komprimierungscodierung für jede Spalte anzugeben (siehe Amazon-Dokumente für verfügbare Codierungen).

Festlegen von Beschreibungen für Spalten

Mit "Redshift" können Spalten Beschreibungen angefügt werden, die in den meisten Abfragetools angezeigt werden sollen (mithilfe des COMMENT Befehls). Sie können das description Spaltenmetadatenfeld so festlegen, dass eine Beschreibung für einzelne Spalten angegeben wird.

Abfrage-Pushdown in Redshift

Der Spark-Optimierer verschiebt die folgenden Operatoren nach unten in Redshift:

  • Filter
  • Project
  • Sort
  • Limit
  • Aggregation
  • Join

Innerhalb Project und Filter, es unterstützt die folgenden Ausdrücke:

  • Die meisten booleschen Logikoperatoren
  • Vergleiche
  • Grundlegende arithmetische Operationen
  • Numerische und Zeichenfolgenumwandlungen
  • Die meisten Zeichenfolgenfunktionen
  • Skalare Unterabfragen, wenn sie vollständig in Redshift gepusht werden können.

Hinweis

Dieser Pushdown unterstützt keine Ausdrücke, die auf Datums- und Zeitstempeln ausgeführt werden.

Innerhalb von Aggregation unterstützt er die folgenden Aggregationsfunktionen:

  • AVG
  • COUNT
  • MAX
  • MIN
  • SUM
  • STDDEV_SAMP
  • STDDEV_POP
  • VAR_SAMP
  • VAR_POP

in Kombination mit der DISTINCT Klausel, sofern zutreffend.

Innerhalb von Join unterstützt er die folgenden Arten von Verknüpfungen:

  • INNER JOIN
  • LEFT OUTER JOIN
  • RIGHT OUTER JOIN
  • LEFT SEMI JOIN
  • LEFT ANTI JOIN
  • Unterabfragen, die der Optimierer neu in Join schreibt, z. B. WHERE EXISTS, WHERE NOT EXISTS

Hinweis

Join-Pushdown unterstützt FULL OUTER JOIN nicht.

Der Pushdown ist u. U. bei Abfragen mit LIMIT am nützlichsten. Eine Abfrage, wie z. B. SELECT * FROM large_redshift_table LIMIT 10 könnte sehr lange dauern, da die gesamte Tabelle zunächst per UNLOAD als Zwischenergebnis in S3 entladen würde. Mit dem Pushdown wird LIMIT in Redshift ausgeführt. Bei Abfragen mit Aggregationen hilft auch der Pushdown der Aggregation in Redshift bei der Reduzierung der Datenmenge, die übertragen werden muss.

Der Abfrage-Pushdown in Redshift ist standardmäßig aktiviert. Die Funktion kann deaktiviert werden, indem spark.databricks.redshift.pushdown auf false gesetzt wird. Auch wenn sie deaktiviert sind, pusht Spark weiterhin Filter nach unten und führt die Spaltenauslöschung in Redshift durch.

Installation des Redshift-Treibers

Die Redshift-Datenquelle erfordert auch einen Redshift-kompatiblen JDBC-Treiber. Da Redshift auf dem PostgreSQL-Datenbanksystem basiert, können Sie den PostgreSQL-JDBC-Treiber verwenden, der im Lieferumfang der Databricks Runtime enthalten ist, oder den von Amazon empfohlenen Redshift JDBC-Treiber. Es ist keine Installation erforderlich, um den PostgreSQL JDBC-Treiber zu verwenden. Die Version des in jeder Databricks Runtime-Version enthaltenen PostgreSQL JDBC-Treibers ist in den Databricks Runtime-Versionshinweisen aufgeführt.

So installieren Sie den Redshift JDBC-Treiber manuell:

  1. Laden Sie den Treiber von Amazon herunter.
  2. Laden Sie den Treiber in Ihren Azure Databricks-Arbeitsbereich hoch. Siehe Installieren von Bibliotheken.
  3. Installieren Sie die Bibliothek auf Ihrem Cluster.

Hinweis

Databricks empfiehlt die Verwendung der neuesten Version des Redshift JDBC-Treibers. Versionen des Redshift JDBC Treibers unter 1.2.41 haben die folgenden Einschränkungen:

  • Version 1.2.16 des Treibers gibt leere Daten zurück, wenn eine Klausel in einer where SQL-Abfrage verwendet wird.
  • Versionen des Treibers älter als 1.2.41 geben möglicherweise ungültige Ergebnisse zurück, da die Nullfähigkeit einer Spalte fälschlicherweise als "Nicht-nullfähig" anstelle von "Unbekannt" angegeben wird.

Transaktionsgarantien

In diesem Abschnitt werden die Transaktionsgarantien der Redshift-Datenquelle für Spark beschrieben.

Allgemeiner Hintergrund für Redshift- und S3-Eigenschaften

Allgemeine Informationen zu Redshift-Transaktionsgarantien finden Sie im Kapitel "Managing Concurrent Write Operations " in der Redshift-Dokumentation. Kurz gesagt, Redshift bietet serialisierbare Isolation gemäß der Dokumentation für den Befehl Redshift BEGIN :

[obwohl] Sie alle vier Transaktionsisolationsstufen verwenden können, verarbeitet Amazon Redshift alle Isolationsstufen als serialisierbar.

Gemäß der Redshift-Dokumentation:

Amazon Redshift unterstützt ein standardmäßiges automatisches Commit-Verhalten , bei dem jeder separat ausgeführte SQL-Befehl einzeln committ.

Daher sind einzelne Befehle wie COPY und UNLOAD atomar und transaktional, während explizite BEGIN und END nur erforderlich sein sollten, um die Atomarität mehrerer Befehle oder Abfragen zu erzwingen.

Beim Lesen von und Schreiben in Redshift liest und schreibt die Datenquelle Daten in S3. Sowohl Spark als auch Redshift erzeugen partitionierte Ausgabe und speichern sie in mehreren Dateien in S3. Gemäß der Dokumentation zum Amazon S3-Datenkonsistenzmodell sind S3-Bucketauflistungsvorgänge letztendlich konsistent, sodass bei den Dateien besondere Maßnahmen ergriffen werden müssen, um fehlende oder unvollständige Daten aufgrund dieser Quelle letztendlicher Konsistenz zu vermeiden.

Garantien der Redshift-Datenquelle für Spark

Anfügen an eine vorhandene Tabelle

Beim Einfügen von Zeilen in Redshift verwendet die Datenquelle den BEFEHL COPY und gibt Manifeste an, die vor bestimmten letztendlich konsistenten S3-Vorgängen schützen. spark-redshift Daher weisen Anfüge an vorhandene Tabellen dieselben atomischen und transaktionsalen Eigenschaften wie normale Redshift-Befehle COPY auf.

Erstellen einer neuen Tabelle (SaveMode.CreateIfNotExists)

Das Erstellen einer neuen Tabelle ist ein zweistufiger Prozess, der aus einem CREATE TABLE Befehl gefolgt von einem COPY-Befehl besteht, um den anfänglichen Satz von Zeilen anzufügen. Beide Vorgänge werden in derselben Transaktion ausgeführt.

Überschreiben einer vorhandenen Tabelle

Standardmäßig verwendet die Datenquelle Transaktionen, um Überschreibungen auszuführen, die durch Löschen der Zieltabelle, das Erstellen einer neuen leeren Tabelle und das Anfügen von Zeilen an sie implementiert werden.

Wenn die veraltete usestagingtable Einstellung auf false festgelegt ist, führt die Datenquelle den DELETE TABLE Befehl aus, bevor sie Zeilen an die neue Tabelle anhängt. Dies opfert die Atomizität des Überschreibvorgangs, verringert jedoch den benötigten Stagingbereich, den Redshift während des Überschreibens benötigt.

Redshift-Tabellenabfrage

Abfragen verwenden den Redshift-Befehl UNLOAD, um eine Abfrage auszuführen und die Ergebnisse in S3 zu speichern. Manifeste werden verwendet, um gegen bestimmte eventuell konsistente S3-Vorgänge zu schützen. Daher sollten Abfragen aus der Redshift-Datenquelle für Spark dieselben Konsistenzeigenschaften wie normale Redshift-Abfragen aufweisen.

Häufige Probleme und Lösungen

S3 Bucket- und Redshift-Cluster befinden sich in verschiedenen AWS-Regionen

Standardmäßig funktionieren S3 <-> Redshift-Kopien nicht, wenn sich der S3-Bucket und der Redshift-Cluster in verschiedenen AWS-Regionen befinden.

Wenn Sie versuchen, eine Redshift-Tabelle zu lesen, wenn sich der S3-Bucket in einem anderen Bereich befindet, wird möglicherweise ein Fehler angezeigt, z. B.:

ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.

Ebenso kann der Versuch, mithilfe eines S3-Buckets in einem anderen Bereich in Redshift zu schreiben, den folgenden Fehler verursachen:

error:  Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
  • Schreibt: Der Befehl "Redshift COPY" unterstützt die explizite Spezifikation des S3-Bucketbereichs, sodass Schreibvorgänge in Redshift in diesen Fällen ordnungsgemäß durchgeführt werden können, indem Sie region 'the-region-name' zur extracopyoptions Einstellung hinzufügen. Verwenden Sie beispielsweise mit einem Bucket in der Region USA, Osten (Virginia) und der Skala-API:

    .option("extracopyoptions", "region 'us-east-1'")
    

    Alternativ können Sie die awsregion Einstellung verwenden:

    .option("awsregion", "us-east-1")
    
  • Liest: Der Befehl Redshift UNLOAD unterstützt auch die explizite Spezifikation der S3-Bucket-Region. Sie können Lesevorgänge ordnungsgemäß vornehmen, indem Sie der awsregion-Einstellung die Region hinzufügen:

    .option("awsregion", "us-east-1")
    

Authentifizierungsfehler beim Verwenden eines Kennworts mit Sonderzeichen in der JDBC-URL

Wenn Sie den Benutzernamen und das Kennwort als Teil der JDBC-URL angeben und das Kennwort Sonderzeichen wie ;, ?oder & enthält, wird möglicherweise die folgende Ausnahme angezeigt:

java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'

Dies wird durch Sonderzeichen im Benutzernamen oder Kennwort verursacht, die vom JDBC-Treiber nicht ordnungsgemäß entfernt werden. Stellen Sie sicher, dass Sie den Benutzernamen und das Kennwort mithilfe der entsprechenden DataFrame-Optionen user und password angeben. Weitere Informationen finden Sie unter Parameter.

Lange ausgeführte Spark-Abfrage hängt unbegrenzt, obwohl der entsprechende Redshift-Vorgang abgeschlossen ist.

Wenn Sie große Datenmengen aus und in Redshift lesen oder schreiben, hängt Ihre Spark-Abfrage möglicherweise unbegrenzt, auch wenn auf der AWS Redshift Monitoring-Seite angezeigt wird, dass der entsprechende LOAD- oder UNLOAD-Vorgang abgeschlossen ist und dass Cluster im Leerlauf ist. Dies wird durch die Verbindung zwischen Redshift und Spark verursacht. Um dies zu vermeiden, vergewissern Sie sich, dass das tcpKeepAlive JDBC-Flag aktiviert ist und TCPKeepAliveMinutes auf einen niedrigen Wert festgelegt ist (z. B. 1).

Weitere Informationen finden Sie unter Amazon Redshift JDBC Driver Configuration.

Zeitstempel mit Zeitzonensemantik

Beim Lesen von Daten werden Redshift TIMESTAMP- und TIMESTAMPTZ-Datentypen dem Spark-TimestampType zugeordnet, und ein Wert wird in koordinierte Weltzeit (UTC) konvertiert und als UTC-Zeitstempel gespeichert. Bei einer Redshift TIMESTAMPwird die lokale Zeitzone angenommen, da der Wert keine Zeitzoneninformationen enthält. Beim Schreiben von Daten in eine Redshift-Tabelle wird ein Spark TimestampType dem Redshift-Datentyp TIMESTAMP zugeordnet.

Migrationsleitfaden

Die Datenquelle erfordert jetzt, dass Sie explizit forward_spark_s3_credentials festlegen, bevor Spark S3-Anmeldeinformationen an Redshift weitergeleitet werden. Diese Änderung hat keine Auswirkungen, wenn Sie die aws_iam_role- oder temporary_aws_*-Authentifizierungsmechanismen verwenden. Wenn Sie sich jedoch auf das alte Standardverhalten verlassen haben, müssen Sie jetzt explizit forward_spark_s3_credentials auf true setzen, um Ihren vorherigen Redshift-zu-S3-Authentifizierungsmechanismus weiterhin verwenden zu können. Eine Diskussion der drei Authentifizierungsmechanismen und ihrer Sicherheitskompromisse finden Sie im Abschnitt "Authentifizierung zu S3 und Redshift" dieses Dokuments.