Share via


Analyze traces

This page shows patterns for analyzing traces in real-world scenarios.

Error monitoring

Monitor and analyze errors in your production environment:

import mlflow
import time
import pandas as pd

def monitor_errors(experiment_name: str, hours: int = 1):
    """Monitor errors in the last N hours."""

    # Calculate time window
    current_time_ms = int(time.time() * 1000)
    cutoff_time_ms = current_time_ms - (hours * 60 * 60 * 1000)

    # Find all errors
    failed_traces = mlflow.search_traces(
        filter_string=f"attributes.status = 'ERROR' AND "
                     f"attributes.timestamp_ms > {cutoff_time_ms}",
        order_by=["attributes.timestamp_ms DESC"]
    )

    if len(failed_traces) == 0:
        print(f"No errors found in the last {hours} hour(s)")
        return

    # Analyze error patterns
    print(f"Found {len(failed_traces)} errors in the last {hours} hour(s)\n")

    # Group by function name
    error_by_function = failed_traces.groupby('tags.mlflow.traceName').size()
    print("Errors by function:")
    print(error_by_function.to_string())

    # Show recent error samples
    print("\nRecent error samples:")
    for _, trace in failed_traces.head(5).iterrows():
        print(f"- {trace['request_preview'][:60]}...")
        print(f"  Function: {trace.get('tags.mlflow.traceName', 'unknown')}")
        print(f"  Time: {pd.to_datetime(trace['timestamp_ms'], unit='ms')}")
        print()

    return failed_traces

Performance monitoring

Analyze performance characteristics and identify bottlenecks:

def profile_performance(function_name: str = None, percentiles: list = [50, 95, 99]):
    """Profile performance metrics for traces."""

    # Build filter
    filter_parts = []
    if function_name:
        filter_parts.append(f"tags.`mlflow.traceName` = '{function_name}'")

    filter_string = " AND ".join(filter_parts) if filter_parts else None

    # Get traces
    traces = mlflow.search_traces(filter_string=filter_string)

    if len(traces) == 0:
        print("No traces found")
        return

    # Calculate percentiles
    perf_stats = traces['execution_time_ms'].describe(percentiles=[p/100 for p in percentiles])

    print(f"Performance Analysis ({len(traces)} traces)")
    print("=" * 40)
    for p in percentiles:
        print(f"P{p}: {perf_stats[f'{p}%']:.1f}ms")
    print(f"Mean: {perf_stats['mean']:.1f}ms")
    print(f"Max: {perf_stats['max']:.1f}ms")

    # Find outliers (>P99)
    if 99 in percentiles:
        p99_threshold = perf_stats['99%']
        outliers = traces[traces['execution_time_ms'] > p99_threshold]

        if len(outliers) > 0:
            print(f"\nOutliers (>{p99_threshold:.0f}ms): {len(outliers)} traces")
            for _, trace in outliers.head(3).iterrows():
                print(f"- {trace['execution_time_ms']:.0f}ms: {trace['request_preview'][:50]}...")

    return traces

User activity analysis

Track and analyze user behavior patterns:

def analyze_user_activity(user_id: str, days: int = 7):
    """Analyze activity patterns for a specific user."""

    cutoff_ms = int((time.time() - days * 86400) * 1000)

    traces = mlflow.search_traces(
        filter_string=f"metadata.`mlflow.user` = '{user_id}' AND "
                     f"attributes.timestamp_ms > {cutoff_ms}",
        order_by=["attributes.timestamp_ms DESC"]
    )

    if len(traces) == 0:
        print(f"No activity found for user {user_id}")
        return

    print(f"User {user_id} Activity Report ({days} days)")
    print("=" * 50)
    print(f"Total requests: {len(traces)}")

    # Daily activity
    traces['date'] = pd.to_datetime(traces['timestamp_ms'], unit='ms').dt.date
    daily_activity = traces.groupby('date').size()
    print(f"\nDaily activity:")
    print(daily_activity.to_string())

    # Query categories
    if 'tags.query_category' in traces.columns:
        categories = traces['tags.query_category'].value_counts()
        print(f"\nQuery categories:")
        print(categories.to_string())

    # Performance stats
    print(f"\nPerformance:")
    print(f"Average response time: {traces['execution_time_ms'].mean():.1f}ms")
    print(f"Error rate: {(traces['status'] == 'ERROR').mean() * 100:.1f}%")

    return traces

Complex RAG pipeline trace

Create a comprehensive trace that demonstrates all features:

import mlflow
import time
from mlflow.entities import SpanType

# Create a complex RAG application trace
@mlflow.trace(span_type=SpanType.CHAIN)
def rag_pipeline(question: str):
    """Main RAG pipeline that orchestrates retrieval and generation."""
    # Add custom tags and metadata
    mlflow.update_current_trace(
        tags={
            "environment": "production",
            "version": "2.1.0",
            "user_id": "U12345",
            "session_id": "S98765",
            "mlflow.traceName": "rag_pipeline"
        }
    )

    # Retrieve relevant documents
    documents = retrieve_documents(question)

    # Generate response with context
    response = generate_answer(question, documents)

    # Simulate tool usage
    fact_check_result = fact_check_tool(response)

    return {
        "answer": response,
        "fact_check": fact_check_result,
        "sources": [doc["metadata"]["doc_uri"] for doc in documents]
    }

@mlflow.trace(span_type=SpanType.RETRIEVER)
def retrieve_documents(query: str):
    """Retrieve relevant documents from vector store."""
    time.sleep(0.1)  # Simulate retrieval time

    # Get current span to set outputs properly
    span = mlflow.get_current_active_span()

    # Create document objects following MLflow schema
    from mlflow.entities import Document
    documents = [
        Document(
            page_content="MLflow Tracing provides observability for GenAI apps...",
            metadata={
                "doc_uri": "docs/mlflow/tracing_guide.md",
                "chunk_id": "chunk_001",
                "relevance_score": 0.95
            }
        ),
        Document(
            page_content="Traces consist of spans that capture execution steps...",
            metadata={
                "doc_uri": "docs/mlflow/trace_concepts.md",
                "chunk_id": "chunk_042",
                "relevance_score": 0.87
            }
        )
    ]

    # Set span outputs properly for RETRIEVER type
    span.set_outputs(documents)

    return [doc.to_dict() for doc in documents]

@mlflow.trace(span_type=SpanType.CHAT_MODEL)
def generate_answer(question: str, documents: list):
    """Generate answer using LLM with retrieved context."""
    time.sleep(0.2)  # Simulate LLM processing

    # Set chat-specific attributes
    from mlflow.tracing import set_span_chat_messages, set_span_chat_tools

    messages = [
        {"role": "system", "content": "You are a helpful assistant. Use the provided context to answer questions."},
        {"role": "user", "content": f"Context: {documents}\n\nQuestion: {question}"}
    ]

    # Define available tools
    tools = [
        {
            "type": "function",
            "function": {
                "name": "fact_check",
                "description": "Verify facts in the response",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "statement": {"type": "string"}
                    },
                    "required": ["statement"]
                }
            }
        }
    ]

    span = mlflow.get_current_active_span()
    set_span_chat_messages(span, messages)
    set_span_chat_tools(span, tools)

    # Simulate token usage
    span.set_attribute("llm.token_usage.input_tokens", 150)
    span.set_attribute("llm.token_usage.output_tokens", 75)
    span.set_attribute("llm.token_usage.total_tokens", 225)

    return "MLflow Tracing provides comprehensive observability for GenAI applications by capturing detailed execution information through spans."

@mlflow.trace(span_type=SpanType.TOOL)
def fact_check_tool(statement: str):
    """Tool to verify facts in the generated response."""
    time.sleep(0.05)

    # Simulate an error for demonstration
    if "comprehensive" in statement:
        raise ValueError("Fact verification service unavailable")

    return {"verified": True, "confidence": 0.92}

# Execute the pipeline
try:
    result = rag_pipeline("What is MLflow Tracing?")
except Exception as e:
    print(f"Pipeline error: {e}")

# Get the trace
trace_id = mlflow.get_last_active_trace_id()
trace = mlflow.get_trace(trace_id)

# Log assessments to the trace
from mlflow.entities import AssessmentSource, AssessmentSourceType

# Add human feedback
mlflow.log_feedback(
    trace_id=trace_id,
    name="helpfulness",
    value=4,
    source=AssessmentSource(
        source_type=AssessmentSourceType.HUMAN,
        source_id="reviewer_alice@company.com"
    ),
    rationale="Clear and accurate response with good context usage"
)

# Add LLM judge assessment
mlflow.log_feedback(
    trace_id=trace_id,
    name="relevance_score",
    value=0.92,
    source=AssessmentSource(
        source_type=AssessmentSourceType.LLM_JUDGE,
        source_id="gpt-4-evaluator"
    ),
    metadata={"evaluation_prompt_version": "v2.1"}
)

# Add ground truth expectation
mlflow.log_expectation(
    trace_id=trace_id,
    name="expected_facts",
    value=["observability", "spans", "GenAI applications"],
    source=AssessmentSource(
        source_type=AssessmentSourceType.HUMAN,
        source_id="subject_matter_expert"
    )
)

# Add span-specific feedback
retriever_span = trace.search_spans(name="retrieve_documents")[0]
mlflow.log_feedback(
    trace_id=trace_id,
    span_id=retriever_span.span_id,
    name="retrieval_quality",
    value="excellent",
    source=AssessmentSource(
        source_type=AssessmentSourceType.CODE,
        source_id="retrieval_evaluator.py"
    )
)

# Refresh trace to get assessments
trace = mlflow.get_trace(trace_id)

Comprehensive trace analysis

Build a complete trace analysis utility that extracts all meaningful information:

import datetime

def analyze_trace(trace_id: str):
    """Comprehensive analysis of a trace."""

    # Get the trace
    trace = mlflow.get_trace(trace_id)

    print(f"=== TRACE ANALYSIS: {trace_id} ===\n")

    # 1. Basic Information
    print("1. BASIC INFORMATION")
    print(f"   State: {trace.info.state}")
    print(f"   Duration: {trace.info.execution_duration}ms")
    print(f"   Start time: {datetime.datetime.fromtimestamp(trace.info.request_time/1000)}")

    if trace.info.experiment_id:
        print(f"   Experiment: {trace.info.experiment_id}")

    # Show request/response previews for quick context
    if trace.info.request_preview:
        print(f"   Request preview: {trace.info.request_preview}")

    if trace.info.response_preview:
        print(f"   Response preview: {trace.info.response_preview}")

    # 2. Tags Analysis
    print("\n2. TAGS")
    for key, value in sorted(trace.info.tags.items()):
        print(f"   {key}: {value}")

    # 3. Token Usage
    print("\n3. TOKEN USAGE")
    if tokens := trace.info.token_usage:
        print(f"   Input: {tokens.get('input_tokens', 0)}")
        print(f"   Output: {tokens.get('output_tokens', 0)}")
        print(f"   Total: {tokens.get('total_tokens', 0)}")

        # Calculate from spans if not in metadata
        total_input = 0
        total_output = 0
        for span in trace.data.spans:
            if span.span_type == SpanType.CHAT_MODEL:
                total_input += span.get_attribute("llm.token_usage.input_tokens") or 0
                total_output += span.get_attribute("llm.token_usage.output_tokens") or 0

        if total_input or total_output:
            print(f"   (From spans - Input: {total_input}, Output: {total_output})")

    # 4. Span Analysis
    print("\n4. SPAN ANALYSIS")
    span_types = {}
    error_spans = []

    for span in trace.data.spans:
        # Count by type
        span_types[span.span_type] = span_types.get(span.span_type, 0) + 1

        # Collect errors
        if span.status.status_code.name == "ERROR":
            error_spans.append(span)

    print("   Span counts by type:")
    for span_type, count in sorted(span_types.items()):
        print(f"     {span_type}: {count}")

    if error_spans:
        print(f"\n   Error spans ({len(error_spans)}):")
        for span in error_spans:
            print(f"     - {span.name}: {span.status.description}")

    # 5. Retrieval Analysis
    print("\n5. RETRIEVAL ANALYSIS")
    retriever_spans = trace.search_spans(span_type=SpanType.RETRIEVER)
    if retriever_spans:
        for r_span in retriever_spans:
            if r_span.outputs:
                docs = r_span.outputs
                print(f"   Retrieved {len(docs)} documents:")
                for doc in docs[:3]:  # Show first 3
                    if isinstance(doc, dict):
                        uri = doc.get('metadata', {}).get('doc_uri', 'Unknown')
                        score = doc.get('metadata', {}).get('relevance_score', 'N/A')
                        print(f"     - {uri} (score: {score})")

    # 6. Assessment Summary
    print("\n6. ASSESSMENTS")
    assessments = trace.search_assessments()

    # Group by source type
    by_source = {}
    for assessment in assessments:
        source_type = assessment.source.source_type
        if source_type not in by_source:
            by_source[source_type] = []
        by_source[source_type].append(assessment)

    for source_type, items in by_source.items():
        print(f"\n   {source_type} ({len(items)}):")
        for assessment in items:
            value_str = f"{assessment.value}"
            if assessment.rationale:
                value_str += f" - {assessment.rationale[:50]}..."
            print(f"     {assessment.name}: {value_str}")

    # 7. Performance Breakdown
    print("\n7. PERFORMANCE BREAKDOWN")
    root_span = next((s for s in trace.data.spans if s.parent_id is None), None)
    if root_span:
        total_duration_ns = root_span.end_time_ns - root_span.start_time_ns

        # Calculate time spent in each span type
        time_by_type = {}
        for span in trace.data.spans:
            duration_ms = (span.end_time_ns - span.start_time_ns) / 1_000_000
            if span.span_type not in time_by_type:
                time_by_type[span.span_type] = 0
            time_by_type[span.span_type] += duration_ms

        print("   Time by span type:")
        for span_type, duration_ms in sorted(time_by_type.items(),
                                           key=lambda x: x[1], reverse=True):
            percentage = (duration_ms / (total_duration_ns / 1_000_000)) * 100
            print(f"     {span_type}: {duration_ms:.1f}ms ({percentage:.1f}%)")

    # 8. Data Flow
    print("\n8. DATA FLOW")
    if intermediate := trace.data.intermediate_outputs:
        print("   Intermediate outputs:")
        for name, output in intermediate.items():
            output_str = str(output)[:100] + "..." if len(str(output)) > 100 else str(output)
            print(f"     {name}: {output_str}")

    return trace

# Run the analysis
analysis_result = analyze_trace(trace_id)

Build reusable trace utilities

class TraceAnalyzer:
    """Utility class for advanced trace analysis."""

    def __init__(self, trace: mlflow.entities.Trace):
        self.trace = trace

    def get_error_summary(self):
        """Get summary of all errors in the trace."""
        errors = []

        # Check trace status
        if self.trace.info.state == "ERROR":
            errors.append({
                "level": "trace",
                "message": "Trace failed",
                "details": self.trace.info.response_preview
            })

        # Check span errors
        for span in self.trace.data.spans:
            if span.status.status_code.name == "ERROR":
                errors.append({
                    "level": "span",
                    "span_name": span.name,
                    "span_type": span.span_type,
                    "message": span.status.description,
                    "span_id": span.span_id
                })

        # Check assessment errors
        for assessment in self.trace.info.assessments:
            if assessment.error:
                errors.append({
                    "level": "assessment",
                    "assessment_name": assessment.name,
                    "error": str(assessment.error)
                })

        return errors

    def get_llm_usage_summary(self):
        """Aggregate LLM usage across all spans."""
        usage = {
            "total_llm_calls": 0,
            "total_input_tokens": 0,
            "total_output_tokens": 0,
            "spans": []
        }

        for span in self.trace.data.spans:
            if span.span_type in [SpanType.CHAT_MODEL, "LLM"]:
                usage["total_llm_calls"] += 1

                input_tokens = span.get_attribute("llm.token_usage.input_tokens") or 0
                output_tokens = span.get_attribute("llm.token_usage.output_tokens") or 0

                usage["total_input_tokens"] += input_tokens
                usage["total_output_tokens"] += output_tokens
                usage["spans"].append({
                    "name": span.name,
                    "input_tokens": input_tokens,
                    "output_tokens": output_tokens
                })

        usage["total_tokens"] = usage["total_input_tokens"] + usage["total_output_tokens"]
        return usage

    def get_retrieval_metrics(self):
        """Extract retrieval quality metrics."""
        metrics = []

        for span in self.trace.search_spans(span_type=SpanType.RETRIEVER):
            if span.outputs:
                docs = span.outputs
                relevance_scores = []

                for doc in docs:
                    if isinstance(doc, dict) and 'metadata' in doc:
                        if score := doc['metadata'].get('relevance_score'):
                            relevance_scores.append(score)

                metrics.append({
                    "span_name": span.name,
                    "num_documents": len(docs),
                    "avg_relevance": sum(relevance_scores) / len(relevance_scores) if relevance_scores else None,
                    "max_relevance": max(relevance_scores) if relevance_scores else None,
                    "min_relevance": min(relevance_scores) if relevance_scores else None
                })

        return metrics

    def get_span_hierarchy(self):
        """Build a hierarchical view of spans."""
        # Create span lookup
        span_dict = {span.span_id: span for span in self.trace.data.spans}

        # Find root spans
        roots = [span for span in self.trace.data.spans if span.parent_id is None]

        def build_tree(span, indent=0):
            result = []
            duration_ms = (span.end_time_ns - span.start_time_ns) / 1_000_000
            result.append({
                "indent": indent,
                "name": span.name,
                "type": span.span_type,
                "duration_ms": duration_ms,
                "status": span.status.status_code.name
            })

            # Find children
            children = [s for s in self.trace.data.spans if s.parent_id == span.span_id]
            for child in sorted(children, key=lambda s: s.start_time_ns):
                result.extend(build_tree(child, indent + 1))

            return result

        hierarchy = []
        for root in roots:
            hierarchy.extend(build_tree(root))

        return hierarchy

    def export_for_evaluation(self):
        """Export trace data in a format suitable for evaluation."""
        # Get root span data
        request = response = None
        if self.trace.data.request:
            request = json.loads(self.trace.data.request)
        if self.trace.data.response:
            response = json.loads(self.trace.data.response)

        # Get expected values from assessments
        expectations = self.trace.search_assessments(type="expectation")
        expected_values = {exp.name: exp.value for exp in expectations}

        # Get retrieval context
        retrieved_context = []
        for span in self.trace.search_spans(span_type=SpanType.RETRIEVER):
            if span.outputs:
                for doc in span.outputs:
                    if isinstance(doc, dict) and 'page_content' in doc:
                        retrieved_context.append(doc['page_content'])

        return {
            "trace_id": self.trace.info.trace_id,
            "request": request,
            "response": response,
            "retrieved_context": retrieved_context,
            "expected_facts": expected_values.get("expected_facts", []),
            "metadata": {
                "user_id": self.trace.info.tags.get("user_id"),
                "session_id": self.trace.info.tags.get("session_id"),
                "duration_ms": self.trace.info.execution_duration,
                "timestamp": self.trace.info.request_time
            }
        }

# Use the analyzer
analyzer = TraceAnalyzer(trace)

# Get various analyses
errors = analyzer.get_error_summary()
print(f"\nErrors found: {len(errors)}")
for error in errors:
    print(f"  - {error['level']}: {error.get('message', error.get('error'))}")

llm_usage = analyzer.get_llm_usage_summary()
print(f"\nLLM Usage: {llm_usage['total_tokens']} total tokens across {llm_usage['total_llm_calls']} calls")

retrieval_metrics = analyzer.get_retrieval_metrics()
print(f"\nRetrieval Metrics:")
for metric in retrieval_metrics:
    print(f"  - {metric['span_name']}: {metric['num_documents']} docs, avg relevance: {metric['avg_relevance']}")

# Export for evaluation
eval_data = analyzer.export_for_evaluation()
print(f"\nExported evaluation data with {len(eval_data['retrieved_context'])} context chunks")

Next steps