Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Die Datenschutz-Grundverordnung (DSGVO) und das California Consumer Privacy Act (CCPA) sind Datenschutz- und Datenschutzbestimmungen, die unternehmen auffordern, alle personenbezogenen Informationen (PII), die über einen Kunden auf ausdrücklicher Anfrage gesammelt werden, dauerhaft und vollständig zu löschen. Auch bekannt als "Recht auf Vergessenwerden" (RTBF) oder "Recht auf Datenlöschung", müssen Löschanforderungen während eines bestimmten Zeitraums (z. B. innerhalb eines Kalendermonats) ausgeführt werden.
Dieser Artikel führt Sie durch die Implementierung von RTBF für daten, die in Databricks gespeichert sind. Das in diesem Artikel enthaltene Beispiel modelliert Datasets für ein E-Commerce-Unternehmen und zeigt, wie Sie Daten in Quelltabellen löschen und diese Änderungen an nachgelagerte Tabellen weitergeben.
Blaupause für die Umsetzung des "Rechts auf Vergessenwerden"
Das folgende Diagramm veranschaulicht, wie das "Recht auf Vergessenwerden" implementiert wird.
-Punktlöschungen mit Delta Lake
Delta Lake beschleunigt punktuelle Löschungen in großen Data Lakes mit ACID-Transaktionen. Dadurch können Sie personenbezogene Informationen (personally identifiable information, PII) finden und löschen, um auf DSGVO- oder CCPA-bezogene Anfragen zu reagieren.
Delta Lake behält den Tabellenverlauf bei und stellt ihn für Zeitpunktabfragen und Rollbacks zur Verfügung. Die VACUUM-Funktion entfernt Datendateien, auf die nicht mehr von einer Delta-Tabelle verwiesen wird und älter als ein angegebener Aufbewahrungsschwellenwert ist und die Daten dauerhaft löscht. Weitere Informationen zu Standardeinstellungen und Empfehlungen finden Sie unter Arbeiten mit Delta Lake-Tabellenverlauf.
Sicherstellen, dass Daten beim Verwenden von Löschvektoren gelöscht werden
Für Tabellen mit aktivierten Löschvektoren müssen Sie nach dem Löschen von Datensätzen auch den Befehl REORG TABLE ... APPLY (PURGE) ausführen, um zugrunde liegende Datensätze dauerhaft zu löschen. Dazu gehören Delta Lake-Tabellen, materialisierte Ansichten und Streamingtabellen. Siehe Anwenden von Änderungen auf Parquet-Datendateien.
Löschen von Daten in Upstreamquellen
DSGVO und CCPA gelten für alle Daten, einschließlich Daten in Quellen außerhalb des Delta Lake, wie z. B. Kafka, Dateien und Datenbanken. Zusätzlich zum Löschen von Daten in Databricks müssen Sie auch daran denken, Daten in Upstreamquellen wie Warteschlangen und Cloudspeicher zu löschen.
Hinweis
Vor der Implementierung von Datenlöschworkflows müssen Sie möglicherweise Arbeitsbereichsdaten für Compliance- oder Sicherungszwecke exportieren. Siehe "Arbeitsbereichsdaten exportieren".
Vollständige Löschung ist der Verschleierung vorzuziehen.
Sie müssen zwischen dem Löschen von Daten und dem Verschleiern der Daten wählen. Obfuscation kann durch Pseudonymisierung, Datenmaskierung usw. implementiert werden. Die sicherste Option ist jedoch die vollständige Löschung, da in der Praxis zur Beseitigung des Reidentifizierungsrisikos häufig eine vollständige Löschung von personenbezogenen Daten erforderlich ist.
Löschen von Daten in der Bronze-Schicht und dann Weitergeben der Löschungen an die Silber- und Gold-Schichten
Es wird empfohlen, die DSGVO- und CCPA-Compliance zu starten, indem Sie zuerst Daten in der Bronzeschicht löschen, die durch einen geplanten Auftrag gesteuert wird, der eine Tabelle mit Löschanforderungen abfragt. Nachdem Daten aus der Bronzeschicht gelöscht wurden, können Änderungen an Silber- und Goldschichten weitergegeben werden.
Es werden regelmäßig Tabellen verwaltet, um Daten aus historischen Dateien zu entfernen.
Standardmäßig behält Delta Lake den Tabellenverlauf, einschließlich gelöschter Datensätze, für 30 Tage bei und stellt sie für Zeitreisen und Rollbacks zur Verfügung. Auch wenn frühere Versionen der Daten entfernt werden, werden die Daten weiterhin im Cloudspeicher aufbewahrt. Daher sollten Sie Datasets regelmäßig verwalten, um frühere Versionen von Daten zu entfernen. Die empfohlene Methode ist die Predictive-Optimierung für verwaltete Unity-Katalog-Tabellen, die sowohl Streamingtabellen als auch materialisierte Ansichten intelligent verwaltet.
- Für Tabellen, die durch eine predictive Optimierung verwaltet werden, verwaltet Lakeflow Spark Declarative Pipelines basierend auf Verwendungsmustern intelligent sowohl Streamingtabellen als auch materialisierte Ansichten.
- Für Tabellen ohne prädiktive Optimierung führt Lakeflow Spark Declarative Pipelines Wartungsaufgaben automatisch innerhalb von 24 Stunden durch, nachdem Streaming-Tabellen und materialisierte Ansichten aktualisiert wurden.
Wenn Sie keine Predictive Optimierung oder Lakeflow Spark Declarative Pipelines verwenden, sollten Sie einen VACUUM Befehl auf Delta-Tabellen ausführen, um frühere Versionen von Daten dauerhaft zu entfernen. Dies reduziert standardmäßig die Zeitreisefunktionen auf 7 Tage, was eine konfigurierbare Einstellung ist, und entfernt auch historische Versionen der betreffenden Daten aus dem Cloudspeicher.
Löschen von PII-Daten aus der Bronzeschicht
Je nach Design Ihres Lakehouses können Sie möglicherweise die Verbindung zwischen personenbezogenen Informationen (PII) und nicht-personenbezogenen Benutzerdaten abtrennen. Wenn Sie z. B. einen nicht natürlichen Schlüssel wie user_id anstelle eines natürlichen Schlüssels wie E-Mail verwenden, können Sie PII-Daten löschen und nicht-PII-Daten bleiben erhalten.
Im restlichen Artikel wird RTBF durch vollständiges Löschen von Benutzerdatensätzen aus allen Bronzetabellen beschrieben. Sie können Daten löschen, indem Sie einen DELETE Befehl ausführen, wie im folgenden Code gezeigt:
spark.sql("DELETE FROM bronze.users WHERE user_id = 5")
Beim gleichzeitigen Löschen einer großen Anzahl von Datensätzen empfehlen wir die Verwendung des MERGE Befehls. Im folgenden Code wird davon ausgegangen, dass Sie über eine Steuerelementtabelle verfügen gdpr_control_table , die eine user_id Spalte enthält. Sie fügen einen Datensatz in diese Tabelle für jeden Benutzer ein, der das "Recht auf Vergessenwerden" in dieser Tabelle angefordert hat.
Der MERGE Befehl gibt die Bedingung für übereinstimmende Zeilen an. In diesem Beispiel werden Datensätze aus target_table mit den Datensätzen in gdpr_control_table basierend auf dem user_id abgeglichen. Wenn eine Übereinstimmung vorhanden ist (z. B. eine user_id sowohl in der target_table als auch der gdpr_control_table), wird die Zeile in der target_table gelöscht. Aktualisieren Sie nach erfolgreicher Ausführung dieses MERGE Befehls die Steuerelementtabelle, um zu bestätigen, dass die Anforderung verarbeitet wurde.
spark.sql("""
MERGE INTO target
USING (
SELECT user_id
FROM gdpr_control_table
) AS source
ON target.user_id = source.user_id
WHEN MATCHED THEN DELETE
""")
Verteilen von Änderungen von der Bronzeschicht auf die Silber- und Goldschichten
Nachdem Daten in der Bronzeschicht gelöscht wurden, müssen Sie die Änderungen an Tabellen in den Silber- und Goldschichten weitergeben.
Materialisierte Ansichten: Automatische Behandlung von Löschungen
Materialisierte Ansichten behandeln automatisch Löschungen in Quellen. Daher müssen Sie nichts Besonderes tun, um sicherzustellen, dass eine materialisierte Ansicht keine Daten enthält, die aus einer Quelle gelöscht wurden. Sie müssen eine materialisierte Ansicht aktualisieren und Wartung ausführen, um sicherzustellen, dass Löschungen vollständig verarbeitet werden.
Eine materialisierte Ansicht gibt immer das richtige Ergebnis zurück, da sie die inkrementelle Berechnung verwendet, wenn sie billiger als die vollständige Neuberechnung ist, aber nie zu den Kosten der Korrektheit. Mit anderen Worten, das Löschen von Daten aus einer Quelle könnte dazu führen, dass eine materialisierte Ansicht vollständig neu komputet wird.
Streamingtabellen: Löschen von Daten und Lesen der Streamingquelle mithilfe von skipChangeCommits
Streamingtabellen verarbeiten nur Anfügedaten, wenn sie aus Delta-Tabellenquellen streamen. Alle anderen Vorgänge, z. B. das Aktualisieren oder Löschen eines Datensatzes aus einer Streamingquelle, werden nicht unterstützt und brechen den Datenstrom auf.
Hinweis
Für eine robustere Streamingimplementierung streamen Sie stattdessen aus den Änderungsfeeds von Delta-Tabellen, und behandeln Sie Updates und Löschvorgänge in Ihrem Verarbeitungscode. Siehe Option 1: Streamen von einem Change Data Capture (CDC)-Feed.
Da das Streamen von Delta-Tabellen nur neue Daten verarbeitet, müssen Sie Änderungen an Daten selbst behandeln. Die empfohlene Methode ist: (1) Löschen von Daten in den Quell-Delta-Tabellen mithilfe von DML, (2) Löschen von Daten aus der Streamingtabelle mit DML und dann (3) Aktualisieren des Streaming-Lesevorgangs, um skipChangeCommits zu verwenden. Dieses Kennzeichen gibt an, dass die Streamingtabelle alles andere als Einfügungen überspringen soll, z. B. Aktualisierungen oder Löschungen.
Alternativ können Sie (1) Daten aus der Quelle löschen und dann (2) die Streamingtabelle vollständig aktualisieren. Wenn Sie eine Streamingtabelle vollständig aktualisieren, wird der Streamingstatus der Tabelle gelöscht und alle Daten erneut verarbeitet. Jede upstream-Datenquelle, die über ihren Aufbewahrungszeitraum hinausgeht (z. B. ein Kafka-Thema, das Daten nach 7 Tagen abaltert), wird nicht mehr verarbeitet, was zu Datenverlust führen kann. Wir empfehlen diese Option für Streamingtabellen nur in dem Szenario, in dem historische Daten verfügbar sind und die Verarbeitung erneut nicht kostspielig ist.
Beispiel: DSGVO- und CCPA-Compliance für ein E-Commerce-Unternehmen
Das folgende Diagramm zeigt eine Medallion-Architektur für ein E-Commerce-Unternehmen, bei dem die DSGVO & CCPA-Compliance implementiert werden muss. Obwohl die Daten eines Benutzers gelöscht werden, können Sie deren Aktivitäten in nachgeschalteten Aggregationen zählen.
-
Quelltabellen
-
source_users- Eine Streaming-Quelltabelle von Benutzern (für dieses Beispiel hier erstellt). Produktionsumgebungen verwenden in der Regel Kafka, Kinesis oder ähnliche Streamingplattformen. -
source_clicks- Eine Streaming-Quelltabelle mit Klicks (hier für das Beispiel erstellt). Produktionsumgebungen verwenden in der Regel Kafka, Kinesis oder ähnliche Streamingplattformen.
-
-
Kontrolltabelle
-
gdpr_requests- Steuerelementtabelle mit Benutzer-IDs, die dem "Recht auf Vergessenwerden" unterliegen. Wenn ein Benutzer die Entfernung anfordert, fügen Sie sie hier hinzu.
-
-
Bronzeschicht
-
users_bronze– Dimensionen der benutzenden Person. Enthält PII (z. B. E-Mail-Adresse). -
clicks_bronze– Klickereignisse. Enthält PII (z. B. IP-Adresse).
-
-
Silberschicht
-
clicks_silver- Bereinigte und standardisierte Klickdaten. -
users_silver– Bereinigte und standardisierte Benutzerdaten. -
user_clicks_silver- Verbindetclicks_silverStreaming mit einem Schnappschuss vonusers_silver.
-
-
Goldschicht
-
user_behavior_gold– Aggregierte Benutzerverhaltensmetriken. -
marketing_insights_gold- Benutzersegment für Markterkenntnisse.
-
Schritt 1: Auffüllen von Tabellen mit Beispieldaten
Der folgende Code erstellt diese beiden Tabellen für dieses Beispiel und füllt sie mit Beispieldaten auf:
-
source_usersenthält dimensionale Daten zu Benutzern. Diese Tabelle enthält eine PII-Spalte namensemail. -
source_clicksenthält Ereignisdaten zu Aktivitäten, die von Benutzern ausgeführt werden. Sie enthält eine PII-Spalte namensip_address.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType, DateType
catalog = "users"
schema = "name"
# Create table containing sample users
users_schema = StructType([
StructField('user_id', IntegerType(), False),
StructField('username', StringType(), True),
StructField('email', StringType(), True),
StructField('registration_date', StringType(), True),
StructField('user_preferences', MapType(StringType(), StringType()), True)
])
users_data = [
(1, 'alice', 'alice@example.com', '2021-01-01', {'theme': 'dark', 'language': 'en'}),
(2, 'bob', 'bob@example.com', '2021-02-15', {'theme': 'light', 'language': 'fr'}),
(3, 'charlie', 'charlie@example.com', '2021-03-10', {'theme': 'dark', 'language': 'es'}),
(4, 'david', 'david@example.com', '2021-04-20', {'theme': 'light', 'language': 'de'}),
(5, 'eve', 'eve@example.com', '2021-05-25', {'theme': 'dark', 'language': 'it'})
]
users_df = spark.createDataFrame(users_data, schema=users_schema)
users_df.write.mode("overwrite").saveAsTable(f"{catalog}.{schema}.source_users")
# Create table containing clickstream (i.e. user activities)
from pyspark.sql.types import TimestampType
clicks_schema = StructType([
StructField('click_id', IntegerType(), False),
StructField('user_id', IntegerType(), True),
StructField('url_clicked', StringType(), True),
StructField('click_timestamp', StringType(), True),
StructField('device_type', StringType(), True),
StructField('ip_address', StringType(), True)
])
clicks_data = [
(1001, 1, 'https://example.com/home', '2021-06-01T12:00:00', 'mobile', '192.168.1.1'),
(1002, 1, 'https://example.com/about', '2021-06-01T12:05:00', 'desktop', '192.168.1.1'),
(1003, 2, 'https://example.com/contact', '2021-06-02T14:00:00', 'tablet', '192.168.1.2'),
(1004, 3, 'https://example.com/products', '2021-06-03T16:30:00', 'mobile', '192.168.1.3'),
(1005, 4, 'https://example.com/services', '2021-06-04T10:15:00', 'desktop', '192.168.1.4'),
(1006, 5, 'https://example.com/blog', '2021-06-05T09:45:00', 'tablet', '192.168.1.5')
]
clicks_df = spark.createDataFrame(clicks_data, schema=clicks_schema)
clicks_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.source_clicks")
Schritt 2: Erstellen einer Pipeline, die PII-Daten verarbeitet
Der folgende Code erstellt Bronze-, Silber- und Goldschichten der oben gezeigten Medallion-Architektur.
from pyspark import pipelines as dp
from pyspark.sql.functions import col, concat_ws, count, countDistinct, avg, when, expr
catalog = "users"
schema = "name"
# ----------------------------
# Bronze Layer - Raw Data Ingestion
# ----------------------------
@dp.table(
name=f"{catalog}.{schema}.users_bronze",
comment='Raw users data loaded from source'
)
def users_bronze():
return (
spark.readStream.table(f"{catalog}.{schema}.source_users")
)
@dp.table(
name=f"{catalog}.{schema}.clicks_bronze",
comment='Raw clicks data loaded from source'
)
def clicks_bronze():
return (
spark.readStream.table(f"{catalog}.{schema}.source_clicks")
)
# ----------------------------
# Silver Layer - Data Cleaning and Enrichment
# ----------------------------
@dp.create_streaming_table(
name=f"{catalog}.{schema}.users_silver",
comment='Cleaned and standardized users data'
)
@dp.view
@dp.expect_or_drop('valid_email', "email IS NOT NULL")
def users_bronze_view():
return (
spark.readStream
.table(f"{catalog}.{schema}.users_bronze")
.withColumn('registration_date', col('registration_date').cast('timestamp'))
.dropDuplicates(['user_id', 'registration_date'])
.select('user_id', 'username', 'email', 'registration_date', 'user_preferences')
)
@dp.create_auto_cdc_flow(
target=f"{catalog}.{schema}.users_silver",
source="users_bronze_view",
keys=["user_id"],
sequence_by="registration_date",
)
@dp.table(
name=f"{catalog}.{schema}.clicks_silver",
comment='Cleaned and standardized clicks data'
)
@dp.expect_or_drop('valid_click_timestamp', "click_timestamp IS NOT NULL")
def clicks_silver():
return (
spark.readStream
.table(f"{catalog}.{schema}.clicks_bronze")
.withColumn('click_timestamp', col('click_timestamp').cast('timestamp'))
.withWatermark('click_timestamp', '10 minutes')
.dropDuplicates(['click_id'])
.select('click_id', 'user_id', 'url_clicked', 'click_timestamp', 'device_type', 'ip_address')
)
@dp.table(
name=f"{catalog}.{schema}.user_clicks_silver",
comment='Joined users and clicks data on user_id'
)
def user_clicks_silver():
# Read users_silver as a static DataFrame - each refresh
# will use a snapshot of the users_silver table.
users = spark.read.table(f"{catalog}.{schema}.users_silver")
# Read clicks_silver as a streaming DataFrame.
clicks = spark.readStream \
.table('clicks_silver')
# Perform the join - join of a static dataset with a
# streaming dataset creates a streaming table.
joined_df = clicks.join(users, on='user_id', how='inner')
return joined_df
# ----------------------------
# Gold Layer - Aggregated and Business-Level Data
# ----------------------------
@dp.materialized_view(
name=f"{catalog}.{schema}.user_behavior_gold",
comment='Aggregated user behavior metrics'
)
def user_behavior_gold():
df = spark.read.table(f"{catalog}.{schema}.user_clicks_silver")
return (
df.groupBy('user_id')
.agg(
count('click_id').alias('total_clicks'),
countDistinct('url_clicked').alias('unique_urls')
)
)
@dp.materialized_view(
name=f"{catalog}.{schema}.marketing_insights_gold",
comment='User segments for marketing insights'
)
def marketing_insights_gold():
df = spark.read.table(f"{catalog}.{schema}.user_behavior_gold")
return (
df.withColumn(
'engagement_segment',
when(col('total_clicks') >= 100, 'High Engagement')
.when((col('total_clicks') >= 50) & (col('total_clicks') < 100), 'Medium Engagement')
.otherwise('Low Engagement')
)
)
Schritt 3: Löschen von Daten in Quelltabellen
In diesem Schritt löschen Sie Daten in allen Tabellen, in denen PII gefunden wird. Mit der folgenden Funktion werden alle Instanzen der PII eines Benutzers aus Tabellen mit PII entfernt.
catalog = "users"
schema = "name"
def apply_gdpr_delete(user_id):
tables_with_pii = ["clicks_bronze", "users_bronze", "clicks_silver", "users_silver", "user_clicks_silver"]
for table in tables_with_pii:
print(f"Deleting user_id {user_id} from table {table}")
spark.sql(f"""
DELETE FROM {catalog}.{schema}.{table}
WHERE user_id = {user_id}
""")
Schritt 4: Hinzufügen von skipChangeCommits zu Definitionen betroffener Streamingtabellen
In diesem Schritt müssen Sie Lakeflow Spark Declarative Pipelines anweisen, zeilen ohne Anfüge zu überspringen. Fügen Sie die Option skipChangeCommits zu den folgenden Methoden hinzu. Sie müssen die Definitionen von materialisierten Ansichten nicht aktualisieren, da sie automatisch Aktualisierungen und Löschungen verarbeiten.
users_bronzeusers_silverclicks_bronzeclicks_silveruser_clicks_silver
Der folgende Code zeigt, wie die users_bronze Methode aktualisiert wird:
def users_bronze():
return (
spark.readStream.option('skipChangeCommits', 'true').table(f"{catalog}.{schema}.source_users")
)
Wenn Sie die Pipeline erneut ausführen, wird sie erfolgreich aktualisiert.