Partager via


AnomalyDetection_ChangePoint

✅ Azure Stream Analytics ✅ Fabric Eventstream

Détecte les anomalies persistantes dans un flux d’événements de série chronologique. Le modèle Machine Learning sous-jacent utilise l’algorithme Exchangeability Martingales.

Syntaxe

AnomalyDetection_ChangePoint(
        <scalar_expression>, 
        <confidence>, 
        <historySize>)
    OVER ([PARTITION BY <partition key>] 
        LIMIT DURATION(<unit>, <length>)
    [WHEN boolean_expression])

Les arguments

scalar_expression

Colonne d’événement ou champ calculé sur lequel le modèle effectue la détection des anomalies. Les valeurs autorisées pour ce paramètre incluent des types de données FLOAT ou BIGINT qui retournent une valeur unique (scalaire).

L’expression générique * n’est pas autorisée. En outre, scalar_expression ne peut pas contenir d’autres fonctions analytiques ou fonctions externes.

confiance

Nombre de pourcentages compris entre 1,00 et 100 (inclus) qui définit la sensibilité du modèle Machine Learning. Plus la confiance est faible, plus le nombre d’anomalies est élevé et vice versa. Commencez à partir d’un nombre arbitraire compris entre 70 et 90 et ajustez-le en fonction des résultats observés lors du développement ou du test.

historySize

Nombre d’événements dans une fenêtre glissante à partir de laquelle le modèle apprend en permanence et utilise pour noter l’événement suivant pour une anomalie. En règle générale, cela doit représenter la période de comportement normal pour permettre au modèle de marquer une anomalie ultérieure. Commencez par une estimation instruite à l’aide des journaux historiques et ajustez en fonction des résultats observés dans le développement ou le test.

OVER ([ partition_by_clause ] limit_duration_clause [when_clause])

partition_by_clause

Utilisé pour partitionner l’entraînement d’un modèle en fonction d’une colonne particulière dans les événements. Le modèle applique les mêmes paramètres de fonction sur toutes les partitions.

limit_duration_clause DURATION(unité, length)

Taille de la fenêtre glissante dans Stream Analytics en termes de temps. La taille recommandée de cette fenêtre de temps est l’équivalent du temps nécessaire à la génération d’historiqueSize du nombre d’événements dans un état stable.

when_clause

Spécifie la condition booléenne pour que les événements soient fournis au modèle pour effectuer la détection d’anomalie. La when_clause est facultative.

Types de retour

La fonction retourne un enregistrement imbriqué composé des colonnes suivantes :

IsAnomaly

UN BIGINT (0 ou 1) indiquant si l’événement était anormal ou non.

Score

Score Martingale calculé (float) indiquant la façon dont un événement est anormal. Ce score augmente de façon exponentielle avec des valeurs anormales.

Exemples

Dans l’exemple de requête suivant, la première requête suppose un événement toutes les 5 minutes, et la deuxième requête suppose un événement toutes les secondes. Le niveau de confiance est défini à 75 pour les deux modèles.

AnomalyDetection_ChangePoint(reading, 75, 72)
    OVER (LIMIT DURATION(hour, 6))

AnomalyDetection_ChangePoint(temperature, 75, 120)
    OVER ([PARTITION BY sensorId] LIMIT DURATION(second, 120))

Exemple en supposant un taux d’entrée uniforme de 1 événement par seconde dans une fenêtre glissante de 20 minutes avec une taille d’historique de 1200 événements. L’instruction SELECT finale extrait et génère le score et l’état d’anomalie avec un niveau de confiance de 80%.

WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

Exemple avec un flux d’entrée non uniforme qui est rendu uniforme à l’aide d’une fenêtre bascule de 1 seconde :

WITH SmootheningStep AS
(
SELECT
System.Timestamp() as time,
AVG(CAST(temperature as float)) as temp
FROM input
GROUP BY TUMBLINGWINDOW(second, 1)
),
AnomalyDetectionStep AS
(
SELECT
time,
temp,
AnomalyDetection_ChangePoint(temp, 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM SmootheningStep
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

Exemple avec une requête partitionnée pour entraîner un modèle distinct par capteur :

WITH AnomalyDetectionStep AS
(
SELECT
sensorid,
System.Timestamp() as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(PARTITION BY sensorid
LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
CAST (sensorid as nvarchar(max)) as sensoridstring,
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep