Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
This page shows patterns for analyzing traces in real-world scenarios.
Error monitoring
Monitor and analyze errors in your production environment:
import mlflow
import time
import pandas as pd
def monitor_errors(experiment_name: str, hours: int = 1):
"""Monitor errors in the last N hours."""
# Calculate time window
current_time_ms = int(time.time() * 1000)
cutoff_time_ms = current_time_ms - (hours * 60 * 60 * 1000)
# Find all errors
failed_traces = mlflow.search_traces(
filter_string=f"attributes.status = 'ERROR' AND "
f"attributes.timestamp_ms > {cutoff_time_ms}",
order_by=["attributes.timestamp_ms DESC"]
)
if len(failed_traces) == 0:
print(f"No errors found in the last {hours} hour(s)")
return
# Analyze error patterns
print(f"Found {len(failed_traces)} errors in the last {hours} hour(s)\n")
# Group by function name
error_by_function = failed_traces.groupby('tags.mlflow.traceName').size()
print("Errors by function:")
print(error_by_function.to_string())
# Show recent error samples
print("\nRecent error samples:")
for _, trace in failed_traces.head(5).iterrows():
print(f"- {trace['request_preview'][:60]}...")
print(f" Function: {trace.get('tags.mlflow.traceName', 'unknown')}")
print(f" Time: {pd.to_datetime(trace['timestamp_ms'], unit='ms')}")
print()
return failed_traces
Performance monitoring
Analyze performance characteristics and identify bottlenecks:
def profile_performance(function_name: str = None, percentiles: list = [50, 95, 99]):
"""Profile performance metrics for traces."""
# Build filter
filter_parts = []
if function_name:
filter_parts.append(f"tags.`mlflow.traceName` = '{function_name}'")
filter_string = " AND ".join(filter_parts) if filter_parts else None
# Get traces
traces = mlflow.search_traces(filter_string=filter_string)
if len(traces) == 0:
print("No traces found")
return
# Calculate percentiles
perf_stats = traces['execution_time_ms'].describe(percentiles=[p/100 for p in percentiles])
print(f"Performance Analysis ({len(traces)} traces)")
print("=" * 40)
for p in percentiles:
print(f"P{p}: {perf_stats[f'{p}%']:.1f}ms")
print(f"Mean: {perf_stats['mean']:.1f}ms")
print(f"Max: {perf_stats['max']:.1f}ms")
# Find outliers (>P99)
if 99 in percentiles:
p99_threshold = perf_stats['99%']
outliers = traces[traces['execution_time_ms'] > p99_threshold]
if len(outliers) > 0:
print(f"\nOutliers (>{p99_threshold:.0f}ms): {len(outliers)} traces")
for _, trace in outliers.head(3).iterrows():
print(f"- {trace['execution_time_ms']:.0f}ms: {trace['request_preview'][:50]}...")
return traces
User activity analysis
Track and analyze user behavior patterns:
def analyze_user_activity(user_id: str, days: int = 7):
"""Analyze activity patterns for a specific user."""
cutoff_ms = int((time.time() - days * 86400) * 1000)
traces = mlflow.search_traces(
filter_string=f"metadata.`mlflow.user` = '{user_id}' AND "
f"attributes.timestamp_ms > {cutoff_ms}",
order_by=["attributes.timestamp_ms DESC"]
)
if len(traces) == 0:
print(f"No activity found for user {user_id}")
return
print(f"User {user_id} Activity Report ({days} days)")
print("=" * 50)
print(f"Total requests: {len(traces)}")
# Daily activity
traces['date'] = pd.to_datetime(traces['timestamp_ms'], unit='ms').dt.date
daily_activity = traces.groupby('date').size()
print(f"\nDaily activity:")
print(daily_activity.to_string())
# Query categories
if 'tags.query_category' in traces.columns:
categories = traces['tags.query_category'].value_counts()
print(f"\nQuery categories:")
print(categories.to_string())
# Performance stats
print(f"\nPerformance:")
print(f"Average response time: {traces['execution_time_ms'].mean():.1f}ms")
print(f"Error rate: {(traces['status'] == 'ERROR').mean() * 100:.1f}%")
return traces
Complex RAG pipeline trace
Create a comprehensive trace that demonstrates all features:
import mlflow
import time
from mlflow.entities import SpanType
# Create a complex RAG application trace
@mlflow.trace(span_type=SpanType.CHAIN)
def rag_pipeline(question: str):
"""Main RAG pipeline that orchestrates retrieval and generation."""
# Add custom tags and metadata
mlflow.update_current_trace(
tags={
"environment": "production",
"version": "2.1.0",
"user_id": "U12345",
"session_id": "S98765",
"mlflow.traceName": "rag_pipeline"
}
)
# Retrieve relevant documents
documents = retrieve_documents(question)
# Generate response with context
response = generate_answer(question, documents)
# Simulate tool usage
fact_check_result = fact_check_tool(response)
return {
"answer": response,
"fact_check": fact_check_result,
"sources": [doc["metadata"]["doc_uri"] for doc in documents]
}
@mlflow.trace(span_type=SpanType.RETRIEVER)
def retrieve_documents(query: str):
"""Retrieve relevant documents from vector store."""
time.sleep(0.1) # Simulate retrieval time
# Get current span to set outputs properly
span = mlflow.get_current_active_span()
# Create document objects following MLflow schema
from mlflow.entities import Document
documents = [
Document(
page_content="MLflow Tracing provides observability for GenAI apps...",
metadata={
"doc_uri": "docs/mlflow/tracing_guide.md",
"chunk_id": "chunk_001",
"relevance_score": 0.95
}
),
Document(
page_content="Traces consist of spans that capture execution steps...",
metadata={
"doc_uri": "docs/mlflow/trace_concepts.md",
"chunk_id": "chunk_042",
"relevance_score": 0.87
}
)
]
# Set span outputs properly for RETRIEVER type
span.set_outputs(documents)
return [doc.to_dict() for doc in documents]
@mlflow.trace(span_type=SpanType.CHAT_MODEL)
def generate_answer(question: str, documents: list):
"""Generate answer using LLM with retrieved context."""
time.sleep(0.2) # Simulate LLM processing
# Set chat-specific attributes
from mlflow.tracing import set_span_chat_messages, set_span_chat_tools
messages = [
{"role": "system", "content": "You are a helpful assistant. Use the provided context to answer questions."},
{"role": "user", "content": f"Context: {documents}\n\nQuestion: {question}"}
]
# Define available tools
tools = [
{
"type": "function",
"function": {
"name": "fact_check",
"description": "Verify facts in the response",
"parameters": {
"type": "object",
"properties": {
"statement": {"type": "string"}
},
"required": ["statement"]
}
}
}
]
span = mlflow.get_current_active_span()
set_span_chat_messages(span, messages)
set_span_chat_tools(span, tools)
# Simulate token usage
span.set_attribute("llm.token_usage.input_tokens", 150)
span.set_attribute("llm.token_usage.output_tokens", 75)
span.set_attribute("llm.token_usage.total_tokens", 225)
return "MLflow Tracing provides comprehensive observability for GenAI applications by capturing detailed execution information through spans."
@mlflow.trace(span_type=SpanType.TOOL)
def fact_check_tool(statement: str):
"""Tool to verify facts in the generated response."""
time.sleep(0.05)
# Simulate an error for demonstration
if "comprehensive" in statement:
raise ValueError("Fact verification service unavailable")
return {"verified": True, "confidence": 0.92}
# Execute the pipeline
try:
result = rag_pipeline("What is MLflow Tracing?")
except Exception as e:
print(f"Pipeline error: {e}")
# Get the trace
trace_id = mlflow.get_last_active_trace_id()
trace = mlflow.get_trace(trace_id)
# Log assessments to the trace
from mlflow.entities import AssessmentSource, AssessmentSourceType
# Add human feedback
mlflow.log_feedback(
trace_id=trace_id,
name="helpfulness",
value=4,
source=AssessmentSource(
source_type=AssessmentSourceType.HUMAN,
source_id="reviewer_alice@company.com"
),
rationale="Clear and accurate response with good context usage"
)
# Add LLM judge assessment
mlflow.log_feedback(
trace_id=trace_id,
name="relevance_score",
value=0.92,
source=AssessmentSource(
source_type=AssessmentSourceType.LLM_JUDGE,
source_id="gpt-4-evaluator"
),
metadata={"evaluation_prompt_version": "v2.1"}
)
# Add ground truth expectation
mlflow.log_expectation(
trace_id=trace_id,
name="expected_facts",
value=["observability", "spans", "GenAI applications"],
source=AssessmentSource(
source_type=AssessmentSourceType.HUMAN,
source_id="subject_matter_expert"
)
)
# Add span-specific feedback
retriever_span = trace.search_spans(name="retrieve_documents")[0]
mlflow.log_feedback(
trace_id=trace_id,
span_id=retriever_span.span_id,
name="retrieval_quality",
value="excellent",
source=AssessmentSource(
source_type=AssessmentSourceType.CODE,
source_id="retrieval_evaluator.py"
)
)
# Refresh trace to get assessments
trace = mlflow.get_trace(trace_id)
Comprehensive trace analysis
Build a complete trace analysis utility that extracts all meaningful information:
import datetime
def analyze_trace(trace_id: str):
"""Comprehensive analysis of a trace."""
# Get the trace
trace = mlflow.get_trace(trace_id)
print(f"=== TRACE ANALYSIS: {trace_id} ===\n")
# 1. Basic Information
print("1. BASIC INFORMATION")
print(f" State: {trace.info.state}")
print(f" Duration: {trace.info.execution_duration}ms")
print(f" Start time: {datetime.datetime.fromtimestamp(trace.info.request_time/1000)}")
if trace.info.experiment_id:
print(f" Experiment: {trace.info.experiment_id}")
# Show request/response previews for quick context
if trace.info.request_preview:
print(f" Request preview: {trace.info.request_preview}")
if trace.info.response_preview:
print(f" Response preview: {trace.info.response_preview}")
# 2. Tags Analysis
print("\n2. TAGS")
for key, value in sorted(trace.info.tags.items()):
print(f" {key}: {value}")
# 3. Token Usage
print("\n3. TOKEN USAGE")
if tokens := trace.info.token_usage:
print(f" Input: {tokens.get('input_tokens', 0)}")
print(f" Output: {tokens.get('output_tokens', 0)}")
print(f" Total: {tokens.get('total_tokens', 0)}")
# Calculate from spans if not in metadata
total_input = 0
total_output = 0
for span in trace.data.spans:
if span.span_type == SpanType.CHAT_MODEL:
total_input += span.get_attribute("llm.token_usage.input_tokens") or 0
total_output += span.get_attribute("llm.token_usage.output_tokens") or 0
if total_input or total_output:
print(f" (From spans - Input: {total_input}, Output: {total_output})")
# 4. Span Analysis
print("\n4. SPAN ANALYSIS")
span_types = {}
error_spans = []
for span in trace.data.spans:
# Count by type
span_types[span.span_type] = span_types.get(span.span_type, 0) + 1
# Collect errors
if span.status.status_code.name == "ERROR":
error_spans.append(span)
print(" Span counts by type:")
for span_type, count in sorted(span_types.items()):
print(f" {span_type}: {count}")
if error_spans:
print(f"\n Error spans ({len(error_spans)}):")
for span in error_spans:
print(f" - {span.name}: {span.status.description}")
# 5. Retrieval Analysis
print("\n5. RETRIEVAL ANALYSIS")
retriever_spans = trace.search_spans(span_type=SpanType.RETRIEVER)
if retriever_spans:
for r_span in retriever_spans:
if r_span.outputs:
docs = r_span.outputs
print(f" Retrieved {len(docs)} documents:")
for doc in docs[:3]: # Show first 3
if isinstance(doc, dict):
uri = doc.get('metadata', {}).get('doc_uri', 'Unknown')
score = doc.get('metadata', {}).get('relevance_score', 'N/A')
print(f" - {uri} (score: {score})")
# 6. Assessment Summary
print("\n6. ASSESSMENTS")
assessments = trace.search_assessments()
# Group by source type
by_source = {}
for assessment in assessments:
source_type = assessment.source.source_type
if source_type not in by_source:
by_source[source_type] = []
by_source[source_type].append(assessment)
for source_type, items in by_source.items():
print(f"\n {source_type} ({len(items)}):")
for assessment in items:
value_str = f"{assessment.value}"
if assessment.rationale:
value_str += f" - {assessment.rationale[:50]}..."
print(f" {assessment.name}: {value_str}")
# 7. Performance Breakdown
print("\n7. PERFORMANCE BREAKDOWN")
root_span = next((s for s in trace.data.spans if s.parent_id is None), None)
if root_span:
total_duration_ns = root_span.end_time_ns - root_span.start_time_ns
# Calculate time spent in each span type
time_by_type = {}
for span in trace.data.spans:
duration_ms = (span.end_time_ns - span.start_time_ns) / 1_000_000
if span.span_type not in time_by_type:
time_by_type[span.span_type] = 0
time_by_type[span.span_type] += duration_ms
print(" Time by span type:")
for span_type, duration_ms in sorted(time_by_type.items(),
key=lambda x: x[1], reverse=True):
percentage = (duration_ms / (total_duration_ns / 1_000_000)) * 100
print(f" {span_type}: {duration_ms:.1f}ms ({percentage:.1f}%)")
# 8. Data Flow
print("\n8. DATA FLOW")
if intermediate := trace.data.intermediate_outputs:
print(" Intermediate outputs:")
for name, output in intermediate.items():
output_str = str(output)[:100] + "..." if len(str(output)) > 100 else str(output)
print(f" {name}: {output_str}")
return trace
# Run the analysis
analysis_result = analyze_trace(trace_id)
Build reusable trace utilities
class TraceAnalyzer:
"""Utility class for advanced trace analysis."""
def __init__(self, trace: mlflow.entities.Trace):
self.trace = trace
def get_error_summary(self):
"""Get summary of all errors in the trace."""
errors = []
# Check trace status
if self.trace.info.state == "ERROR":
errors.append({
"level": "trace",
"message": "Trace failed",
"details": self.trace.info.response_preview
})
# Check span errors
for span in self.trace.data.spans:
if span.status.status_code.name == "ERROR":
errors.append({
"level": "span",
"span_name": span.name,
"span_type": span.span_type,
"message": span.status.description,
"span_id": span.span_id
})
# Check assessment errors
for assessment in self.trace.info.assessments:
if assessment.error:
errors.append({
"level": "assessment",
"assessment_name": assessment.name,
"error": str(assessment.error)
})
return errors
def get_llm_usage_summary(self):
"""Aggregate LLM usage across all spans."""
usage = {
"total_llm_calls": 0,
"total_input_tokens": 0,
"total_output_tokens": 0,
"spans": []
}
for span in self.trace.data.spans:
if span.span_type in [SpanType.CHAT_MODEL, "LLM"]:
usage["total_llm_calls"] += 1
input_tokens = span.get_attribute("llm.token_usage.input_tokens") or 0
output_tokens = span.get_attribute("llm.token_usage.output_tokens") or 0
usage["total_input_tokens"] += input_tokens
usage["total_output_tokens"] += output_tokens
usage["spans"].append({
"name": span.name,
"input_tokens": input_tokens,
"output_tokens": output_tokens
})
usage["total_tokens"] = usage["total_input_tokens"] + usage["total_output_tokens"]
return usage
def get_retrieval_metrics(self):
"""Extract retrieval quality metrics."""
metrics = []
for span in self.trace.search_spans(span_type=SpanType.RETRIEVER):
if span.outputs:
docs = span.outputs
relevance_scores = []
for doc in docs:
if isinstance(doc, dict) and 'metadata' in doc:
if score := doc['metadata'].get('relevance_score'):
relevance_scores.append(score)
metrics.append({
"span_name": span.name,
"num_documents": len(docs),
"avg_relevance": sum(relevance_scores) / len(relevance_scores) if relevance_scores else None,
"max_relevance": max(relevance_scores) if relevance_scores else None,
"min_relevance": min(relevance_scores) if relevance_scores else None
})
return metrics
def get_span_hierarchy(self):
"""Build a hierarchical view of spans."""
# Create span lookup
span_dict = {span.span_id: span for span in self.trace.data.spans}
# Find root spans
roots = [span for span in self.trace.data.spans if span.parent_id is None]
def build_tree(span, indent=0):
result = []
duration_ms = (span.end_time_ns - span.start_time_ns) / 1_000_000
result.append({
"indent": indent,
"name": span.name,
"type": span.span_type,
"duration_ms": duration_ms,
"status": span.status.status_code.name
})
# Find children
children = [s for s in self.trace.data.spans if s.parent_id == span.span_id]
for child in sorted(children, key=lambda s: s.start_time_ns):
result.extend(build_tree(child, indent + 1))
return result
hierarchy = []
for root in roots:
hierarchy.extend(build_tree(root))
return hierarchy
def export_for_evaluation(self):
"""Export trace data in a format suitable for evaluation."""
# Get root span data
request = response = None
if self.trace.data.request:
request = json.loads(self.trace.data.request)
if self.trace.data.response:
response = json.loads(self.trace.data.response)
# Get expected values from assessments
expectations = self.trace.search_assessments(type="expectation")
expected_values = {exp.name: exp.value for exp in expectations}
# Get retrieval context
retrieved_context = []
for span in self.trace.search_spans(span_type=SpanType.RETRIEVER):
if span.outputs:
for doc in span.outputs:
if isinstance(doc, dict) and 'page_content' in doc:
retrieved_context.append(doc['page_content'])
return {
"trace_id": self.trace.info.trace_id,
"request": request,
"response": response,
"retrieved_context": retrieved_context,
"expected_facts": expected_values.get("expected_facts", []),
"metadata": {
"user_id": self.trace.info.tags.get("user_id"),
"session_id": self.trace.info.tags.get("session_id"),
"duration_ms": self.trace.info.execution_duration,
"timestamp": self.trace.info.request_time
}
}
# Use the analyzer
analyzer = TraceAnalyzer(trace)
# Get various analyses
errors = analyzer.get_error_summary()
print(f"\nErrors found: {len(errors)}")
for error in errors:
print(f" - {error['level']}: {error.get('message', error.get('error'))}")
llm_usage = analyzer.get_llm_usage_summary()
print(f"\nLLM Usage: {llm_usage['total_tokens']} total tokens across {llm_usage['total_llm_calls']} calls")
retrieval_metrics = analyzer.get_retrieval_metrics()
print(f"\nRetrieval Metrics:")
for metric in retrieval_metrics:
print(f" - {metric['span_name']}: {metric['num_documents']} docs, avg relevance: {metric['avg_relevance']}")
# Export for evaluation
eval_data = analyzer.export_for_evaluation()
print(f"\nExported evaluation data with {len(eval_data['retrieved_context'])} context chunks")
Next steps
- Search and analyze traces - Learn to query multiple traces
- Build evaluation datasets - Convert traces to evaluation datasets
- Log assessments - Add feedback to traces