次の方法で共有


GDPR コンプライアンスのためにデータを準備する

一般データ保護規則 (GDPR) とカリフォルニア消費者プライバシー法 (CCPA) は、企業が明示的な要求に応じて顧客に関して収集したすべての個人を特定できる情報 (PII) を永続的かつ完全に削除することを要求するプライバシーおよびデータ セキュリティ規制です。 "忘れられる権利" (RTBF) または "データ消去の権利" とも呼ばれます。削除要求は、指定された期間内 (たとえば、1 暦月内) に実行する必要があります。

この記事では、Databricks に格納されているデータに RTBF を実装する方法について説明します。 この記事に含まれる例では、eコマース企業のデータセットをモデル化し、ソース テーブル内のデータを削除し、これらの変更をダウンストリーム テーブルに反映する方法を示します。

"忘れられる権利" を実装するためのブループリント

次の図は、"忘れられる権利" を実装する方法を示しています。

GDPR コンプライアンスを実装する方法を示す図。

Delta Lake でのポイントの削除

Delta Lake は、ACID トランザクションを使用して大規模なデータ レイクでのポイント削除を高速化し、コンシューマーの GDPR または CCPA 要求に応じて個人の情報 (PII) を見つけて削除できるようにします。

Delta Lake はテーブル履歴を保持し、ポイントインタイム クエリとロールバックに使用できるようにします。 VACUUM関数は、Delta テーブルによって参照されなくなり、指定された保持しきい値より古いデータ ファイルを削除し、データを完全に削除します。 既定値と推奨事項の詳細については、「 テーブル履歴の操作」を参照してください。

削除ベクトルを使用するときにデータが削除されるようにする

削除ベクトルが有効になっているテーブルの場合、レコードを削除した後、 REORG TABLE ... APPLY (PURGE) を実行して基になるレコードを完全に削除する必要もあります。 これには、Delta Lake テーブル、具体化されたビュー、ストリーミング テーブルが含まれます。 「Parquet データ ファイルに変更を適用する」をご覧ください。

アップストリーム ソースのデータを削除する

GDPR と CCPA は、Kafka、ファイル、データベースなど、Delta Lake の外部のソース内のデータを含むすべてのデータに適用されます。 Databricks でデータを削除するだけでなく、キューやクラウド ストレージなどのアップストリーム ソースのデータも忘れずに削除する必要があります。

データ削除ワークフローを実装する前に、コンプライアンスまたはバックアップの目的でワークスペース データをエクスポートすることが必要になる場合があります。 「 ワークスペース データのエクスポート」を参照してください。

完全な削除は難読化よりも望ましいです

データの削除と難読化のどちらかを選択する必要があります。 難読化は、仮名化、データ マスクなどを使用して実装できます。ただし、最も安全なオプションは完全な消去です。実際には、再識別のリスクを排除するには、多くの場合、PII データを完全に削除する必要があるためです。

ブロンズ レイヤー内のデータを削除し、削除をシルバーレイヤーとゴールドレイヤーに反映する

GDPR と CCPA のコンプライアンスを開始するには、削除要求のテーブルを照会する定期的なジョブによって、まずブロンズレイヤー内のデータを削除することをお勧めします。 ブロンズ レイヤーからデータを削除した後、変更をシルバーレイヤーとゴールドレイヤーに反映できます。

履歴ファイルからデータを削除するためにテーブルを定期的に管理する

既定では、Delta Lake は削除されたレコードを含むテーブル履歴を 30 日間保持し、タイム トラベルとロールバックに使用できるようにします。 ただし、以前のバージョンのデータが削除された場合でも、データはクラウド ストレージに保持されます。 そのため、以前のバージョンのデータを削除するには、データセットを定期的に管理する必要があります。 推奨される方法は、ストリーミング テーブルと具体化されたビューの両方をインテリジェントに維持する Unity カタログ マネージド テーブルの予測最適化です。

  • 予測最適化によって管理されるテーブルの場合、Lakeflow Spark 宣言パイプラインは、使用パターンに基づいて、ストリーミング テーブルと具体化されたビューの両方をインテリジェントに維持します。
  • 予測最適化が有効になっていないテーブルの場合、Lakeflow Spark 宣言パイプラインは、ストリーミング テーブルと具体化されたビューが更新されてから 24 時間以内に自動的にメンテナンス タスクを実行します。

予測最適化または Lakeflow Spark 宣言パイプラインを使用していない場合は、デルタ テーブルで VACUUM コマンドを実行して、以前のバージョンのデータを完全に削除する必要があります。 既定では、タイム トラベル機能は 7 日に短縮されます。これは 構成可能な設定であり、対象のデータの履歴バージョンもクラウド ストレージから削除されます。

ブロンズ レイヤーから PII データを削除する

レイクハウスの設計によっては、PII と PII 以外のユーザー データの間のリンクを切断できる場合があります。 たとえば、電子メールなどの自然なキーではなく、 user_id などの非自然キーを使用している場合は、PII 以外のデータを残す PII データを削除できます。

この記事の残りの部分では、すべてのブロンズ テーブルからユーザー レコードを完全に削除することで RTBF を処理します。 次のコードに示すように、 DELETE コマンドを実行してデータを削除できます。

spark.sql("DELETE FROM bronze.users WHERE user_id = 5")

一度に多数のレコードを一緒に削除する場合は、 MERGE コマンドを使用することをお勧めします。 次のコードでは、gdpr_control_table列を含む user_id というコントロール テーブルがあることを前提としています。 このテーブルに "忘れられる権利" を要求したすべてのユーザーに対して、このテーブルにレコードを挿入します。

MERGE コマンドは、一致する行の条件を指定します。 この例では、target_tableのレコードを、gdpr_control_tableに基づいてuser_id内のレコードと照合します。 一致がある場合 (たとえば、user_idtarget_tableの両方のgdpr_control_table)、target_tableの行が削除されます。 この MERGE コマンドが成功したら、制御テーブルを更新して、要求が処理されたことを確認します。

spark.sql("""
  MERGE INTO target
  USING (
    SELECT user_id
    FROM gdpr_control_table
  ) AS source
  ON target.user_id = source.user_id
  WHEN MATCHED THEN DELETE
""")

ブロンズ層から銀層および金層への変化を伝達する

ブロンズ レイヤーでデータが削除されたら、変更をシルバーレイヤーとゴールドレイヤーのテーブルに反映する必要があります。

具体化されたビュー: 削除を自動的に処理する

具体化されたビューは、ソース内の削除を自動的に処理します。 そのため、具体化されたビューにソースから削除されたデータが含まれていないことを確認するために特別な操作を行う必要はありません。 マテリアライズド ビューを更新し、メンテナンスを実行して、削除が完全に処理されるようにする必要があります。

具体化されたビューは、完全な再計算よりも低い場合は増分計算を使用しますが、正確性を犠牲にすることはありませんので、常に正しい結果を返します。 つまり、ソースからデータを削除すると、具体化されたビューが完全に再計算される可能性があります。

削除を自動的に処理する方法を示す図。

ストリーミング テーブル: skipChangeCommits を使用してデータを削除し、ストリーミング ソースを読み取ります

ストリーミング テーブルは、Delta テーブル ソースからストリーム配信するときに追加専用データを処理します。 ストリーミング ソースからのレコードの更新や削除など、その他の操作はサポートされておらず、ストリームが中断されます。

より堅牢なストリーミング実装を実現するには、代わりに Delta テーブルの変更フィードからストリーミングし、処理コードで更新と削除を処理します。 オプション 1: 変更データ キャプチャ (CDC) フィードからのストリームを参照してください。

ストリーミング テーブルの削除を処理する方法を示す図。

Delta テーブルからのストリーミングでは新しいデータのみが処理されるため、データの変更は自分で処理する必要があります。 推奨される方法は、(1) DML を使用してソース Delta テーブルのデータを削除し、(2) DML を使用してストリーミング テーブルからデータを削除してから、(3) skipChangeCommitsを使用するようにストリーミング読み取りを更新することです。 このフラグは、ストリーミング テーブルが更新や削除など、挿入以外のものをスキップする必要があることを示します。

skipChangeCommits を使用する GDPR コンプライアンス方法を示す図。

または、(1) ソースからデータを削除してから、(2) ストリーミング テーブルを完全に更新することもできます。 ストリーミング テーブルを完全に更新すると、テーブルのストリーミング状態がクリアされ、すべてのデータが再処理されます。 保持期間を超えるアップストリーム データ ソース (たとえば、7 日後にデータを古くする Kafka トピック) は再び処理されないため、データが失われる可能性があります。 このオプションは、履歴データが使用可能で、再び処理してもコストが発生しないシナリオでのみ、テーブルをストリーミングする場合に推奨されます。

ストリーミング テーブルに対して完全な更新を実行する GDPR コンプライアンス方法を示す図。

例: eコマース企業の GDPR と CCPA コンプライアンス

次の図は、GDPR と CCPA コンプライアンスを実装する必要がある e コマース企業の medallion アーキテクチャを示しています。 ユーザーのデータが削除された場合でも、ダウンストリーム集計でアクティビティをカウントすることが必要になる場合があります。

eコマース企業の GDPR と CCPA コンプライアンスの例を示す図。

  • ソース テーブル
    • source_users - ユーザーのストリーミング ソース テーブル (この例では、ここで作成)。 運用環境では、通常、Kafka、Kinesis、または同様のストリーミング プラットフォームが使用されます。
    • source_clicks クリックのストリーミングソーステーブル(例として、ここで作成されたもの) 運用環境では、通常、Kafka、Kinesis、または同様のストリーミング プラットフォームが使用されます。
  • コントロール テーブル
    • gdpr_requests - "忘れられる権利" の対象となるユーザー ID を含む制御テーブル。 ユーザーが削除を要求したら、ここに追加します。
  • ブロンズレイヤー
    • users_bronze - ユーザー ディメンション。 PII (電子メール アドレスなど) が含まれます。
    • clicks_bronze - イベントをクリックします。 PII (IP アドレスなど) が含まれます。
  • シルバー レイヤー
    • clicks_silver - クリーンかつ標準化されたクリックス データ。
    • users_silver - クリーンアップおよび標準化されたユーザー データ。
    • user_clicks_silver - clicks_silver (ストリーミング) と users_silverのスナップショットを結合します。
  • ゴールド レイヤー
    • user_behavior_gold - 集計されたユーザー動作メトリック。
    • marketing_insights_gold - 市場分析情報のユーザー のセグメント化。

手順 1: サンプル データをテーブルに入力する

次のコードでは、この例用にこれら 2 つのテーブルを作成し、サンプル データを設定します。

  • source_users には、ユーザーに関するディメンション データが含まれています。 このテーブルには、 emailという PII 列が含まれています。
  • source_clicks には、ユーザーが実行したアクティビティに関するイベント データが含まれています。 ip_addressと呼ばれる PII 列が含まれています。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType, DateType

catalog = "users"
schema = "name"

# Create table containing sample users
users_schema = StructType([
   StructField('user_id', IntegerType(), False),
   StructField('username', StringType(), True),
   StructField('email', StringType(), True),
   StructField('registration_date', StringType(), True),
   StructField('user_preferences', MapType(StringType(), StringType()), True)
])

users_data = [
   (1, 'alice', 'alice@example.com', '2021-01-01', {'theme': 'dark', 'language': 'en'}),
   (2, 'bob', 'bob@example.com', '2021-02-15', {'theme': 'light', 'language': 'fr'}),
   (3, 'charlie', 'charlie@example.com', '2021-03-10', {'theme': 'dark', 'language': 'es'}),
   (4, 'david', 'david@example.com', '2021-04-20', {'theme': 'light', 'language': 'de'}),
   (5, 'eve', 'eve@example.com', '2021-05-25', {'theme': 'dark', 'language': 'it'})
]

users_df = spark.createDataFrame(users_data, schema=users_schema)
users_df.write.mode("overwrite").saveAsTable(f"{catalog}.{schema}.source_users")

# Create table containing clickstream (i.e. user activities)
from pyspark.sql.types import TimestampType

clicks_schema = StructType([
   StructField('click_id', IntegerType(), False),
   StructField('user_id', IntegerType(), True),
   StructField('url_clicked', StringType(), True),
   StructField('click_timestamp', StringType(), True),
   StructField('device_type', StringType(), True),
   StructField('ip_address', StringType(), True)
])

clicks_data = [
   (1001, 1, 'https://example.com/home', '2021-06-01T12:00:00', 'mobile', '192.168.1.1'),
   (1002, 1, 'https://example.com/about', '2021-06-01T12:05:00', 'desktop', '192.168.1.1'),
   (1003, 2, 'https://example.com/contact', '2021-06-02T14:00:00', 'tablet', '192.168.1.2'),
   (1004, 3, 'https://example.com/products', '2021-06-03T16:30:00', 'mobile', '192.168.1.3'),
   (1005, 4, 'https://example.com/services', '2021-06-04T10:15:00', 'desktop', '192.168.1.4'),
   (1006, 5, 'https://example.com/blog', '2021-06-05T09:45:00', 'tablet', '192.168.1.5')
]

clicks_df = spark.createDataFrame(clicks_data, schema=clicks_schema)
clicks_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.source_clicks")

手順 2: PII データを処理するパイプラインを作成する

次のコードは、上に示した medallion アーキテクチャのブロンズ、シルバー、ゴールドのレイヤーを作成します。

from pyspark import pipelines as dp
from pyspark.sql.functions import col, concat_ws, count, countDistinct, avg, when, expr

catalog = "users"
schema = "name"

# ----------------------------
# Bronze Layer - Raw Data Ingestion
# ----------------------------

@dp.table(
   name=f"{catalog}.{schema}.users_bronze",
   comment='Raw users data loaded from source'
)
def users_bronze():
   return (
     spark.readStream.table(f"{catalog}.{schema}.source_users")
   )

@dp.table(
   name=f"{catalog}.{schema}.clicks_bronze",
   comment='Raw clicks data loaded from source'
)
def clicks_bronze():
   return (
       spark.readStream.table(f"{catalog}.{schema}.source_clicks")
   )

# ----------------------------
# Silver Layer - Data Cleaning and Enrichment
# ----------------------------

@dp.create_streaming_table(
   name=f"{catalog}.{schema}.users_silver",
   comment='Cleaned and standardized users data'
)

@dp.view
@dp.expect_or_drop('valid_email', "email IS NOT NULL")
def users_bronze_view():
   return (
       spark.readStream
           .table(f"{catalog}.{schema}.users_bronze")
           .withColumn('registration_date', col('registration_date').cast('timestamp'))
           .dropDuplicates(['user_id', 'registration_date'])
           .select('user_id', 'username', 'email', 'registration_date', 'user_preferences')
   )

@dp.create_auto_cdc_flow(
   target=f"{catalog}.{schema}.users_silver",
   source="users_bronze_view",
   keys=["user_id"],
   sequence_by="registration_date",
)

@dp.table(
   name=f"{catalog}.{schema}.clicks_silver",
   comment='Cleaned and standardized clicks data'
)
@dp.expect_or_drop('valid_click_timestamp', "click_timestamp IS NOT NULL")
def clicks_silver():
   return (
       spark.readStream
           .table(f"{catalog}.{schema}.clicks_bronze")
           .withColumn('click_timestamp', col('click_timestamp').cast('timestamp'))
           .withWatermark('click_timestamp', '10 minutes')
           .dropDuplicates(['click_id'])
           .select('click_id', 'user_id', 'url_clicked', 'click_timestamp', 'device_type', 'ip_address')
   )

@dp.table(
   name=f"{catalog}.{schema}.user_clicks_silver",
   comment='Joined users and clicks data on user_id'
)
def user_clicks_silver():
   # Read users_silver as a static DataFrame - each refresh
   # will use a snapshot of the users_silver table.
   users = spark.read.table(f"{catalog}.{schema}.users_silver")

   # Read clicks_silver as a streaming DataFrame.
   clicks = spark.readStream \
       .table('clicks_silver')

   # Perform the join - join of a static dataset with a
   # streaming dataset creates a streaming table.
   joined_df = clicks.join(users, on='user_id', how='inner')

   return joined_df

# ----------------------------
# Gold Layer - Aggregated and Business-Level Data
# ----------------------------

@dp.materialized_view(
   name=f"{catalog}.{schema}.user_behavior_gold",
   comment='Aggregated user behavior metrics'
)
def user_behavior_gold():
   df = spark.read.table(f"{catalog}.{schema}.user_clicks_silver")
   return (
       df.groupBy('user_id')
         .agg(
             count('click_id').alias('total_clicks'),
             countDistinct('url_clicked').alias('unique_urls')
         )
   )

@dp.materialized_view(
   name=f"{catalog}.{schema}.marketing_insights_gold",
   comment='User segments for marketing insights'
)
def marketing_insights_gold():
   df = spark.read.table(f"{catalog}.{schema}.user_behavior_gold")
   return (
       df.withColumn(
           'engagement_segment',
           when(col('total_clicks') >= 100, 'High Engagement')
           .when((col('total_clicks') >= 50) & (col('total_clicks') < 100), 'Medium Engagement')
           .otherwise('Low Engagement')
       )
   )

手順 3: ソース テーブルのデータを削除する

この手順では、PII が見つかったすべてのテーブルのデータを削除します。 次の関数は、PII を持つテーブルからユーザーの PII のすべてのインスタンスを削除します。

catalog = "users"
schema = "name"

def apply_gdpr_delete(user_id):
 tables_with_pii = ["clicks_bronze", "users_bronze", "clicks_silver", "users_silver", "user_clicks_silver"]

 for table in tables_with_pii:
   print(f"Deleting user_id {user_id} from table {table}")
   spark.sql(f"""
     DELETE FROM {catalog}.{schema}.{table}
     WHERE user_id = {user_id}
   """)

手順 4: 影響を受けるストリーミング テーブルの定義に skipChangeCommits を追加する

この手順では、追加されていない行をスキップするように Lakeflow Spark 宣言パイプラインに指示する必要があります。 skipChangeCommits オプションを次のメソッドに追加します。 マテリアライズド ビューは自動的に更新と削除を処理するため、マテリアライズド ビューの定義を更新する必要はありません。

  • users_bronze
  • users_silver
  • clicks_bronze
  • clicks_silver
  • user_clicks_silver

次のコードは、 users_bronze メソッドを更新する方法を示しています。

def users_bronze():
   return (
     spark.readStream.option('skipChangeCommits', 'true').table(f"{catalog}.{schema}.source_users")
   )

パイプラインをもう一度実行すると、正常に更新されます。