チェックポイントと先書きログが連携して、構造化ストリーミング ワークロードの処理が保証されます。 チェックポイントは、状態情報や処理されたレコードなど、クエリを識別する情報を追跡します。 チェックポイント ディレクトリ内のファイルを削除するか、新しいチェックポイントの場所に変更すると、クエリの次の実行が新たに開始されます。
クエリごとに異なるチェックポイントの場所が必要です。 複数のクエリに同じ場所を共有しないでください。
構造化ストリーミング クエリに対してチェックポイントを有効にする
次の例のように、ストリーミング クエリを実行する前に、checkpointLocation オプションを指定する必要があります。
Python
(df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
)
Scala
df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
Note
ノートブック内の display() の出力や memory シンクなど、一部のシンクでは、このオプションを省略すると、一時的なチェックポイントの場所が自動的に生成されます。 これらの一時的なチェックポイントの場所によってフォールト トレランスやデータ整合性の保証が確証されることはなく、適切にクリーンアップされない場合があります。 Databricks では、これらのシンクのチェックポイントの場所を常に指定することをお勧めします。
構造化ストリーミング クエリの変更後に復旧する
同じチェックポイントの場所からの再起動の間に、ストリーミング クエリで許可される変更には制限があります。 許可されていない変更、または変更の影響が十分に定義されていない変更の一部を次に示します。 それらすべてについて、次のとおりです。
- 許可される用語は、指定された変更を実行できるが、その効果のセマンティクスが十分に定義されているかどうかを意味します。これは、クエリと変更によって異なります。
- "許可されない" という用語は、再起動されたクエリが予期しないエラーで失敗する可能性が高いため、指定された変更を行うべきでないことを意味します。
-
sdfは、sparkSession.readStreamで生成されたストリーミング DataFrame/Dataset を表します。
構造化ストリーミング クエリでの変更の種類
入力ソースの数または種類 (つまり、異なるソース) の変更: これは許可されません。
入力ソースのパラメーターの変更: これが許可されているかどうか、および変更のセマンティクスが適切に定義されているかどうかは、ソースとクエリ (
maxFilesPerTriggerやmaxOffsetsPerTriggerなどのアドミッション制御を含む) によって異なります。 次に例をいくつか示します。レート制限の追加、削除、変更は許可されます。
spark.readStream.format("kafka").option("subscribe", "article")to
spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)サブスクライブされた記事とファイルに対する変更は、結果が予測不可能であるために、一般に許可されません。
spark.readStream.format("kafka").option("subscribe", "article")からspark.readStream.format("kafka").option("subscribe", "newarticle")へ
トリガー間隔の変更: 増分バッチと時間間隔の間でトリガーを変更できます。 「実行間のトリガー間隔の変更」を参照してください。
出力シンクの種類の変更: シンクのいくつかの特定の組み合わせ間の変更が許可されます。 これは、ケースバイケースで検証する必要があります。 次に例をいくつか示します。
- Kafka シンクへのファイル シンクが許可されます。 Kafka には新しいデータだけが表示されます。
- ファイル シンクへの Kafka シンクは許可されません。
- Kafka シンクを foreach に変更するか、またはその逆を許可します。
出力シンクのパラメーターの変更: これが許可されるかどうかと、変更のセマンティクスが十分に定義されているかどうかは、シンクとクエリによって異なります。 次に例をいくつか示します。
- ファイル シンクの出力ディレクトリに対する変更は許可されません。
sdf.writeStream.format("parquet").option("path", "/somePath")からsdf.writeStream.format("parquet").option("path", "/anotherPath")へ - 出力トピックに対する変更は許可されます。
sdf.writeStream.format("kafka").option("topic", "topic1")からsdf.writeStream.format("kafka").option("topic", "topic2")へ - ユーザー定義の foreach シンク (つまり
ForeachWriterコード) に対する変更は許可されますが、変更のセマンティクスはコードによって異なります。
- ファイル シンクの出力ディレクトリに対する変更は許可されません。
プロジェクション/フィルター/マップのような操作での変更: 一部の場合が許可されます。 例えば次が挙げられます。
- フィルターの追加または削除は、
sdf.selectExpr("a")からsdf.where(...).selectExpr("a").filter(...)に許可されます。 - 同じ出力スキーマを持つプロジェクションの変更は、
sdf.selectExpr("stringColumn AS json").writeStreamからsdf.select(to_json(...).as("json")).writeStreamに許可されます。 - 異なる出力スキーマを持つプロジェクションの変更は、条件付きで
sdf.selectExpr("a").writeStreamからsdf.selectExpr("b").writeStreamに許可されます。これは、出力シンクでスキーマを"a"から"b"に変更できる場合にのみです。
- フィルターの追加または削除は、
ステートフル操作の変更: ストリーミング クエリの一部の操作では、結果を継続的に更新するために状態データを維持する必要があります。 構造化ストリームは、状態データをフォールト トレラント ストレージ (DBFS、Azure Blob Storage など) に自動的にチェックポイント処理し、再起動後に復元します。 ただし、これは、状態データのスキーマが再起動間で同じままである必要があります。 つまり、ストリーミング クエリのステートフル操作に対する変更 (追加、削除、またはスキーマの変更) は、再起動の間は許可されません。 状態の回復を確保するために、再起動の間にスキーマを変更しないステートフル操作の一覧を次に示します。
-
ストリーミング集計: たとえば、
sdf.groupBy("a").agg(...)です。 グループ化キーまたは集計の数または種類の変更は許可されません。 -
ストリーミング重複排除: たとえば、
sdf.dropDuplicates("a")です。 グループ化キーまたは集計の数または種類の変更は許可されません。 -
ストリーム同士の結合: たとえば
sdf1.join(sdf2, ...)(つまり、両方の入力がsparkSession.readStreamと一緒に生成される) です。 スキーマ列または等結合列の変更は許可されません。 結合の種類 (外部または内部) の変更は許可されません。 結合条件のその他の変更は、不正に定義されています。 -
任意のステートフル操作: たとえば、
sdf.groupByKey(...).mapGroupsWithState(...)やsdf.groupByKey(...).flatMapGroupsWithState(...)です。 ユーザー定義状態のスキーマとタイムアウトの種類に対する変更は許可されません。 ユーザー定義の状態マッピング関数内での変更は許可されますが、変更のセマンティック効果はユーザー定義ロジックによって異なります。 状態スキーマの変更を本当にサポートする場合は、スキーマの移行をサポートするエンコード/デコード スキームを使用して、複雑な状態データ構造を明示的にバイトにエンコード/デコードできます。 たとえば、Avro でエンコードされたバイトとして状態を保存した場合、これによってバイナリ状態が復元されるため、クエリの再起動間で Avro-state-schema を自由に変更できます。
-
ストリーミング集計: たとえば、
Important
ステートフル演算子 dropDuplicates() と dropDuplicatesWithinWatermark() は、コンピューティング アクセス モード間で変更するときに状態スキーマの互換性チェックが原因で再起動に失敗する可能性があります。
専用アクセス モードと分離なしアクセス モードを切り替えることができます。 標準アクセス モードとサーバーレス アクセス モードの間での変更は許可されます。 他のアクセス モードの組み合わせを変更しないでください。
このエラーを回避するには、これらの演算子を含むストリーミング クエリのコンピューティング アクセス モードを変更しないでください。