Azure Databricks を使用して、Amazon Redshift からテーブルの読み取りと書き込みを行うことができます。
重要
従来のクエリフェデレーションドキュメントは廃止され、更新されない可能性があります。 このコンテンツに記載されている構成は、Databricks によって公式に承認またはテストされていません。 Lakehouse Federation がソース データベースをサポートしている場合、Databricks では代わりにこれを使用することをお勧めします。
Databricks Redshift データ ソースは、Amazon S3 を使用して Redshift との間でデータを効率的に転送し、JDBC を使用して Redshift で適切な COPY コマンドおよび UNLOAD コマンドを自動的にトリガーします。
注
Databricks Runtime 11.3 LTS 以降、Databricks Runtime には Redshift JDBC ドライバーが含まれており、形式オプションの redshift キーワードを使用してアクセスできます。 各 Databricks Runtime に含まれるドライバー バージョンについては、Databricks Runtime リリース ノートのバージョンと互換性 を参照してください。 ユーザー指定のドライバーは引き続きサポートされており、バンドルされている JDBC ドライバーよりも優先されます。
Databricks Runtime 10.4 LTS 以下では、Redshift JDBC ドライバーを手動でインストールする必要があり、クエリではその形式にドライバー (com.databricks.spark.redshift) を使用する必要があります。
「Redshift ドライバーのインストール」を参照してください。
使用方法
次の例は、Redshift ドライバーとの接続を示しています。 PostgreSQL JDBC ドライバーを使用している場合は、 url パラメーターの値を置き換えます。
AWS の資格情報を構成すると、Python、SQL、R、または Scala の Spark データ ソース API でデータ ソースを使用できます。
重要
Unity Catalog で定義されている外部の場所は、tempdir の場所としてサポートされていません。
Python
# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") # Optional - will use default port 5439 if not specified.
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a query
df = (spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table
# Write back to a table
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
)
# Write back to a table using IAM Role based authentication
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
)
SQL
Databricks Runtime 10.4 LTS 以下での SQL を使用したデータの読み取り:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
dbtable '<table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Databricks Runtime 11.3 LTS 以降での SQL を使用したデータの読み取り:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
host '<hostname>',
port '<port>', /* Optional - will use default port 5439 if not specified. *./
user '<username>',
password '<password>',
database '<database-name>'
dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
tempdir 's3a://<bucket>/<directory-path>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
SQL を使用するデータの書き込み:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
dbtable '<new-table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;
SQL API では、新しいテーブルの作成のみがサポートされ、上書きや追加はサポートされません。
R
Databricks Runtime 10.4 LTS 以下での R を使用したデータの読み取り:
df <- read.df(
NULL,
"com.databricks.spark.redshift",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
dbtable = "<your-table-name>",
url = "jdbc:redshift://<the-rest-of-the-connection-string>")
Databricks Runtime 11.3 LTS 以降での R を使用したデータの読み取り:
df <- read.df(
NULL,
"redshift",
host = "hostname",
port = "port",
user = "username",
password = "password",
database = "database-name",
dbtable = "schema-name.table-name",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
forward_spark_s3_credentials = "true",
dbtable = "<your-table-name>")
スカラ (プログラミング言語)
// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") /* Optional - will use default port 5439 if not specified. */
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", true)
.load()
// Read data from a query
val df = spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table
// Write back to a table
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
// Write back to a table using IAM Role based authentication
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
Redshift の使用方法に関する推奨事項
クエリを実行すると、大量のデータが S3 に抽出される場合があります。 Redshift で同じデータに対して複数のクエリを実行する場合、Databricks では Delta Lake を使用して抽出されたデータを保存することをお勧めします。
コンフィギュレーション
S3 と Redshift への認証
次の図で示されているように、データ ソースにはいくつかのネットワーク接続が含まれます。
┌───────┐
┌───────────────────>│ S3 │<─────────────────┐
│ IAM or keys └───────┘ IAM or keys │
│ ^ │
│ │ IAM or keys │
v v ┌──────v────┐
┌────────────┐ ┌───────────┐ │┌──────────┴┐
│ Redshift │ │ Spark │ ││ Spark │
│ │<──────────>│ Driver │<────────>| Executors │
└────────────┘ └───────────┘ └───────────┘
JDBC with Configured
username / in
password Spark
(SSL enabled by default)
データ ソースは、Redshift との間でデータを転送するときに、S3 にデータの読み取りと書き込みを行います。 その結果、S3 バケット (tempdir 構成パラメーターを使用して指定) への読み取りと書き込みのアクセス権を持つ AWS 資格情報が必要になります。
注
データ ソースは、S3 で作成される一時ファイルをクリーンアップしません。 そのため、 オブジェクト ライフサイクル構成 で専用の一時 S3 バケットを使用して、指定した有効期限後に一時ファイルが自動的に削除されるようにすることをお勧めします。 これらのファイルを暗号化する方法については、このドキュメントの「暗号化」セクションを参照してください。
Unity Catalog で定義されている 外部の場所を tempdir の場所として使用することはできません。
次のセクションでは、各接続の認証構成オプションについて説明します。
Redshift への Spark ドライバー
Spark ドライバーは、ユーザー名とパスワードを使用して JDBC 経由で Redshift に接続します。 Redshift では、この接続を認証するための IAM ロールの使用はサポートされていません。 既定では、この接続では SSL 暗号化が使用されます。詳細については、「暗号化」を参照してください。
Spark から S3
S3 は、Redshift に対して読み取りまたは書き込みを行う際に、一括データを格納するための中継として機能します。 Spark は、Hadoop FileSystem インターフェイスと Amazon Java SDK の S3 クライアントの両方を使用して S3 に接続します。
注
DBFS マウントを使用して、Redshift の S3 へのアクセスを構成することはできません。
Hadoop conf でキーを設定する:Hadoop 構成プロパティで AWS キーを指定します。
tempdir構成がs3a://ファイルシステムを指している場合は、Hadoop XML 構成ファイルでfs.s3a.access.keyプロパティとfs.s3a.secret.keyプロパティを設定するか、sc.hadoopConfiguration.set()を呼び出して Spark のグローバル Hadoop 構成を構成できます。s3n://ファイルシステムを使用する場合は、次の例に示すように、レガシ構成キーを指定できます。スカラ (プログラミング言語)
たとえば、
s3aファイルシステムを使用している場合は、次のように追加します:sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")レガシ
s3nファイルシステムの場合は、次を追加します:sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")Python
次のコマンドはいくつかの Spark の内部構造に依存していますが、PySpark のすべてのバージョンで動作し、今後問題が発生する、または変更される可能性は低くなります:
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
Redshift から S3
forward_spark_s3_credentials オプションを true に設定して、Spark が JDBC 経由で S3 に接続するために使用している AWS キーの資格情報を Redshift に自動的に転送します。 JDBC クエリではこれらの資格情報が埋め込まれるため、Databricks では JDBC 接続の SSL 暗号化を有効にすることを強くお勧めします。
暗号化
JDBC のセキュリティ保護: SSL 関連の設定が JDBC URL に存在しない限り、データ ソースは既定で SSL 暗号化を有効にし、Redshift サーバーが信頼できるかどうかを確認します (つまり、
sslmode=verify-fullです)。 そのため、サーバー証明書は、初めて必要になったときに Amazon サーバーから自動的にダウンロードされます。 失敗した場合は、事前バンドルされた証明書ファイルがフォールバックとして使用されます。 これは、Redshift ドライバーと PostgreSQL JDBC ドライバーの両方に適用されます。この機能に問題がある場合、または単にSSLを無効にしたい場合は、
.option("autoenablessl", "false")またはDataFrameReaderでDataFrameWriterを呼び出すことができます。カスタム SSL 関連の設定を指定する場合は、Redshift ドキュメントの手順に従うことができます。Java での SSL 証明書とサーバー証明書の使用およびJDBC ドライバー構成オプション データ ソースで使用される JDBC
urlに存在するすべての SSL 関連オプションが優先されます (つまり、自動構成はトリガーされません)。S3 に格納されている UNLOAD データの暗号化 (Redshift からの読み取り時に格納されるデータ): S3 へのデータのUNLOAD に関する Redshift のドキュメントによると、「UNLOAD は、Amazon S3 サーバー側の暗号化 (SSE-S3) を使用してデータ ファイルを自動的に暗号化します」とあります。
Redshift では、カスタム キーを使用したクライアント側暗号化もサポートされていますが (暗号化されたデータ ファイルのアンロードを参照)、データ ソースには必要な対称キーを指定する機能がありません。
S3 に格納されている COPY データの暗号化 (Redshift への書き込み時に格納されるデータ): Amazon S3 からの暗号化されたデータ ファイルの読み込みに関する Redshift ドキュメントによる:
COPY コマンドを使用して、AWS マネージド暗号化キー (SSE-S3 または SSE-KMS)、クライアント側暗号化、またはその両方を使用して、サーバー側暗号化を使用して Amazon S3 にアップロードされたデータ ファイルを読み込むことができます。 COPY では、顧客が指定したキー (SSE-C) を使用した Amazon S3 サーバー側暗号化はサポートされていません。
パラメーター
Spark SQL に用意されているパラメーターマップまたは OPTIONS は、次の設定をサポートしています。
| パラメーター | 必須 | 既定値 | 説明 |
|---|---|---|---|
| dbtable | はい (クエリが指定されていない場合)。 | なし | Redshift で作成または読み取りを行うテーブル。 このパラメーターは、データを Redshift に保存するときに必要です。 |
| クエリ | はい (dbtable が指定されていない場合)。 | なし | Redshift から読み取るためのクエリ。 |
| ユーザー | いいえ | なし | Redshift ユーザー名。 パスワード オプションと共に使用する必要があります。 ユーザーとパスワードの両方が URL に渡されない場合のみ使用可能で、両方を渡すとエラーになります。 エスケープする必要がある特殊文字がユーザー名に含まれている場合は、このパラメーターを使用します。 |
| パスワード | いいえ | なし | Redshift パスワード。
user オプションと共に使用する必要があります。 ユーザーとパスワードの両方がURLに渡されない場合のみ使用可能で、両方を渡すとエラーになります。 エスケープする必要がある特殊文字がパスワードに含まれている場合は、このパラメーターを使用します。 |
| URL | イエス | なし | 次の形式の JDBC URLjdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password>subprotocol は、読み込んだ JDBC ドライバーに応じて、postgresql または redshift できます。 1 つの Redshift 互換ドライバーがクラスパス上にあり、この URL と一致している必要があります。
host と port は Redshift マスター ノードを指す必要があるため、ドライバー アプリケーションからのアクセスを許可するようにセキュリティ グループや VPC を構成する必要があります。
database は Redshift データベース名 user を識別し、password はデータベースにアクセスするための認証情報であり、JDBC のこの URL に埋め込む必要があり、ユーザー アカウントは参照されるテーブルに必要な権限を持っている必要があります。 |
| 検索パス | いいえ | なし | Redshift でスキーマ検索パスを設定します。
SET search_path to コマンドを使用して設定されます。 テーブルを検索するスキーマ名のコンマ区切りの一覧にする必要があります。
search_path の Redshift ドキュメント を参照してください。 |
| aws_iam_role | IAM ロールを使用して承認する場合のみ。 | なし | Redshift クラスターにアタッチされた IAM Redshift COPY/UNLOAD 操作ロールの完全に指定された ARN。例: arn:aws:iam::123456789000:role/<redshift-iam-role>。 |
| フォワードスパークS3クレデンシャル | いいえ | false |
true の場合、データ ソースは、Spark が S3 への接続に使用している資格情報を自動的に検出し、これらの資格情報を JDBC 経由で Redshift に転送します。 これらの資格情報は JDBC クエリの一部として送信されるため、このオプションを使用する場合は、JDBC 接続の SSL 暗号化を有効にすることを強くお勧めします。 |
| 一時的なAWSアクセスキーID | いいえ | なし | AWS アクセス キーには、S3 バケットへの書き込みアクセス許可が必要です。 |
| 一時的なAWS秘密アクセスキー | いいえ | なし | 提供されたアクセス キーに対応する AWS シークレット アクセス キー。 |
| temporary_aws_session_token | いいえ | なし | 指定されたアクセス キーに対応する AWS セッション トークン。 |
| tempdir | イエス | なし | Amazon S3 の書き込み可能な場所、読み取り時にアンロードされたデータに使用され、書き込み時に Avro データが Redshift に読み込まれます。 Spark の Redshift データ ソースを通常の ETL パイプラインの一部として使用している場合は、バケットに ライフサイクル ポリシー を設定し、それをこのデータの一時場所として使用すると便利です。 Unity Catalog で定義されている外部の場所 を tempdir の場所として使用することはできません。 |
| JDBCドライバー | いいえ | JDBC URL のサブプロトコルによって決定されます。 | 使用する JDBC ドライバーのクラス名。 このクラスは、クラスパス上にある必要があります。 JDBC URL のサブプロトコルによって適切なドライバー クラス名が自動的に決定されるため、ほとんどの場合、このオプションを指定する必要はありません。 |
| ディストスタイル | いいえ | EVEN |
テーブルの作成時に使用する Redshift ディストリビューション スタイル。
EVEN、KEY、または ALL のいずれかを指定できます (Redshift のドキュメントを参照)。
KEY を使用する場合は、ディストリビューション キーを distkey キー オプションで設定する必要もあります。 |
| distkey | いいえ (DISTSTYLE KEY を使用する場合以外) |
なし | テーブルの作成時にディストリビューション キーとして使用するテーブル内の列の名前。 |
| ソートキー仕様 | いいえ | なし | 完全な Redshift のソートキー定義。 例を次に示します。
|
| usestagingtable (非推奨) | いいえ | true |
この非推奨のオプションを false に設定すると、書き込みの開始時に上書き操作の宛先テーブルが直ちに削除され、上書き操作がアトミックでなくなり、コピー先テーブルの可用性が低下します。 これにより、上書きの一時的なディスク領域の要件が削減される可能性があります。usestagingtable=false 操作設定すると、データが失われたり使用できなくなったりするリスクがあるため、移行先テーブルを手動で削除する必要があるため、非推奨とされます。 |
| 説明 | いいえ | なし | テーブルの説明です。 SQL COMMENT コマンドを使用して設定され、ほとんどのクエリ ツールに表示されるはずです。 個々の列に説明を設定するには、description メタデータも参照してください。 |
| 事前行動 | いいえ | なし |
; コマンドを読み込む前に実行する SQL コマンドの COPY 区切られたリストです。 新しいデータを読み込む前に、DELETE コマンドや同様のコマンドをここで実行すると便利な場合があります。 コマンドに %sが含まれている場合、テーブル名は実行前に書式設定されます (ステージング テーブルを使用している場合)。これらのコマンドが失敗した場合、それがエラーとみなされ、例外がスローされることに注意してください。 ステージング テーブルを使用している場合、変更は元に戻され、事前アクションが失敗した場合はバックアップ テーブルが復元されます。 |
| postActions | いいえ | なし |
; 区切りのSQLコマンドリストは、データのロードが成功した後に実行されます。 新しいデータを読み込むときに、GRANT コマンドや同様のコマンドをいくつかここで実行すると便利な場合があります。 コマンドに %sが含まれている場合、テーブル名は実行前に書式設定されます (ステージング テーブルを使用している場合)。これらのコマンドが失敗した場合、それがエラーとみなされ、例外がスローされることに注意してください。 ステージング テーブルを使用している場合、変更は元に戻され、ポスト アクションが失敗した場合はバックアップ テーブルが復元されます。 |
| 追加コピーオプション | いいえ | なし | データを読み込むときに Redshift COPY コマンドに追加するその他のオプション一覧 (TRUNCATECOLUMNS や MAXERROR n など) (他のオプションについては、Redshift ドキュメントを参照してください)。これらのオプションは COPY コマンドの末尾にアペンドされるため、コマンドの最後に意味のあるオプションのみを使用できますが、可能な限り多くのユース ケースに対応する必要があります。 |
| tempformat | いいえ | AVRO |
Redshift に書き込み時に、S3 で一時ファイルを保存する形式。 既定値は AVROです。他に使用できる値として、CSV には CSV、gzipped CSV には CSV GZIP がそれぞれあります。Redshift は、Avro ファイルを読み込む場合よりも CSV の読み込み時に大幅に高速であるため、その tempformat を使用すると、Redshift への書き込み時にパフォーマンスが大幅に向上する可能性があります。 |
| csvnullstring | いいえ | @NULL@ |
CSV tempformat を使用するときに null に対して書き込む文字列値。 これは、実際のデータに表示されない値である必要があります。 |
| CSVセパレーター | いいえ | , |
tempformat が CSV または CSV GZIP に設定された一時ファイルを書き込むときに使用する区切り記号。 有効な ASCII 文字 ("," や "\|" など) である必要があります。 |
| csvignoreleadingwhitespace | いいえ | true |
true に設定すると、tempformat が CSV または CSV GZIP に設定されている場合、書き込み時に先頭の空白が値から削除されます。 それ以外の場合、空白は保持されます。 |
| csvignoretrailingwhitespace | いいえ | true |
true に設定すると、tempformat が CSV または CSV GZIP に設定されている場合、書き込み時に末尾の空白が値から削除されます。 それ以外の場合、空白は保持されます。 |
| infer_timestamp_ntz_type | いいえ | false |
true の場合、タイプ Redshift TIMESTAMP の値は、読み取り中に TimestampNTZType (タイム ゾーンのないタイムスタンプ) として解釈されます。 それ以外の場合、基になる Redshift テーブルでの型に関係なく、すべてのタイムスタンプが TimestampType として解釈されます。 |
その他の構成オプション
文字列列の最大サイズの構成
Redshift テーブルを作成する場合、既定の動作は文字列に対して TEXT 列を作成することです。 Redshift は TEXT 列を VARCHAR(256) として格納するため、これらの列の最大サイズは 256 文字です (ソース)。
より大きな列をサポートするには、maxlength 列メタデータ フィールドを使用して、個々の文字列の最大長を指定できます。 これは、既定値よりも長さが小さい列を宣言することで、省スペースのパフォーマンス最適化を実装する場合にも役立ちます。
注
Spark の制限により、SQL および R 言語 API は列メタデータの変更をサポートしていません。
Python
df = ... # the dataframe you'll want to write to Redshift
# Specify the custom width of each column
columnLengthMap = {
"language_code": 2,
"country_code": 2,
"url": 2083,
}
# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
metadata = {'maxlength': length}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
df.write \
.format("com.databricks.spark.redshift") \
.option("url", jdbcURL) \
.option("tempdir", s3TempDirectory) \
.option("dbtable", sessionTable) \
.save()
スカラ (プログラミング言語)
Spark の Scala API を使用して複数の列のメタデータ フィールドを更新する例を次に示します。
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom width of each column
val columnLengthMap = Map(
"language_code" -> 2,
"country_code" -> 2,
"url" -> 2083
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
val metadata = new MetadataBuilder().putLong("maxlength", length).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
df.write
.format("com.databricks.spark.redshift")
.option("url", jdbcURL)
.option("tempdir", s3TempDirectory)
.option("dbtable", sessionTable)
.save()
カスタム列の型を設定する
列の種類を手動で設定する必要がある場合は、列のメタデータを redshift_type 使用できます。 たとえば、Spark SQL Schema -> Redshift SQL 型マッチャーをオーバーライドしてユーザー定義の列型を割り当てる場合は、次の操作を実行できます。
Python
# Specify the custom type of each column
columnTypeMap = {
"language_code": "CHAR(2)",
"country_code": "CHAR(2)",
"url": "BPCHAR(111)",
}
df = ... # the dataframe you'll want to write to Redshift
# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
metadata = {'redshift_type': colType}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
スカラ (プログラミング言語)
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom type of each column
val columnTypeMap = Map(
"language_code" -> "CHAR(2)",
"country_code" -> "CHAR(2)",
"url" -> "BPCHAR(111)"
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
列暗号化の構成
テーブルを作成するときは、encoding 列メタデータ フィールドを使用して各列の圧縮エンコードを指定します (使用可能なエンコードについては、「Amazon のドキュメント」 を参照してください)。
列に説明を設定する
Redshift では、(COMMENT コマンドを使用して) ほとんどのクエリ ツールに表示する必要がある説明を列に添付できます。
description 列メタデータ フィールドを設定して、個々の列の説明を指定できます。
Redshift へのプッシュダウンのクエリ
Spark オプティマイザーは、次の演算子を Redshift にプッシュダウンします。
FilterProjectSortLimitAggregationJoin
Project および Filter 内では、次の式がサポートされます。
- ほとんどのブール型ロジック演算子
- 比較
- 基本的な算術演算
- 数値と文字列のキャスト
- ほとんどの文字列関数
- スカラー サブクエリ (完全に Redshift にプッシュダウンできる場合)。
注
このプッシュダウンでは、日付とタイムスタンプで動作する式はサポートされていません。
Aggregation 内では、次の集計関数がサポートされます。
AVGCOUNTMAXMINSUMSTDDEV_SAMPSTDDEV_POPVAR_SAMPVAR_POP
必要に応じて、DISTINCT 句と組み合わせます。
Join 内では、次の種類の結合がサポートされます。
INNER JOINLEFT OUTER JOINRIGHT OUTER JOINLEFT SEMI JOINLEFT ANTI JOIN- オプティマイザーによって
Joinへ書き換えられたサブクエリ (例:WHERE EXISTS、WHERE NOT EXISTS)
注
結合プッシュダウンは FULL OUTER JOIN をサポートしていません。
プッシュダウンは、LIMIT のクエリで最も役立つ可能性があります。 テーブル全体が最初に中間結果として S3 に読み込まれるため、SELECT * FROM large_redshift_table LIMIT 10 のようなクエリには非常に長い時間がかかる可能性があります。 プッシュダウンでは、 LIMIT は Redshift で実行されます。 集計を含むクエリでは、Redshift に集計をプッシュすると、転送する必要があるデータの量を減らすのにも役立ちます。
既定では、Redshift へのクエリ プッシュダウンは有効になっています。 この機能を無効にするには、spark.databricks.redshift.pushdown を false に設定します。 無効にした場合でも、Spark はフィルターを押し下げ、列の削除を Redshift に実行します。
Redshift ドライバーのインストール
Redshift データ ソースには、Redshift 互換の JDBC ドライバーも必要です。 Redshift は PostgreSQL データベース システムに基づいているため、Databricks Runtime に含まれている PostgreSQL JDBC ドライバーまたは Amazon が推奨する Redshift JDBC ドライバーを使用できます。 PostgreSQL JDBC ドライバーを使用するには、インストールは必要ありません。 各 Databricks Runtime リリースに含まれる PostgreSQL JDBC ドライバーのバージョンは、Databricks Runtime リリース ノートに記載されています。
Redshift JDBC ドライバーを手動でインストールするには:
- Amazon からドライバーを [ダウンロード] します。
- ドライバーを Azure Databricks ワークスペースに [アップロード] します。 「 ライブラリのインストール」を参照してください。
- ライブラリをクラスターにインストールします。
注
Databricks では、Redshift JDBC ドライバーの最新バージョンを使用することをお勧めします。 1.2.41 より前の Redshift JDBC ドライバーのバージョンには、次の制限があります。
- バージョン 1.2.16 のドライバーは、SQL クエリで
where句を使用すると空のデータを返します。 - 1.2.41 より前のバージョンのドライバーでは、列の null 許容が "不明" ではなく "Not Nullable" と誤って報告されるため、無効な結果が返されることがあります。
トランザクション保証
このセクションでは、Spark の Redshift データ ソースのトランザクション保証について説明します。
Redshift プロパティと S3 プロパティの一般的な背景
Redshift トランザクション保証の一般的な情報については、Redshift ドキュメントの 同時実行書き込み操作の管理 に関する章を参照してください。 簡単に言うと、Redshift は Redshift BEGIN コマンドのドキュメントに従ってシリアル化可能な分離を提供します。
[ただし] 4 つのトランザクション分離レベルのいずれかを使用できますが、Amazon Redshift はすべての分離レベルをシリアル化可能として処理します。
次の Redshift ドキュメントを参照してください。
Amazon Redshift では、個別に実行される各 SQL コマンドが個別にコミットされる既定の 自動 コミット動作がサポートされています。
したがって、COPY および UNLOADのような個々のコマンドはアトミックで トランザクション的ですが、明示的な BEGIN および END は複数のコマンドまたはクエリのアトミック性を強制するためにのみ必要です。
Redshift からの読み取りと Redshift への書き込み時に、データ ソースは S3 でデータの読み取りと書き込みを行います。 Spark と Redshift の両方で、パーティション分割された出力が生成され、S3 の複数のファイルに格納されます。 Amazon S3 データ整合性モデルのドキュメントによると、S3 バケット登録操作は最終的に一貫性があるため、この結果整合性のソースによるデータの欠落または不完全を回避するために、ファイルは特別な長さに移動する必要があります。
Spark の Redshift データ ソースの保証
既存のテーブルに追加する
Redshift に行を挿入する場合、データ ソースは COPY コマンドを使用し、最終的に一貫性のある特定の S3 操作から保護する マニフェスト を指定します。 その結果、既存のテーブルへの追加は、 spark-redshift は通常の Redshift COPY コマンドと同じアトミック プロパティとトランザクション プロパティを持ちます。
新しいテーブルを作成する (SaveMode.CreateIfNotExists)
新しいテーブルの作成は2段階であり、CREATE TABLE コマンドを使用して、その後にCOPYコマンドで最初の行セットを追加します。 どちらの操作も同じトランザクションで実行されます。
既存のテーブルを上書きする
既定では、データ ソースはトランザクションを使用して上書きを実行します。これは、コピー先テーブルの削除、新しい空のテーブルの作成、および行の追加によって実装されます。
非推奨の usestagingtable 設定が false に設定されている場合、データ ソースは、新しいテーブルに行を追加する前に DELETE TABLE コマンドをコミットし、上書き操作のアトミック性を犠牲にしながら、上書き時に Redshift が必要とするステージング領域の量を減らします。
Redshift テーブルのクエリ
クエリでは、Redshift UNLOAD コマンドを使用してクエリを実行し、その結果を S3 に保存し、 マニフェスト を使用して、最終的に一貫性のある特定の S3 操作から保護します。 その結果、Spark の Redshift データ ソースからのクエリは、通常の Redshift クエリと同じ整合性プロパティを持つ必要があります。
一般的な問題と解決策
S3 バケットと Redshift クラスターは異なる AWS リージョンにあります
既定では、S3 バケットと Redshift クラスターが異なる AWS リージョンにある場合、S3 <-> Redshift コピーは機能しません。
S3 バケットが別のリージョンにあるときに Redshift テーブルを読み取ろうとすると、次のようなエラーが表示されることがあります。
ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.
同様に、別のリージョンの S3 バケットを使用して Redshift に書き込もうとすると、次のエラーが発生する可能性があります。
error: Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
書き込み: Redshift COPY コマンドは、S3 バケット領域の明示的な指定をサポートしているため、
region 'the-region-name'設定にextracopyoptionsを追加することで、このような場合に Redshift への書き込みを適切に行うことができます。 たとえば、米国東部 (バージニア) リージョンと Scala API のバケットでは、次の値を使用します。.option("extracopyoptions", "region 'us-east-1'")または、次の
awsregion設定を使用できます。.option("awsregion", "us-east-1")読み取り: Redshift UNLOAD コマンドでは、S3 バケットリージョンの明示的な指定もサポートされています。 リージョンを次の
awsregion設定に追加することで、読み取りを適切に動作させることができます。.option("awsregion", "us-east-1")
JDBC URL に特殊文字を含むパスワードを使用する場合の認証エラー
JDBC URL の一部としてユーザー名とパスワードを指定し、パスワードに特殊文字 (;、または ?&) が含まれている場合は、次の例外が発生する可能性があります。
java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'
これは、ユーザー名またはパスワードの特殊文字が JDBC ドライバーによって正しくエスケープされていないことが原因で発生します。 対応する DataFrame オプション user および password を使用して、ユーザー名とパスワードを必ず指定してください。 詳しくは、「パラメーター」をご覧ください。
対応する Redshift 操作が実行されていても、実行時間の長い Spark クエリが無期限に停止する
Redshift との間で大量のデータの読み取りまたは書き込みを行っている場合は、AWS Redshift 監視ページに対応する LOAD または UNLOAD 操作が完了し、クラスターがアイドル状態であることが示されていても、Spark クエリが無期限に停止する可能性があります。 これは、Redshift と Spark の間の接続がタイムアウトした場合に発生します。これを回避するには、tcpKeepAlive JDBC フラグが有効になっていて、TCPKeepAliveMinutes は低い値 (1 など) に設定されていることを確認します。
詳細については、「Amazon Redshift JDBC Driver の構成」を参照してください。
タイムゾーン セマンティクスがあるタイムスタンプ
データを読み取るとき、Redshift TIMESTAMP 型と TIMESTAMPTZ データ型の両方が Spark TimestampTypeにマップされ、値が協定世界時 (UTC) に変換され、UTC タイムスタンプとして格納されます。 Redshift TIMESTAMP の場合、ローカルのタイムゾーンは、値にタイムゾーン情報がないためと見なされます。 Redshift テーブルにデータを書き込む場合、Spark TimestampType は Redshift TIMESTAMP データ型にマップされます。
移行ガイド
データ ソースでは、Spark S3 資格情報が Redshift に転送される前に明示的に forward_spark_s3_credentials を設定する必要があります。 この変更は、aws_iam_role または temporary_aws_* 認証メカニズムを使用する場合には影響しません。 ただし、以前の既定の動作に依存していた場合は、以前の Redshift から S3 認証メカニズムを引き続き使用するように明示的に forward_spark_s3_credentials を true 設定する必要があります。 3 つの認証メカニズムとそのセキュリティのトレードオフについては、このドキュメントの 「S3 と Redshift への認証」を参照してください。