Share via


Search traces programmatically

Search and analyze traces programmatically using mlflow.search_traces().

Quick reference

# Search by status
mlflow.search_traces("attributes.status = 'OK'")
mlflow.search_traces("attributes.status = 'ERROR'")

# Search by time (milliseconds since epoch)
mlflow.search_traces("attributes.timestamp_ms > 1749006880539")
mlflow.search_traces("attributes.execution_time_ms > 5000")

# Search by tags
mlflow.search_traces("tags.environment = 'production'")
mlflow.search_traces("tags.`mlflow.traceName` = 'my_function'")

# Search by metadata
mlflow.search_traces("metadata.`mlflow.user` = 'alice@company.com'")

# Combined filters (AND only)
mlflow.search_traces(
    "attributes.status = 'OK' AND tags.environment = 'production'"
)

Key rules

  • Always use prefixes: attributes., tags., or metadata.
  • Backticks if tag or attribute names have dots**: tags.`mlflow.traceName`
  • Single quotes only: 'value' not "value"
  • Milliseconds for time: 1749006880539 not dates
  • AND only: No OR support

Databricks-specific parameters

The following parameters are specific to Databricks:

  • sql_warehouse_id: Optional Databricks SQL warehouse ID. When specified, trace queries are executed using the specified SQL warehouse for improved performance on large trace datasets.
  • model_id: Optional model ID from the Databricks Model Registry. When specified, searches for traces associated with the given registered model.

SQL Warehouse integration

Execute trace queries using a Databricks SQL warehouse for improved performance on large trace datasets:

# Use SQL warehouse for better performance
traces = mlflow.search_traces(
    filter_string="attributes.status = 'OK'",
    sql_warehouse_id="your-warehouse-id"
)

Model Registry integration

Search for traces associated with registered models in Databricks:

# Find traces for a specific registered model
model_traces = mlflow.search_traces(
    model_id="my-model-123",
    filter_string="attributes.status = 'OK'"
)

# Analyze model performance from traces
print(f"Found {len(model_traces)} successful traces for model")
print(f"Average latency: {model_traces['execution_time_ms'].mean():.2f}ms")

Search examples

Search by status

# Find successful, failed, or in-progress traces
traces = mlflow.search_traces(filter_string="attributes.status = 'OK'")
# Exclude errors
traces = mlflow.search_traces(filter_string="attributes.status != 'ERROR'")

Search by timestamp

import time
from datetime import datetime

# Recent traces (last 5 minutes)
current_time_ms = int(time.time() * 1000)
five_minutes_ago = current_time_ms - (5 * 60 * 1000)
traces = mlflow.search_traces(
    filter_string=f"attributes.timestamp_ms > {five_minutes_ago}"
)

# Date range
start_date = int(datetime(2024, 1, 1).timestamp() * 1000)
end_date = int(datetime(2024, 1, 31).timestamp() * 1000)
traces = mlflow.search_traces(
    filter_string=f"attributes.timestamp_ms > {start_date} AND attributes.timestamp_ms < {end_date}"
)

# Can also use 'timestamp' alias instead of 'timestamp_ms'
traces = mlflow.search_traces(filter_string=f"attributes.timestamp > {five_minutes_ago}")

Search by execution time

# Find slow traces
traces = mlflow.search_traces(filter_string="attributes.execution_time_ms > 5000")

# Performance range
traces = mlflow.search_traces(
    filter_string="attributes.execution_time_ms > 100 AND attributes.execution_time_ms < 1000"
)

# Can also use 'latency' alias instead of 'execution_time_ms'
traces = mlflow.search_traces(filter_string="attributes.latency > 1000")

Search by tags

# Custom tags (set via mlflow.update_current_trace)
traces = mlflow.search_traces(filter_string="tags.customer_id = 'C001'")

# System tags (require backticks for dotted names)
traces = mlflow.search_traces(
    filter_string="tags.`mlflow.traceName` = 'process_chat_request'"
)
traces = mlflow.search_traces(
    filter_string="tags.`mlflow.artifactLocation` != ''"
)

Complex filters

# Recent successful production traces
current_time_ms = int(time.time() * 1000)
one_hour_ago = current_time_ms - (60 * 60 * 1000)

traces = mlflow.search_traces(
    filter_string=f"attributes.status = 'OK' AND "
                 f"attributes.timestamp_ms > {one_hour_ago} AND "
                 f"tags.environment = 'production'"
)

# Fast traces from specific user
traces = mlflow.search_traces(
    filter_string="attributes.execution_time_ms < 100 AND "
                 "metadata.`mlflow.user` = 'alice@company.com'"
)

# Specific function with performance threshold
traces = mlflow.search_traces(
    filter_string="tags.`mlflow.traceName` = 'process_payment' AND "
                 "attributes.execution_time_ms > 1000"
)

Query by context metadata

These examples demonstrate how to search across multiple traces using context metadata like user IDs, sessions, environments, and feature flags. For details on adding context metadata to traces, see Add context to traces.

Analyze user behavior

from mlflow.client import MlflowClient

client = MlflowClient()

def analyze_user_behavior(user_id: str, experiment_id: str):
    """Analyze a specific user's interaction patterns."""

    # Search for all traces from a specific user
    user_traces = client.search_traces(
        experiment_ids=[experiment_id],
        filter_string=f"metadata.`mlflow.trace.user` = '{user_id}'",
        max_results=1000
    )

    # Calculate key metrics
    total_interactions = len(user_traces)
    unique_sessions = len(set(t.info.metadata.get("mlflow.trace.session", "") for t in user_traces))
    avg_response_time = sum(t.info.execution_time_ms for t in user_traces) / total_interactions

    return {
        "total_interactions": total_interactions,
        "unique_sessions": unique_sessions,
        "avg_response_time": avg_response_time
    }

Analyze session flow

def analyze_session_flow(session_id: str, experiment_id: str):
    """Analyze conversation flow within a session."""

    # Get all traces from a session, ordered chronologically
    session_traces = client.search_traces(
        experiment_ids=[experiment_id],
        filter_string=f"metadata.`mlflow.trace.session` = '{session_id}'",
        order_by=["timestamp ASC"]
    )

    # Build a timeline of the conversation
    conversation_turns = []
    for i, trace in enumerate(session_traces):
        conversation_turns.append({
            "turn": i + 1,
            "timestamp": trace.info.timestamp,
            "duration_ms": trace.info.execution_time_ms,
            "status": trace.info.status
        })

    return conversation_turns

Compare error rates across versions

def compare_version_error_rates(experiment_id: str, versions: list):
    """Compare error rates across different app versions in production."""
    error_rates = {}
    for version in versions:
        traces = client.search_traces(
            experiment_ids=[experiment_id],
            filter_string=f"metadata.`mlflow.source.type` = 'production' AND metadata.app_version = '{version}'"
        )
        if not traces:
            error_rates[version] = None # Or 0 if no traces means no errors
            continue

        error_count = sum(1 for t in traces if t.info.status == "ERROR")
        error_rates[version] = (error_count / len(traces)) * 100
    return error_rates

# version_errors = compare_version_error_rates("your_exp_id", ["1.0.0", "1.1.0"])
# print(version_errors)

Analyze feature flag performance

def analyze_feature_flag_performance(experiment_id: str, flag_name: str):
    """Analyze performance differences between feature flag states."""
    control_latency = []
    treatment_latency = []

    control_traces = client.search_traces(
        experiment_ids=[experiment_id],
        filter_string=f"metadata.feature_flag_{flag_name} = 'false'",
    )
    for t in control_traces:
        control_latency.append(t.info.execution_time_ms)

    treatment_traces = client.search_traces(
        experiment_ids=[experiment_id],
        filter_string=f"metadata.feature_flag_{flag_name} = 'true'",
    )
    for t in treatment_traces:
        treatment_latency.append(t.info.execution_time_ms)

    avg_control_latency = sum(control_latency) / len(control_latency) if control_latency else 0
    avg_treatment_latency = sum(treatment_latency) / len(treatment_latency) if treatment_latency else 0

    return {
        f"avg_latency_{flag_name}_off": avg_control_latency,
        f"avg_latency_{flag_name}_on": avg_treatment_latency
    }

# perf_metrics = analyze_feature_flag_performance("your_exp_id", "new_retriever")
# print(perf_metrics)

DataFrame operations

The DataFrame returned by mlflow.search_traces contains these columns:

traces_df = mlflow.search_traces()

# Default columns
print(traces_df.columns)
# ['request_id', 'trace', 'timestamp_ms', 'status', 'execution_time_ms',
#  'request', 'response', 'request_metadata', 'spans', 'tags']

Extract span fields

# Extract specific span fields into DataFrame columns
traces = mlflow.search_traces(
    extract_fields=[
        "process_request.inputs.customer_id",
        "process_request.outputs",
        "validate_input.inputs",
        "generate_response.outputs.message"
    ]
)

# Use extracted fields for evaluation dataset
eval_data = traces.rename(columns={
    "process_request.inputs.customer_id": "customer",
    "generate_response.outputs.message": "ground_truth"
})

Next steps