Freigeben über


Nicht-konversationelle KI-Agenten mit MLflow

Nicht-konversationelle Agenten verarbeiten strukturierte Eingaben, um spezifische Ausgaben zu erzeugen, ohne den Konversationszustand aufrechtzuerhalten. Jede Anforderung ist unabhängig und eigenständig, sodass diese Agents ideal für aufgabenspezifische Vorgänge wie Dokumentklassifizierung, Datenextraktion, Batchanalyse und strukturierte Fragebeantwortung geeignet sind.

Im Gegensatz zu konversationalen Agenten, die Dialoge mit mehreren Gängen verwalten, konzentrieren sich nicht-konversationale Agenten auf die effiziente Ausführung gut definierter Aufgaben. Diese optimierte Architektur ermöglicht einen höheren Durchsatz für unabhängige Anforderungen.

Hier erfahren Sie, wie Sie:

  • Einen nichtkonversationalen Agenten erstellen
  • Implementieren Sie eine umfassende MLflow-Ablaufverfolgung und Überwachbarkeit.
  • Agent für das Modell-Serving mit automatischer Ablaufverfolgungserfassung bereitstellen
  • Konfigurieren der Produktionsüberwachung mit MLflow 3-Scorern

Anforderungen

Abhängigkeiten:

  • MLflow 3.2.0 oder höher
  • databricks-agents 1.2.0 oder höher
  • databricks-sdk[openai] für die LLM-Integration
  • Python 3.10 oder höher

Arbeitsbereichszugriff:

  • Zugriff auf Foundation Model-APIs (Standard: Claude 3.7 Sonnet, konfigurierbar)
  • Zugriff auf einen Katalog und ein Schema für die Registrierung des KI-Modells
%pip install --upgrade mlflow[databricks]==3.6.0 pydantic databricks-sdk[openai] databricks-agents databricks-sdk
%restart_python

Beispielszenario

Der Agent in diesem Beispiel verarbeitet strukturierte Fragen zu Finanzdokumentinhalten und liefert Ja/Nein-Antworten mit Gründen. Benutzer stellen sowohl den Dokumenttext als auch Die Fragen direkt in der Eingabe bereit, sodass in diesem vereinfachten Beispiel keine Vektorsuchinfrastruktur erforderlich ist. Dies veranschaulicht, wie nicht dialogorientierte Agents gut definierte Aufgaben ohne Gesprächskontext verarbeiten können.

Sie können dieses Beispiel für Produktionsanwendungsfälle erweitern, indem Sie zusätzliche Tools und Funktionen integrieren. Beispiele sind die Vektorsuche für das Abrufen von Dokumenten, MCP -Tools (Model Context Protocol) für externe Integrationen oder andere Databricks-Agents wie Genie für den strukturierten Datenzugriff.

Einrichten des Service Principal

Nicht-konversationelle Agenten unterstützen keine automatische Authentifizierungsweitergabe, um Ablaufverfolgungen aus der Modellbereitstellung zu schreiben. Stattdessen müssen Sie eine benutzerdefinierte MLflow 3-Ablaufverfolgungsintegration implementieren und die Authentifizierung manuell mithilfe eines Dienstprinzipals verarbeiten.

  1. Erstellen Sie einen Dienstprinzipal mit OAuth-Anmeldeinformationen.
  2. Speichern von Anmeldeinformationen in einem geheimen Bereich:
# TODO: Configuration constants - Update these for your environment
CATALOG = "main"
SCHEMA = "default"  # Replace with your schema name
SECRET_SCOPE = "<YOUR_SECRET_SCOPE>"  # Replace with your secret scope name
DATABRICKS_HOST = (
    "https://host.databricks.com"  # Replace with your workspace URL
)

# TODO: If you have not yet stored your service principal's OAuth client id and client secret as Databricks secrets,
# uncomment the following code and replace the <client_id> and <client_secret> with your service principal's id and secret.

# from databricks.sdk import WorkspaceClient

# w = WorkspaceClient()
# w.secrets.put_secret(SECRET_SCOPE, "client_id", string_value ="<YOUR_SERVICE_PRINCIPAL_CLIENT_ID>")
# w.secrets.put_secret(SECRET_SCOPE, "client_secret", string_value ="<YOUR_SERVICE_PRINCIPAL_CLIENT_SECRET>")

Konfigurieren sie das MLflow-Experiment:

  • Erstellen Sie das Experiment, wenn es nicht vorhanden ist.
  • Erteilen Sie dem Dienstprinzipal CAN_EDIT die Berechtigungen für das Experiment.
# Mlflow experiment to capture traces
EXPERIMENT_NAME = "/Workspace/Shared/non-conversational"

# LLM Configuration
LLM_MODEL = "databricks-claude-3-7-sonnet"  # Change this to use different models

# Model and endpoint names - do not need to be changed
MODEL_NAME = "document_analyser"
ENDPOINT_NAME = "document_analyser_agent"
REGISTERED_MODEL_NAME = f"{CATALOG}.{SCHEMA}.{MODEL_NAME}"

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.ml import ExperimentAccessControlRequest
from databricks.sdk.service.iam import PermissionLevel
import mlflow

# Set experiment and get the experiment object directly
experiment = mlflow.set_experiment(EXPERIMENT_NAME)
experiment_id = experiment.experiment_id

# Fetch the service principal client_id from secret scope
client_id = dbutils.secrets.get(scope=SECRET_SCOPE, key="client_id")

# Set permissions for the SPN which will later write the traces from the serving endpoint
w = WorkspaceClient()
# Set CAN_EDIT permissions for the service principal
w.experiments.set_permissions(
    experiment_id=experiment_id,
    access_control_list=[
        ExperimentAccessControlRequest(
            service_principal_name=client_id,
            permission_level=PermissionLevel.CAN_EDIT
        )
    ]
)

print(f"✓ CAN_EDIT permissions granted to SPN {client_id[:8]}... for experiment: {experiment_id}")

Eingabe- und Ausgabeformat

Im Gegensatz zu konversationellen Agenten, die flexible Chatnachrichtenformate verwenden, benötigen nicht-konversationelle Agenten strukturierte Pydantic-Modelle für Eingaben und Ausgaben.

  1. Erstellen Sie Eingabeschemas mit allen erforderlichen Feldern für die Aufgabenausführung.
  2. Fügen Sie Ablaufverfolgungsmetadaten (trace_id, span_id) in Ausgabeschemas ein, um die Feedbackprotokollierung zu aktivieren.
  3. Designausgaben, die wenn angebracht detaillierte Begründungen oder Erläuterungen zu Gedankengängen bereitstellen.
  4. Überprüfen Sie Schemas während der Entwicklung, um Fehler vor der Bereitstellung abzufangen.

Eingabeformat (AgentInput)

{
  "document_text": "Document content to analyze...",
  "questions": [
    { "text": "Do the documents contain a balance sheet?" },
    { "text": "Do the documents contain an income statement?" },
    { "text": "Do the documents contain a cash flow statement?" }
  ]
}

Ausgabeformat (AgentOutput)

{
  "results": [
    {
      "question_text": "Do the documents contain a balance sheet?",
      "answer": "Yes",
      "chain_of_thought": "Detailed reasoning for the answer...",
      "span_id": "abc123def456"
    }
  ],
  "trace_id": "tr-xyz789abc123"
}
  • Strukturierte Eingabe: Benutzer stellen Dokumenttext und Fragen in einer einzigen Anforderung bereit.
  • Detaillierte Begründung: Jede Antwort enthält eine schritt-für-Schritt-Kette des Gedankens.
  • Rückverfolgbarkeit: Die Antwort umfasst trace_id und span_id für die Feedbacksammlung

Erstellen Sie den nicht-konversationellen Agenten

Erstellen Sie das nicht-konversationelle System mit MLflow für die Ablaufverfolgung. Der Agent verwendet @mlflow.trace Dekoratoren, um LLM-Anrufe automatisch zu erfassen und den gesamten Anforderungsfluss vollständig observierbar zu machen.

Benutzer stellen sowohl den Dokumenttext als auch Fragen direkt in der Eingabe bereit.

%%writefile model.py
import json
import logging
from typing import Optional
import uuid
import os
import sys

from databricks.sdk import WorkspaceClient
import mlflow
from mlflow.pyfunc import PythonModel
from mlflow.tracing import set_destination
from mlflow.tracing.destination import Databricks
from mlflow.entities import SpanType

from pydantic import BaseModel, Field


class Question(BaseModel):
    """Represents a question in the input."""

    text: str = Field(..., description="Yes/no question about document content")


class AgentInput(BaseModel):
    """Input model for the document analyser agent."""

    document_text: str = Field(..., description="The document text to analyze")
    questions: list[Question] = Field(..., description="List of yes/no questions")


class Answer(BaseModel):
    """Represents a structured response from the LLM."""

    answer: str = Field(..., description="Yes or No answer")
    chain_of_thought: str = Field(..., description="Step-by-step reasoning for the answer")


class AnalysisResult(BaseModel):
    """Represents an analysis result in the output."""

    question_text: str = Field(..., description="Original question text")
    answer: str = Field(..., description="Yes or No answer")
    chain_of_thought: str = Field(..., description="Step-by-step reasoning for the answer")
    span_id: str | None = Field(None, description="MLflow span ID for this specific answer (None during offline evaluation)")


class AgentOutput(BaseModel):
    """Output model for the document analyser agent."""

    results: list[AnalysisResult] = Field(..., description="List of analysis results")
    trace_id: str | None = Field(None, description="MLflow trace ID for user feedback collection (None during offline evaluation)")


class DocumentAnalyser(PythonModel):
    """Non-conversational agent for document analysis using MLflow model serving."""

    def __init__(self) -> None:
        """Initialize the document analyser.

        Sets up logging configuration, initializes model properties, and prepares
        the model for serving.
        """
        self._setup_logging()
        self.model_name = "document_analyser_v1"
        self.logger.debug(f"Initialized {self.model_name}")

    def _setup_logging(self) -> None:
        """Set up logging configuration for Model Serving.

        Configures a logger that uses stderr for better visibility in Model Serving
        environments. Log level can be controlled via MODEL_LOG_LEVEL environment
        variable (defaults to INFO).
        """
        self.logger = logging.getLogger("ModelLogger")
        # Set log level from environment variable or default to INFO
        log_level = os.getenv("MODEL_LOG_LEVEL", "INFO").upper()
        self.logger.setLevel(getattr(logging, log_level, logging.INFO))
        if not self.logger.handlers:
            handler = logging.StreamHandler()
            handler.setLevel(getattr(logging, log_level, logging.INFO))
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            handler.setFormatter(formatter)
            self.logger.addHandler(handler)

    def load_context(self, context) -> None:
        """Load model context and initialize clients.

        This method is called once when the model is loaded in the serving environment.
        It sets up MLflow tracing destination, initializes the Databricks workspace
        client, and configures the OpenAI-compatible client for LLM inference.

        Args:
            context: MLflow model context containing artifacts and configuration
        """
        self.logger.debug("Loading model context")
        set_destination(Databricks(experiment_id=os.getenv("MONITORING_EXPERIMENT_ID")))

        self.logger.debug("Instantiate workspace client")
        self.w = WorkspaceClient()
        # You can load any artifacts here if needed
        # self.artifacts = context.artifacts

        self.logger.debug("Instantiate openai client")
        # Get an OpenAI-compatible client configured for Databricks serving endpoints
        self.openai_client = self.w.serving_endpoints.get_open_ai_client()

    @mlflow.trace(name="answer_question", span_type=SpanType.LLM)
    def answer_question(self, question_text: str, document_text: str) -> tuple[object, str | None]:
        """Answer a question using LLM with structured response format.

        Uses the OpenAI-compatible client to call a language model with a structured
        JSON response format. The LLM analyzes the provided document text and returns
        a yes/no answer with reasoning.

        Args:
            question_text (str): The yes/no question to answer about the document
            document_text (str): The document text to analyze

        Returns:
            tuple: (openai.ChatCompletion, str|None) - LLM response and span_id
        """
        # Create a chat completion request with structured response for questions

        question_prompt = f"""
        You are a document analysis expert. Answer the following yes/no question based on the provided document.

        Question: "{question_text}"

        Document:
        {document_text}

        Analyze the document and provide a structured response.
        """

        # Create a separate sub-span for the actual OpenAI API call
        llm_response = self._call_openai_completion(question_prompt)

        # Get the current span ID for this specific answer
        current_span = mlflow.get_current_active_span()
        span_id = current_span.span_id if current_span is not None else None

        return llm_response, span_id

    @mlflow.trace(name="openai_completion", span_type=SpanType.LLM)
    def _call_openai_completion(self, prompt: str):
        """Make the actual OpenAI API call with its own sub-span.

        Args:
            prompt (str): The formatted prompt to send to the LLM

        Returns:
            OpenAI ChatCompletion response
        """
        return self.openai_client.chat.completions.create(
            model=os.getenv("LLM_MODEL", "databricks-claude-3-7-sonnet"),  # Configurable LLM model
            messages=[
                {
                    "role": "user",
                    "content": prompt
                }
            ],
            response_format={
                "type": "json_schema",
                "json_schema": {
                    "name": "question_response",
                    "schema": Answer.model_json_schema()
                }
            }
        )

    @mlflow.trace(name="document_analysis")
    def predict(self, context, model_input: list[AgentInput]) -> list[AgentOutput]:
        """Process document analysis questions with yes/no answers.

        Args:
            context: MLflow model context
            model_input: List of structured inputs containing document text and questions

        Returns:
            List of AgentOutput with yes/no answers and reasoning
        """
        self.logger.debug(f"Processing {len(model_input)} classification request(s)")

        # Get the current trace ID for user feedback collection
        # Will be None during offline evaluation when no active span exists
        current_span = mlflow.get_current_active_span()
        trace_id = current_span.trace_id if current_span is not None else None

        results = []
        for input_data in model_input:
            self.logger.debug(f"Number of questions: {len(input_data.questions)}")
            self.logger.debug(f"Document length: {len(input_data.document_text)} characters")

            analysis_results = []

            for question in input_data.questions:
                self.logger.debug(f"Processing question: {question.text}")

                # Answer the question using LLM with structured response
                llm_response, answer_span_id = self.answer_question(question.text, input_data.document_text)

                # Parse structured JSON response
                try:
                    response_data = json.loads(llm_response.choices[0].message.content)
                    answer_obj = Answer(**response_data)
                except Exception as e:
                    self.logger.debug(f"Failed to parse structured response: {e}")
                    # Fallback to default response
                    answer_obj = Answer(
                        answer="No",
                        chain_of_thought="Unable to process the question due to parsing error."
                    )

                analysis_results.append(AnalysisResult(
                    question_text=question.text,
                    answer=answer_obj.answer,
                    chain_of_thought=answer_obj.chain_of_thought,
                    span_id=answer_span_id
                ))

            self.logger.debug(f"Generated {len(analysis_results)} analysis results")

            results.append(AgentOutput(
                results=analysis_results,
                trace_id=trace_id
            ))

        return results

mlflow.models.set_model(DocumentAnalyser())

Protokollieren und registrieren des Agents

Bevor der Agent auf einem Dienstendpunkt bereitgestellt werden kann, muss er bei einem MLflow-Experiment protokolliert und im Unity-Katalog registriert werden.

import os
import mlflow

import json
from mlflow.pyfunc import PythonModel
from pydantic import BaseModel, Field
from model import DocumentAnalyser, AgentInput, Question


# Create example input for signature inference
def create_example_input() -> AgentInput:
    """Create example input for the non-conversational agent."""
    return AgentInput(
        document_text="Total assets: $2,300,000. Total liabilities: $1,200,000. Shareholder's equity: $1,100,000. Net income for the period was $450,000. Revenues: $1,700,000. Expenses: $1,250,000. Net cash provided by operating activities: $80,000. Cash flows from investing activities: -$20,000",
        questions=[
            Question(text="Do the documents contain a balance sheet?"),
            Question(text="Do the documents contain an income statement?"),
            Question(text="Do the documents contain a cash flow statement?"),
        ],
    )


input_example = create_example_input()

with mlflow.start_run(run_name="deploy_non_conversational_agent"):
    active_run = mlflow.active_run()
    current_experiment_id = active_run.info.experiment_id
    # Set environment variables for the model using current notebook experiment
    os.environ["MONITORING_EXPERIMENT_ID"] = current_experiment_id
    print(
        f"✓ Using current notebook experiment ID for tracing: {current_experiment_id}"
    )

    # Log the non-conversational agent with auto-inferred dependencies
    model_info = mlflow.pyfunc.log_model(
        name=MODEL_NAME,
        python_model="model.py",  # Path to the model code file
        input_example=[create_example_input().model_dump()],
        registered_model_name=REGISTERED_MODEL_NAME,
    )

# Set logged model as current active model to associate it with the below evaluation results
mlflow.set_active_model(model_id=mlflow.last_logged_model().model_id)

print(f"✓ Model logged and registered: {REGISTERED_MODEL_NAME}")
print(f"✓ Model version: {model_info.registered_model_version}")

Agent bewerten

Bewerten Sie vor der Bereitstellung in der Produktion die Leistung des Agents mithilfe des GenAI-Evaluierungsframeworks von MLflow mit vordefinierten Scorern. Einige Auswerter erfordern einen Bodenwahrheitsdatensatz.

import mlflow
import mlflow.genai.datasets
from requests import HTTPError

# Create an evaluation dataset in Unity Catalog
uc_schema = f"{CATALOG}.{SCHEMA}"
evaluation_dataset_table_name = "document_analyser_eval"

try:
    # Try to create a new evaluation dataset
    eval_dataset = mlflow.genai.datasets.create_dataset(
        uc_table_name=f"{uc_schema}.{evaluation_dataset_table_name}",
    )
    print(f"✓ Created evaluation dataset: {uc_schema}.{evaluation_dataset_table_name}")
except HTTPError as e:
    # Check if it's a TABLE_ALREADY_EXISTS error
    if e.response.status_code == 400 and "TABLE_ALREADY_EXISTS" in str(e):
        print(
            f"Dataset {uc_schema}.{evaluation_dataset_table_name} already exists, loading existing dataset..."
        )
        eval_dataset = mlflow.genai.datasets.get_dataset(
            uc_table_name=f"{uc_schema}.{evaluation_dataset_table_name}"
        )
        print(
            f"✓ Loaded existing evaluation dataset: {uc_schema}.{evaluation_dataset_table_name}"
        )
    else:
        # Different HTTP error, re-raise
        raise

# Define comprehensive test cases with expected facts for ground truth comparison
sample_document = "Total assets: $2,300,000. Total liabilities: $1,200,000. Shareholder's equity: $1,100,000. Net income for the period was $450,000. Revenues: $1,700,000. Expenses: $1,250,000. Net cash provided by operating activities: $80,000. Cash flows from investing activities: -$20,000"

evaluation_examples = [
    {
        "inputs": {
            "document_text": sample_document,
            "questions": [{"text": "Do the documents contain a balance sheet?"}],
        },
        "expectations": {
            "expected_facts": [
                "answer is Yes",
                "balance sheet information",
                "total assets mentioned",
                "total liabilities mentioned",
                "shareholder's equity mentioned",
            ]
        },
    },
    {
        "inputs": {
            "document_text": sample_document,
            "questions": [{"text": "Do the documents contain an income statement?"}],
        },
        "expectations": {
            "expected_facts": [
                "answer is Yes",
                "income statement information",
                "net income mentioned",
                "revenues mentioned",
                "expenses mentioned",
            ]
        },
    },
    {
        "inputs": {
            "document_text": sample_document,
            "questions": [{"text": "Do the documents contain a cash flow statement?"}],
        },
        "expectations": {
            "expected_facts": [
                "answer is Yes",
                "cash flow information",
                "operating activities mentioned",
                "investing activities mentioned",
                "cash flows mentioned",
            ]
        },
    },
    {
        "inputs": {
            "document_text": sample_document,
            "questions": [
                {
                    "text": "Do the documents contain information about employee benefits?"
                }
            ],
        },
        "expectations": {
            "expected_facts": [
                "answer is No",
                "no employee benefits information",
                "financial statements focus",
                "no HR-related content",
            ]
        },
    },
]

# Add the examples to the evaluation dataset
eval_dataset.merge_records(evaluation_examples)
print(f"✓ Added {len(evaluation_examples)} records to evaluation dataset")

# Preview the dataset
df = eval_dataset.to_df()
print(f"✓ Dataset preview - Total records: {len(df)}")
df.display()
import warnings
import mlflow
from mlflow.genai.scorers import (
    RelevanceToQuery,
    Correctness,
    Guidelines,
)

# Suppress harmless threadpoolctl warnings that can appear in Databricks environments
warnings.filterwarnings("ignore", message=".*threadpoolctl.*")
warnings.filterwarnings("ignore", category=UserWarning, module="threadpoolctl")

# Load the logged model for evaluation
model_uri = f"models:/{REGISTERED_MODEL_NAME}/{model_info.registered_model_version}"
print(f"Loading model for evaluation: {model_uri}")

# Load the model as a predict function
loaded_model = mlflow.pyfunc.load_model(model_uri)


def my_app(document_text, questions):
    """Wrapper function for the model prediction."""
    # The evaluation dataset's inputs field contains {"document_text": "...", "questions": [...]}
    # but the predict_fn parameter names must match the keys in inputs
    input_data = {"document_text": document_text, "questions": questions}
    return loaded_model.predict([input_data])


# Define scorers for evaluation including ground truth comparison
correctness_scorer = Correctness()  # Compares against expected_facts
relevance_scorer = RelevanceToQuery()  # Evaluates relevance of response to question
response_schema_scorer = Guidelines(
    name="response_schema",
    guidelines="The response must be structured JSON with an 'answer' field containing 'Yes' or 'No' and a 'chain_of_thought' field with clear reasoning. There also needs to be a 'question_text' field that contains the question that was asked. All these fields are part of the 'results' array field.",
)  # Validates structured output format

# This creates an evaluation run using the MLflow-managed dataset
results = mlflow.genai.evaluate(
    data=eval_dataset,  # Use the MLflow-managed dataset
    predict_fn=my_app,
    scorers=[
        correctness_scorer,
        relevance_scorer,
        response_schema_scorer,
    ],
)

# Access the run ID
print(f"✓ Evaluation completed")
print(f"Evaluation run ID: {results.run_id}")

# Display evaluation results summary
if hasattr(results, "metrics") and results.metrics:
    print("\n📊 Evaluation Results Summary:")
    for metric_name, metric_value in results.metrics.items():
        if isinstance(metric_value, (int, float)):
            print(f"  • {metric_name}: {metric_value:.3f}")
        else:
            print(f"  • {metric_name}: {metric_value}")
else:
    print("✓ Evaluation completed - view detailed results in the evaluation experiment")

# Display link to the evaluation dataset
print(f"\n📊 Evaluation Dataset: {uc_schema}.{evaluation_dataset_table_name}")
print(f"🔗 View dataset in Unity Catalog Data Explorer")

Bereitstellen im Modellbereitstellungs-Dienst

Stellen Sie den ausgewerteten Agent auf einem Model Serving-Endpunkt mit den erforderlichen Umgebungsvariablen für die MLflow 3-Ablaufverfolgung bereit. Dadurch wird sichergestellt, dass alle Produktionsanforderungen automatisch nachverfolgt und beim angegebenen MLflow-Experiment protokolliert werden.

import mlflow
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import (
    ServedEntityInput,
    ServingModelWorkloadType,
    EndpointCoreConfigInput,
)

from model import DocumentAnalyser, AgentInput, Question

workspace = WorkspaceClient()

# Use the model version from the logged model
model_version = model_info.registered_model_version

print(f"Using model version: {model_version}")

new_entity = ServedEntityInput(
    entity_name=REGISTERED_MODEL_NAME,
    entity_version=model_version,
    name=f"{MODEL_NAME}-{model_version}",
    workload_size="Small",
    workload_type=ServingModelWorkloadType.CPU,
    scale_to_zero_enabled=True,
    environment_vars={
        "DATABRICKS_CLIENT_ID": f"{{{{secrets/{SECRET_SCOPE}/client_id}}}}",
        "DATABRICKS_CLIENT_SECRET": f"{{{{secrets/{SECRET_SCOPE}/client_secret}}}}",
        "DATABRICKS_HOST": DATABRICKS_HOST,
        "MLFLOW_TRACKING_URI": "databricks",
        "MONITORING_EXPERIMENT_ID": current_experiment_id,
        "MODEL_LOG_LEVEL": "INFO",
        "LLM_MODEL": LLM_MODEL,
    },
)

# Check if endpoint exists and create or update accordingly
try:
    # Try to get the existing endpoint
    existing_endpoint = workspace.serving_endpoints.get(ENDPOINT_NAME)
    print(
        f"Endpoint {ENDPOINT_NAME} exists, updating with model version {model_version}"
    )

    # Update existing endpoint with new model version
    workspace.serving_endpoints.update_config(
        name=ENDPOINT_NAME, served_entities=[new_entity]
    )
    print("Endpoint update initiated, waiting for completion...")

    # Wait for update to complete
    workspace.serving_endpoints.wait_get_serving_endpoint_not_updating(ENDPOINT_NAME)
    print("Endpoint updated successfully and is ready")

except Exception as e:
    # Endpoint doesn't exist, create it
    print(f"Endpoint {ENDPOINT_NAME} doesn't exist, creating new endpoint...")

    workspace.serving_endpoints.create(
        name=ENDPOINT_NAME,
        config=EndpointCoreConfigInput(name=ENDPOINT_NAME, served_entities=[new_entity]),
    )
    print("Endpoint creation initiated, waiting for completion...")

    # Wait for creation to complete
    workspace.serving_endpoints.wait_get_serving_endpoint_not_updating(ENDPOINT_NAME)
    print("Endpoint created successfully and is ready")

# Final status check
endpoint_status = workspace.serving_endpoints.get(ENDPOINT_NAME)
print(f"Final endpoint status: {endpoint_status.state}")
print(
    f"Endpoint URL: https://{DATABRICKS_HOST.replace('https://', '')}/serving-endpoints/{ENDPOINT_NAME}/invocations"
)

Einrichten der Produktionsüberwachung mit Scorern

Konfigurieren Sie die automatische Qualitätsauswertung für den Produktionsdatenverkehr mithilfe von MLflow 3 Scorern. Auswerter analysieren automatisch Protokolle, die von Produktionsanforderungen protokolliert werden, um eine kontinuierliche Qualitätsüberwachung zu ermöglichen.

from mlflow.genai.scorers import (
    RelevanceToQuery,
    Guidelines,
    ScorerSamplingConfig,
    list_scorers,
    get_scorer,
)

# Set the active experiment for scoring (use the current notebook's experiment)
print(f"Setting experiment to: {current_experiment_id}")
mlflow.set_experiment(experiment_id=current_experiment_id)

# Verify the experiment is set correctly
current_experiment = mlflow.get_experiment(current_experiment_id)
print(
    f"Current experiment: {current_experiment.name} (ID: {current_experiment.experiment_id})"
)

# Setup scorers for production monitoring
print("Setting up production monitoring scorers...")

# Relevance scorer - always create new to avoid conflicts
relevance_scorer = RelevanceToQuery().register(name="financial_relevance_check")
relevance_scorer = relevance_scorer.start(
    sampling_config=ScorerSamplingConfig(sample_rate=0.5)
)
print("✅ Created relevance scorer (50% sampling)")

# Guidelines scorer for response schema validation
response_schema_scorer = Guidelines(
    name="response_schema",
    guidelines="The response must be structured JSON with an 'answer' field containing 'Yes' or 'No' and a 'chain_of_thought' field with clear reasoning.",
).register(name="response_schema_check")
response_schema_scorer = response_schema_scorer.start(
    sampling_config=ScorerSamplingConfig(sample_rate=0.4)
)
print("✅ Created response schema scorer (40% sampling)")

# List all active scorers
print(f"\nActive Scorers in Experiment {current_experiment_id}:")
scorers = list_scorers()
for scorer in scorers:
    print(f"• {scorer.name}: {scorer.sample_rate*100}% sampling")

Testen des bereitgestellten Agents

Testen Sie den bereitgestellten Agent mit Beispieleingaben. Jede Anforderung generiert automatisch MLflow 3-Ablaufverfolgungen, die den vollständigen Anforderungsfluss erfassen. Die Produktionsscorer bewerten diese Ablaufverfolgungen für die Qualitätsüberwachung.

from databricks.sdk import WorkspaceClient

# Test the non-conversational agent endpoint using Databricks SDK
workspace = WorkspaceClient()

# Example payload with structured input for the non-conversational agent
test_input = {
    "inputs": [
        {
            "document_text": "Total assets: $2,300,000. Total liabilities: $1,200,000. Shareholder's equity: $1,100,000. Net income for the period was $450,000. Revenues: $1,700,000. Expenses: $1,250,000. Net cash provided by operating activities: $80,000. Cash flows from investing activities: -$20,000",
            "questions": [
                {"text": "Do the documents contain a balance sheet?"},
                {"text": "Do the documents contain an income statement?"},
                {"text": "Do the documents contain a cash flow statement?"},
            ],
        }
    ]
}

# Query the serving endpoint using the workspace client
response = workspace.serving_endpoints.query(
    name=ENDPOINT_NAME, inputs=test_input["inputs"]
)

print("Endpoint Response:")
print(response.as_dict())

# Generate MLflow experiment URL
experiment_url = f"{DATABRICKS_HOST}/ml/experiments/{current_experiment_id}"
print(f"\nMLflow Experiment URL: {experiment_url}")

Protokollieren von Benutzerfeedback

Auch bei nicht-konversationellen Agenten ist das Sammeln von Benutzerfeedback entscheidend für die kontinuierliche Verbesserung. Benutzerorientierte Front-End-Anwendungen können Benutzern das Annehmen oder Ablehnen einzelner Antworten ermöglichen, die vom Agent bereitgestellt werden. Dieses Feedback kann dann mithilfe der in der Antwort enthaltenen trace_id und span_id bei MLflow protokolliert werden.

Häufiges Feedbackszenario für nicht-konversationelle Agenten:

  • Genauigkeitsfeedback: "War diese Ja/Nein-Antwort richtig?"
  • Relevanzfeedback: "War die Begründung für die Frage angemessen?"
  • Qualitätsfeedback: "War der unterstützende Nachweis ausreichend?"
  • Fehlerberichterstattung: "Hat der Agent den Dokumentinhalt falsch verstanden?"

In der folgenden Zelle wird veranschaulicht, wie Feedback für eine einzelne Antwort mithilfe des in der Antwort zurückgegebenen span_id protokolliert wird.

import mlflow
from mlflow.entities import AssessmentSource

# Get the response from the previous test (extract span_id from first result)
# In a real application, this would come from the API response
response_dict = response.as_dict()
first_prediction = response_dict["predictions"][0]
first_result = first_prediction["results"][0]

# Assert we have the required IDs for feedback logging
assert (
    first_result.get("span_id") is not None
), "span_id is required for feedback logging"
assert (
    first_prediction.get("trace_id") is not None
), "trace_id is required for feedback logging"

span_id = first_result["span_id"]
trace_id = first_prediction["trace_id"]
question_text = first_result["question_text"]
answer = first_result["answer"]

print(f"Logging feedback for question: '{question_text}'")
print(f"Agent answer: {answer}")
print(f"Span ID: {span_id}")
print(f"Trace ID: {trace_id}")

try:
    # Example: User provides positive feedback on this specific answer
    mlflow.log_feedback(
        trace_id=trace_id,
        span_id=span_id,
        name="user_feedback",
        value=True,  # True for positive, False for negative
        source=AssessmentSource(source_type="HUMAN"),
        rationale="Answer was accurate and well-reasoned",
    )

    print("✅ Feedback logged successfully!")

except Exception as e:
    print(f"Note: Could not log feedback in this environment: {e}")

Nächste Schritte

Beispiel-Notebook

Nicht-konversationelle KI-Agenten mit MLflow

Notebook abrufen