Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Este tutorial apresenta um exemplo de ponta a ponta de um fluxo de trabalho de Ciência de Dados do Synapse, no Microsoft Fabric. Você aprenderá a criar, treinar e avaliar modelos de elevação e aplicar técnicas de modelagem de elevação.
Pré-requisitos
Obtenha uma assinatura do Microsoft Fabric. Ou cadastre-se para uma avaliação gratuita do Microsoft Fabric.
Entre no Microsoft Fabric.
Alterne para o Fabric usando o seletor de experiência no lado inferior esquerdo da sua página inicial.
- Familiaridade com notebooks do Microsoft Fabric
- Um lakehouse para este notebook, para armazenar dados para este exemplo. Para obter mais informações, visite Adicionar um lakehouse ao seu notebook
Acompanhe em um bloco de anotações
Você pode acompanhar em um notebook de uma das duas maneiras:
- Abra e execute o notebook integrado.
- Carregue seu bloco de anotações no GitHub.
Abrir o bloco de anotações interno
O notebook de Modelagem de elevação de exemplo acompanha este tutorial.
Para abrir o bloco de anotações de exemplo para este tutorial, siga as instruções em Preparar seu sistema para tutoriais de ciência de dados.
Certifique-se de anexar um lakehouse ao notebook antes de começar a executar o código.
Importar o notebook do GitHub
O notebook AIsample - Uplift Modeling.ipynb acompanha este tutorial.
Para abrir o bloco de anotações que acompanha este tutorial, siga as instruções em Preparar seu sistema para tutoriais de ciência de dados, para importar o bloco de anotações para seu workspace.
Você pode criar um novo notebook ou, se preferir, copiar e colar o código desta página.
Certifique-se de anexar um lakehouse ao notebook antes de começar a executar o código.
Etapa 1: Carregar os dados
Conjunto de Dados
O Laboratório de IA do Criteo criou o conjunto de dados. Esse conjunto de dados tem 13 milhões de linhas. Cada linha representa um usuário. Cada linha tem 12 recursos, um indicador de tratamento e dois rótulos binários que incluem visita e conversão.
- f0 – f11: valores de recurso (valores densos e flutuantes)
- tratamento: se um usuário foi ou não direcionado aleatoriamente para tratamento (por exemplo, publicidade) (1 = tratamento, 0 = controle)
- conversão: se ocorreu uma conversão (por exemplo, fez uma compra) para um usuário (binário, rótulo)
- visita: se ocorreu uma conversão (por exemplo, fez uma compra) para um usuário (binário, rótulo)
Citação
- Página inicial do conjunto de dados: https://ailab.criteo.com/criteo-uplift-prediction-dataset/
O conjunto de dados usado para este notebook requer esta citação bibTex:
@inproceedings{Diemert2018,
author = {{Diemert Eustache, Betlei Artem} and Renaudin, Christophe and Massih-Reza, Amini},
title={A Large Scale Benchmark for Uplift Modeling},
publisher = {ACM},
booktitle = {Proceedings of the AdKDD and TargetAd Workshop, KDD, London,United Kingdom, August, 20, 2018},
year = {2018}
}
Dica
Ao definir os parâmetros a seguir, você pode aplicar esse notebook a conjuntos de dados diferentes facilmente.
IS_CUSTOM_DATA = False # If True, the user must upload the dataset manually
DATA_FOLDER = "Files/uplift-modelling"
DATA_FILE = "criteo-research-uplift-v2.1.csv"
# Data schema
FEATURE_COLUMNS = [f"f{i}" for i in range(12)]
TREATMENT_COLUMN = "treatment"
LABEL_COLUMN = "visit"
EXPERIMENT_NAME = "aisample-upliftmodelling" # MLflow experiment name
Importar bibliotecas
Antes do processamento, você deve importar as bibliotecas Spark e SynapseML necessárias. Você também deve importar uma biblioteca de visualização de dados – por exemplo, Seaborn, uma biblioteca de visualização de dados do Python. Uma biblioteca de visualização de dados fornece uma interface de alto nível para criar recursos visuais em DataFrames e matrizes. Saiba mais sobre Spark, SynapseML e Seaborn.
import os
import gzip
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.style as style
import seaborn as sns
%matplotlib inline
from synapse.ml.featurize import Featurize
from synapse.ml.core.spark import FluentAPI
from synapse.ml.lightgbm import *
from synapse.ml.train import ComputeModelStatistics
import mlflow
Baixar um conjunto de dados e carregar no lakehouse
Esse código baixa uma versão publicamente disponível do conjunto de dados e armazena o recurso em um Lakehouse do Fabric.
Importante
Certifique-se de Adicionar um lakehouse ao notebook antes de executá-lo. A falha ao fazer isso resultará em um erro.
if not IS_CUSTOM_DATA:
# Download demo data files into lakehouse if not exist
import os, requests
remote_url = "http://go.criteo.net/criteo-research-uplift-v2.1.csv.gz"
download_file = "criteo-research-uplift-v2.1.csv.gz"
download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError("Default lakehouse not found, please add a lakehouse and restart the session.")
os.makedirs(download_path, exist_ok=True)
if not os.path.exists(f"{download_path}/{DATA_FILE}"):
r = requests.get(f"{remote_url}", timeout=30)
with open(f"{download_path}/{download_file}", "wb") as f:
f.write(r.content)
with gzip.open(f"{download_path}/{download_file}", "rb") as fin:
with open(f"{download_path}/{DATA_FILE}", "wb") as fout:
fout.write(fin.read())
print("Downloaded demo data files into lakehouse.")
Comece a gravar o runtime deste notebook.
# Record the notebook running time
import time
ts = time.time()
Configurar o acompanhamento de experimentos do MLflow
Para estender os recursos de log do MLflow, o registro automático captura automaticamente os valores dos parâmetros de entrada e das métricas de saída de um modelo de machine learning durante seu treinamento. Essas informações são então registradas no workspace, onde as APIs do MLflow ou o experimento correspondente no workspace podem acessá-la e visualizá-la. Visite este recurso para obter mais informações sobre o registro automático.
# Set up the MLflow experiment
import mlflow
mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True) # Disable MLflow autologging
Nota
Para desabilitar o registro automático do Microsoft Fabric em uma sessão de notebook, chame mlflow.autolog() e defina disable=True.
Ler dados do lakehouse
Leia dados brutos da seção Arquivos do lakehouse e adicione mais colunas para diferentes partes de data. As mesmas informações são usadas para criar uma tabela delta particionada.
raw_df = spark.read.csv(f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True).cache()
Etapa 2: Análise de dados exploratória
Use o comando display para exibir estatísticas de alto nível sobre o conjunto de dados. Você também pode mostrar as exibições de gráfico para visualizar facilmente subconjuntos do conjunto de dados.
display(raw_df.limit(20))
Examine o percentual dos usuários que visitam, o percentual de usuários que convertem e o percentual dos visitantes que convertem.
raw_df.select(
F.mean("visit").alias("Percentage of users that visit"),
F.mean("conversion").alias("Percentage of users that convert"),
(F.sum("conversion") / F.sum("visit")).alias("Percentage of visitors that convert"),
).show()
A análise indica que 4,9% de usuários do grupo de tratamento – usuários que receberam o tratamento ou publicidade – visitaram a loja online. Apenas 3,8% de usuários do grupo de controle - usuários que nunca receberam o tratamento ou nunca foram oferecidos ou expostos à publicidade - fizeram o mesmo. Além disso, 0,31% de todos os usuários do grupo de tratamento converteram ou fizeram uma compra - enquanto apenas 0,19% de usuários do grupo de controle fizeram isso. Como resultado, a taxa de conversão de visitantes que fizeram uma compra, que também eram membros do grupo de tratamento, é 6,36%, em comparação com apenas 5,07%** para os usuários do grupo de controle. Com base nesses resultados, o tratamento pode potencialmente aprimorar a taxa de visitas em cerca de 1%e a taxa de conversão de visitantes em cerca de 1,3%. O tratamento leva a uma melhora significativa.
Etapa 3: Definir o modelo para treinamento
Preparar o treinamento e testar os conjuntos de dados
Aqui, você ajusta um transformador Featurize ao DataFrame raw_df, para extrair recursos das colunas de entrada especificadas e gerar esses recursos para uma nova coluna chamada features.
O DataFrame resultante é armazenado em um novo DataFrame chamado df.
transformer = Featurize().setOutputCol("features").setInputCols(FEATURE_COLUMNS).fit(raw_df)
df = transformer.transform(raw_df)
# Split the DataFrame into training and test sets, with a 80/20 ratio and a seed of 42
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# Print the training and test dataset sizes
print("Size of train dataset: %d" % train_df.count())
print("Size of test dataset: %d" % test_df.count())
# Group the training dataset by the treatment column, and count the number of occurrences of each value
train_df.groupby(TREATMENT_COLUMN).count().show()
Preparar os conjuntos de dados de tratamento e controle
Depois de criar os conjuntos de dados de treinamento e teste, você também precisa formar os conjuntos de dados de tratamento e controle para treinar os modelos de machine learning para medir a elevação.
# Extract the treatment and control DataFrames
treatment_train_df = train_df.where(f"{TREATMENT_COLUMN} > 0")
control_train_df = train_df.where(f"{TREATMENT_COLUMN} = 0")
Agora que você preparou seus dados, poderá continuar a treinar um modelo com o LightGBM.
Modelagem de elevação: T-Learner com LightGBM
Meta-aprendizes são um conjunto de algoritmos, criados com base em algoritmos de aprendizado de máquina como LightGBM, Xgboost etc. Eles ajudam a estimar o CATE (efeito de tratamento médio condicional). O T-learner é um meta-aprendiz que não usa um único modelo. Em vez disso, o T-learner usa um modelo por variável de tratamento. Portanto, dois modelos são desenvolvidos e nos referimos ao meta-aprendiz como T-learner. O T-learner usa vários modelos de machine learning para superar o problema de descartar totalmente o tratamento, forçando o aprendiz a primeiro dividir por ele.
mlflow.autolog(exclusive=False)
classifier = (
LightGBMClassifier(dataTransferMode="bulk")
.setFeaturesCol("features") # Set the column name for features
.setNumLeaves(10) # Set the number of leaves in each decision tree
.setNumIterations(100) # Set the number of boosting iterations
.setObjective("binary") # Set the objective function for binary classification
.setLabelCol(LABEL_COLUMN) # Set the column name for the label
)
# Start a new MLflow run with the name "uplift"
active_run = mlflow.start_run(run_name="uplift")
# Start a new nested MLflow run with the name "treatment"
with mlflow.start_run(run_name="treatment", nested=True) as treatment_run:
treatment_run_id = treatment_run.info.run_id # Get the ID of the treatment run
treatment_model = classifier.fit(treatment_train_df) # Fit the classifier on the treatment training data
# Start a new nested MLflow run with the name "control"
with mlflow.start_run(run_name="control", nested=True) as control_run:
control_run_id = control_run.info.run_id # Get the ID of the control run
control_model = classifier.fit(control_train_df) # Fit the classifier on the control training data
Usar o conjunto de dados de teste para uma previsão
Aqui, você usa o treatment_model e control_model, ambos definidos anteriormente, para transformar o conjunto de dados de teste de test_df. Em seguida, você calcula a elevação prevista. Você define a elevação prevista como a diferença entre o resultado do tratamento previsto e o resultado do controle previsto. Quanto maior essa diferença de elevação prevista, maior a eficácia do tratamento (por exemplo, publicidade) em um indivíduo ou em um subgrupo.
getPred = F.udf(lambda v: float(v[1]), FloatType())
# Cache the resulting DataFrame for easier access
test_pred_df = (
test_df.mlTransform(treatment_model)
.withColumn("treatment_pred", getPred("probability"))
.drop("rawPrediction", "probability", "prediction")
.mlTransform(control_model)
.withColumn("control_pred", getPred("probability"))
.drop("rawPrediction", "probability", "prediction")
.withColumn("pred_uplift", F.col("treatment_pred") - F.col("control_pred"))
.select(TREATMENT_COLUMN, LABEL_COLUMN, "treatment_pred", "control_pred", "pred_uplift")
.cache()
)
# Display the first twenty rows of the resulting DataFrame
display(test_pred_df.limit(20))
Executar avaliação de modelo
Como a elevação real não pode ser observada para cada indivíduo, você precisa medir a elevação sobre um grupo de indivíduos. Você usa uma Curva de Elevação que plota a elevação real e cumulativa em toda a população.
O eixo x representa a proporção da população selecionada para o tratamento. Um valor de 0 sugere nenhum grupo de tratamento - ninguém é exposto ou oferecido ao tratamento. Um valor de 1 sugere um grupo de tratamento completo - todos são expostos ou oferecidos ao tratamento. O eixo y mostra a medida de elevação. O objetivo é localizar o tamanho do grupo de tratamento ou o percentual da população que seria oferecida ou exposta ao tratamento (por exemplo, publicidade). Essa abordagem otimiza a seleção de destino para otimizar o resultado.
Primeiro, organize a ordem do DataFrame de teste pela elevação prevista. A elevação prevista é a diferença entre o resultado do tratamento previsto e o resultado do controle previsto.
# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))
display(test_ranked_df.limit(20))
Em seguida, calcule o percentual cumulativo de visitas nos grupos de tratamento e controle.
# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()
# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
test_ranked_df.withColumn(
"control_label",
F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"treatment_label",
F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"control_cumsum",
F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
)
.withColumn(
"treatment_cumsum",
F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
)
)
# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))
Por fim, em cada porcentagem, calcule a elevação do grupo como a diferença entre o percentual cumulativo de visitas entre os grupos de tratamento e controle.
test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))
Agora, plote a curva de elevação para a previsão do conjunto de dados de teste. Você deve converter o DataFrame do PySpark em um DataFrame do Pandas antes de plotar.
def uplift_plot(uplift_df):
"""
Plot the uplift curve
"""
gain_x = uplift_df.percent_rank
gain_y = uplift_df.group_uplift
# Plot the data
fig = plt.figure(figsize=(10, 6))
mpl.rcParams["font.size"] = 8
ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")
plt.plot(
[0, gain_x.max()],
[0, gain_y.max()],
"--",
color="tab:orange",
label="Random Treatment",
)
plt.legend()
plt.xlabel("Porportion Targeted")
plt.ylabel("Uplift")
plt.grid()
return fig, ax
test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)
mlflow.log_figure(fig, "UpliftCurve.png")
O eixo x representa a proporção da população selecionada para o tratamento. Um valor de 0 sugere nenhum grupo de tratamento - ninguém é exposto ou oferecido ao tratamento. Um valor de 1 sugere um grupo de tratamento completo - todos são expostos ou oferecidos ao tratamento. O eixo y mostra a medida de elevação. O objetivo é localizar o tamanho do grupo de tratamento ou o percentual da população que seria oferecida ou exposta ao tratamento (por exemplo, publicidade). Essa abordagem otimiza a seleção de destino para otimizar o resultado.
Primeiro, organize a ordem do DataFrame de teste pela elevação prevista. A elevação prevista é a diferença entre o resultado do tratamento previsto e o resultado do controle previsto.
# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))
display(test_ranked_df.limit(20))
Em seguida, calcule o percentual cumulativo de visitas nos grupos de tratamento e controle.
# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()
# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
test_ranked_df.withColumn(
"control_label",
F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"treatment_label",
F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"control_cumsum",
F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
)
.withColumn(
"treatment_cumsum",
F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
)
)
# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))
Por fim, em cada porcentagem, calcule a elevação do grupo como a diferença entre o percentual cumulativo de visitas entre os grupos de tratamento e controle.
test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))
Agora, plote a curva de elevação para a previsão do conjunto de dados de teste. Você deve converter o DataFrame do PySpark em um DataFrame do Pandas antes de plotar.
def uplift_plot(uplift_df):
"""
Plot the uplift curve
"""
gain_x = uplift_df.percent_rank
gain_y = uplift_df.group_uplift
# Plot the data
fig = plt.figure(figsize=(10, 6))
mpl.rcParams["font.size"] = 8
ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")
plt.plot(
[0, gain_x.max()],
[0, gain_y.max()],
"--",
color="tab:orange",
label="Random Treatment",
)
plt.legend()
plt.xlabel("Porportion Targeted")
plt.ylabel("Uplift")
plt.grid()
return fig, ax
test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)
mlflow.log_figure(fig, "UpliftCurve.png")
A análise e a curva de elevação mostram que os 20% superiores da população, conforme classificados pela previsão, teriam um grande ganho caso recebessem o tratamento. Isso significa que os 20% melhores da população representam o grupo de persuadíveis. Portanto, você pode então definir a pontuação de corte para o tamanho desejado do grupo de tratamento em 20%, para identificar os clientes-alvo selecionados de modo a obter o maior impacto.
cutoff_percentage = 0.2
cutoff_score = test_ranked_pd_df.iloc[int(len(test_ranked_pd_df) * cutoff_percentage)][
"pred_uplift"
]
print("Uplift scores that exceed {:.4f} map to Persuadables.".format(cutoff_score))
mlflow.log_metrics(
{"cutoff_score": cutoff_score, "cutoff_percentage": cutoff_percentage}
)
Etapa 4: Registrar o modelo de ML final
Você usa o MLflow para acompanhar e registrar todos os experimentos em grupos de tratamento e controle. Este acompanhamento e registro incluem os parâmetros, as métricas e os modelos correspondentes. Essas informações são registradas no nome do experimento, no workspace, para uso posterior.
# Register the model
treatment_model_uri = "runs:/{}/model".format(treatment_run_id)
mlflow.register_model(treatment_model_uri, f"{EXPERIMENT_NAME}-treatmentmodel")
control_model_uri = "runs:/{}/model".format(control_run_id)
mlflow.register_model(control_model_uri, f"{EXPERIMENT_NAME}-controlmodel")
mlflow.end_run()
Para visualizar seus experimentos:
- No painel esquerdo, selecione seu workspace.
- Localize e selecione o nome do experimento, neste caso, aisample-upliftmodelling.
Etapa 5: salvar os resultados da previsão
O Microsoft Fabric oferece PREDICT – uma função escalonável que dá suporte à pontuação em lote em qualquer mecanismo de computação. Ele permite que os clientes operacionalizem modelos de machine learning. Os usuários podem criar previsões em lotes diretamente de um bloco de anotações ou da página de item para um modelo específico. Visite este recurso para saber mais sobre PREDICT e saiba como usar PREDICT no Microsoft Fabric.
# Load the model back
loaded_treatmentmodel = mlflow.spark.load_model(treatment_model_uri, dfs_tmpdir="Files/spark")
loaded_controlmodel = mlflow.spark.load_model(control_model_uri, dfs_tmpdir="Files/spark")
# Make predictions
batch_predictions_treatment = loaded_treatmentmodel.transform(test_df)
batch_predictions_control = loaded_controlmodel.transform(test_df)
batch_predictions_treatment.show(5)
# Save the predictions in the lakehouse
batch_predictions_treatment.write.format("delta").mode("overwrite").save(
f"{DATA_FOLDER}/predictions/batch_predictions_treatment"
)
batch_predictions_control.write.format("delta").mode("overwrite").save(
f"{DATA_FOLDER}/predictions/batch_predictions_control"
)
# Determine the entire runtime
print(f"Full run cost {int(time.time() - ts)} seconds.")