Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Esta receita mostra como usar os serviços SynapseML e Azure AI, no Apache Spark, para deteção de anomalias multivariadas. A deteção multivariada de anomalias envolve a deteção de anomalias entre muitas variáveis ou séries temporais, contabilizando todas as intercorrelações e dependências entre as diferentes variáveis. Este cenário usa o SynapseML e os serviços de IA do Azure para treinar um modelo para deteção de anomalias multivariadas. Em seguida, usamos o modelo para inferir anomalias multivariadas dentro de um conjunto de dados que contém medições sintéticas de três sensores IoT.
Importante
A partir de 20 de setembro de 2023, você não poderá criar novos recursos do Detetor de Anomalias. O serviço de Detetor de Anomalias será desativado a 1 de outubro de 2026.
Para obter mais informações sobre o Detetor de Anomalias do Azure AI, visite o recurso de informações do Detetor de Anomalias .
Pré-requisitos
- Uma assinatura do Azure - Crie uma gratuitamente
- Ligue o seu bloco de notas a uma casa no lago. No lado esquerdo, selecione Adicionar para adicionar uma casa de lago existente ou criar uma casa de lago.
Configurar
Começando com um recurso existente Anomaly Detector , você pode explorar maneiras de lidar com dados de vários formulários. O catálogo de serviços dentro da IA do Azure fornece várias opções:
Criar um recurso de Detetor de Anomalias
- No portal do Azure, selecione Criar no seu grupo de recursos e digite Detetor de Anomalias. Selecione o recurso Detetor de anomalias.
- Nomeie o recurso e, idealmente, use a mesma região que o restante do grupo de recursos. Utilize as opções predefinidas para o resto e, em seguida, selecione Rever + Criar e, em seguida , Criar.
- Depois de criar o recurso Detetor de anomalias, abra-o e selecione o
Keys and Endpointspainel no painel de navegação esquerdo. Copie a chave do recurso Detetor de anomalias para aANOMALY_API_KEYvariável de ambiente ou armazene-aanomalyKeyna variável.
Criar um recurso de Conta de Armazenamento
Para salvar dados intermediários, você deve criar uma Conta de Armazenamento de Blob do Azure. Dentro dessa conta de armazenamento, crie um contêiner para armazenar os dados intermediários. Anote o nome do contêiner e copie a cadeia de conexão para esse contêiner. Você precisa dele para preencher posteriormente a containerName variável e a BLOB_CONNECTION_STRING variável de ambiente.
Introduza as suas chaves de serviço
Primeiro, configure as variáveis de ambiente para nossas chaves de serviço. A próxima célula define as variáveis de ambiente ANOMALY_API_KEY e BLOB_CONNECTION_STRING, com base nos valores armazenados no Cofre de Chaves do Azure. Se você executar este tutorial em seu próprio ambiente, certifique-se de definir estas variáveis de ambiente antes de prosseguir:
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Leia as variáveis de ambiente ANOMALY_API_KEY e BLOB_CONNECTION_STRING, e defina as variáveis containerName e location.
# An Anomaly Dectector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
"wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"
Conecte-se à nossa conta de armazenamento, para que o detetor de anomalias possa salvar resultados intermediários nessa conta de armazenamento:
spark.sparkContext._jsc.hadoopConfiguration().set(
f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)
Importe todos os módulos necessários:
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt
import synapse.ml
from synapse.ml.cognitive import *
Leia os dados de exemplo em um DataFrame do Spark:
df = (
spark.read.format("csv")
.option("header", "true")
.load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)
df = (
df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
.withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
.withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)
# Let's inspect the dataframe:
df.show(5)
Agora podemos criar um estimator objeto, que usamos para treinar nosso modelo. Especificamos os horários de início e término dos dados de treinamento. Também especificamos as colunas de entrada a serem usadas e o nome da coluna que contém os carimbos de data/hora. Por fim, especificamos o número de pontos de dados a serem usados na janela deslizante de deteção de anomalias e definimos a cadeia de conexão para a Conta de Armazenamento de Blob do Azure:
trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]
estimator = (
FitMultivariateAnomaly()
.setSubscriptionKey(anomalyKey)
.setLocation(location)
.setStartTime(trainingStartTime)
.setEndTime(trainingEndTime)
.setIntermediateSaveDir(intermediateSaveDir)
.setTimestampCol(timestampColumn)
.setInputCols(inputColumns)
.setSlidingWindow(200)
)
Vamos ajustar o estimator aos dados.
model = estimator.fit(df)
Uma vez feito o treinamento, podemos usar o modelo para inferência. O código na célula seguinte especifica as horas de início e fim dos dados nos quais gostaríamos de detetar as anomalias:
inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"
result = (
model.setStartTime(inferenceStartTime)
.setEndTime(inferenceEndTime)
.setOutputCol("results")
.setErrorCol("errors")
.setInputCols(inputColumns)
.setTimestampCol(timestampColumn)
.transform(df)
)
result.show(5)
Na célula anterior, .show(5) mostrava-nos as primeiras cinco linhas do dataframe. Os resultados foram todos null porque se situaram fora da janela de inferência.
Para mostrar os resultados apenas para os dados inferidos, selecione as colunas necessárias. Em seguida, podemos ordenar as linhas no dataframe por ordem crescente e filtrar o resultado para mostrar apenas as linhas no intervalo da janela de inferência. Aqui, inferenceEndTime corresponde à última linha no dataframe, portanto, pode ignorá-lo.
Finalmente, para plotar melhor os resultados, converta o dataframe do Spark em um dataframe Pandas:
rdf = (
result.select(
"timestamp",
*inputColumns,
"results.contributors",
"results.isAnomaly",
"results.severity"
)
.orderBy("timestamp", ascending=True)
.filter(col("timestamp") >= lit(inferenceStartTime))
.toPandas()
)
rdf
Formate a contributors coluna que armazena a pontuação de contribuição de cada sensor para as anomalias detetadas. A próxima célula lida com isso e divide a pontuação de contribuição de cada sensor em sua própria coluna:
def parse(x):
if type(x) is list:
return dict([item[::-1] for item in x])
else:
return {"series_0": 0, "series_1": 0, "series_2": 0}
rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
[rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf
Agora temos as pontuações de contribuição dos sensores 1, 2 e 3 nas series_0colunas , series_1e series_2 respectivamente.
Para plotar os resultados, execute a próxima célula. O minSeverity parâmetro especifica a gravidade mínima das anomalias a serem plotadas:
minSeverity = 0.1
####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
rdf["timestamp"],
rdf["sensor_1"],
color="tab:orange",
linestyle="solid",
linewidth=2,
label="sensor_1",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_2"],
color="tab:green",
linestyle="solid",
linewidth=2,
label="sensor_2",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_3"],
color="tab:blue",
linestyle="solid",
linewidth=2,
label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()
anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)
plt.legend()
plt.title(
"A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()
####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
rdf["timestamp"],
rdf["severity"],
color="black",
linestyle="solid",
linewidth=2,
label="Severity score",
)
plt.plot(
rdf["timestamp"],
[minSeverity] * len(rdf["severity"]),
color="red",
linestyle="dotted",
linewidth=1,
label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()
####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
rdf["timestamp"],
rdf["series_1"],
width=2,
color="tab:green",
label="sensor_2",
bottom=rdf["series_0"],
)
plt.bar(
rdf["timestamp"],
rdf["series_2"],
width=2,
color="tab:blue",
label="sensor_3",
bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()
Os gráficos mostram os dados brutos dos sensores (dentro da janela de inferência) em laranja, verde e azul. As linhas verticais vermelhas na primeira figura mostram as anomalias detetadas que têm uma gravidade maior ou igual a minSeverity.
O segundo gráfico mostra a pontuação de gravidade de todas as anomalias detetadas, com o minSeverity limiar mostrado na linha vermelha pontilhada.
Finalmente, o último gráfico mostra a contribuição dos dados de cada sensor para as anomalias detetadas. Ajuda-nos a diagnosticar e compreender a causa mais provável de cada anomalia.