Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Esta receta muestra cómo usar los servicios SynapseML y Azure AI, en Apache Spark, para la detección de anomalías multivariante. La detección de anomalías multivariante implica la detección de anomalías entre muchas variables o series temporales, a la vez que se contabilizan todas las correlaciones y dependencias entre las distintas variables. En este escenario se usa SynapseML y los servicios de Azure AI para entrenar un modelo para la detección de anomalías multivariante. A continuación, usamos el modelo para deducir anomalías multivariante dentro de un conjunto de datos que contiene medidas sintéticas de tres sensores de IoT.
Importante
A partir del 20 de septiembre de 2023, no puede crear nuevos recursos de Anomaly Detector. El servicio Anomaly Detector se retirará el 1 de octubre de 2026.
Para obtener más información sobre Azure AI Anomaly Detector, visite la fuente de información de Anomaly Detector.
Requisitos previos
- Una suscripción a Azure: cree una cuenta gratuita.
- Adjunte el cuaderno a un almacén de lago. En el lado izquierdo, seleccione Añadir para añadir un almacén de lago existente o crear uno.
Configurar
A partir de un recurso existente Anomaly Detector , puede explorar formas de controlar los datos de varios formularios. El catálogo de servicios de Azure AI proporciona varias opciones:
- Decisión
- Inteligencia de documentos
- Language
- Discurso
- traducción de
- Visión
- Búsqueda web
Creación de un recurso de Anomaly Detector
- En Azure Portal, seleccione Crear en el grupo de recursos y, a continuación, escriba Anomaly Detector. Seleccione el recurso Anomaly Detector.
- Asigne un nombre al recurso y, idealmente, use la misma región que el resto del grupo de recursos. Use las opciones predeterminadas para el resto, seleccione Revisar y crear y, a continuación, Crear.
- Después de crear el recurso Anomaly Detector, ábralo y seleccione el elemento
Keys and Endpointsen el panel de navegación izquierdo. Copie la clave del recurso Anomaly Detector en la variable de entornoANOMALY_API_KEYo almacénela en la variableanomalyKey.
Creación de un recurso de cuenta de almacenamiento
Para guardar los datos intermedios, debe crear una cuenta de Azure Blob Storage. Dentro de esa cuenta de almacenamiento, cree un contenedor para almacenar los datos intermedios. Anote el nombre del contenedor y copie la cadena de conexión en ese contenedor. Lo necesitará para rellenar posteriormente la containerName variable y la variable de BLOB_CONNECTION_STRING entorno.
Escriba las claves del servicio.
En primer lugar, configure las variables de entorno para nuestras claves de servicio. La siguiente celda establece las variables de entorno ANOMALY_API_KEY y BLOB_CONNECTION_STRING, en función de los valores almacenados en Azure Key Vault. Si ejecuta este tutorial en su propio entorno, asegúrese de establecer estas variables de entorno antes de continuar:
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Lea las ANOMALY_API_KEY variables de entorno y BLOB_CONNECTION_STRING y establezca las containerName variables y location :
# An Anomaly Dectector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
"wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"
Conéctese a nuestra cuenta de almacenamiento para que el detector de anomalías pueda guardar los resultados intermedios en esa cuenta de almacenamiento:
spark.sparkContext._jsc.hadoopConfiguration().set(
f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)
Importe todos los módulos necesarios:
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt
import synapse.ml
from synapse.ml.cognitive import *
Lea los datos de ejemplo en un dataframe de Spark:
df = (
spark.read.format("csv")
.option("header", "true")
.load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)
df = (
df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
.withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
.withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)
# Let's inspect the dataframe:
df.show(5)
Ahora podemos crear un estimator objeto , que usamos para entrenar nuestro modelo. Especificamos las horas de inicio y finalización de los datos de entrenamiento. También se especifican las columnas de entrada que se van a usar y el nombre de la columna que contiene las marcas de tiempo. Por último, se especifica el número de puntos de datos que se van a usar en la ventana deslizante de detección de anomalías y se establece la cadena de conexión en la cuenta de Azure Blob Storage:
trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]
estimator = (
FitMultivariateAnomaly()
.setSubscriptionKey(anomalyKey)
.setLocation(location)
.setStartTime(trainingStartTime)
.setEndTime(trainingEndTime)
.setIntermediateSaveDir(intermediateSaveDir)
.setTimestampCol(timestampColumn)
.setInputCols(inputColumns)
.setSlidingWindow(200)
)
Vamos a ajustar el estimator a los datos:
model = estimator.fit(df)
Una vez realizado el entrenamiento, podemos usar el modelo para la inferencia. El código de la celda siguiente especifica las horas de inicio y finalización de los datos en los que nos gustaría detectar las anomalías:
inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"
result = (
model.setStartTime(inferenceStartTime)
.setEndTime(inferenceEndTime)
.setOutputCol("results")
.setErrorCol("errors")
.setInputCols(inputColumns)
.setTimestampCol(timestampColumn)
.transform(df)
)
result.show(5)
En la celda anterior, .show(5) nos mostró las cinco primeras filas de trama de datos. Los resultados fueron todos null porque quedaron fuera de la ventana de inferencia.
Para mostrar los resultados solo para los datos inferidos, seleccione las columnas necesarias. A continuación, podemos ordenar las filas del dataframe por orden ascendente y filtrar el resultado para mostrar solo las filas del intervalo de ventanas de inferencia. Aquí, inferenceEndTime coincide con la última fila del dataframe, por lo que puede omitirla.
Por último, para trazar mejor los resultados, convierta el dataframe de Spark en un dataframe de Pandas:
rdf = (
result.select(
"timestamp",
*inputColumns,
"results.contributors",
"results.isAnomaly",
"results.severity"
)
.orderBy("timestamp", ascending=True)
.filter(col("timestamp") >= lit(inferenceStartTime))
.toPandas()
)
rdf
Dé formato a la columna contributors que almacena la puntuación de contribución de cada sensor a las anomalías detectadas. La celda siguiente controla esto y divide la puntuación de contribución de cada sensor en su propia columna:
def parse(x):
if type(x) is list:
return dict([item[::-1] for item in x])
else:
return {"series_0": 0, "series_1": 0, "series_2": 0}
rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
[rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf
Ahora tenemos las puntuaciones de contribución de los sensores 1, 2 y 3 en las columnas series_0, series_1 y series_2 respectivamente.
Para trazar los resultados, ejecute la celda siguiente. El minSeverity parámetro especifica la gravedad mínima de las anomalías que se van a trazar:
minSeverity = 0.1
####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
rdf["timestamp"],
rdf["sensor_1"],
color="tab:orange",
linestyle="solid",
linewidth=2,
label="sensor_1",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_2"],
color="tab:green",
linestyle="solid",
linewidth=2,
label="sensor_2",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_3"],
color="tab:blue",
linestyle="solid",
linewidth=2,
label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()
anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)
plt.legend()
plt.title(
"A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()
####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
rdf["timestamp"],
rdf["severity"],
color="black",
linestyle="solid",
linewidth=2,
label="Severity score",
)
plt.plot(
rdf["timestamp"],
[minSeverity] * len(rdf["severity"]),
color="red",
linestyle="dotted",
linewidth=1,
label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()
####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
rdf["timestamp"],
rdf["series_1"],
width=2,
color="tab:green",
label="sensor_2",
bottom=rdf["series_0"],
)
plt.bar(
rdf["timestamp"],
rdf["series_2"],
width=2,
color="tab:blue",
label="sensor_3",
bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()
Los trazados muestran los datos sin procesar de los sensores (dentro de la ventana de inferencia) en naranja, verde y azul. Las líneas verticales rojas de la primera ilustración muestran las anomalías detectadas que tienen una gravedad mayor o igual a minSeverity.
En el segundo trazado se muestra la puntuación de gravedad de todas las anomalías detectadas, y el umbral minSeverity se muestra en la línea roja punteada.
Por último, el último gráfico muestra la contribución de los datos de cada sensor a las anomalías detectadas. Nos ayuda a diagnosticar y comprender la causa más probable de cada anomalía.