Compartilhar via


Receita: serviços de IA da Azure – Detector de Anomalias Multivariadas

Esta receita mostra como usar os serviços SynapseML e IA do Azure, no Apache Spark, para detecção de anomalias multivariadas. A detecção de anomalias multivariadas envolve a detecção de anomalias entre muitas variáveis ou séries temporais, ao mesmo tempo em que contabiliza todas as inter correlações e dependências entre as diferentes variáveis. Esse cenário usa o SynapseML e os serviços de IA do Azure para treinar um modelo para detecção de anomalias multivariadas. Em seguida, usamos o modelo para inferir anomalias multivariadas em um conjunto de dados que contém medidas sintéticas de três sensores de IoT.

Importante

A partir de 20 de setembro de 2023, você não pode criar novos recursos do Detector de Anomalias. O serviço Detector de Anomalias será desativado em 1º de outubro de 2026.

Para obter mais informações sobre o Detector de Anomalias de IA do Azure, visite o recurso de informações do Detector de Anomalias .

Pré-requisitos

  • Uma assinatura do Azure – crie uma gratuitamente
  • Anexe seu bloco de anotações a um lakehouse. No lado esquerdo, selecione Adicionar para adicionar um lakehouse existente ou criar um.

Instalação

Começando com um recurso existente Anomaly Detector , você pode explorar maneiras de lidar com dados de vários formulários. O catálogo de serviços dentro da IA do Azure fornece várias opções:

Criar um recurso do Detector de Anomalias

  • No portal do Azure, selecione Criar em seu grupo de recursos e digite Detector de Anomalias. Selecione o recurso Detector de Anomalias.
  • Nomeie o recurso e, idealmente, use a mesma região que o restante do grupo de recursos. Use as opções padrão para o restante e selecione Revisar e Criar e, em seguida, Criar.
  • Depois de criar o recurso detector de anomalias, abra-o e selecione o Keys and Endpoints painel na navegação esquerda. Copie a chave do recurso Detector de Anomalias na variável de ambiente ANOMALY_API_KEY ou armazene-a na variável anomalyKey.

Criar um recurso de Conta de Armazenamento

Para salvar dados intermediários, você deve criar uma Conta de Armazenamento de Blobs do Azure. Dentro dessa conta de armazenamento, crie um contêiner para armazenar os dados intermediários. Observe que o nome do contêiner e copie a cadeia de caracteres de conexão para esse contêiner. Você precisa dela para preencher posteriormente a containerName variável e a variável de BLOB_CONNECTION_STRING ambiente.

Inserir suas chaves de serviço

Primeiro, configure as variáveis de ambiente para nossas chaves de serviço. A próxima célula define as variáveis de ambiente ANOMALY_API_KEY e BLOB_CONNECTION_STRING com base nos valores armazenados em nosso Azure Key Vault. Se você executar este tutorial em seu próprio ambiente, certifique-se de definir essas variáveis de ambiente antes de continuar:

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Leia as variáveis de ambiente ANOMALY_API_KEY e BLOB_CONNECTION_STRING, e defina as variáveis containerName e location:

# An Anomaly Dectector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
    "wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"

Conecte-se à nossa conta de armazenamento para que o detector de anomalias possa salvar resultados intermediários nessa conta de armazenamento:

spark.sparkContext._jsc.hadoopConfiguration().set(
    f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)

Importe todos os módulos necessários:

import numpy as np
import pandas as pd

import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt

import synapse.ml
from synapse.ml.cognitive import *

Leia os dados de exemplo em um DataFrame do Spark:

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)

df = (
    df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)

# Let's inspect the dataframe:
df.show(5)

Agora podemos criar um estimator objeto, que usamos para treinar nosso modelo. Especificamos as horas de início e término para os dados de treinamento. Também especificamos as colunas de entrada que serão utilizadas e o nome da coluna que contém os carimbos de data/hora. Por fim, especificamos o número de pontos de dados a serem usados na janela deslizante de detecção de anomalias e definimos a cadeia de conexão para a Conta de Armazenamento de Blobs do Azure:

trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]

estimator = (
    FitMultivariateAnomaly()
    .setSubscriptionKey(anomalyKey)
    .setLocation(location)
    .setStartTime(trainingStartTime)
    .setEndTime(trainingEndTime)
    .setIntermediateSaveDir(intermediateSaveDir)
    .setTimestampCol(timestampColumn)
    .setInputCols(inputColumns)
    .setSlidingWindow(200)
)

Vamos ajustar o estimator aos dados:

model = estimator.fit(df)

Depois que o treinamento for concluído, podemos usar o modelo para inferência. O código na próxima célula especifica os horários de início e término para os dados nos quais gostaríamos de detectar as anomalias:

inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"

result = (
    model.setStartTime(inferenceStartTime)
    .setEndTime(inferenceEndTime)
    .setOutputCol("results")
    .setErrorCol("errors")
    .setInputCols(inputColumns)
    .setTimestampCol(timestampColumn)
    .transform(df)
)

result.show(5)

Na célula anterior, .show(5) nos mostrou as cinco primeiras linhas de dataframe. Os resultados foram todos null porque caíram fora da janela de inferência.

Para mostrar os resultados apenas para os dados inferidos, selecione as colunas necessárias. Em seguida, podemos ordenar as linhas no dataframe por ordem crescente e filtrar o resultado para mostrar apenas as linhas no intervalo da janela de inferência. Aqui, inferenceEndTime corresponde à última linha no dataframe, portanto, pode ignorá-la.

Por fim, para plotar melhor os resultados, converta o dataframe do Spark em um dataframe do Pandas:

rdf = (
    result.select(
        "timestamp",
        *inputColumns,
        "results.contributors",
        "results.isAnomaly",
        "results.severity"
    )
    .orderBy("timestamp", ascending=True)
    .filter(col("timestamp") >= lit(inferenceStartTime))
    .toPandas()
)

rdf

Formate a coluna contributors que armazena a pontuação de contribuição de cada sensor para as anomalias detectadas. A próxima célula lida com isso e divide a pontuação de contribuição de cada sensor em sua própria coluna:

def parse(x):
    if type(x) is list:
        return dict([item[::-1] for item in x])
    else:
        return {"series_0": 0, "series_1": 0, "series_2": 0}

rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
    [rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf

Agora temos as pontuações de contribuição dos sensores 1, 2 e 3 nas colunas series_0, series_1 e series_2, respectivamente.

Para plotar os resultados, execute a próxima célula. O minSeverity parâmetro especifica a gravidade mínima das anomalias a serem plotdas:

minSeverity = 0.1

####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
    rdf["timestamp"],
    rdf["sensor_1"],
    color="tab:orange",
    linestyle="solid",
    linewidth=2,
    label="sensor_1",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_2"],
    color="tab:green",
    linestyle="solid",
    linewidth=2,
    label="sensor_2",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_3"],
    color="tab:blue",
    linestyle="solid",
    linewidth=2,
    label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()

anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)

plt.legend()
plt.title(
    "A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()

####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
    rdf["timestamp"],
    rdf["severity"],
    color="black",
    linestyle="solid",
    linewidth=2,
    label="Severity score",
)
plt.plot(
    rdf["timestamp"],
    [minSeverity] * len(rdf["severity"]),
    color="red",
    linestyle="dotted",
    linewidth=1,
    label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()

####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
    rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
    rdf["timestamp"],
    rdf["series_1"],
    width=2,
    color="tab:green",
    label="sensor_2",
    bottom=rdf["series_0"],
)
plt.bar(
    rdf["timestamp"],
    rdf["series_2"],
    width=2,
    color="tab:blue",
    label="sensor_3",
    bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()

Captura de tela do gráfico de resultados de detecção de anomalias multivariadas.

Os gráficos mostram os dados brutos dos sensores (dentro da janela de inferência) em laranja, verde e azul. As linhas verticais vermelhas na primeira figura mostram as anomalias detectadas com uma gravidade maior ou igual a minSeverity.

O segundo gráfico mostra a pontuação de gravidade de todas as anomalias detectadas, com o limite minSeverity mostrado na linha vermelha pontilhada.

Finalmente, o último gráfico mostra a contribuição dos dados de cada sensor para as anomalias detectadas. Isso nos ajuda a diagnosticar e entender a causa mais provável de cada anomalia.