Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Search and analyze traces programmatically using mlflow.search_traces().
Quick reference
# Search by status
mlflow.search_traces("attributes.status = 'OK'")
mlflow.search_traces("attributes.status = 'ERROR'")
# Search by time (milliseconds since epoch)
mlflow.search_traces("attributes.timestamp_ms > 1749006880539")
mlflow.search_traces("attributes.execution_time_ms > 5000")
# Search by tags
mlflow.search_traces("tags.environment = 'production'")
mlflow.search_traces("tags.`mlflow.traceName` = 'my_function'")
# Search by metadata
mlflow.search_traces("metadata.`mlflow.user` = 'alice@company.com'")
# Combined filters (AND only)
mlflow.search_traces(
"attributes.status = 'OK' AND tags.environment = 'production'"
)
Key rules
- Always use prefixes:
attributes.,tags., ormetadata. - Backticks if tag or attribute names have dots**:
tags.`mlflow.traceName` - Single quotes only:
'value'not"value" - Milliseconds for time:
1749006880539not dates - AND only: No OR support
Databricks-specific parameters
The following parameters are specific to Databricks:
sql_warehouse_id: Optional Databricks SQL warehouse ID. When specified, trace queries are executed using the specified SQL warehouse for improved performance on large trace datasets.model_id: Optional model ID from the Databricks Model Registry. When specified, searches for traces associated with the given registered model.
SQL Warehouse integration
Execute trace queries using a Databricks SQL warehouse for improved performance on large trace datasets:
# Use SQL warehouse for better performance
traces = mlflow.search_traces(
filter_string="attributes.status = 'OK'",
sql_warehouse_id="your-warehouse-id"
)
Model Registry integration
Search for traces associated with registered models in Databricks:
# Find traces for a specific registered model
model_traces = mlflow.search_traces(
model_id="my-model-123",
filter_string="attributes.status = 'OK'"
)
# Analyze model performance from traces
print(f"Found {len(model_traces)} successful traces for model")
print(f"Average latency: {model_traces['execution_time_ms'].mean():.2f}ms")
Search examples
Search by status
# Find successful, failed, or in-progress traces
traces = mlflow.search_traces(filter_string="attributes.status = 'OK'")
# Exclude errors
traces = mlflow.search_traces(filter_string="attributes.status != 'ERROR'")
Search by timestamp
import time
from datetime import datetime
# Recent traces (last 5 minutes)
current_time_ms = int(time.time() * 1000)
five_minutes_ago = current_time_ms - (5 * 60 * 1000)
traces = mlflow.search_traces(
filter_string=f"attributes.timestamp_ms > {five_minutes_ago}"
)
# Date range
start_date = int(datetime(2024, 1, 1).timestamp() * 1000)
end_date = int(datetime(2024, 1, 31).timestamp() * 1000)
traces = mlflow.search_traces(
filter_string=f"attributes.timestamp_ms > {start_date} AND attributes.timestamp_ms < {end_date}"
)
# Can also use 'timestamp' alias instead of 'timestamp_ms'
traces = mlflow.search_traces(filter_string=f"attributes.timestamp > {five_minutes_ago}")
Search by execution time
# Find slow traces
traces = mlflow.search_traces(filter_string="attributes.execution_time_ms > 5000")
# Performance range
traces = mlflow.search_traces(
filter_string="attributes.execution_time_ms > 100 AND attributes.execution_time_ms < 1000"
)
# Can also use 'latency' alias instead of 'execution_time_ms'
traces = mlflow.search_traces(filter_string="attributes.latency > 1000")
Search by tags
# Custom tags (set via mlflow.update_current_trace)
traces = mlflow.search_traces(filter_string="tags.customer_id = 'C001'")
# System tags (require backticks for dotted names)
traces = mlflow.search_traces(
filter_string="tags.`mlflow.traceName` = 'process_chat_request'"
)
traces = mlflow.search_traces(
filter_string="tags.`mlflow.artifactLocation` != ''"
)
Complex filters
# Recent successful production traces
current_time_ms = int(time.time() * 1000)
one_hour_ago = current_time_ms - (60 * 60 * 1000)
traces = mlflow.search_traces(
filter_string=f"attributes.status = 'OK' AND "
f"attributes.timestamp_ms > {one_hour_ago} AND "
f"tags.environment = 'production'"
)
# Fast traces from specific user
traces = mlflow.search_traces(
filter_string="attributes.execution_time_ms < 100 AND "
"metadata.`mlflow.user` = 'alice@company.com'"
)
# Specific function with performance threshold
traces = mlflow.search_traces(
filter_string="tags.`mlflow.traceName` = 'process_payment' AND "
"attributes.execution_time_ms > 1000"
)
Query by context metadata
These examples demonstrate how to search across multiple traces using context metadata like user IDs, sessions, environments, and feature flags. For details on adding context metadata to traces, see Add context to traces.
Analyze user behavior
from mlflow.client import MlflowClient
client = MlflowClient()
def analyze_user_behavior(user_id: str, experiment_id: str):
"""Analyze a specific user's interaction patterns."""
# Search for all traces from a specific user
user_traces = client.search_traces(
experiment_ids=[experiment_id],
filter_string=f"metadata.`mlflow.trace.user` = '{user_id}'",
max_results=1000
)
# Calculate key metrics
total_interactions = len(user_traces)
unique_sessions = len(set(t.info.metadata.get("mlflow.trace.session", "") for t in user_traces))
avg_response_time = sum(t.info.execution_time_ms for t in user_traces) / total_interactions
return {
"total_interactions": total_interactions,
"unique_sessions": unique_sessions,
"avg_response_time": avg_response_time
}
Analyze session flow
def analyze_session_flow(session_id: str, experiment_id: str):
"""Analyze conversation flow within a session."""
# Get all traces from a session, ordered chronologically
session_traces = client.search_traces(
experiment_ids=[experiment_id],
filter_string=f"metadata.`mlflow.trace.session` = '{session_id}'",
order_by=["timestamp ASC"]
)
# Build a timeline of the conversation
conversation_turns = []
for i, trace in enumerate(session_traces):
conversation_turns.append({
"turn": i + 1,
"timestamp": trace.info.timestamp,
"duration_ms": trace.info.execution_time_ms,
"status": trace.info.status
})
return conversation_turns
Compare error rates across versions
def compare_version_error_rates(experiment_id: str, versions: list):
"""Compare error rates across different app versions in production."""
error_rates = {}
for version in versions:
traces = client.search_traces(
experiment_ids=[experiment_id],
filter_string=f"metadata.`mlflow.source.type` = 'production' AND metadata.app_version = '{version}'"
)
if not traces:
error_rates[version] = None # Or 0 if no traces means no errors
continue
error_count = sum(1 for t in traces if t.info.status == "ERROR")
error_rates[version] = (error_count / len(traces)) * 100
return error_rates
# version_errors = compare_version_error_rates("your_exp_id", ["1.0.0", "1.1.0"])
# print(version_errors)
Analyze feature flag performance
def analyze_feature_flag_performance(experiment_id: str, flag_name: str):
"""Analyze performance differences between feature flag states."""
control_latency = []
treatment_latency = []
control_traces = client.search_traces(
experiment_ids=[experiment_id],
filter_string=f"metadata.feature_flag_{flag_name} = 'false'",
)
for t in control_traces:
control_latency.append(t.info.execution_time_ms)
treatment_traces = client.search_traces(
experiment_ids=[experiment_id],
filter_string=f"metadata.feature_flag_{flag_name} = 'true'",
)
for t in treatment_traces:
treatment_latency.append(t.info.execution_time_ms)
avg_control_latency = sum(control_latency) / len(control_latency) if control_latency else 0
avg_treatment_latency = sum(treatment_latency) / len(treatment_latency) if treatment_latency else 0
return {
f"avg_latency_{flag_name}_off": avg_control_latency,
f"avg_latency_{flag_name}_on": avg_treatment_latency
}
# perf_metrics = analyze_feature_flag_performance("your_exp_id", "new_retriever")
# print(perf_metrics)
DataFrame operations
The DataFrame returned by mlflow.search_traces contains these columns:
traces_df = mlflow.search_traces()
# Default columns
print(traces_df.columns)
# ['request_id', 'trace', 'timestamp_ms', 'status', 'execution_time_ms',
# 'request', 'response', 'request_metadata', 'spans', 'tags']
Extract span fields
# Extract specific span fields into DataFrame columns
traces = mlflow.search_traces(
extract_fields=[
"process_request.inputs.customer_id",
"process_request.outputs",
"validate_input.inputs",
"generate_response.outputs.message"
]
)
# Use extracted fields for evaluation dataset
eval_data = traces.rename(columns={
"process_request.inputs.customer_id": "customer",
"generate_response.outputs.message": "ground_truth"
})
Next steps
- Build evaluation datasets - Convert queried traces into test datasets