Udostępnij przez


Przepis: Usługi azure AI — wykrywanie anomalii wielowariancyjnej

Ten przepis pokazuje, jak używać usług SynapseML i Azure AI na platformie Apache Spark na potrzeby wykrywania anomalii wielowariancyjnych. Wykrywanie anomalii wielowariancyjnych obejmuje wykrywanie anomalii między wieloma zmiennymi lub szeregami czasowymi, a jednocześnie uwzględnia wszystkie korelacje między różnymi zmiennymi i zależności. W tym scenariuszu używane są usługi SynapseML i Azure AI do trenowania modelu na potrzeby wykrywania anomalii wielowariancyjnych. Następnie używamy modelu do wnioskowania wielowarianckich anomalii w zestawie danych, który zawiera syntetyczne pomiary z trzech czujników IoT.

Ważne

Od 20 września 2023 r. nie będzie można tworzyć nowych zasobów wykrywacza anomalii. Usługa wykrywania anomalii zostanie wycofana 1 października 2026 r.

Aby uzyskać więcej informacji na temat narzędzia do wykrywania anomalii w usłudze Azure AI, odwiedź zasób informacyjny narzędzia do wykrywania anomalii .

Wymagania wstępne

  • Subskrypcja platformy Azure — utwórz bezpłatnie
  • Dołącz notes do magazynu lakehouse. Po lewej stronie wybierz pozycję Dodaj , aby dodać istniejący obiekt lakehouse lub utworzyć jezioro.

Ustawienia

Począwszy od istniejącego Anomaly Detector zasobu, możesz eksplorować sposoby obsługi danych różnych formularzy. Wykaz usług w usłudze Azure AI oferuje kilka opcji:

Tworzenie zasobu Narzędzie do wykrywania anomalii

  • W witrynie Azure Portal wybierz pozycję Utwórz w grupie zasobów, a następnie wpisz Narzędzie do wykrywania anomalii. Wybierz zasób Narzędzie do wykrywania anomalii.
  • Nadaj zasobowi nazwę i najlepiej użyj tego samego regionu co pozostała część grupy zasobów. Użyj domyślnych opcji pozostałych, a następnie wybierz pozycję Przejrzyj i utwórz , a następnie pozycję Utwórz.
  • Po utworzeniu zasobu Anomaly Detector otwórz go i wybierz Keys and Endpoints w lewym panelu nawigacyjnym. Skopiuj klucz zasobu Narzędzie do wykrywania anomalii do ANOMALY_API_KEY zmiennej środowiskowej lub zapisz go w zmiennej anomalyKey .

Tworzenie zasobu konta magazynu

Aby zapisać dane pośrednie, musisz utworzyć konto usługi Azure Blob Storage. Na tym koncie magazynu utwórz kontener do przechowywania danych pośrednich. Zanotuj nazwę kontenera i skopiuj parametry połączenia do tego kontenera. Będzie ona potrzebna do późniejszego wypełnienia zmiennej containerName i zmiennej środowiskowej BLOB_CONNECTION_STRING .

Wprowadź klucze usługi

Najpierw skonfiguruj zmienne środowiskowe dla kluczy usługi. Następna komórka ustawia zmienne środowiskowe ANOMALY_API_KEY i BLOB_CONNECTION_STRING, bazując na wartościach przechowywanych w Azure Key Vault. Jeśli uruchomisz ten samouczek we własnym środowisku, przed kontynuowaniem pamiętaj o ustawieniu tych zmiennych środowiskowych:

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Odczytaj zmienne środowiskowe ANOMALY_API_KEY oraz BLOB_CONNECTION_STRING, a następnie ustaw zmienne containerName i location.

# An Anomaly Dectector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
    "wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"

Połącz się z naszym kontem magazynowym, aby wykrywacz anomalii mógł zapisywać wyniki pośrednie na tym koncie magazynowym.

spark.sparkContext._jsc.hadoopConfiguration().set(
    f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)

Zaimportuj wszystkie niezbędne moduły:

import numpy as np
import pandas as pd

import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt

import synapse.ml
from synapse.ml.cognitive import *

Odczytywanie przykładowych danych do ramki danych platformy Spark:

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)

df = (
    df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)

# Let's inspect the dataframe:
df.show(5)

Możemy teraz utworzyć estimator obiekt, którego używamy do trenowania modelu. Określamy czas rozpoczęcia i zakończenia danych treningowych. Określamy również kolumny wejściowe do użycia oraz nazwę kolumny zawierającej znaczniki czasu. Na koniec określamy liczbę punktów danych do wykorzystania w oknie przesuwnego wykrywania anomalii i ustawiamy ciąg połączenia do konta usługi Azure Blob Storage.

trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]

estimator = (
    FitMultivariateAnomaly()
    .setSubscriptionKey(anomalyKey)
    .setLocation(location)
    .setStartTime(trainingStartTime)
    .setEndTime(trainingEndTime)
    .setIntermediateSaveDir(intermediateSaveDir)
    .setTimestampCol(timestampColumn)
    .setInputCols(inputColumns)
    .setSlidingWindow(200)
)

Dopasujmy estimator do danych.

model = estimator.fit(df)

Po zakończeniu trenowania możemy użyć modelu do wnioskowania. Kod w następnej komórce określa czas rozpoczęcia i zakończenia danych, w których chcemy wykryć anomalie:

inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"

result = (
    model.setStartTime(inferenceStartTime)
    .setEndTime(inferenceEndTime)
    .setOutputCol("results")
    .setErrorCol("errors")
    .setInputCols(inputColumns)
    .setTimestampCol(timestampColumn)
    .transform(df)
)

result.show(5)

W poprzedniej komórce .show(5) pokazano nam pięć pierwszych wierszy ramki danych. Wszystkie wyniki były null, ponieważ wylądowały poza zakresem wnioskowania.

Aby wyświetlić wyniki tylko dla wnioskowanych danych, wybierz wymagane kolumny. Następnie możemy uporządkować wiersze w ramce danych według kolejności rosnącej i przefiltrować wynik, aby wyświetlić tylko wiersze w zakresie okien wnioskowania. inferenceEndTime W tym miejscu pasuje do ostatniego wiersza w ramce danych, więc może go zignorować.

Na koniec, aby lepiej wykreślić wyniki, przekonwertuj ramkę danych platformy Spark na ramkę danych biblioteki Pandas:

rdf = (
    result.select(
        "timestamp",
        *inputColumns,
        "results.contributors",
        "results.isAnomaly",
        "results.severity"
    )
    .orderBy("timestamp", ascending=True)
    .filter(col("timestamp") >= lit(inferenceStartTime))
    .toPandas()
)

rdf

Sformatuj kolumnę contributors , która przechowuje wynik udziału z każdego czujnika do wykrytych anomalii. Następna komórka obsługuje to i dzieli wynik udziału każdego czujnika na własną kolumnę:

def parse(x):
    if type(x) is list:
        return dict([item[::-1] for item in x])
    else:
        return {"series_0": 0, "series_1": 0, "series_2": 0}

rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
    [rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf

Teraz mamy wyniki udziału czujników 1, 2 i 3 odpowiednio w series_0kolumnach , series_1i series_2 .

Aby wykreślić wyniki, uruchom następną komórkę. Parametr minSeverity określa minimalną dotkliwość anomalii do przedstawienia:

minSeverity = 0.1

####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
    rdf["timestamp"],
    rdf["sensor_1"],
    color="tab:orange",
    linestyle="solid",
    linewidth=2,
    label="sensor_1",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_2"],
    color="tab:green",
    linestyle="solid",
    linewidth=2,
    label="sensor_2",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_3"],
    color="tab:blue",
    linestyle="solid",
    linewidth=2,
    label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()

anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)

plt.legend()
plt.title(
    "A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()

####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
    rdf["timestamp"],
    rdf["severity"],
    color="black",
    linestyle="solid",
    linewidth=2,
    label="Severity score",
)
plt.plot(
    rdf["timestamp"],
    [minSeverity] * len(rdf["severity"]),
    color="red",
    linestyle="dotted",
    linewidth=1,
    label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()

####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
    rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
    rdf["timestamp"],
    rdf["series_1"],
    width=2,
    color="tab:green",
    label="sensor_2",
    bottom=rdf["series_0"],
)
plt.bar(
    rdf["timestamp"],
    rdf["series_2"],
    width=2,
    color="tab:blue",
    label="sensor_3",
    bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()

Zrzut ekranu przedstawiający wykres wyników wykrywania anomalii wielowariancji.

Wykresy pokazują nieprzetworzone dane z czujników (wewnątrz okna wnioskowania) w kolorze pomarańczowym, zielonym i niebieskim. Czerwone linie pionowe na pierwszej ilustracji pokazują wykryte anomalie o ważności większej lub równej minSeverity.

Drugi wykres przedstawia wynik ważności wszystkich wykrytych anomalii z minSeverity progiem pokazanym w kropkowanej czerwonej linii.

Na koniec ostatni wykres przedstawia wkład danych z każdego czujnika do wykrytych anomalii. Pomaga nam to zdiagnozować i zrozumieć najbardziej prawdopodobną przyczynę każdej anomalii.