次の方法で共有


Jupyter Notebook のコード例

この記事では、Jupyter Notebook を使用して Microsoft Sentinel Lake データ (プレビュー) と対話し、Microsoft Sentinel データ レイクのセキュリティ データを分析する方法を示すサンプル コード スニペットをいくつか紹介します。 これらの例は、Microsoft Entra ID サインイン ログ、グループ情報、デバイス ネットワーク イベントなど、さまざまなテーブルのデータにアクセスして分析する方法を示しています。 コード スニペットは、Microsoft Sentinel 拡張機能を使用して Visual Studio Code 内の Jupyter ノートブックで実行するように設計されています。

これらの例を実行するには、必要なアクセス許可と、Microsoft Sentinel 拡張機能と共に Visual Studio Code がインストールされている必要があります。 詳細については、「 Microsoft Sentinel Data Lake のアクセス許可 」および「 Microsoft Sentinel Data Lake で Jupyter Notebook を使用する」を参照してください。

失敗したサインイン試行の分析

この例では、サインイン試行に失敗したユーザーを識別します。 これを行うには、次のノートブックの例で 2 つのテーブルのサインイン データを処理します。

  • SigninLogs
  • AADNonInteractiveUserSignInLogs(AAD非対話型ユーザーサインインログ)

ノートブックは、次の手順を実行します。

  1. 指定したテーブルのデータを処理する関数を作成します。これには次のものが含まれます。
    1. 指定したテーブルから DataFrames にデータを読み込みます。
    2. "Status" JSON フィールドを解析して 'errorCode' を抽出し、各サインイン試行が成功したか失敗したかを判断します。
    3. データを集計して、各ユーザーの失敗したサインイン試行と成功したサインイン試行の数をカウントします。
    4. 100 を超えるサインイン試行が失敗し、少なくとも 1 回のサインイン試行が成功したユーザーのみを含むようにデータをフィルター処理します。
    5. 失敗したサインイン試行回数で結果を並べ替えます。
  2. SigninLogsテーブルとAADNonInteractiveUserSignInLogsテーブルの両方に対して関数を呼び出します。
  3. 両方のテーブルの結果を 1 つの DataFrame に結合します。
  4. DataFrame を Pandas DataFrame に変換します。
  5. Pandas DataFrame をフィルター処理して、失敗したサインイン試行回数が最も多い上位 20 人のユーザーを表示します。
  6. サインイン試行の失敗回数が最も多いユーザーを視覚化する横棒グラフを作成します。

このノートブックは、ログ テーブル内のデータの量に応じて、Large プールで実行するのに約 10 分かかります。

# Import necessary libraries
import matplotlib.pyplot as plt
from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import col, when, count, from_json, desc
from pyspark.sql.types import StructType, StructField, StringType

data_provider = MicrosoftSentinelProvider(spark)

# Function to process data
def process_data(table_name,workspace_name):
    # Load data into DataFrame
    df = data_provider.read_table(table_name, workspace_name)
    
    # Define schema for parsing the 'Status' JSON field
    status_schema = StructType([StructField("errorCode", StringType(), True)])
    # Parse the 'Status' JSON field to extract 'errorCode'
    df = df.withColumn("Status_json", from_json(col("Status"), status_schema)) \
           .withColumn("ResultType", col("Status_json.errorCode"))
    # Define success codes
    success_codes = ["0", "50125", "50140", "70043", "70044"]
    
    # Determine FailureOrSuccess based on ResultType
    df = df.withColumn("FailureOrSuccess", when(col("ResultType").isin(success_codes), "Success").otherwise("Failure"))
    
    # Summarize FailureCount and SuccessCount
    df = df.groupBy("UserPrincipalName", "UserDisplayName", "IPAddress") \
           .agg(count(when(col("FailureOrSuccess") == "Failure", True)).alias("FailureCount"),
                count(when(col("FailureOrSuccess") == "Success", True)).alias("SuccessCount"))
    
    # Filter where FailureCount > 100 and SuccessCount > 0
    df = df.filter((col("FailureCount") > 100) & (col("SuccessCount") > 0))
    
    # Order by FailureCount descending
    df = df.orderBy(desc("FailureCount"))
         
    return df

# Process the tables to a common schema
workspace_name = "your-workspace-name"  # Replace with your actual workspace name
aad_signin = process_data("SigninLogs", workspace_name)
aad_non_int = process_data("AADNonInteractiveUserSignInLogs", workspace_name)

# Union the DataFrames
result_df = aad_signin.unionByName(aad_non_int)

# Show the result
result_df.show()

# Convert the Spark DataFrame to a Pandas DataFrame
result_pd_df = result_df.toPandas()

# Filter to show table with top 20 users with the highest failed sign-ins attempted
top_20_df = result_pd_df.nlargest(20, 'FailureCount')

# Create bar chart to show users by highest failed sign-ins attempted
plt.figure(figsize=(12, 6))
plt.bar(top_20_df['UserDisplayName'], top_20_df['FailureCount'], color='skyblue')
plt.xlabel('Users')
plt.ylabel('Number of Failed sign-ins')
plt.title('Top 20 Users with Failed sign-ins')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()  

次のスクリーンショットは、上のコードの出力のサンプルを示しています。失敗したサインイン試行回数が最も多い上位 20 人のユーザーが横棒グラフ形式で表示されています。

サインイン試行の失敗回数が最も多いユーザーの横棒グラフを示すスクリーンショット。

Access Lake 層の Microsoft Entra ID グループ テーブル

次のコード サンプルは、Microsoft Sentinel データ レイクの EntraGroups テーブルにアクセスする方法を示しています。 displayNamegroupTypesmailmailNicknamedescriptiontenantIdなど、さまざまなフィールドが表示されます。

from sentinel_lake.providers import MicrosoftSentinelProvider
data_provider = MicrosoftSentinelProvider(spark)
 
table_name = "EntraGroups"  
df = data_provider.read_table(table_name)  
df.select("displayName", "groupTypes", "mail", "mailNickname", "description", "tenantId").show(100, truncate=False)   

次のスクリーンショットは、上記のコードの出力のサンプルを示しています。Microsoft Entra ID グループ情報がデータフレーム形式で表示されています。

Microsoft Entra ID グループ テーブルからの出力例を示すスクリーンショット。

特定のユーザーの Microsoft Entra ID サインイン ログにアクセスする

次のコード サンプルは、Microsoft Entra ID SigninLogs テーブルにアクセスし、特定のユーザーの結果をフィルター処理する方法を示しています。 UserDisplayName、UserPrincipalName、UserId などのさまざまなフィールドを取得します。

from sentinel_lake.providers import MicrosoftSentinelProvider
data_provider = MicrosoftSentinelProvider(spark)
 
table_name = "SigninLogs"  
workspace_name = "your-workspace-name"  # Replace with your actual workspace name
df = data_provider.read_table(table_name, workspace_name)  
df.select("UserDisplayName", "UserPrincipalName", "UserId", "CorrelationId", "UserType", 
 "ResourceTenantId", "RiskLevelDuringSignIn", "ResourceProvider", "IPAddress", "AppId", "AADTenantId")\
    .filter(df.UserPrincipalName == "bploni5@contoso.com")\
    .show(100, truncate=False) 

サインインの場所を確認する

次のコード サンプルは、Microsoft Entra ID SigninLogs テーブルからサインイン場所を抽出して表示する方法を示しています。 from_json関数を使用して、LocationDetails フィールドの JSON 構造を解析し、市区町村、都道府県、国や地域などの特定の場所の属性にアクセスできるようにします。

from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import from_json, col  
from pyspark.sql.types import StructType, StructField, StringType  
 
data_provider = MicrosoftSentinelProvider(spark)  
workspace_name = "your-workspace-name"  # Replace with your actual workspace name
table_name = "SigninLogs"  
df = data_provider.read_table(table_name, workspace_name)  
 
location_schema = StructType([  
  StructField("city", StringType(), True),  
  StructField("state", StringType(), True),  
  StructField("countryOrRegion", StringType(), True)  
])  
 
# Extract location details from JSON  
df = df.withColumn("LocationDetails", from_json(col("LocationDetails"), location_schema))  
df = df.select("UserPrincipalName", "CreatedDateTime", "IPAddress", 
 "LocationDetails.city", "LocationDetails.state", "LocationDetails.countryOrRegion")  
 
sign_in_locations_df = df.orderBy("CreatedDateTime", ascending=False)  
sign_in_locations_df.show(100, truncate=False) 

通常とは異なる国からのサインイン

次のコード サンプルは、ユーザーの一般的なサインイン パターンに含まれていない国からのサインインを識別する方法を示しています。

from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

data_provider = MicrosoftSentinelProvider(spark)
table_name = "signinlogs"
workspace_name = "your-workspace-name"  # Replace with your actual workspace name
df = data_provider.read_table(table_name, workspace_name)

location_schema = StructType([
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("countryOrRegion", StringType(), True)
])

# Extract location details from JSON
df = df.withColumn("LocationDetails", from_json(col("LocationDetails"), location_schema))
df = df.select(
    "UserPrincipalName",
    "CreatedDateTime",
    "IPAddress",
    "LocationDetails.city",
    "LocationDetails.state",
    "LocationDetails.countryOrRegion"
)

sign_in_locations_df = df.orderBy("CreatedDateTime", ascending=False)
sign_in_locations_df.show(100, truncate=False)

失敗した複数のサインインからのブルート フォース攻撃

サインイン試行の失敗回数が多いアカウントのユーザー サインイン ログを分析して、ブルート フォース攻撃の可能性を特定します。

from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import col, when, count, from_json, desc
from pyspark.sql.types import StructType, StructField, StringType

data_provider = MicrosoftSentinelProvider(spark)

def process_data(table_name, workspace_name):
    df = data_provider.read_table(table_name, workspace_name)
    status_schema = StructType([StructField("errorCode", StringType(), True)])
    df = df.withColumn("Status_json", from_json(col("Status"), status_schema)) \
           .withColumn("ResultType", col("Status_json.errorCode"))
    success_codes = ["0", "50125", "50140", "70043", "70044"]
    df = df.withColumn("FailureOrSuccess", when(col("ResultType").isin(success_codes), "Success").otherwise("Failure"))
    df = df.groupBy("UserPrincipalName", "UserDisplayName", "IPAddress") \
           .agg(count(when(col("FailureOrSuccess") == "Failure", True)).alias("FailureCount"),
                count(when(col("FailureOrSuccess") == "Success", True)).alias("SuccessCount"))
    # Lower the brute force threshold to >10 failures and remove the success requirement
    df = df.filter(col("FailureCount") > 10)
    df = df.orderBy(desc("FailureCount"))
    df = df.withColumn("AccountCustomEntity", col("UserPrincipalName")) \
           .withColumn("IPCustomEntity", col("IPAddress"))
    return df
workspace_name = "your-workspace-name"  # Replace with your actual workspace name
aad_signin = process_data("SigninLogs", workspace_name)
aad_non_int = process_data("AADNonInteractiveUserSignInLogs",workspace_name)
result_df = aad_signin.unionByName(aad_non_int)
result_df.show()

横移動の試行を検出する

DeviceNetworkEvents を使用して、エンドポイント間の SMB/RDP トラフィックの異常など、横移動を通知する可能性がある疑わしい内部 IP 接続を特定します。

from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import col, count, countDistinct, desc

deviceNetworkEventTable = "DeviceNetworkEvents"
workspace_name = "<your-workspace-name>"  # Replace with your actual workspace name
data_provider = MicrosoftSentinelProvider(spark)
device_network_events = data_provider.read_table(deviceNetworkEventTable, workspace_name)

# Define internal IP address range (example: 10.x.x.x, 192.168.x.x, 172.16.x.x - 172.31.x.x)
internal_ip_regex = r"^(10\.\d{1,3}\.\d{1,3}\.\d{1,3}|192\.168\.\d{1,3}\.\d{1,3}|172\.(1[6-9]|2[0-9]|3[0-1])\.\d{1,3}\.\d{1,3})$"

# Filter for internal-to-internal connections
internal_connections = device_network_events.filter(
    col("RemoteIP").rlike(internal_ip_regex) &
    col("LocalIP").rlike(internal_ip_regex)
)

# Group by source and destination, count connections
suspicious_lateral = (
    internal_connections.groupBy("LocalIP", "RemoteIP", "InitiatingProcessAccountName")
    .agg(count("*").alias("ConnectionCount"))
    .filter(col("ConnectionCount") > 10)  # Threshold can be adjusted
    .orderBy(desc("ConnectionCount"))
)
suspicious_lateral.show()

資格情報ダンプ ツールを見つけ出す

DeviceProcessEvents にクエリを実行して、mimikatz.exe や lsass.exe アクセスの予期しない実行などのプロセスを検索します。これは、資格情報の収集を示している可能性があります。

from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import col, lower

workspace_id = "<your-workspace-name>"
device_process_table = "DeviceProcessEvents"

data_provider = MicrosoftSentinelProvider(spark)
process_events = data_provider.read_table(device_process_table, workspace_id)

# Look for known credential dumping tools and suspicious access to lsass.exe
suspicious_processes = process_events.filter(
    (lower(col("FileName")).rlike("mimikatz|procdump|lsassy|nanodump|sekurlsa|dumpert")) |
    (
        (lower(col("FileName")) == "lsass.exe") &
        (~lower(col("InitiatingProcessFileName")).isin(["services.exe", "wininit.exe", "taskmgr.exe"]))
    )
)

suspicious_processes.select(
    "Timestamp",
    "DeviceName",
    "AccountName",
    "FileName",
    "FolderPath",
    "InitiatingProcessFileName",
    "InitiatingProcessCommandLine"
).show(50, truncate=False)

USB アクティビティと機密性の高いファイル アクセスの相関関係

ノートブックで DeviceEvents と DeviceFileEvents を組み合わせて、潜在的なデータ流出パターンを表示します。 視覚エフェクトを追加して、どのデバイス、ユーザー、またはファイルがいつ関係したかを示します。

from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import col, lower, to_timestamp, expr
import matplotlib.pyplot as plt

data_provider = MicrosoftSentinelProvider(spark)
workspace_id = “<your-workspace-id>”

# Load DeviceEvents and DeviceFileEvents tables
device_events = data_provider.read_table("DeviceEvents", workspace_id)
device_file_events = data_provider.read_table("DeviceFileEvents", workspace_id)
device_info = data_provider.read_table("DeviceInfo", workspace_id)

# Filter for USB device activity (adjust 'ActionType' or 'AdditionalFields' as needed)
usb_events = device_events.filter(
    lower(col("ActionType")).rlike("usb|removable|storage")
)

# Filter for sensitive file access (e.g., files in Documents, Desktop, or with sensitive extensions)
sensitive_file_events = device_file_events.filter(
    lower(col("FolderPath")).rlike("documents|desktop|finance|confidential|secret|sensitive") |
    lower(col("FileName")).rlike(r"\.(docx|xlsx|pdf|csv|zip|7z|rar|pst|bak)$")
)

# Convert timestamps
usb_events = usb_events.withColumn("EventTime", to_timestamp(col("Timestamp")))
sensitive_file_events = sensitive_file_events.withColumn("FileEventTime", to_timestamp(col("Timestamp")))

# Join on DeviceId and time proximity (within 10 minutes) using expr for column operations
joined = usb_events.join(
    sensitive_file_events,
    (usb_events.DeviceId == sensitive_file_events.DeviceId) &
    (expr("abs(unix_timestamp(EventTime) - unix_timestamp(FileEventTime)) <= 600")),
    "inner"
) \
.join(device_info, usb_events.DeviceId == device_info.DeviceId, "inner")


# Select relevant columns
correlated = joined.select(
    device_info.DeviceName,
    usb_events.DeviceId,
    usb_events.AccountName,
    usb_events.EventTime.alias("USBEventTime"),
    sensitive_file_events.FileName,
    sensitive_file_events.FolderPath,
    sensitive_file_events.FileEventTime
)

correlated.show(50, truncate=False)

# Visualization: Number of sensitive file accesses per device
pd_df = correlated.toPandas()
if not pd_df.empty:
    plt.figure(figsize=(12, 6))
    pd_df.groupby('DeviceName').size().sort_values(ascending=False).head(10).plot(kind='bar')
    plt.title('Top Devices with Correlated USB and Sensitive File Access Events')
    plt.xlabel('DeviceName')
    plt.ylabel('Number of Events')
    plt.tight_layout()
    plt.show()
else:
    print("No correlated USB and sensitive file access events found in the selected period.")

ビーコン動作の検出

長時間にわたって低バイト ボリュームで定期的な送信接続をクラスタリングすることで、潜在的なコマンドアンドコントロールを検出します。

# Setup
from pyspark.sql.functions import col, to_timestamp, window, count, avg, stddev, hour, date_trunc
from sentinel_lake.providers import MicrosoftSentinelProvider 
import matplotlib.pyplot as plt
import pandas as pd

data_provider = MicrosoftSentinelProvider(spark)
device_net_events = "DeviceNetworkEvents"
workspace_id = "<your-workspace-id>"

network_df = data_provider.read_table(device_net_events, workspace_id)

# Add hour bucket to group by frequency
network_df = network_df.withColumn("HourBucket", date_trunc("hour", col("Timestamp")))

# Group by device and IP to count hourly traffic
hourly_traffic = network_df.groupBy("DeviceName", "RemoteIP", "HourBucket") \
    .agg(count("*").alias("ConnectionCount"))

# Count number of hours this IP talks to device
stats_df = hourly_traffic.groupBy("DeviceName", "RemoteIP") \
    .agg(
        count("*").alias("HoursSeen"),
        avg("ConnectionCount").alias("AvgConnPerHour"),
        stddev("ConnectionCount").alias("StdDevConnPerHour")
    )

# Filter beacon-like traffic: low stddev, repeated presence
beacon_candidates = stats_df.filter(
    (col("HoursSeen") > 10) &
    (col("AvgConnPerHour") < 5) &
    (col("StdDevConnPerHour") < 1.0)
)

beacon_candidates.show(truncate=False)

# Choose one Device + IP pair to plot
example = beacon_candidates.limit(1).collect()[0]
example_device = example["DeviceName"]
example_ip = example["RemoteIP"]

# Filter hourly traffic for that pair
example_df = hourly_traffic.filter(
    (col("DeviceName") == example_device) & 
    (col("RemoteIP") == example_ip)
).orderBy("HourBucket")

# Convert to Pandas and plot
example_pd = example_df.toPandas()
example_pd["HourBucket"] = pd.to_datetime(example_pd["HourBucket"])

plt.figure(figsize=(12, 5))
plt.plot(example_pd["HourBucket"], example_pd["ConnectionCount"], marker="o", linestyle="-")
plt.title(f"Outbound Connections – {example_device} to {example_ip}")
plt.xlabel("Time (Hourly)")
plt.ylabel("Connection Count")
plt.grid(True)
plt.tight_layout()
plt.show()