Compartir a través de


Ejemplos de código de Jupyter Notebook

En este artículo se presentan algunos fragmentos de código de ejemplo que muestran cómo interactuar con los datos de lago de Microsoft Sentinel (versión preliminar) mediante cuadernos de Jupyter Notebook para analizar los datos de seguridad en el lago de datos de Microsoft Sentinel. En estos ejemplos se muestra cómo acceder y analizar datos de varias tablas, como registros de inicio de sesión de Id. de Microsoft Entra, información de grupo y eventos de red de dispositivos. Los fragmentos de código están diseñados para ejecutarse en cuadernos de Jupyter Notebook en Visual Studio Code mediante la extensión microsoft Sentinel.

Para ejecutar estos ejemplos, debe tener instalados los permisos necesarios y Visual Studio Code con la extensión microsoft Sentinel. Para más información, consulte Permisos del lago de datos de Microsoft Sentinel y Uso de Jupyter Notebooks con el lago de datos de Microsoft Sentinel.

Análisis de intentos de inicio de sesión fallidos

En este ejemplo se identifican los usuarios con intentos de inicio de sesión erróneos. Para ello, en este ejemplo de cuaderno se procesan los datos de inicio de sesión de dos tablas:

  • SigninLogs
  • AADNonInteractiveUserSignInLogs

El cuaderno realiza los pasos siguientes:

  1. Cree una función para procesar datos de las tablas especificadas, lo que incluye:
    1. Cargue datos de las tablas especificadas en DataFrames.
    2. Analice el campo JSON "Status" para extraer "errorCode" y determine si cada intento de inicio de sesión fue correcto o erróneo.
    3. Agregue los datos para contar el número de intentos de inicio de sesión erróneos y correctos para cada usuario.
    4. Filtre los datos para incluir solo los usuarios con más de 100 intentos de inicio de sesión erróneos y al menos un intento de inicio de sesión correcto.
    5. Ordene los resultados por el número de intentos de inicio de sesión erróneos.
  2. Llame a la función para las tablas SigninLogs y AADNonInteractiveUserSignInLogs.
  3. Combine los resultados de ambas tablas en un único DataFrame.
  4. Convierta el DataFrame en un DataFrame de Pandas.
  5. Filtre el DataFrame de Pandas para mostrar los 20 primeros usuarios con el mayor número de intentos de inicio de sesión con errores.
  6. Cree un gráfico de barras para visualizar los usuarios con el mayor número de intentos de inicio de sesión erróneos.

Nota:

Este cuaderno de trabajo tarda unos 10 minutos en ejecutarse en el grupo grande, dependiendo del volumen de datos en las tablas de registros

# Import necessary libraries
import matplotlib.pyplot as plt
from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import col, when, count, from_json, desc
from pyspark.sql.types import StructType, StructField, StringType

data_provider = MicrosoftSentinelProvider(spark)

# Function to process data
def process_data(table_name,workspace_name):
    # Load data into DataFrame
    df = data_provider.read_table(table_name, workspace_name)
    
    # Define schema for parsing the 'Status' JSON field
    status_schema = StructType([StructField("errorCode", StringType(), True)])
    # Parse the 'Status' JSON field to extract 'errorCode'
    df = df.withColumn("Status_json", from_json(col("Status"), status_schema)) \
           .withColumn("ResultType", col("Status_json.errorCode"))
    # Define success codes
    success_codes = ["0", "50125", "50140", "70043", "70044"]
    
    # Determine FailureOrSuccess based on ResultType
    df = df.withColumn("FailureOrSuccess", when(col("ResultType").isin(success_codes), "Success").otherwise("Failure"))
    
    # Summarize FailureCount and SuccessCount
    df = df.groupBy("UserPrincipalName", "UserDisplayName", "IPAddress") \
           .agg(count(when(col("FailureOrSuccess") == "Failure", True)).alias("FailureCount"),
                count(when(col("FailureOrSuccess") == "Success", True)).alias("SuccessCount"))
    
    # Filter where FailureCount > 100 and SuccessCount > 0
    df = df.filter((col("FailureCount") > 100) & (col("SuccessCount") > 0))
    
    # Order by FailureCount descending
    df = df.orderBy(desc("FailureCount"))
         
    return df

# Process the tables to a common schema
workspace_name = "your-workspace-name"  # Replace with your actual workspace name
aad_signin = process_data("SigninLogs", workspace_name)
aad_non_int = process_data("AADNonInteractiveUserSignInLogs", workspace_name)

# Union the DataFrames
result_df = aad_signin.unionByName(aad_non_int)

# Show the result
result_df.show()

# Convert the Spark DataFrame to a Pandas DataFrame
result_pd_df = result_df.toPandas()

# Filter to show table with top 20 users with the highest failed sign-ins attempted
top_20_df = result_pd_df.nlargest(20, 'FailureCount')

# Create bar chart to show users by highest failed sign-ins attempted
plt.figure(figsize=(12, 6))
plt.bar(top_20_df['UserDisplayName'], top_20_df['FailureCount'], color='skyblue')
plt.xlabel('Users')
plt.ylabel('Number of Failed sign-ins')
plt.title('Top 20 Users with Failed sign-ins')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()  

En la captura de pantalla siguiente se muestra un ejemplo de la salida del código anterior, en la que se muestran los 20 usuarios principales con el mayor número de intentos de inicio de sesión erróneos en un formato de gráfico de barras.

Captura de pantalla que muestra un gráfico de barras de los usuarios con el mayor número de intentos de inicio de sesión erróneos.

Tabla de grupos Microsoft Entra ID de capa de lago de acceso

En el ejemplo de código siguiente se muestra cómo acceder a la EntraGroups tabla en el lago de datos de Microsoft Sentinel. Muestra varios campos como displayName, , groupTypesmail, mailNickname, , descriptiony tenantId.

from sentinel_lake.providers import MicrosoftSentinelProvider
data_provider = MicrosoftSentinelProvider(spark)
 
table_name = "EntraGroups"  
df = data_provider.read_table(table_name)  
df.select("displayName", "groupTypes", "mail", "mailNickname", "description", "tenantId").show(100, truncate=False)   

En la captura de pantalla siguiente se muestra un ejemplo de la salida del código anterior, que presenta la información del grupo de Microsoft Entra ID en un formato de marco de datos.

Recorte de pantalla que muestra la salida de ejemplo de la tabla del grupo de Microsoft Entra ID.

Acceso a los registros de inicio de sesión de Microsoft Entra ID para un usuario específico

En el ejemplo de código siguiente se muestra cómo acceder a la tabla Microsoft Entra ID SigninLogs y filtrar los resultados de un usuario específico. Recupera varios campos, como UserDisplayName, UserPrincipalName, UserId, etc.

from sentinel_lake.providers import MicrosoftSentinelProvider
data_provider = MicrosoftSentinelProvider(spark)
 
table_name = "SigninLogs"  
workspace_name = "your-workspace-name"  # Replace with your actual workspace name
df = data_provider.read_table(table_name, workspace_name)  
df.select("UserDisplayName", "UserPrincipalName", "UserId", "CorrelationId", "UserType", 
 "ResourceTenantId", "RiskLevelDuringSignIn", "ResourceProvider", "IPAddress", "AppId", "AADTenantId")\
    .filter(df.UserPrincipalName == "bploni5@contoso.com")\
    .show(100, truncate=False) 

Examen de las ubicaciones de inicio de sesión

En el ejemplo de código siguiente se muestra cómo extraer y mostrar ubicaciones de inicio de sesión de la tabla SigninLogs de id. de Microsoft Entra. Usa la from_json función para analizar la estructura JSON del LocationDetails campo, lo que le permite acceder a atributos de ubicación específicos, como ciudad, estado y país o región.

from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import from_json, col  
from pyspark.sql.types import StructType, StructField, StringType  
 
data_provider = MicrosoftSentinelProvider(spark)  
workspace_name = "your-workspace-name"  # Replace with your actual workspace name
table_name = "SigninLogs"  
df = data_provider.read_table(table_name, workspace_name)  
 
location_schema = StructType([  
  StructField("city", StringType(), True),  
  StructField("state", StringType(), True),  
  StructField("countryOrRegion", StringType(), True)  
])  
 
# Extract location details from JSON  
df = df.withColumn("LocationDetails", from_json(col("LocationDetails"), location_schema))  
df = df.select("UserPrincipalName", "CreatedDateTime", "IPAddress", 
 "LocationDetails.city", "LocationDetails.state", "LocationDetails.countryOrRegion")  
 
sign_in_locations_df = df.orderBy("CreatedDateTime", ascending=False)  
sign_in_locations_df.show(100, truncate=False) 

Inicios de sesión desde ubicaciones inusuales

En el ejemplo de código siguiente se muestra cómo identificar inicios de sesión de países que no forman parte del patrón de inicio de sesión típico de un usuario.

from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

data_provider = MicrosoftSentinelProvider(spark)
table_name = "signinlogs"
workspace_name = "your-workspace-name"  # Replace with your actual workspace name
df = data_provider.read_table(table_name, workspace_name)

location_schema = StructType([
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("countryOrRegion", StringType(), True)
])

# Extract location details from JSON
df = df.withColumn("LocationDetails", from_json(col("LocationDetails"), location_schema))
df = df.select(
    "UserPrincipalName",
    "CreatedDateTime",
    "IPAddress",
    "LocationDetails.city",
    "LocationDetails.state",
    "LocationDetails.countryOrRegion"
)

sign_in_locations_df = df.orderBy("CreatedDateTime", ascending=False)
sign_in_locations_df.show(100, truncate=False)

Ataque de fuerza bruta debido a múltiples intentos fallidos de inicio de sesión

Identifique posibles ataques por fuerza bruta mediante el análisis de registros de inicio de sesión de usuario para cuentas con un gran número de intentos de inicio de sesión erróneos.

from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import col, when, count, from_json, desc
from pyspark.sql.types import StructType, StructField, StringType

data_provider = MicrosoftSentinelProvider(spark)

def process_data(table_name, workspace_name):
    df = data_provider.read_table(table_name, workspace_name)
    status_schema = StructType([StructField("errorCode", StringType(), True)])
    df = df.withColumn("Status_json", from_json(col("Status"), status_schema)) \
           .withColumn("ResultType", col("Status_json.errorCode"))
    success_codes = ["0", "50125", "50140", "70043", "70044"]
    df = df.withColumn("FailureOrSuccess", when(col("ResultType").isin(success_codes), "Success").otherwise("Failure"))
    df = df.groupBy("UserPrincipalName", "UserDisplayName", "IPAddress") \
           .agg(count(when(col("FailureOrSuccess") == "Failure", True)).alias("FailureCount"),
                count(when(col("FailureOrSuccess") == "Success", True)).alias("SuccessCount"))
    # Lower the brute force threshold to >10 failures and remove the success requirement
    df = df.filter(col("FailureCount") > 10)
    df = df.orderBy(desc("FailureCount"))
    df = df.withColumn("AccountCustomEntity", col("UserPrincipalName")) \
           .withColumn("IPCustomEntity", col("IPAddress"))
    return df
workspace_name = "your-workspace-name"  # Replace with your actual workspace name
aad_signin = process_data("SigninLogs", workspace_name)
aad_non_int = process_data("AADNonInteractiveUserSignInLogs",workspace_name)
result_df = aad_signin.unionByName(aad_non_int)
result_df.show()

Detección de intentos de movimiento lateral

Use DeviceNetworkEvents para identificar conexiones IP internas sospechosas que puedan indicar movimiento lateral, por ejemplo, tráfico SMB/RDP anómalo entre puntos de conexión.

from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import col, count, countDistinct, desc

deviceNetworkEventTable = "DeviceNetworkEvents"
workspace_name = "<your-workspace-name>"  # Replace with your actual workspace name
data_provider = MicrosoftSentinelProvider(spark)
device_network_events = data_provider.read_table(deviceNetworkEventTable, workspace_name)

# Define internal IP address range (example: 10.x.x.x, 192.168.x.x, 172.16.x.x - 172.31.x.x)
internal_ip_regex = r"^(10\.\d{1,3}\.\d{1,3}\.\d{1,3}|192\.168\.\d{1,3}\.\d{1,3}|172\.(1[6-9]|2[0-9]|3[0-1])\.\d{1,3}\.\d{1,3})$"

# Filter for internal-to-internal connections
internal_connections = device_network_events.filter(
    col("RemoteIP").rlike(internal_ip_regex) &
    col("LocalIP").rlike(internal_ip_regex)
)

# Group by source and destination, count connections
suspicious_lateral = (
    internal_connections.groupBy("LocalIP", "RemoteIP", "InitiatingProcessAccountName")
    .agg(count("*").alias("ConnectionCount"))
    .filter(col("ConnectionCount") > 10)  # Threshold can be adjusted
    .orderBy(desc("ConnectionCount"))
)
suspicious_lateral.show()

Descubrimiento de herramientas de volcado de credenciales

Consulte DeviceProcessEvents para buscar procesos como mimikatz.exe o ejecución inesperada de lsass.exe acceso, lo que podría indicar la recolección de credenciales.

from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import col, lower

workspace_id = "<your-workspace-name>"
device_process_table = "DeviceProcessEvents"

data_provider = MicrosoftSentinelProvider(spark)
process_events = data_provider.read_table(device_process_table, workspace_id)

# Look for known credential dumping tools and suspicious access to lsass.exe
suspicious_processes = process_events.filter(
    (lower(col("FileName")).rlike("mimikatz|procdump|lsassy|nanodump|sekurlsa|dumpert")) |
    (
        (lower(col("FileName")) == "lsass.exe") &
        (~lower(col("InitiatingProcessFileName")).isin(["services.exe", "wininit.exe", "taskmgr.exe"]))
    )
)

suspicious_processes.select(
    "Timestamp",
    "DeviceName",
    "AccountName",
    "FileName",
    "FolderPath",
    "InitiatingProcessFileName",
    "InitiatingProcessCommandLine"
).show(50, truncate=False)

Correlación de la actividad USB con acceso confidencial a archivos

Combine DeviceEvents y DeviceFileEvents en un cuaderno para exponer posibles patrones de filtración de datos. Agregue visualizaciones para mostrar qué dispositivos, usuarios o archivos estaban implicados y cuándo.

from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import col, lower, to_timestamp, expr
import matplotlib.pyplot as plt

data_provider = MicrosoftSentinelProvider(spark)
workspace_id = “<your-workspace-id>”

# Load DeviceEvents and DeviceFileEvents tables
device_events = data_provider.read_table("DeviceEvents", workspace_id)
device_file_events = data_provider.read_table("DeviceFileEvents", workspace_id)
device_info = data_provider.read_table("DeviceInfo", workspace_id)

# Filter for USB device activity (adjust 'ActionType' or 'AdditionalFields' as needed)
usb_events = device_events.filter(
    lower(col("ActionType")).rlike("usb|removable|storage")
)

# Filter for sensitive file access (e.g., files in Documents, Desktop, or with sensitive extensions)
sensitive_file_events = device_file_events.filter(
    lower(col("FolderPath")).rlike("documents|desktop|finance|confidential|secret|sensitive") |
    lower(col("FileName")).rlike(r"\.(docx|xlsx|pdf|csv|zip|7z|rar|pst|bak)$")
)

# Convert timestamps
usb_events = usb_events.withColumn("EventTime", to_timestamp(col("Timestamp")))
sensitive_file_events = sensitive_file_events.withColumn("FileEventTime", to_timestamp(col("Timestamp")))

# Join on DeviceId and time proximity (within 10 minutes) using expr for column operations
joined = usb_events.join(
    sensitive_file_events,
    (usb_events.DeviceId == sensitive_file_events.DeviceId) &
    (expr("abs(unix_timestamp(EventTime) - unix_timestamp(FileEventTime)) <= 600")),
    "inner"
) \
.join(device_info, usb_events.DeviceId == device_info.DeviceId, "inner")


# Select relevant columns
correlated = joined.select(
    device_info.DeviceName,
    usb_events.DeviceId,
    usb_events.AccountName,
    usb_events.EventTime.alias("USBEventTime"),
    sensitive_file_events.FileName,
    sensitive_file_events.FolderPath,
    sensitive_file_events.FileEventTime
)

correlated.show(50, truncate=False)

# Visualization: Number of sensitive file accesses per device
pd_df = correlated.toPandas()
if not pd_df.empty:
    plt.figure(figsize=(12, 6))
    pd_df.groupby('DeviceName').size().sort_values(ascending=False).head(10).plot(kind='bar')
    plt.title('Top Devices with Correlated USB and Sensitive File Access Events')
    plt.xlabel('DeviceName')
    plt.ylabel('Number of Events')
    plt.tight_layout()
    plt.show()
else:
    print("No correlated USB and sensitive file access events found in the selected period.")

Detección de comportamiento de baliza

Detecte posibles comandos y controles mediante la agrupación en clústeres de conexiones salientes normales en volúmenes de bytes bajos durante largas duraciones.

# Setup
from pyspark.sql.functions import col, to_timestamp, window, count, avg, stddev, hour, date_trunc
from sentinel_lake.providers import MicrosoftSentinelProvider 
import matplotlib.pyplot as plt
import pandas as pd

data_provider = MicrosoftSentinelProvider(spark)
device_net_events = "DeviceNetworkEvents"
workspace_id = "<your-workspace-id>"

network_df = data_provider.read_table(device_net_events, workspace_id)

# Add hour bucket to group by frequency
network_df = network_df.withColumn("HourBucket", date_trunc("hour", col("Timestamp")))

# Group by device and IP to count hourly traffic
hourly_traffic = network_df.groupBy("DeviceName", "RemoteIP", "HourBucket") \
    .agg(count("*").alias("ConnectionCount"))

# Count number of hours this IP talks to device
stats_df = hourly_traffic.groupBy("DeviceName", "RemoteIP") \
    .agg(
        count("*").alias("HoursSeen"),
        avg("ConnectionCount").alias("AvgConnPerHour"),
        stddev("ConnectionCount").alias("StdDevConnPerHour")
    )

# Filter beacon-like traffic: low stddev, repeated presence
beacon_candidates = stats_df.filter(
    (col("HoursSeen") > 10) &
    (col("AvgConnPerHour") < 5) &
    (col("StdDevConnPerHour") < 1.0)
)

beacon_candidates.show(truncate=False)

# Choose one Device + IP pair to plot
example = beacon_candidates.limit(1).collect()[0]
example_device = example["DeviceName"]
example_ip = example["RemoteIP"]

# Filter hourly traffic for that pair
example_df = hourly_traffic.filter(
    (col("DeviceName") == example_device) & 
    (col("RemoteIP") == example_ip)
).orderBy("HourBucket")

# Convert to Pandas and plot
example_pd = example_df.toPandas()
example_pd["HourBucket"] = pd.to_datetime(example_pd["HourBucket"])

plt.figure(figsize=(12, 5))
plt.plot(example_pd["HourBucket"], example_pd["ConnectionCount"], marker="o", linestyle="-")
plt.title(f"Outbound Connections – {example_device} to {example_ip}")
plt.xlabel("Time (Hourly)")
plt.ylabel("Connection Count")
plt.grid(True)
plt.tight_layout()
plt.show()