Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Este artigo apresenta alguns trechos de código de exemplo que demonstram como interagir com o lago de dados do Microsoft Sentinel (pré-visualização) usando blocos de anotações Jupyter para analisar dados de segurança no lago de dados do Microsoft Sentinel. Estes exemplos ilustram como aceder e analisar dados de várias tabelas, tais como registos de início de sessão do Microsoft Entra ID, informações de grupo e eventos de rede de dispositivos. Os trechos de código são projetados para serem executados em blocos de anotações Jupyter dentro do Visual Studio Code usando a extensão Microsoft Sentinel.
Para executar esses exemplos, deve ter as permissões necessárias e o Visual Studio Code instalado com a extensão Microsoft Sentinel. Para obter mais informações, consulte Permissões do data lake do Microsoft Sentinel e Utilizar cadernos Jupyter com o data lake do Microsoft Sentinel.
Análise de tentativas de início de sessão falhadas
Este exemplo identifica usuários com tentativas de entrada com falha. Para fazer isso, este exemplo de bloco de anotações processa dados de entrada de duas tabelas:
- SigninLogs
- Registos de Entrada de Utilizador Não-Interativo do AAD
O bloco de anotações executa as seguintes etapas:
- Crie uma função para processar dados das tabelas especificadas, que inclui:
- Carregue dados das tabelas especificadas em DataFrames.
- Analise o campo JSON 'Status' para extrair 'errorCode' e determine se cada tentativa de entrada foi um sucesso ou uma falha.
- Agregue os dados para contar o número de tentativas de entrada falhadas e bem-sucedidas para cada usuário.
- Filtre os dados para incluir apenas utilizadores com mais de 100 tentativas de início de sessão falhadas e pelo menos uma tentativa de início de sessão bem-sucedida.
- Organize os resultados por número de tentativas de iniciar sessão falhadas.
- Chame a função para as tabelas
SigninLogseAADNonInteractiveUserSignInLogs. - Combine os resultados de ambas as tabelas em um único DataFrame.
- Converta o DataFrame em um Pandas DataFrame.
- Filtre o Pandas DataFrame para mostrar os 20 principais utilizadores com o maior número de tentativas de início de sessão falhadas.
- Crie um gráfico de barras para visualizar os utilizadores com o maior número de tentativas de início de sessão falhadas.
Observação
Este notebook leva cerca de 10 minutos para ser executado no Large pool, dependendo do volume de dados nas tabelas de logs.
# Import necessary libraries
import matplotlib.pyplot as plt
from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import col, when, count, from_json, desc
from pyspark.sql.types import StructType, StructField, StringType
data_provider = MicrosoftSentinelProvider(spark)
# Function to process data
def process_data(table_name,workspace_name):
# Load data into DataFrame
df = data_provider.read_table(table_name, workspace_name)
# Define schema for parsing the 'Status' JSON field
status_schema = StructType([StructField("errorCode", StringType(), True)])
# Parse the 'Status' JSON field to extract 'errorCode'
df = df.withColumn("Status_json", from_json(col("Status"), status_schema)) \
.withColumn("ResultType", col("Status_json.errorCode"))
# Define success codes
success_codes = ["0", "50125", "50140", "70043", "70044"]
# Determine FailureOrSuccess based on ResultType
df = df.withColumn("FailureOrSuccess", when(col("ResultType").isin(success_codes), "Success").otherwise("Failure"))
# Summarize FailureCount and SuccessCount
df = df.groupBy("UserPrincipalName", "UserDisplayName", "IPAddress") \
.agg(count(when(col("FailureOrSuccess") == "Failure", True)).alias("FailureCount"),
count(when(col("FailureOrSuccess") == "Success", True)).alias("SuccessCount"))
# Filter where FailureCount > 100 and SuccessCount > 0
df = df.filter((col("FailureCount") > 100) & (col("SuccessCount") > 0))
# Order by FailureCount descending
df = df.orderBy(desc("FailureCount"))
return df
# Process the tables to a common schema
workspace_name = "your-workspace-name" # Replace with your actual workspace name
aad_signin = process_data("SigninLogs", workspace_name)
aad_non_int = process_data("AADNonInteractiveUserSignInLogs", workspace_name)
# Union the DataFrames
result_df = aad_signin.unionByName(aad_non_int)
# Show the result
result_df.show()
# Convert the Spark DataFrame to a Pandas DataFrame
result_pd_df = result_df.toPandas()
# Filter to show table with top 20 users with the highest failed sign-ins attempted
top_20_df = result_pd_df.nlargest(20, 'FailureCount')
# Create bar chart to show users by highest failed sign-ins attempted
plt.figure(figsize=(12, 6))
plt.bar(top_20_df['UserDisplayName'], top_20_df['FailureCount'], color='skyblue')
plt.xlabel('Users')
plt.ylabel('Number of Failed sign-ins')
plt.title('Top 20 Users with Failed sign-ins')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()
A captura de ecrã a seguir mostra uma amostra da saída do código acima, exibindo os 20 principais utilizadores com o maior número de tentativas falhadas de início de sessão em um formato de gráfico de barras.
Tabela do Grupo do Microsoft Entra ID da camada de acesso ao lago
O exemplo de código a seguir demonstra como acessar a EntraGroups tabela no data lake do Microsoft Sentinel. Ele exibe vários campos, como displayName, groupTypes, mail, mailNickname, descriptione tenantId.
from sentinel_lake.providers import MicrosoftSentinelProvider
data_provider = MicrosoftSentinelProvider(spark)
table_name = "EntraGroups"
df = data_provider.read_table(table_name)
df.select("displayName", "groupTypes", "mail", "mailNickname", "description", "tenantId").show(100, truncate=False)
A captura de tela a seguir mostra um exemplo da saída do código acima, exibindo as informações do grupo Microsoft Entra ID em um formato de dataframe.
Aceder aos registos de início de sessão do Microsoft Entra ID para um utilizador específico
O exemplo de código a seguir demonstra como acessar a tabela de ID SigninLogs do Microsoft Entra e filtrar os resultados para um usuário específico. Ele recupera vários campos, como UserDisplayName, UserPrincipalName, UserId e muito mais.
from sentinel_lake.providers import MicrosoftSentinelProvider
data_provider = MicrosoftSentinelProvider(spark)
table_name = "SigninLogs"
workspace_name = "your-workspace-name" # Replace with your actual workspace name
df = data_provider.read_table(table_name, workspace_name)
df.select("UserDisplayName", "UserPrincipalName", "UserId", "CorrelationId", "UserType",
"ResourceTenantId", "RiskLevelDuringSignIn", "ResourceProvider", "IPAddress", "AppId", "AADTenantId")\
.filter(df.UserPrincipalName == "bploni5@contoso.com")\
.show(100, truncate=False)
Examinar locais de entrada
O exemplo de código a seguir demonstra como extrair e exibir locais de entrada da tabela Microsoft Entra ID SigninLogs. Ele usa a from_json função para analisar a estrutura JSON do campo, permitindo que você acesse atributos de LocationDetails localização específicos, como cidade, estado e país ou região.
from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
data_provider = MicrosoftSentinelProvider(spark)
workspace_name = "your-workspace-name" # Replace with your actual workspace name
table_name = "SigninLogs"
df = data_provider.read_table(table_name, workspace_name)
location_schema = StructType([
StructField("city", StringType(), True),
StructField("state", StringType(), True),
StructField("countryOrRegion", StringType(), True)
])
# Extract location details from JSON
df = df.withColumn("LocationDetails", from_json(col("LocationDetails"), location_schema))
df = df.select("UserPrincipalName", "CreatedDateTime", "IPAddress",
"LocationDetails.city", "LocationDetails.state", "LocationDetails.countryOrRegion")
sign_in_locations_df = df.orderBy("CreatedDateTime", ascending=False)
sign_in_locations_df.show(100, truncate=False)
Entradas de países incomuns
O exemplo de código a seguir demonstra como identificar entradas de países que não fazem parte do padrão de entrada típico de um usuário.
from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
data_provider = MicrosoftSentinelProvider(spark)
table_name = "signinlogs"
workspace_name = "your-workspace-name" # Replace with your actual workspace name
df = data_provider.read_table(table_name, workspace_name)
location_schema = StructType([
StructField("city", StringType(), True),
StructField("state", StringType(), True),
StructField("countryOrRegion", StringType(), True)
])
# Extract location details from JSON
df = df.withColumn("LocationDetails", from_json(col("LocationDetails"), location_schema))
df = df.select(
"UserPrincipalName",
"CreatedDateTime",
"IPAddress",
"LocationDetails.city",
"LocationDetails.state",
"LocationDetails.countryOrRegion"
)
sign_in_locations_df = df.orderBy("CreatedDateTime", ascending=False)
sign_in_locations_df.show(100, truncate=False)
Ataque de força bruta a partir de vários logins com falha
Identifique possíveis ataques de força bruta analisando os logs de entrada do usuário para contas com um alto número de tentativas de entrada com falha.
from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import col, when, count, from_json, desc
from pyspark.sql.types import StructType, StructField, StringType
data_provider = MicrosoftSentinelProvider(spark)
def process_data(table_name, workspace_name):
df = data_provider.read_table(table_name, workspace_name)
status_schema = StructType([StructField("errorCode", StringType(), True)])
df = df.withColumn("Status_json", from_json(col("Status"), status_schema)) \
.withColumn("ResultType", col("Status_json.errorCode"))
success_codes = ["0", "50125", "50140", "70043", "70044"]
df = df.withColumn("FailureOrSuccess", when(col("ResultType").isin(success_codes), "Success").otherwise("Failure"))
df = df.groupBy("UserPrincipalName", "UserDisplayName", "IPAddress") \
.agg(count(when(col("FailureOrSuccess") == "Failure", True)).alias("FailureCount"),
count(when(col("FailureOrSuccess") == "Success", True)).alias("SuccessCount"))
# Lower the brute force threshold to >10 failures and remove the success requirement
df = df.filter(col("FailureCount") > 10)
df = df.orderBy(desc("FailureCount"))
df = df.withColumn("AccountCustomEntity", col("UserPrincipalName")) \
.withColumn("IPCustomEntity", col("IPAddress"))
return df
workspace_name = "your-workspace-name" # Replace with your actual workspace name
aad_signin = process_data("SigninLogs", workspace_name)
aad_non_int = process_data("AADNonInteractiveUserSignInLogs",workspace_name)
result_df = aad_signin.unionByName(aad_non_int)
result_df.show()
Detetar tentativas de movimento lateral
Utilizar DeviceNetworkEvents para identificar conexões IP internas suspeitas que podem sinalizar movimento lateral, por exemplo, tráfego SMB/RDP anormal entre terminais.
from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import col, count, countDistinct, desc
deviceNetworkEventTable = "DeviceNetworkEvents"
workspace_name = "<your-workspace-name>" # Replace with your actual workspace name
data_provider = MicrosoftSentinelProvider(spark)
device_network_events = data_provider.read_table(deviceNetworkEventTable, workspace_name)
# Define internal IP address range (example: 10.x.x.x, 192.168.x.x, 172.16.x.x - 172.31.x.x)
internal_ip_regex = r"^(10\.\d{1,3}\.\d{1,3}\.\d{1,3}|192\.168\.\d{1,3}\.\d{1,3}|172\.(1[6-9]|2[0-9]|3[0-1])\.\d{1,3}\.\d{1,3})$"
# Filter for internal-to-internal connections
internal_connections = device_network_events.filter(
col("RemoteIP").rlike(internal_ip_regex) &
col("LocalIP").rlike(internal_ip_regex)
)
# Group by source and destination, count connections
suspicious_lateral = (
internal_connections.groupBy("LocalIP", "RemoteIP", "InitiatingProcessAccountName")
.agg(count("*").alias("ConnectionCount"))
.filter(col("ConnectionCount") > 10) # Threshold can be adjusted
.orderBy(desc("ConnectionCount"))
)
suspicious_lateral.show()
Descubra ferramentas para extração de credenciais
Consultar DeviceProcessEvents para localizar processos como mimikatz.exe ou execução inesperada de acesso lsass.exe, o que pode indicar a captura de credenciais.
from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import col, lower
workspace_id = "<your-workspace-name>"
device_process_table = "DeviceProcessEvents"
data_provider = MicrosoftSentinelProvider(spark)
process_events = data_provider.read_table(device_process_table, workspace_id)
# Look for known credential dumping tools and suspicious access to lsass.exe
suspicious_processes = process_events.filter(
(lower(col("FileName")).rlike("mimikatz|procdump|lsassy|nanodump|sekurlsa|dumpert")) |
(
(lower(col("FileName")) == "lsass.exe") &
(~lower(col("InitiatingProcessFileName")).isin(["services.exe", "wininit.exe", "taskmgr.exe"]))
)
)
suspicious_processes.select(
"Timestamp",
"DeviceName",
"AccountName",
"FileName",
"FolderPath",
"InitiatingProcessFileName",
"InitiatingProcessCommandLine"
).show(50, truncate=False)
Correlação da atividade USB com o acesso sensível a arquivos
Combine DeviceEvents e DeviceFileEvents em um bloco de anotações para revelar possíveis padrões de exfiltração de dados. Adicione visualizações para mostrar quais dispositivos, usuários ou arquivos estavam envolvidos e quando.
from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import col, lower, to_timestamp, expr
import matplotlib.pyplot as plt
data_provider = MicrosoftSentinelProvider(spark)
workspace_id = “<your-workspace-id>”
# Load DeviceEvents and DeviceFileEvents tables
device_events = data_provider.read_table("DeviceEvents", workspace_id)
device_file_events = data_provider.read_table("DeviceFileEvents", workspace_id)
device_info = data_provider.read_table("DeviceInfo", workspace_id)
# Filter for USB device activity (adjust 'ActionType' or 'AdditionalFields' as needed)
usb_events = device_events.filter(
lower(col("ActionType")).rlike("usb|removable|storage")
)
# Filter for sensitive file access (e.g., files in Documents, Desktop, or with sensitive extensions)
sensitive_file_events = device_file_events.filter(
lower(col("FolderPath")).rlike("documents|desktop|finance|confidential|secret|sensitive") |
lower(col("FileName")).rlike(r"\.(docx|xlsx|pdf|csv|zip|7z|rar|pst|bak)$")
)
# Convert timestamps
usb_events = usb_events.withColumn("EventTime", to_timestamp(col("Timestamp")))
sensitive_file_events = sensitive_file_events.withColumn("FileEventTime", to_timestamp(col("Timestamp")))
# Join on DeviceId and time proximity (within 10 minutes) using expr for column operations
joined = usb_events.join(
sensitive_file_events,
(usb_events.DeviceId == sensitive_file_events.DeviceId) &
(expr("abs(unix_timestamp(EventTime) - unix_timestamp(FileEventTime)) <= 600")),
"inner"
) \
.join(device_info, usb_events.DeviceId == device_info.DeviceId, "inner")
# Select relevant columns
correlated = joined.select(
device_info.DeviceName,
usb_events.DeviceId,
usb_events.AccountName,
usb_events.EventTime.alias("USBEventTime"),
sensitive_file_events.FileName,
sensitive_file_events.FolderPath,
sensitive_file_events.FileEventTime
)
correlated.show(50, truncate=False)
# Visualization: Number of sensitive file accesses per device
pd_df = correlated.toPandas()
if not pd_df.empty:
plt.figure(figsize=(12, 6))
pd_df.groupby('DeviceName').size().sort_values(ascending=False).head(10).plot(kind='bar')
plt.title('Top Devices with Correlated USB and Sensitive File Access Events')
plt.xlabel('DeviceName')
plt.ylabel('Number of Events')
plt.tight_layout()
plt.show()
else:
print("No correlated USB and sensitive file access events found in the selected period.")
Deteção de comportamento de beacon
Detete possíveis comandos e controles agrupando conexões de saída regulares em volumes de bytes baixos durante longas durações.
# Setup
from pyspark.sql.functions import col, to_timestamp, window, count, avg, stddev, hour, date_trunc
from sentinel_lake.providers import MicrosoftSentinelProvider
import matplotlib.pyplot as plt
import pandas as pd
data_provider = MicrosoftSentinelProvider(spark)
device_net_events = "DeviceNetworkEvents"
workspace_id = "<your-workspace-id>"
network_df = data_provider.read_table(device_net_events, workspace_id)
# Add hour bucket to group by frequency
network_df = network_df.withColumn("HourBucket", date_trunc("hour", col("Timestamp")))
# Group by device and IP to count hourly traffic
hourly_traffic = network_df.groupBy("DeviceName", "RemoteIP", "HourBucket") \
.agg(count("*").alias("ConnectionCount"))
# Count number of hours this IP talks to device
stats_df = hourly_traffic.groupBy("DeviceName", "RemoteIP") \
.agg(
count("*").alias("HoursSeen"),
avg("ConnectionCount").alias("AvgConnPerHour"),
stddev("ConnectionCount").alias("StdDevConnPerHour")
)
# Filter beacon-like traffic: low stddev, repeated presence
beacon_candidates = stats_df.filter(
(col("HoursSeen") > 10) &
(col("AvgConnPerHour") < 5) &
(col("StdDevConnPerHour") < 1.0)
)
beacon_candidates.show(truncate=False)
# Choose one Device + IP pair to plot
example = beacon_candidates.limit(1).collect()[0]
example_device = example["DeviceName"]
example_ip = example["RemoteIP"]
# Filter hourly traffic for that pair
example_df = hourly_traffic.filter(
(col("DeviceName") == example_device) &
(col("RemoteIP") == example_ip)
).orderBy("HourBucket")
# Convert to Pandas and plot
example_pd = example_df.toPandas()
example_pd["HourBucket"] = pd.to_datetime(example_pd["HourBucket"])
plt.figure(figsize=(12, 5))
plt.plot(example_pd["HourBucket"], example_pd["ConnectionCount"], marker="o", linestyle="-")
plt.title(f"Outbound Connections – {example_device} to {example_ip}")
plt.xlabel("Time (Hourly)")
plt.ylabel("Connection Count")
plt.grid(True)
plt.tight_layout()
plt.show()
Conteúdo relacionado
- Referência de classe do Microsoft Sentinel Provider (visualização)
- Visão geral do data lake Microsoft Sentinel (pré-visualização)
- Permissões do lago de dados do Microsoft Sentinel (visualização)
- Explore o lago de dados do Microsoft Sentinel usando notebooks Jupyter (visualização)
- Notebooks Jupyter e o data lake do Microsoft Sentinel (preview)