Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Dit recept laat zien hoe u SynapseML- en Azure AI-services op Apache Spark gebruikt voor anomaliedetectie met meerdere variabelen. Multivariate anomaliedetectie omvat het detecteren van afwijkingen tussen veel variabelen of tijdreeksen, waarbij rekening wordt gehouden met alle intercorrelaties en afhankelijkheden tussen de verschillende variabelen. In dit scenario worden SynapseML en de Azure AI-services gebruikt om een model te trainen voor anomaliedetectie met meerdere variabelen. Vervolgens gebruiken we het model om afwijkingen met meerdere variabelen af te stellen binnen een gegevensset die synthetische metingen van drie IoT-sensoren bevat.
Belangrijk
Vanaf 20 september 2023 kunt u geen nieuwe Anomaly Detector-resources maken. De Anomaly Detector-service wordt op 1 oktober 2026 buiten gebruik gesteld.
Ga voor meer informatie over de Azure AI Anomaly Detector naar de Anomaly Detector-informatiebron.
Vereisten
- Een Azure-abonnement - Een gratis abonnement maken
- Koppel uw notitieblok aan een lakehouse. Selecteer aan de linkerkant Toevoegen om een bestaand lakehouse toe te voegen of een lakehouse te maken.
Instellingen
Vanaf een bestaande Anomaly Detector resource kunt u manieren verkennen om gegevens van verschillende formulieren te verwerken. De catalogus met services in Azure AI biedt verschillende opties:
Een Anomaly Detector-resource maken
- Selecteer maken in de Azure-portal in uw resourcegroep en typ vervolgens Anomaly Detector. Selecteer de Anomaly Detector-resource.
- Geef de resource een naam en gebruik idealiter dezelfde regio als de rest van uw resourcegroep. Gebruik de standaardopties voor de rest en selecteer Vervolgens Beoordelen + Maken en vervolgens Maken.
- Nadat u de Anomaly Detector-resource hebt gemaakt, opent u deze en selecteert u het
Keys and Endpointsdeelvenster in het linkernavigatievenster. Kopieer de sleutel voor de Anomaly Detector-resource naar deANOMALY_API_KEYomgevingsvariabele of sla deze op in deanomalyKeyvariabele.
Een opslagaccountresource maken
Als u tussenliggende gegevens wilt opslaan, moet u een Azure Blob Storage-account maken. Maak binnen dat opslagaccount een container voor het opslaan van de tussenliggende gegevens. Noteer de containernaam en kopieer de verbindingsreeks naar die container. U hebt deze nodig om de containerName variabele en de BLOB_CONNECTION_STRING omgevingsvariabele later te vullen.
Voer uw servicesleutels in
Stel eerst de omgevingsvariabelen in voor onze servicesleutels. In de volgende cel worden de ANOMALY_API_KEY en de BLOB_CONNECTION_STRING omgevingsvariabelen ingesteld op basis van de waarden die zijn opgeslagen in azure Key Vault. Als u deze zelfstudie uitvoert in uw eigen omgeving, moet u deze omgevingsvariabelen instellen voordat u verdergaat:
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Lees de ANOMALY_API_KEY en BLOB_CONNECTION_STRING omgevingsvariabelen en stel de containerName en location variabelen in:
# An Anomaly Dectector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
"wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"
Maak verbinding met ons opslagaccount, zodat de anomaliedetector tussenliggende resultaten in dat opslagaccount kan opslaan:
spark.sparkContext._jsc.hadoopConfiguration().set(
f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)
Importeer alle benodigde modules:
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt
import synapse.ml
from synapse.ml.cognitive import *
Lees de voorbeeldgegevens in een Spark DataFrame:
df = (
spark.read.format("csv")
.option("header", "true")
.load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)
df = (
df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
.withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
.withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)
# Let's inspect the dataframe:
df.show(5)
We kunnen nu een estimator object maken dat we gebruiken om ons model te trainen. We geven de begin- en eindtijden op voor de trainingsgegevens. We geven ook de invoerkolommen op die moeten worden gebruikt en de naam van de kolom die de tijdstempels bevat. Ten slotte geven we het aantal gegevenspunten op dat moet worden gebruikt in het schuifvenster voor anomaliedetectie en stellen we de verbindingsreeks in op het Azure Blob Storage-account:
trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]
estimator = (
FitMultivariateAnomaly()
.setSubscriptionKey(anomalyKey)
.setLocation(location)
.setStartTime(trainingStartTime)
.setEndTime(trainingEndTime)
.setIntermediateSaveDir(intermediateSaveDir)
.setTimestampCol(timestampColumn)
.setInputCols(inputColumns)
.setSlidingWindow(200)
)
Laten we de estimator op de data aanpassen.
model = estimator.fit(df)
Zodra de training is voltooid, kunnen we het model gebruiken voor deductie. De code in de volgende cel geeft de begin- en eindtijden op voor de gegevens waarin we de afwijkingen willen detecteren:
inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"
result = (
model.setStartTime(inferenceStartTime)
.setEndTime(inferenceEndTime)
.setOutputCol("results")
.setErrorCol("errors")
.setInputCols(inputColumns)
.setTimestampCol(timestampColumn)
.transform(df)
)
result.show(5)
In de vorige cel toonde .show(5) ons de eerste vijf dataframe-rijen. De resultaten waren allemaal null omdat ze buiten het inferencevenster terechtkwamen.
Om de resultaten alleen voor de afgeleide gegevens weer te geven, selecteert u de benodigde kolommen. Vervolgens kunnen we de rijen in het gegevensframe rangschikken op oplopende volgorde en het resultaat filteren om alleen de rijen in het bereik van het deductievenster weer te geven. Hier komt inferenceEndTime overeen met de laatste rij in het dataframe, dus kan die genegeerd worden.
Als u de resultaten beter wilt uitzetten, converteert u het Spark-gegevensframe naar een Pandas-dataframe:
rdf = (
result.select(
"timestamp",
*inputColumns,
"results.contributors",
"results.isAnomaly",
"results.severity"
)
.orderBy("timestamp", ascending=True)
.filter(col("timestamp") >= lit(inferenceStartTime))
.toPandas()
)
rdf
Maak de contributors kolom op waarin de bijdragescore van elke sensor wordt opgeslagen in de gedetecteerde afwijkingen. De volgende cel verwerkt dit en splitst de bijdragescore van elke sensor in een eigen kolom:
def parse(x):
if type(x) is list:
return dict([item[::-1] for item in x])
else:
return {"series_0": 0, "series_1": 0, "series_2": 0}
rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
[rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf
We hebben nu de bijdragescores van sensoren 1, 2 en 3 in respectievelijk de series_0, series_1en series_2 kolommen.
Als u de resultaten wilt uitzetten, voert u de volgende cel uit. De minSeverity parameter geeft de minimale ernst van de afwijkingen op die moeten worden uitgezet:
minSeverity = 0.1
####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
rdf["timestamp"],
rdf["sensor_1"],
color="tab:orange",
linestyle="solid",
linewidth=2,
label="sensor_1",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_2"],
color="tab:green",
linestyle="solid",
linewidth=2,
label="sensor_2",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_3"],
color="tab:blue",
linestyle="solid",
linewidth=2,
label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()
anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)
plt.legend()
plt.title(
"A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()
####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
rdf["timestamp"],
rdf["severity"],
color="black",
linestyle="solid",
linewidth=2,
label="Severity score",
)
plt.plot(
rdf["timestamp"],
[minSeverity] * len(rdf["severity"]),
color="red",
linestyle="dotted",
linewidth=1,
label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()
####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
rdf["timestamp"],
rdf["series_1"],
width=2,
color="tab:green",
label="sensor_2",
bottom=rdf["series_0"],
)
plt.bar(
rdf["timestamp"],
rdf["series_2"],
width=2,
color="tab:blue",
label="sensor_3",
bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()
De plots tonen de onbewerkte gegevens van de sensoren (in het deductievenster) in oranje, groen en blauw. De rode verticale lijnen in de eerste afbeelding tonen de gedetecteerde afwijkingen met een ernst die groter is dan of gelijk is aan minSeverity.
In het tweede diagram ziet u de ernstscore van alle gedetecteerde afwijkingen, met de minSeverity drempelwaarde die wordt weergegeven in de rode stippellijn.
Ten slotte toont de laatste plot de bijdrage van de gegevens van elke sensor aan de gedetecteerde afwijkingen. Het helpt ons bij het vaststellen en begrijpen van de meest waarschijnlijke oorzaak van elke anomalie.