共用方式為


準備您的資料以符合GDPR規範

一般數據保護條例(GDPR)和加州消費者隱私法(CCPA)是隱私權和數據安全性法規,要求公司在其明確要求后永久和完全刪除所有關於客戶的個人標識資訊(PII)。 也稱為「被遺忘的權利」(RTBF)或「數據清除權」,刪除要求必須在指定的期間內執行(例如,在一個日曆月份內)。

本文會引導您瞭解如何在 Databricks 中儲存的數據上實作 RTBF。 本文中包含的範例會建立電子商務公司的數據集模型,並示範如何刪除源數據表中的數據,並將這些變更傳播至下游數據表。

實作「被遺忘權」的藍圖

下圖說明如何實作「被遺忘的權利」。

說明如何實作GDPR合規性的圖表。

使用 Delta Lake 刪除

Delta Lake 使用 ACID 交易加快大型數據湖中的點刪除速度,讓您找出並移除個人可識別的資訊(PII),以回應消費者 GDPR 或 CCPA 要求。

Delta Lake 會保留數據表歷程記錄,並讓它可供時間點查詢和復原使用。 VACUUM 函式會移除 Delta 資料表中不再參考且早於指定保留閾值的數據檔案,並永久刪除這些數據。 想了解更多關於預設值和建議,請參見 「與資料表歷史合作」。

使用刪除向量時,請確定資料已刪除

針對已啟用刪除向量的數據表,刪除記錄之後,您也必須執行 REORG TABLE ... APPLY (PURGE) 以永久刪除基礎記錄。 這包括 Delta Lake 數據表、具體化檢視和串流數據表。 請參閱 將變更套用至 Parquet 資料檔

刪除上游來源中的數據

GDPR 和 CCPA 適用於所有數據,包括 Delta Lake 外部來源中的數據,例如 Kafka、檔案和資料庫。 除了刪除 Databricks 中的數據之外,您也必須記得刪除上游來源中的數據,例如佇列和雲端記憶體。

備註

在實施資料刪除工作流程之前,你可能需要匯出工作區資料以符合法規或備份。 請參見 匯出工作區資料

相較於混淆,完整刪除更為可取

您必須選擇刪除資料並將其模糊化。 模糊化可以使用化名化、數據遮罩等方式來實作。不過,最安全的選項是完全清除,因為實際上,消除重新識別的風險通常需要完整刪除 PII 數據。

刪除銅層中的數據,然後將刪除傳播至銀層和金層

我們建議您先在 bronz 層刪除資料,開始符合 GDPR 和 CCPA 合規,這是由一個排程工作查詢刪除請求表所驅動。 從銅層刪除數據之後,變更可以傳播到銀層和金層。

定期維護數據表,以從歷程記錄檔中移除數據

根據預設,Delta Lake 會保留數據表的歷程記錄(包括已刪除的記錄)長達 30 天,並使其可用於時間旅行和復原操作。 但是,即使移除舊版的數據,數據仍會保留在雲端記憶體中。 因此,你應該定期維護資料集,以移除先前版本的資料。 建議的方式是 Unity 目錄受控數據表的預測性優化,以智慧方式維護串流數據表和具體化檢視。

  • 對於由預測最佳化管理的資料表,Lakeflow Spark 宣告式管線會根據使用模式,智慧地維護串流資料表和具體化檢視。
  • 對於未啟用預測最佳化的資料表,Lakeflow Spark 宣告式管線會在串流資料表和具體化檢視更新後的 24 小時內自動執行維護工作。

如果您未使用預測優化或 Lakeflow Spark 宣告式工作流,您應該在 Delta 表上執行 VACUUM 命令,以永久移除舊版的資料。 根據預設,這會將時間移動功能減少到7天,這是 可設定的設定,並且也會從雲端記憶體移除有問題的數據歷程記錄版本。

從銅層刪除 PII 資料

根據湖倉的設計,您可能能夠切斷 PII 與非 PII 使用者資料之間的連結。 例如,如果您使用非自然密鑰,例如 user_id,而不是電子郵件之類的自然密鑰,您可以刪除 PII 資料,這會保留非 PII 資料。

本文的其餘部分將透過從所有基礎資料表中完全刪除用戶記錄來處理 RTBF。 您可以執行 DELETE 命令來刪除資料,如下列程式代碼所示:

spark.sql("DELETE FROM bronze.users WHERE user_id = 5")

一次一次刪除大量記錄時,建議您使用 MERGE 命令。 下列程式代碼假設您有一個名為 gdpr_control_table 的控件數據表,其中包含 user_id 數據行。 您需要為每位要求行使「被遺忘權利」的使用者在此數據表中插入一條記錄。

MERGE 命令會指定比對數據列的條件。 在此範例中,它會根據 target_table,比對來自 gdpr_control_table 的記錄與 user_id 中的記錄。 如果有匹配的項目(例如,user_idtarget_table中的 gdpr_control_table),則會刪除 target_table 中的行。 此 MERGE 命令成功之後,請更新控制數據表以確認已處理要求。

spark.sql("""
  MERGE INTO target
  USING (
    SELECT user_id
    FROM gdpr_control_table
  ) AS source
  ON target.user_id = source.user_id
  WHEN MATCHED THEN DELETE
""")

將更改從青銅層傳播到銀層和金層

在銅層中刪除資料之後,您必須將這些變更同步到銀層和金層的資料表。

具體化檢視:自動處理刪除

具象化檢視會自動處理來源中的刪除。 因此,您不需要執行任何特殊動作,以確保具體化檢視不包含已從來源刪除的數據。 您必須重新整理具象化檢視表並執行維護,以確保刪除操作能被完全處理。

具體化檢視總是傳回正確的結果,因為如果增量計算比完整重新計算更便宜,它就會使用增量計算,但絕不以正確性為代價。 換句話說,從來源刪除數據可能會導致具現化檢視完全重新計算。

圖表,說明如何自動處理刪除。

串流表:使用`skipChangeCommits`刪除資料並讀取串流來源

串流資料表在從 Delta 資料表來源串流時,會處理僅附加的資料。 其他操作,例如從串流來源更新或刪除紀錄,都不支援,且會破壞串流。

備註

若要實作更穩健的串流,建議從 Delta 表格的變更資料流中串流,並在處理程式碼中處理更新與刪除。 請參見 選項一:從變更資料擷取(CDC)串流

圖表,說明如何處理串流數據表中的刪除。

由於從 Delta 資料表串流只處理新資料,你必須自己處理資料變更。 推薦的方法是:(1) 使用 DML 刪除來源 Delta 資料表中的資料,(2) 使用 DML 刪除串流資料表的資料,然後 (3) 更新串流讀取以使用 skipChangeCommits。 此旗標表示串流表應跳過除插入外的所有內容,例如更新或刪除。

圖表,說明使用 skipChangeCommits 的 GDPR 合規性方法。

或者,你可以(1)從來源刪除資料,然後(2)完全刷新串流表。 當您完全重新整理串流數據表時,它會清除數據表的串流狀態,並重新處理所有數據。 任何超過保留期限的上游資料來源(例如,Kafka 主題在 7 天後過期的資料)將不再被處理,這可能會導致資料遺失。 我們建議此選項僅用於有歷程數據可用且重新處理不會產生高成本的流式資料表情境。

圖表顯示一種在串流資料表上執行完整重新整理以達到GDPR合規的方法。

範例:電子商務公司的GDPR和CCPA合規性

下圖顯示需要實作GDPR & CCPA合規性的電子商務公司的獎章架構。 即使刪除用戶的數據,您可能想要在下游匯總中計算其活動。

圖表,說明電子商務公司的GDPR和CCPA合規性範例。

  • 來源表
    • source_users - 使用者資料串流來源表(在此處建立,作為範例用)。 生產環境通常使用 Kafka、Kinesis 或類似的串流平台。
    • source_clicks - 點擊的串流來源表(此處建立,作為範例)。 生產環境通常使用 Kafka、Kinesis 或類似的串流平台。
  • 控制表
    • gdpr_requests - 包含使用者識別碼的控制數據表,受限於「被遺忘的權利」。 當用戶要求移除時,請在這裡加入他們。
  • 銅層
    • users_bronze - 使用者維度。 包含 PII(例如電子郵件位址)。
    • clicks_bronze - 按兩下事件。 包含 PII(例如 IP 位址)。
  • 銀層
    • clicks_silver - 清潔且標準化的點擊數據。
    • users_silver - 清理並標準化使用者資料。
    • user_clicks_silver - 將 clicks_silver(流媒體)與 users_silver 的快照結合。
  • 金層
    • user_behavior_gold - 彙整的用戶行為指標。
    • marketing_insights_gold - 用戶細分以進行市場洞察。

步驟 1:使用範例數據填入數據表

以下程式碼為此範例建立這兩個表格,並填充範例資料:

  • source_users 包含使用者的相關維度數據。 此數據表包含稱為 email的 PII 資料行。
  • source_clicks 包含使用者所執行活動的相關事件數據。 其中包含名為 ip_address的 PII 資料行。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType, DateType

catalog = "users"
schema = "name"

# Create table containing sample users
users_schema = StructType([
   StructField('user_id', IntegerType(), False),
   StructField('username', StringType(), True),
   StructField('email', StringType(), True),
   StructField('registration_date', StringType(), True),
   StructField('user_preferences', MapType(StringType(), StringType()), True)
])

users_data = [
   (1, 'alice', 'alice@example.com', '2021-01-01', {'theme': 'dark', 'language': 'en'}),
   (2, 'bob', 'bob@example.com', '2021-02-15', {'theme': 'light', 'language': 'fr'}),
   (3, 'charlie', 'charlie@example.com', '2021-03-10', {'theme': 'dark', 'language': 'es'}),
   (4, 'david', 'david@example.com', '2021-04-20', {'theme': 'light', 'language': 'de'}),
   (5, 'eve', 'eve@example.com', '2021-05-25', {'theme': 'dark', 'language': 'it'})
]

users_df = spark.createDataFrame(users_data, schema=users_schema)
users_df.write.mode("overwrite").saveAsTable(f"{catalog}.{schema}.source_users")

# Create table containing clickstream (i.e. user activities)
from pyspark.sql.types import TimestampType

clicks_schema = StructType([
   StructField('click_id', IntegerType(), False),
   StructField('user_id', IntegerType(), True),
   StructField('url_clicked', StringType(), True),
   StructField('click_timestamp', StringType(), True),
   StructField('device_type', StringType(), True),
   StructField('ip_address', StringType(), True)
])

clicks_data = [
   (1001, 1, 'https://example.com/home', '2021-06-01T12:00:00', 'mobile', '192.168.1.1'),
   (1002, 1, 'https://example.com/about', '2021-06-01T12:05:00', 'desktop', '192.168.1.1'),
   (1003, 2, 'https://example.com/contact', '2021-06-02T14:00:00', 'tablet', '192.168.1.2'),
   (1004, 3, 'https://example.com/products', '2021-06-03T16:30:00', 'mobile', '192.168.1.3'),
   (1005, 4, 'https://example.com/services', '2021-06-04T10:15:00', 'desktop', '192.168.1.4'),
   (1006, 5, 'https://example.com/blog', '2021-06-05T09:45:00', 'tablet', '192.168.1.5')
]

clicks_df = spark.createDataFrame(clicks_data, schema=clicks_schema)
clicks_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.source_clicks")

步驟 2:建立處理 PII 數據的管線

下列程式代碼會建立上述獎章架構的銅級、銀級和金層。

from pyspark import pipelines as dp
from pyspark.sql.functions import col, concat_ws, count, countDistinct, avg, when, expr

catalog = "users"
schema = "name"

# ----------------------------
# Bronze Layer - Raw Data Ingestion
# ----------------------------

@dp.table(
   name=f"{catalog}.{schema}.users_bronze",
   comment='Raw users data loaded from source'
)
def users_bronze():
   return (
     spark.readStream.table(f"{catalog}.{schema}.source_users")
   )

@dp.table(
   name=f"{catalog}.{schema}.clicks_bronze",
   comment='Raw clicks data loaded from source'
)
def clicks_bronze():
   return (
       spark.readStream.table(f"{catalog}.{schema}.source_clicks")
   )

# ----------------------------
# Silver Layer - Data Cleaning and Enrichment
# ----------------------------

@dp.create_streaming_table(
   name=f"{catalog}.{schema}.users_silver",
   comment='Cleaned and standardized users data'
)

@dp.view
@dp.expect_or_drop('valid_email', "email IS NOT NULL")
def users_bronze_view():
   return (
       spark.readStream
           .table(f"{catalog}.{schema}.users_bronze")
           .withColumn('registration_date', col('registration_date').cast('timestamp'))
           .dropDuplicates(['user_id', 'registration_date'])
           .select('user_id', 'username', 'email', 'registration_date', 'user_preferences')
   )

@dp.create_auto_cdc_flow(
   target=f"{catalog}.{schema}.users_silver",
   source="users_bronze_view",
   keys=["user_id"],
   sequence_by="registration_date",
)

@dp.table(
   name=f"{catalog}.{schema}.clicks_silver",
   comment='Cleaned and standardized clicks data'
)
@dp.expect_or_drop('valid_click_timestamp', "click_timestamp IS NOT NULL")
def clicks_silver():
   return (
       spark.readStream
           .table(f"{catalog}.{schema}.clicks_bronze")
           .withColumn('click_timestamp', col('click_timestamp').cast('timestamp'))
           .withWatermark('click_timestamp', '10 minutes')
           .dropDuplicates(['click_id'])
           .select('click_id', 'user_id', 'url_clicked', 'click_timestamp', 'device_type', 'ip_address')
   )

@dp.table(
   name=f"{catalog}.{schema}.user_clicks_silver",
   comment='Joined users and clicks data on user_id'
)
def user_clicks_silver():
   # Read users_silver as a static DataFrame - each refresh
   # will use a snapshot of the users_silver table.
   users = spark.read.table(f"{catalog}.{schema}.users_silver")

   # Read clicks_silver as a streaming DataFrame.
   clicks = spark.readStream \
       .table('clicks_silver')

   # Perform the join - join of a static dataset with a
   # streaming dataset creates a streaming table.
   joined_df = clicks.join(users, on='user_id', how='inner')

   return joined_df

# ----------------------------
# Gold Layer - Aggregated and Business-Level Data
# ----------------------------

@dp.materialized_view(
   name=f"{catalog}.{schema}.user_behavior_gold",
   comment='Aggregated user behavior metrics'
)
def user_behavior_gold():
   df = spark.read.table(f"{catalog}.{schema}.user_clicks_silver")
   return (
       df.groupBy('user_id')
         .agg(
             count('click_id').alias('total_clicks'),
             countDistinct('url_clicked').alias('unique_urls')
         )
   )

@dp.materialized_view(
   name=f"{catalog}.{schema}.marketing_insights_gold",
   comment='User segments for marketing insights'
)
def marketing_insights_gold():
   df = spark.read.table(f"{catalog}.{schema}.user_behavior_gold")
   return (
       df.withColumn(
           'engagement_segment',
           when(col('total_clicks') >= 100, 'High Engagement')
           .when((col('total_clicks') >= 50) & (col('total_clicks') < 100), 'Medium Engagement')
           .otherwise('Low Engagement')
       )
   )

步驟 3:刪除源數據表中的數據

在這個步驟中,你會刪除所有有個人識別資料的資料表。 以下函式會移除所有包含 PII 的資料表中的使用者 PII 實例。

catalog = "users"
schema = "name"

def apply_gdpr_delete(user_id):
 tables_with_pii = ["clicks_bronze", "users_bronze", "clicks_silver", "users_silver", "user_clicks_silver"]

 for table in tables_with_pii:
   print(f"Deleting user_id {user_id} from table {table}")
   spark.sql(f"""
     DELETE FROM {catalog}.{schema}.{table}
     WHERE user_id = {user_id}
   """)

步驟 4:將 skipChangeCommits 新增至受影響串流數據表的定義

在此步驟中,您必須告訴 Lakeflow Spark 宣告式資料管線略過未追加的資料列。 將skipChangeCommits選項新增至下列方法。 你不需要更新實體化視圖的定義,因為它們會自動處理更新和刪除。

  • users_bronze
  • users_silver
  • clicks_bronze
  • clicks_silver
  • user_clicks_silver

下列程式代碼示範如何更新 users_bronze 方法:

def users_bronze():
   return (
     spark.readStream.option('skipChangeCommits', 'true').table(f"{catalog}.{schema}.source_users")
   )

當您再次執行管道時,它會成功更新。