Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
O Regulamento Geral de Proteção de Dados (GDPR) e a Lei de Privacidade do Consumidor da Califórnia (CCPA) são regulamentos de privacidade e segurança de dados que exigem que as empresas excluam permanente e completamente todas as informações de identificação pessoal (PII) coletadas sobre um cliente mediante solicitação explícita. Também conhecido como "direito a ser esquecido" (RTBF) ou "direito ao apagamento de dados", os pedidos de exclusão devem ser executados durante um período especificado (por exemplo, dentro de um mês civil).
Este artigo orienta você sobre como implementar o RTBF em dados armazenados no Databricks. O exemplo incluído neste artigo modela conjuntos de dados para uma empresa de comércio eletrônico e mostra como excluir dados em tabelas de origem e propagar essas alterações para tabelas downstream.
Plano pormenorizado para a aplicação do «direito a ser esquecido»
O diagrama a seguir ilustra como implementar o "direito a ser esquecido".
Exclusões de pontos com Delta Lake
O Delta Lake acelera as exclusões de pontos em grandes data lakes com transações ACID, permitindo que você localize e remova informações pessoalmente identificáveis (PII) em resposta a solicitações de GDPR ou CCPA do consumidor.
O Delta Lake retém o histórico da tabela e o disponibiliza para consultas point-in-time e reversões. A função VACUUM remove arquivos de dados que não são mais referenciados por uma tabela Delta e são mais antigos do que um limite de retenção especificado, excluindo permanentemente os dados. Para saber mais sobre padrões e recomendações, consulte Trabalhar com o histórico da tabela Delta Lake.
Garantir que os dados sejam excluídos ao usar vetores de exclusão
Para tabelas com vetores de exclusão habilitados, depois de excluir registros, você também deve executar REORG TABLE ... APPLY (PURGE) para excluir permanentemente os registros subjacentes. Isso inclui tabelas Delta Lake, visualizações materializadas e tabelas de streaming. Consulte Aplicar alterações aos arquivos de dados do Parquet.
Excluir dados em fontes upstream
O GDPR e a CCPA aplicam-se a todos os dados, incluindo dados em fontes fora do Delta Lake, como Kafka, arquivos e bancos de dados. Além de excluir dados no Databricks, você também deve se lembrar de excluir dados em fontes upstream, como filas e armazenamento em nuvem.
Observação
Antes de implementar fluxos de trabalho de eliminação de dados, pode ser necessário exportar os dados do espaço de trabalho para fins de conformidade ou backup. Ver Exportar dados do espaço de trabalho.
A eliminação completa é preferível à ofuscação
Você tem que escolher entre excluir dados ou ofuscá-los. A ofuscação pode ser implementada usando pseudonimização, mascaramento de dados, etc. No entanto, a opção mais segura é o apagamento total porque, na prática, a eliminação do risco de reidentificação exige frequentemente uma eliminação completa dos dados de PII.
Exclua dados na camada bronze e, em seguida, propague exclusões para as camadas prata e ouro
Recomendamos que comece a cumprir o RGPD e CCPA eliminando primeiro os dados da camada bronze, impulsionado por um trabalho agendado que consulta uma tabela de pedidos de eliminação. Depois que os dados são excluídos da camada de bronze, as alterações podem ser propagadas para as camadas de prata e ouro.
Manter regularmente tabelas para remover dados de arquivos históricos
Por padrão, o Delta Lake retém o histórico da tabela, incluindo registros excluídos, por 30 dias e o disponibiliza para viagens no tempo e reversões. Mas mesmo que as versões anteriores dos dados sejam removidas, os dados ainda são retidos no armazenamento em nuvem. Por isso, deve manter regularmente conjuntos de dados para remover versões anteriores dos dados. A maneira recomendada é otimização preditiva para tabelas gerenciadas do Unity Catalog, que mantém de forma inteligente as tabelas de streaming e as visualizações materializadas.
- Para tabelas gerenciadas por otimização preditiva, o Lakeflow Spark Declarative Pipelines mantém de forma inteligente tabelas de streaming e exibições materializadas, com base em padrões de uso.
- Para tabelas sem otimização preditiva habilitada, o Lakeflow Spark Declarative Pipelines executa automaticamente tarefas de manutenção dentro de 24 horas após a atualização de tabelas de streaming e visualizações materializadas.
Se você não estiver usando otimização preditiva ou Lakeflow Spark Declarative Pipelines, deverá executar um VACUUM comando em tabelas Delta para remover permanentemente versões anteriores de dados. Por padrão, isso reduzirá os recursos de viagem no tempo para 7 dias, que é uma configuração configurável, e removerá versões históricas dos dados em questão do armazenamento em nuvem também.
Excluir dados de PII da camada bronze
Dependendo do design do seu lakehouse, poderá cortar o vínculo entre dados de utilizador com informação pessoal identificável (PII) e não identificável. Por exemplo, se você estiver usando uma chave não natural, como user_id, em vez de uma chave natural, como e-mail, poderá excluir dados de PII, o que deixa dados não PII no lugar.
O restante deste artigo lida com a RTBF, eliminando completamente os registos de utilizador de todas as tabelas bronze. Você pode excluir dados executando um comando DELETE, conforme mostrado no código a seguir:
spark.sql("DELETE FROM bronze.users WHERE user_id = 5")
Ao excluir um grande número de registros juntos ao mesmo tempo, recomendamos usar o comando MERGE. O código abaixo pressupõe que você tenha uma tabela de controle chamada gdpr_control_table que contém uma coluna user_id. Você insere um registro nesta tabela para cada usuário que solicitou o "direito a ser esquecido" nesta tabela.
O comando MERGE especifica a condição para linhas correspondentes. Neste exemplo, ele faz a correspondência entre registros de target_table e registros em gdpr_control_table com base no user_id. Se houver uma correspondência (por exemplo, uma user_id no target_table e no gdpr_control_table), a linha no target_table será excluída. Depois que esse comando MERGE for bem-sucedido, atualize a tabela de controle para confirmar que a solicitação foi processada.
spark.sql("""
MERGE INTO target
USING (
SELECT user_id
FROM gdpr_control_table
) AS source
ON target.user_id = source.user_id
WHEN MATCHED THEN DELETE
""")
Propagar mudanças das camadas de bronze para as camadas de prata e ouro
Depois que os dados forem excluídos na camada bronze, você deverá propagar as alterações para tabelas nas camadas prata e ouro.
Visualizações materializadas: manipule exclusões automaticamente
As visualizações materializadas lidam automaticamente com exclusões em fontes de dados. Assim, você não precisa fazer nada de especial para garantir que uma exibição materializada não contenha dados que foram excluídos de uma fonte. Você deve atualizar uma exibição materializada e executar a manutenção para garantir que as exclusões sejam completamente processadas.
Uma visão materializada sempre retorna o resultado correto porque usa computação incremental se for mais barata do que a recomputação completa, mas nunca ao custo da correção. Em outras palavras, excluir dados de uma fonte poderá levar a que uma visão materializada seja totalmente recomputada.
Tabelas de streaming: exclua dados e leia a fonte de streaming usando skipChangeCommits
As tabelas de streaming processam dados de adição única ao serem transmitidos de fontes de tabelas Delta. Qualquer outra operação, como atualizar ou eliminar um registo de uma fonte de streaming, não é suportada e interrompe o fluxo.
Observação
Para uma implementação de streaming mais robusta, transmita a partir dos fluxos de alterações das tabelas Delta e trate das atualizações e eliminações no seu código de processamento. Ver Opção 1: Transmitir a partir de um feed de captura de dados de alteração (CDC).
Como o streaming a partir das tabelas Delta trata apenas dos dados novos, deve gerir as alterações aos dados por si próprio. O método recomendado é: (1) eliminar dados nas tabelas Delta de origem usando DML, (2) eliminar dados da tabela de streaming usando DML, e depois (3) atualizar a leitura de streaming para utilizar skipChangeCommits. Esta indicação indica que a tabela de streaming deve ignorar qualquer coisa que não sejam inserções, como atualizações ou remoções.
Alternativamente, pode (1) eliminar dados da fonte e depois (2) atualizar completamente a tabela de streaming. Quando você atualiza totalmente uma tabela de streaming, ela limpa o estado de streaming da tabela e reprocessa todos os dados novamente. Qualquer fonte de dados upstream que esteja além de seu período de retenção (por exemplo, um tópico Kafka que envelhece os dados após 7 dias) não será processada novamente, o que pode causar perda de dados. Recomendamos esta opção para streaming de tabelas apenas no cenário em que os dados históricos estão disponíveis e processá-los novamente não será caro.
Exemplo: conformidade com RGPD e CCPA para uma empresa de comércio eletrónico
O diagrama a seguir mostra uma arquitetura medalhão para uma empresa de comércio eletrônico onde a conformidade com o GDPR & a CCPA precisa ser implementada. Mesmo que os dados de um usuário sejam excluídos, convém contar suas atividades em agregações downstream.
-
Tabelas de origem
-
source_users- Uma tabela de fontes de streaming dos utilizadores (criada aqui, para o exemplo). Os ambientes de produção normalmente utilizam Kafka, Kineses ou plataformas de streaming semelhantes. -
source_clicks- Uma tabela de fontes de streaming de cliques (criada aqui, para o exemplo). Os ambientes de produção normalmente utilizam Kafka, Kineses ou plataformas de streaming semelhantes.
-
-
Tabela de controlo
-
gdpr_requests- Tabela de controle contendo IDs de usuário sujeitos ao "direito a ser esquecido". Quando um utilizador pedir para ser removido, adicione-o aqui.
-
-
Camada de bronze
-
users_bronze- Dimensões do utilizador. Contém PII (por exemplo, endereço de e-mail). -
clicks_bronze- Clique em eventos. Contém PII (por exemplo, endereço IP).
-
-
Camada de prata
-
clicks_silver- Dados de cliques limpos e padronizados. -
users_silver- Dados de utilizador limpos e padronizados. -
user_clicks_silver- Junta-seclicks_silver(streaming) a um instantâneo deusers_silver.
-
-
Camada de ouro
-
user_behavior_gold- Métricas agregadas de comportamento do utilizador. -
marketing_insights_gold- Segmento de utilizador para análises de mercado.
-
Etapa 1: preencher tabelas com dados de exemplo
O código seguinte cria estas duas tabelas para este exemplo e preenche-as com dados de exemplo:
-
source_userscontém dados dimensionais sobre os usuários. Esta tabela contém uma coluna PII chamadaemail. -
source_clickscontém dados de eventos sobre atividades realizadas pelos usuários. Ele contém uma coluna PII chamadaip_address.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType, DateType
catalog = "users"
schema = "name"
# Create table containing sample users
users_schema = StructType([
StructField('user_id', IntegerType(), False),
StructField('username', StringType(), True),
StructField('email', StringType(), True),
StructField('registration_date', StringType(), True),
StructField('user_preferences', MapType(StringType(), StringType()), True)
])
users_data = [
(1, 'alice', 'alice@example.com', '2021-01-01', {'theme': 'dark', 'language': 'en'}),
(2, 'bob', 'bob@example.com', '2021-02-15', {'theme': 'light', 'language': 'fr'}),
(3, 'charlie', 'charlie@example.com', '2021-03-10', {'theme': 'dark', 'language': 'es'}),
(4, 'david', 'david@example.com', '2021-04-20', {'theme': 'light', 'language': 'de'}),
(5, 'eve', 'eve@example.com', '2021-05-25', {'theme': 'dark', 'language': 'it'})
]
users_df = spark.createDataFrame(users_data, schema=users_schema)
users_df.write.mode("overwrite").saveAsTable(f"{catalog}.{schema}.source_users")
# Create table containing clickstream (i.e. user activities)
from pyspark.sql.types import TimestampType
clicks_schema = StructType([
StructField('click_id', IntegerType(), False),
StructField('user_id', IntegerType(), True),
StructField('url_clicked', StringType(), True),
StructField('click_timestamp', StringType(), True),
StructField('device_type', StringType(), True),
StructField('ip_address', StringType(), True)
])
clicks_data = [
(1001, 1, 'https://example.com/home', '2021-06-01T12:00:00', 'mobile', '192.168.1.1'),
(1002, 1, 'https://example.com/about', '2021-06-01T12:05:00', 'desktop', '192.168.1.1'),
(1003, 2, 'https://example.com/contact', '2021-06-02T14:00:00', 'tablet', '192.168.1.2'),
(1004, 3, 'https://example.com/products', '2021-06-03T16:30:00', 'mobile', '192.168.1.3'),
(1005, 4, 'https://example.com/services', '2021-06-04T10:15:00', 'desktop', '192.168.1.4'),
(1006, 5, 'https://example.com/blog', '2021-06-05T09:45:00', 'tablet', '192.168.1.5')
]
clicks_df = spark.createDataFrame(clicks_data, schema=clicks_schema)
clicks_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.source_clicks")
Etapa 2: Criar um pipeline que processa dados de PII
O código a seguir cria as camadas de bronze, prata e ouro da arquitetura do medalhão mostrada acima.
from pyspark import pipelines as dp
from pyspark.sql.functions import col, concat_ws, count, countDistinct, avg, when, expr
catalog = "users"
schema = "name"
# ----------------------------
# Bronze Layer - Raw Data Ingestion
# ----------------------------
@dp.table(
name=f"{catalog}.{schema}.users_bronze",
comment='Raw users data loaded from source'
)
def users_bronze():
return (
spark.readStream.table(f"{catalog}.{schema}.source_users")
)
@dp.table(
name=f"{catalog}.{schema}.clicks_bronze",
comment='Raw clicks data loaded from source'
)
def clicks_bronze():
return (
spark.readStream.table(f"{catalog}.{schema}.source_clicks")
)
# ----------------------------
# Silver Layer - Data Cleaning and Enrichment
# ----------------------------
@dp.create_streaming_table(
name=f"{catalog}.{schema}.users_silver",
comment='Cleaned and standardized users data'
)
@dp.view
@dp.expect_or_drop('valid_email', "email IS NOT NULL")
def users_bronze_view():
return (
spark.readStream
.table(f"{catalog}.{schema}.users_bronze")
.withColumn('registration_date', col('registration_date').cast('timestamp'))
.dropDuplicates(['user_id', 'registration_date'])
.select('user_id', 'username', 'email', 'registration_date', 'user_preferences')
)
@dp.create_auto_cdc_flow(
target=f"{catalog}.{schema}.users_silver",
source="users_bronze_view",
keys=["user_id"],
sequence_by="registration_date",
)
@dp.table(
name=f"{catalog}.{schema}.clicks_silver",
comment='Cleaned and standardized clicks data'
)
@dp.expect_or_drop('valid_click_timestamp', "click_timestamp IS NOT NULL")
def clicks_silver():
return (
spark.readStream
.table(f"{catalog}.{schema}.clicks_bronze")
.withColumn('click_timestamp', col('click_timestamp').cast('timestamp'))
.withWatermark('click_timestamp', '10 minutes')
.dropDuplicates(['click_id'])
.select('click_id', 'user_id', 'url_clicked', 'click_timestamp', 'device_type', 'ip_address')
)
@dp.table(
name=f"{catalog}.{schema}.user_clicks_silver",
comment='Joined users and clicks data on user_id'
)
def user_clicks_silver():
# Read users_silver as a static DataFrame - each refresh
# will use a snapshot of the users_silver table.
users = spark.read.table(f"{catalog}.{schema}.users_silver")
# Read clicks_silver as a streaming DataFrame.
clicks = spark.readStream \
.table('clicks_silver')
# Perform the join - join of a static dataset with a
# streaming dataset creates a streaming table.
joined_df = clicks.join(users, on='user_id', how='inner')
return joined_df
# ----------------------------
# Gold Layer - Aggregated and Business-Level Data
# ----------------------------
@dp.materialized_view(
name=f"{catalog}.{schema}.user_behavior_gold",
comment='Aggregated user behavior metrics'
)
def user_behavior_gold():
df = spark.read.table(f"{catalog}.{schema}.user_clicks_silver")
return (
df.groupBy('user_id')
.agg(
count('click_id').alias('total_clicks'),
countDistinct('url_clicked').alias('unique_urls')
)
)
@dp.materialized_view(
name=f"{catalog}.{schema}.marketing_insights_gold",
comment='User segments for marketing insights'
)
def marketing_insights_gold():
df = spark.read.table(f"{catalog}.{schema}.user_behavior_gold")
return (
df.withColumn(
'engagement_segment',
when(col('total_clicks') >= 100, 'High Engagement')
.when((col('total_clicks') >= 50) & (col('total_clicks') < 100), 'Medium Engagement')
.otherwise('Low Engagement')
)
)
Etapa 3: Excluir dados em tabelas de origem
Neste passo, apaga dados em todas as tabelas onde são encontradas as PII. A função seguinte remove todas as instâncias de PII de um utilizador das tabelas que contêm PII.
catalog = "users"
schema = "name"
def apply_gdpr_delete(user_id):
tables_with_pii = ["clicks_bronze", "users_bronze", "clicks_silver", "users_silver", "user_clicks_silver"]
for table in tables_with_pii:
print(f"Deleting user_id {user_id} from table {table}")
spark.sql(f"""
DELETE FROM {catalog}.{schema}.{table}
WHERE user_id = {user_id}
""")
Etapa 4: Adicionar skipChangeCommits às definições de tabelas de streaming afetadas
Nesta etapa, deve indicar ao Lakeflow Spark Declarative Pipelines para ignorar linhas não adicionadas. Adicione a opção skipChangeCommits aos seguintes métodos. Não tens de atualizar as definições das visualizações materializadas porque elas tratam automaticamente das atualizações e eliminações.
users_bronzeusers_silverclicks_bronzeclicks_silveruser_clicks_silver
O código a seguir mostra como atualizar o método users_bronze:
def users_bronze():
return (
spark.readStream.option('skipChangeCommits', 'true').table(f"{catalog}.{schema}.source_users")
)
Quando executares o pipeline novamente, ele será atualizado com sucesso.