Partilhar via


Tutorial: Criar um aplicativo de aprendizado de máquina com o Apache Spark MLlib e o Azure Synapse Analytics

Neste artigo, você aprenderá a usar o Apache Spark MLlib para criar um aplicativo de aprendizado de máquina que faz análise preditiva simples em um conjunto de dados aberto do Azure. O Spark fornece bibliotecas de aprendizado de máquina integradas. Este exemplo usa a classificação por meio de regressão logística.

SparkML e MLlib são bibliotecas Spark principais que fornecem muitos utilitários que são úteis para tarefas de aprendizado de máquina, incluindo utilitários que são adequados para:

  • Classificação
  • Regressão
  • Clusterização
  • Modelagem de tópicos
  • Decomposição de Valores Singulares (DVS) e Análise de Componentes Principais (ACP)
  • Teste de hipóteses e cálculo de estatísticas amostrais

Compreender a classificação e a regressão logística

A classificação, uma tarefa popular de aprendizado de máquina, é o processo de classificar dados de entrada em categorias. É tarefa de um algoritmo de classificação descobrir como atribuir rótulos aos dados de entrada que você fornece. Por exemplo, você pode pensar em um algoritmo de aprendizado de máquina que aceita informações de ações como entrada e dividir as ações em duas categorias: ações que você deve vender e ações que você deve manter.

A regressão logística é um algoritmo que pode ser usado para classificação. A API de regressão logística do Spark é útil para classificação binária ou classificação de dados de entrada em um de dois grupos. Para obter mais informações sobre regressão logística, consulte Wikipedia.

Em resumo, o processo de regressão logística produz uma função logística que você pode usar para prever a probabilidade de que um vetor de entrada pertença a um grupo ou outro.

Exemplo de análise preditiva em dados de táxi de Nova York

Neste exemplo, você usa o Spark para executar algumas análises preditivas em dados de dicas de viagem de táxi de Nova York. Os dados estão disponíveis através dos Conjuntos de Dados Abertos do Azure. Este subconjunto do conjunto de dados contém informações sobre viagens de táxi amarelo, incluindo informações sobre cada viagem, a hora e locais de início e fim, o custo e outros atributos interessantes.

Importante

Pode haver cobranças adicionais para extrair esses dados de seu local de armazenamento.

Nas etapas a seguir, você desenvolve um modelo para prever se uma viagem específica inclui uma dica ou não.

Criar um modelo de aprendizado de máquina do Apache Spark

  1. Crie um bloco de anotações usando o kernel do PySpark. Para obter instruções, consulte Criar um bloco de anotações.

  2. Importe os tipos necessários para este aplicativo. Copie e cole o código a seguir em uma célula vazia e pressione Shift+Enter. Ou execute a célula usando o ícone de reprodução azul à esquerda do código.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    

    Devido ao kernel do PySpark, você não precisa criar nenhum contexto explicitamente. O contexto do Spark é criado automaticamente para você quando você executa a primeira célula de código.

Construir o DataFrame de entrada

Como os dados brutos estão em um formato Parquet, você pode usar o contexto do Spark para carregar o arquivo diretamente para a memória como um DataFrame. Embora o código nas etapas a seguir use as opções padrão, é possível forçar o mapeamento de tipos de dados e outros atributos de esquema, se necessário.

  1. Execute as seguintes linhas para criar um Spark DataFrame colando o código em uma nova célula. Esta etapa recupera os dados por meio da API Open Datasets. Extrair todos esses dados gera cerca de 1,5 bilhão de linhas.

    Dependendo do tamanho do seu pool Apache Spark sem servidor, os dados brutos podem ser muito grandes ou levar muito tempo para operar. Você pode filtrar esses dados para algo menor. O exemplo de código a seguir usa start_date e end_date para aplicar um filtro que retorna um único mês de dados.

    from azureml.opendatasets import NycTlcYellow
    
    from datetime import datetime
    from dateutil import parser
    
    end_date = parser.parse('2018-05-08 00:00:00')
    start_date = parser.parse('2018-05-01 00:00:00')
    
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    filtered_df = spark.createDataFrame(nyc_tlc.to_pandas_dataframe())
    
    
  2. A desvantagem da filtragem simples é que, de uma perspetiva estatística, ela pode introduzir enviesamento nos dados. Outra abordagem é usar a amostragem incorporada no Spark.

    O código a seguir reduz o conjunto de dados para cerca de 2.000 linhas, se for aplicado após o código anterior. Você pode usar esta etapa de amostragem em vez do filtro simples ou em conjunto com o filtro simples.

    # To make development easier, faster, and less expensive, downsample for now
    sampled_taxi_df = filtered_df.sample(True, 0.001, seed=1234)
    
  3. Agora é possível olhar para os dados para ver o que foi lido. Normalmente, é melhor revisar dados com um subconjunto em vez do conjunto completo, dependendo do tamanho do conjunto de dados.

    O código a seguir oferece duas maneiras de exibir os dados. A primeira maneira é básica. A segunda maneira fornece uma experiência de grade muito mais rica, juntamente com a capacidade de visualizar os dados graficamente.

    #sampled_taxi_df.show(5)
    display(sampled_taxi_df)
    
  4. Dependendo do tamanho do conjunto de dados gerado e da sua necessidade de experimentar ou executar o bloco de anotações muitas vezes, convém armazenar o conjunto de dados em cache localmente no espaço de trabalho. Há três maneiras de executar o cache explícito:

    • Salve o DataFrame localmente como um arquivo.
    • Salve o DataFrame como uma tabela ou exibição temporária.
    • Salve o DataFrame como uma tabela permanente.

As duas primeiras dessas abordagens estão incluídas nos exemplos de código a seguir.

A criação de uma tabela ou exibição temporária fornece caminhos de acesso diferentes aos dados, mas dura apenas a duração da sessão de instância do Spark.

sampled_taxi_df.createOrReplaceTempView("nytaxi")

Preparar os dados

Os dados em sua forma bruta muitas vezes não são adequados para passar diretamente para um modelo. Você deve executar uma série de ações nos dados para colocá-los em um estado em que o modelo possa consumi-los.

No código a seguir, você executa quatro classes de operações:

  • A remoção de valores atípicos ou incorretos através da filtragem.
  • A remoção de colunas, que não são necessárias.
  • A criação de novas colunas derivadas dos dados brutos para fazer o modelo funcionar de forma mais eficaz. Esta operação é às vezes chamada de featurização.
  • Rotulagem. Como você está realizando uma classificação binária (haverá uma gorjeta ou não em uma determinada viagem), há a necessidade de converter o valor da gorjeta em um valor 0 ou 1.
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                                , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                                , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                                , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                                , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                                , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                                )\
                        .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                                & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                                & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                                & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                                & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                                & (sampled_taxi_df.rateCodeId <= 5)
                                & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                                )

Em seguida, faz-se uma segunda passagem sobre os dados para adicionar as características finais.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Criar um modelo de regressão logística

A tarefa final é converter os dados rotulados em um formato que possa ser analisado por meio de regressão logística. A entrada para um algoritmo de regressão logística precisa ser um conjunto de pares vetoriais de etiqueta/característica, onde o vetor de características é um vetor de números que representam o ponto de dados.

Então, você precisa converter as colunas categóricas em números. Especificamente, você precisa converter as trafficTimeBins colunas e weekdayString em representações inteiras. Existem várias abordagens para realizar a conversão. O exemplo a seguir adota a OneHotEncoder abordagem, que é comum.

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Essa ação resulta em um novo DataFrame com todas as colunas no formato certo para treinar um modelo.

Treinar um modelo de regressão logística

A primeira tarefa é dividir o conjunto de dados em um conjunto de treinamento e um conjunto de teste ou validação. A divisão aqui é arbitrária. Experimente diferentes configurações de divisão para ver se elas afetam o modelo.

# Decide on the split between training and testing data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Agora que há dois DataFrames, a próxima tarefa é criar a fórmula do modelo e executá-la no DataFrame de treinamento. Em seguida, você pode validar em relação ao DataFrame de teste. Experimente diferentes versões da fórmula do modelo para ver o impacto de diferentes combinações.

Observação

Para salvar o modelo, atribua a função de Colaborador de Dados de Blob de Armazenamento ao escopo de recursos do servidor do Banco de Dados SQL do Azure. Para obter etapas detalhadas, consulte Atribuir funções do Azure usando o portal do Azure. Somente membros com privilégios de proprietário podem executar esta etapa.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Saving the model is optional, but it's another form of inter-session cache
datestamp = datetime.now().strftime('%m-%d-%Y-%s')
fileName = "lrModel_" + datestamp
logRegDirfilename = fileName
lrModel.save(logRegDirfilename)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

A saída desta célula é:

Area under ROC = 0.9779470729751403

Criar uma representação visual da previsão

Agora você pode construir uma visualização final para ajudá-lo a raciocinar sobre os resultados deste teste. Uma curva ROC é uma forma de rever o resultado.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Gráfico que mostra a curva ROC para regressão logística no modelo de ponta.

Encerrar a instância do Spark

Depois de terminar de executar o aplicativo, desligue o bloco de anotações para liberar os recursos fechando a guia. Ou selecione Encerrar sessão no painel de status na parte inferior do bloco de anotações.

Ver também

Próximos passos

Observação

Parte da documentação oficial do Apache Spark depende do uso do console do Spark, que não está disponível no Apache Spark no Azure Synapse Analytics. Em vez disso, use o notebook ou as experiências do IntelliJ .