访问跟踪数据

本页演示如何访问跟踪数据的各个方面,包括元数据、范围、评估等。 了解如何访问跟踪数据后,请参阅 “分析跟踪”

MLflow Trace 对象由两个主要组件组成:

基本元数据属性

API 参考:TraceInfo

# Primary identifiers
print(f"Trace ID: {trace.info.trace_id}")
print(f"Client Request ID: {trace.info.client_request_id}")

# Status information
print(f"State: {trace.info.state}")  # OK, ERROR, IN_PROGRESS
print(f"Status (deprecated): {trace.info.status}")  # Use state instead

# Request/response previews (truncated)
print(f"Request preview: {trace.info.request_preview}")
print(f"Response preview: {trace.info.response_preview}")

存储位置和试验

# Trace storage location
location = trace.info.trace_location
print(f"Location type: {location.type}")

# If stored in MLflow experiment
if location.mlflow_experiment:
    print(f"Experiment ID: {location.mlflow_experiment.experiment_id}")
    # Shortcut property
    print(f"Experiment ID: {trace.info.experiment_id}")

# If stored in Databricks inference table
if location.inference_table:
    print(f"Table: {location.inference_table.full_table_name}")

请求和响应预览

这些 request_preview 属性 response_preview 提供完整请求和响应数据的截断摘要,使用户能够轻松快速了解所发生的情况,而无需加载完整的有效负载。

request_preview = trace.info.request_preview
response_preview = trace.info.response_preview

print(f"Request preview: {request_preview}")
print(f"Response preview: {response_preview}")

# Compare with full request/response data
full_request = trace.data.request  # Complete request text
full_response = trace.data.response  # Complete response text

if full_request and request_preview:
    print(f"Full request length: {len(full_request)} characters")
    print(f"Preview is {len(request_preview)/len(full_request)*100:.1f}% of full request")

API 参考:TraceInfo 计时属性

# Timestamps (milliseconds since epoch)
print(f"Start time (ms): {trace.info.request_time}")
print(f"Timestamp (ms): {trace.info.timestamp_ms}")  # Alias for request_time

# Duration
print(f"Execution duration (ms): {trace.info.execution_duration}")
print(f"Execution time (ms): {trace.info.execution_time_ms}")  # Alias

# Convert to human-readable format
import datetime
start_time = datetime.datetime.fromtimestamp(trace.info.request_time / 1000)
print(f"Started at: {start_time}")

标记和元数据

API 参考:TraceInfo.tagsTraceInfo.trace_metadata

# Tags (mutable, can be updated after creation)
print("Tags:")
for key, value in trace.info.tags.items():
    print(f"  {key}: {value}")

# Access specific tags
print(f"Environment: {trace.info.tags.get('environment')}")
print(f"User ID: {trace.info.tags.get('user_id')}")

# Trace metadata (immutable, set at creation)
print("\nTrace metadata:")
for key, value in trace.info.trace_metadata.items():
    print(f"  {key}: {value}")

# Deprecated alias
print(f"Request metadata: {trace.info.request_metadata}")  # Same as trace_metadata

令牌使用情况信息

MLflow 跟踪功能可以通过使用 LLM 提供商 API 返回的令牌计数来记录 LLM 调用的令牌使用情况。

# Get aggregated token usage (if available)
token_usage = trace.info.token_usage
if token_usage:
    print(f"Input tokens: {token_usage.get('input_tokens')}")
    print(f"Output tokens: {token_usage.get('output_tokens')}")
    print(f"Total tokens: {token_usage.get('total_tokens')}")

跟踪令牌使用情况的方式取决于 LLM 供应商。 下表介绍了跨各种提供程序和平台跟踪令牌使用情况的不同方法。

情景 如何跟踪令牌使用情况
Databricks 基础模型 API 使用 OpenAI 客户端验证 MLflow 跟踪是否自动跟踪令牌使用情况。
具有原生 MLflow 跟踪支持的 LLM 提供商 请参阅 MLflow 跟踪集成 下的提供商的集成页面,以确定是否支持原生令牌跟踪。
没有原生 MLflow 追踪支持的提供程序 使用 Span.set_attribute手动记录令牌使用情况,如 此示例所示。
跨 AI 平台监视多个终结点。 使用 AI 网关使用情况跟踪 将令牌使用情况记录到服务终结点中的系统表。

评估

查找带有 search_assessments() 的评估。

API 参考:Trace.search_assessments

# 1. Get all assessments
all_assessments = trace.search_assessments()
print(f"Total assessments: {len(all_assessments)}")

# 2. Search by name
helpfulness = trace.search_assessments(name="helpfulness")
if helpfulness:
    assessment = helpfulness[0]
    print(f"Helpfulness: {assessment.value}")
    print(f"Source: {assessment.source.source_type} - {assessment.source.source_id}")
    print(f"Rationale: {assessment.rationale}")

# 3. Search by type
feedback_only = trace.search_assessments(type="feedback")
expectations_only = trace.search_assessments(type="expectation")
print(f"Feedback assessments: {len(feedback_only)}")
print(f"Expectation assessments: {len(expectations_only)}")

# 4. Search by span ID
span_assessments = trace.search_assessments(span_id=retriever_span.span_id)
print(f"Assessments for retriever span: {len(span_assessments)}")

# 5. Get all assessments including overridden ones
all_including_invalid = trace.search_assessments(all=True)
print(f"All assessments (including overridden): {len(all_including_invalid)}")

# 6. Combine criteria
human_feedback = trace.search_assessments(
    type="feedback",
    name="helpfulness"
)
for fb in human_feedback:
    print(f"Human feedback: {fb.name} = {fb.value}")

访问评估详细信息

# Get detailed assessment information
for assessment in trace.info.assessments:
    print(f"\nAssessment: {assessment.name}")
    print(f"  Type: {type(assessment).__name__}")
    print(f"  Value: {assessment.value}")
    print(f"  Source: {assessment.source.source_type.value}")
    print(f"  Source ID: {assessment.source.source_id}")

    # Optional fields
    if assessment.rationale:
        print(f"  Rationale: {assessment.rationale}")
    if assessment.metadata:
        print(f"  Metadata: {assessment.metadata}")
    if assessment.error:
        print(f"  Error: {assessment.error}")
    if hasattr(assessment, 'span_id') and assessment.span_id:
        print(f"  Span ID: {assessment.span_id}")

使用 Spans

跨度是跟踪的基础单元,表示单个操作或工作单位。 该 Span 类表示从跟踪中提取到的不可变已结束的跨度。

访问范围属性

API 参考:TraceData.spansSpanSpanType

# Access all spans from a trace
spans = trace.data.spans
print(f"Total spans: {len(spans)}")

# Get a specific span
span = spans[0]

# Basic properties
print(f"Span ID: {span.span_id}")
print(f"Name: {span.name}")
print(f"Type: {span.span_type}")
print(f"Trace ID: {span.trace_id}")  # Which trace this span belongs to
print(f"Parent ID: {span.parent_id}")  # None for root spans

# Timing information (nanoseconds)
print(f"Start time: {span.start_time_ns}")
print(f"End time: {span.end_time_ns}")
duration_ms = (span.end_time_ns - span.start_time_ns) / 1_000_000
print(f"Duration: {duration_ms:.2f}ms")

# Status information
print(f"Status: {span.status}")
print(f"Status code: {span.status.status_code}")
print(f"Status description: {span.status.description}")

# Inputs and outputs
print(f"Inputs: {span.inputs}")
print(f"Outputs: {span.outputs}")

# Iterate through all spans
for span in spans:
    print(f"\nSpan: {span.name}")
    print(f"  ID: {span.span_id}")
    print(f"  Type: {span.span_type}")
    print(f"  Duration (ms): {(span.end_time_ns - span.start_time_ns) / 1_000_000:.2f}")

    # Parent-child relationships
    if span.parent_id:
        print(f"  Parent ID: {span.parent_id}")

查找特定范围

用于 search_spans() 查找与特定条件匹配的跨度:

import re
from mlflow.entities import SpanType

# 1. Search by exact name
retriever_spans = trace.search_spans(name="retrieve_documents")
print(f"Found {len(retriever_spans)} retriever spans")

# 2. Search by regex pattern
pattern = re.compile(r".*_tool$")
tool_spans = trace.search_spans(name=pattern)
print(f"Found {len(tool_spans)} tool spans")

# 3. Search by span type
chat_spans = trace.search_spans(span_type=SpanType.CHAT_MODEL)
llm_spans = trace.search_spans(span_type="CHAT_MODEL")  # String also works
print(f"Found {len(chat_spans)} chat model spans")

# 4. Search by span ID
specific_span = trace.search_spans(span_id=retriever_spans[0].span_id)
print(f"Found span: {specific_span[0].name if specific_span else 'Not found'}")

# 5. Combine criteria
tool_fact_check = trace.search_spans(
    name="fact_check_tool",
    span_type=SpanType.TOOL
)
print(f"Found {len(tool_fact_check)} fact check tool spans")

# 6. Get all spans of a type
all_tools = trace.search_spans(span_type=SpanType.TOOL)
for tool in all_tools:
    print(f"Tool: {tool.name}")

中间输出

# Get intermediate outputs from non-root spans
intermediate = trace.data.intermediate_outputs
if intermediate:
    print("\nIntermediate outputs:")
    for span_name, output in intermediate.items():
        print(f"  {span_name}: {output}")

Span 属性

API 参考:Span.get_attributeSpanAttributeKey

from mlflow.tracing.constant import SpanAttributeKey

# Get a chat model span
chat_span = trace.search_spans(span_type=SpanType.CHAT_MODEL)[0]

# Get all attributes
print("All span attributes:")
for key, value in chat_span.attributes.items():
    print(f"  {key}: {value}")

# Get specific attribute
specific_attr = chat_span.get_attribute("custom_attribute")
print(f"Custom attribute: {specific_attr}")

# Access chat-specific attributes using SpanAttributeKey
messages = chat_span.get_attribute(SpanAttributeKey.CHAT_MESSAGES)
tools = chat_span.get_attribute(SpanAttributeKey.CHAT_TOOLS)

print(f"Chat messages: {messages}")
print(f"Available tools: {tools}")

# Access token usage from span
input_tokens = chat_span.get_attribute("llm.token_usage.input_tokens")
output_tokens = chat_span.get_attribute("llm.token_usage.output_tokens")
print(f"Span token usage - Input: {input_tokens}, Output: {output_tokens}")

在执行期间创建和修改跨度

在执行期间创建范围时,可以使用 LiveSpan 可修改的对象:

import mlflow
from mlflow.entities import SpanType, SpanStatus, SpanStatusCode

@mlflow.trace(span_type=SpanType.CHAIN)
def process_data(data: dict):
    # Get the current active span (LiveSpan)
    span = mlflow.get_current_active_span()

    # Set span type (if not set via decorator)
    span.set_span_type(SpanType.CHAIN)

    # Set inputs
    span.set_inputs({"data": data, "timestamp": time.time()})

    # Set individual attributes
    span.set_attribute("processing_version", "2.0")
    span.set_attribute("data_size", len(str(data)))

    # Set multiple attributes at once
    span.set_attributes({
        "environment": "production",
        "region": "us-west-2",
        "custom_metadata": {"key": "value"}
    })

    try:
        # Process the data
        result = {"processed": True, "count": len(data)}

        # Set outputs
        span.set_outputs(result)

        # Set success status
        span.set_status(SpanStatusCode.OK)

    except Exception as e:
        # Record the exception
        span.record_exception(e)
        # This automatically sets status to ERROR and adds an exception event
        raise

    return result

# Example with manual span creation
with mlflow.start_span(name="manual_span", span_type=SpanType.TOOL) as span:
    # Add events during execution
    from mlflow.entities import SpanEvent

    span.add_event(SpanEvent(
        name="processing_started",
        attributes={
            "stage": "initialization",
            "memory_usage_mb": 256
        }
    ))

    # Do some work...
    time.sleep(0.1)

    # Add another event
    span.add_event(SpanEvent(
        name="checkpoint_reached",
        attributes={"progress": 0.5}
    ))

    # Manually end the span with outputs and status
    span.end(
        outputs={"result": "success"},
        attributes={"final_metric": 0.95},
        status=SpanStatusCode.OK
    )

跨事件

事件记录跨度生存期内的特定事件。

API 参考:SpanEvent

from mlflow.entities import SpanEvent
import time

# Create an event with current timestamp
event = SpanEvent(
    name="validation_completed",
    attributes={
        "records_validated": 1000,
        "errors_found": 3,
        "validation_type": "schema"
    }
)

# Create an event with specific timestamp (nanoseconds)
specific_time_event = SpanEvent(
    name="data_checkpoint",
    timestamp=int(time.time() * 1e9),
    attributes={"checkpoint_id": "ckpt_123"}
)

# Create an event from an exception
try:
    raise ValueError("Invalid input format")
except Exception as e:
    error_event = SpanEvent.from_exception(e)
    # This creates an event with name="exception" and attributes containing:
    # - exception.message
    # - exception.type
    # - exception.stacktrace

    # Add to current span
    span = mlflow.get_current_active_span()
    span.add_event(error_event)

时间跨度状态

控制和查询跨度执行状态。

API 参考:SpanStatus

from mlflow.entities import SpanStatus, SpanStatusCode

# Create status objects
success_status = SpanStatus(SpanStatusCode.OK)
error_status = SpanStatus(
    SpanStatusCode.ERROR,
    description="Failed to connect to database"
)

# Set status on a live span
span = mlflow.get_current_active_span()
span.set_status(success_status)

# Or use string shortcuts
span.set_status("OK")
span.set_status("ERROR")

# Query status from completed spans
for span in trace.data.spans:
    if span.status.status_code == SpanStatusCode.ERROR:
        print(f"Error in {span.name}: {span.status.description}")

使用 RETRIEVER 范围

RETRIEVER 范围有特定的输出需求:

from mlflow.entities import Document, SpanType

@mlflow.trace(span_type=SpanType.RETRIEVER)
def retrieve_documents(query: str):
    span = mlflow.get_current_active_span()

    # Create Document objects (required for RETRIEVER spans)
    documents = [
        Document(
            page_content="The content of the document...",
            metadata={
                "doc_uri": "path/to/document.md",
                "chunk_id": "chunk_001",
                "relevance_score": 0.95,
                "source": "knowledge_base"
            },
            id="doc_123"  # Optional document ID
        ),
        Document(
            page_content="Another relevant section...",
            metadata={
                "doc_uri": "path/to/other.md",
                "chunk_id": "chunk_042",
                "relevance_score": 0.87
            }
        )
    ]

    # Set outputs as Document objects for proper UI rendering
    span.set_outputs(documents)

    # Return in your preferred format
    return [doc.to_dict() for doc in documents]

# Access retriever outputs
retriever_span = trace.search_spans(span_type=SpanType.RETRIEVER)[0]
if retriever_span.outputs:
    for doc in retriever_span.outputs:
        if isinstance(doc, dict):
            content = doc.get('page_content', '')
            uri = doc.get('metadata', {}).get('doc_uri', '')
            score = doc.get('metadata', {}).get('relevance_score', 0)
            print(f"Document from {uri} (score: {score})")

高级区间操作

将范围转换为/从字典转换

# Convert span to dictionary
span_dict = span.to_dict()
print(f"Span dict keys: {span_dict.keys()}")

# Recreate span from dictionary
from mlflow.entities import Span
reconstructed_span = Span.from_dict(span_dict)
print(f"Reconstructed span: {reconstructed_span.name}")

高级跨度分析

def analyze_span_tree(trace):
    """Analyze the span hierarchy and relationships."""
    spans = trace.data.spans

    # Build parent-child relationships
    span_dict = {span.span_id: span for span in spans}
    children = {}

    for span in spans:
        if span.parent_id:
            if span.parent_id not in children:
                children[span.parent_id] = []
            children[span.parent_id].append(span)

    # Find root spans
    roots = [s for s in spans if s.parent_id is None]

    def print_tree(span, indent=0):
        duration_ms = (span.end_time_ns - span.start_time_ns) / 1_000_000
        status_icon = "✓" if span.status.status_code == SpanStatusCode.OK else "✗"
        print(f"{'  ' * indent}{status_icon} {span.name} ({span.span_type}) - {duration_ms:.1f}ms")

        # Print children
        for child in sorted(children.get(span.span_id, []),
                          key=lambda s: s.start_time_ns):
            print_tree(child, indent + 1)

    print("Span Hierarchy:")
    for root in roots:
        print_tree(root)

    # Calculate span statistics
    total_time = sum((s.end_time_ns - s.start_time_ns) / 1_000_000
                     for s in spans)
    llm_time = sum((s.end_time_ns - s.start_time_ns) / 1_000_000
                   for s in spans if s.span_type in [SpanType.LLM, SpanType.CHAT_MODEL])
    retrieval_time = sum((s.end_time_ns - s.start_time_ns) / 1_000_000
                        for s in spans if s.span_type == SpanType.RETRIEVER)

    print(f"\nSpan Statistics:")
    print(f"  Total spans: {len(spans)}")
    print(f"  Total time: {total_time:.1f}ms")
    print(f"  LLM time: {llm_time:.1f}ms ({llm_time/total_time*100:.1f}%)")
    print(f"  Retrieval time: {retrieval_time:.1f}ms ({retrieval_time/total_time*100:.1f}%)")

    # Find critical path (longest duration path from root to leaf)
    def find_critical_path(span):
        child_paths = []
        for child in children.get(span.span_id, []):
            path, duration = find_critical_path(child)
            child_paths.append((path, duration))

        span_duration = (span.end_time_ns - span.start_time_ns) / 1_000_000
        if child_paths:
            best_path, best_duration = max(child_paths, key=lambda x: x[1])
            return [span] + best_path, span_duration + best_duration
        else:
            return [span], span_duration

    if roots:
        critical_paths = [find_critical_path(root) for root in roots]
        critical_path, critical_duration = max(critical_paths, key=lambda x: x[1])

        print(f"\nCritical Path ({critical_duration:.1f}ms total):")
        for span in critical_path:
            duration_ms = (span.end_time_ns - span.start_time_ns) / 1_000_000
            print(f"  → {span.name} ({duration_ms:.1f}ms)")

# Use the analyzer
analyze_span_tree(trace)

请求和响应数据

# Get root span request/response (backward compatibility)
request_json = trace.data.request
response_json = trace.data.response

# Parse JSON strings
import json
if request_json:
    request_data = json.loads(request_json)
    print(f"Request: {request_data}")

if response_json:
    response_data = json.loads(response_json)
    print(f"Response: {response_data}")

数据导出和转换

转换为字典

API 参考:Trace.to_dict

# Convert entire trace to dictionary
trace_dict = trace.to_dict()
print(f"Trace dict keys: {trace_dict.keys()}")
print(f"Info keys: {trace_dict['info'].keys()}")
print(f"Data keys: {trace_dict['data'].keys()}")

# Convert individual components
info_dict = trace.info.to_dict()
data_dict = trace.data.to_dict()

# Reconstruct trace from dictionary
from mlflow.entities import Trace
reconstructed_trace = Trace.from_dict(trace_dict)
print(f"Reconstructed trace ID: {reconstructed_trace.info.trace_id}")

JSON 序列化

API 参考:Trace.to_json

# Convert to JSON string
trace_json = trace.to_json()
print(f"JSON length: {len(trace_json)} characters")

# Pretty print JSON
trace_json_pretty = trace.to_json(pretty=True)
print("Pretty JSON (first 500 chars):")
print(trace_json_pretty[:500])

# Load trace from JSON
from mlflow.entities import Trace
loaded_trace = Trace.from_json(trace_json)
print(f"Loaded trace ID: {loaded_trace.info.trace_id}")

Pandas 数据帧转换

API 参考:Trace.to_pandas_dataframe_row

# Convert trace to DataFrame row
row_data = trace.to_pandas_dataframe_row()
print(f"DataFrame row keys: {list(row_data.keys())}")

# Create DataFrame from multiple traces
import pandas as pd

# Get multiple traces
traces = mlflow.search_traces(max_results=5)

# If you have individual trace objects
trace_rows = [t.to_pandas_dataframe_row() for t in [trace]]
df = pd.DataFrame(trace_rows)

print(f"DataFrame shape: {df.shape}")
print(f"Columns: {df.columns.tolist()}")

# Access specific data from DataFrame
print(f"Trace IDs: {df['trace_id'].tolist()}")
print(f"States: {df['state'].tolist()}")
print(f"Durations: {df['execution_duration'].tolist()}")

后续步骤