Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Niet-gespreksagenten verwerken gestructureerde invoer om specifieke uitvoer te produceren zonder de gespreksstatus te behouden. Elke aanvraag is onafhankelijk en zelfstandig, waardoor deze agents ideaal zijn voor taakspecifieke bewerkingen, zoals documentclassificatie, gegevensextractie, batchanalyse en gestructureerde vragen beantwoorden.
In tegenstelling tot gespreksagenten die meerdere gespreksrondes beheren, richten niet-conversatieagenten zich op het efficiënt uitvoeren van goed gedefinieerde taken. Deze gestroomlijnde architectuur maakt een hogere doorvoer mogelijk voor onafhankelijke aanvragen.
U leert het volgende:
- Een niet-converserende agent maken
- Uitgebreide MLflow-tracering en waarneembaarheid implementeren
- De agent implementeren in Model Serving met automatische traceringsverzameling
- Productiebewaking instellen met MLflow 3-scorers
Requirements
Afhankelijkheden:
- MLflow 3.2.0 of hoger
- databricks-agents 1.2.0 of hoger
- databricks-sdk[openai] voor LLM-integratie
- Python 3.10 of hoger
Toegang tot werkruimte:
- Toegang tot Foundation Model-API's (standaard: Claude 4.5 Sonnet, configureerbaar)
- Toegang tot een catalogus en schema voor registratie van het AI-model
%pip install --upgrade mlflow[databricks]==3.6.0 pydantic databricks-sdk[openai] databricks-agents databricks-sdk
%restart_python
Voorbeeldscenario
De agent in dit voorbeeld verwerkt gestructureerde vragen over inhoud van financiële documenten en biedt ja/nee-antwoorden met redenering. Gebruikers bieden zowel de documenttekst als vragen rechtstreeks in de invoer, waardoor in dit vereenvoudigde voorbeeld geen vectorzoekinfrastructuur meer nodig is. Dit laat zien hoe niet-gespreksagenten goed gedefinieerde taken kunnen afhandelen zonder gesprekscontext.
U kunt dit voorbeeld uitbreiden voor gebruiksvoorbeelden voor productie door aanvullende hulpprogramma's en mogelijkheden te integreren. Voorbeelden hiervan zijn vectorzoekopdrachten voor het ophalen van documenten, MCP-hulpprogramma's (Model Context Protocol) voor externe integraties of andere Databricks-agents zoals Genie voor gestructureerde gegevenstoegang.
De service-principal configureren
Niet-gespreksagenten bieden geen ondersteuning voor automatische verificatiepassthrough voor het schrijven van traceringen van Model Serving. In plaats daarvan moet u aangepaste MLflow 3-traceringsintegratie implementeren en verificatie handmatig afhandelen met behulp van een service-principal.
- Maak een service-principal met OAuth-referenties, en verleen het gebruikersrechten voor uw werkruimte.
- Sla referenties op in een geheim bereik.
# TODO: Configuration constants - After running this cell, set these values for your environment
dbutils.widgets.text("catalog", "main")
dbutils.widgets.text("schema", "default")
dbutils.widgets.text("secret_scope", "")
dbutils.widgets.dropdown("clean_up_resources", "False", ["True", "False"])
CATALOG = dbutils.widgets.get("catalog")
SCHEMA = dbutils.widgets.get("schema")
SECRET_SCOPE = dbutils.widgets.get("secret_scope")
CLEAN_UP_RESOURCES = dbutils.widgets.get("clean_up_resources")
from databricks.sdk import WorkspaceClient
workspace = WorkspaceClient()
DATABRICKS_HOST = workspace.config.host # Your workspace URL
Als u de OAuth-client-id en het clientgeheim van uw service-principal nog niet hebt opgeslagen als Databricks-geheimen, verwijdert u de volgende code en vervangt u de waarden door de id en het geheim van uw service-principal. U kunt de geheimen ook maken met behulp van de Databricks CLI.
# TODO: Store your service principal ID and secret.
# workspace.secrets.put_secret(SECRET_SCOPE, "client_id", string_value ="<YOUR_SERVICE_PRINCIPAL_CLIENT_ID>")
# workspace.secrets.put_secret(SECRET_SCOPE, "client_secret", string_value ="<YOUR_SERVICE_PRINCIPAL_CLIENT_SECRET>")
Configureer het MLflow-experiment:
- Maak het experiment als het niet bestaat.
- Verleen de service-principal
CAN_EDITmachtigingen aan het experiment.
# MLflow experiment to capture traces
EXPERIMENT_NAME = "/Workspace/Shared/non-conversational"
# LLM Configuration
LLM_MODEL = "databricks-claude-sonnet-4-5" # Change this to use different models
# Model and endpoint names - do not need to be changed
MODEL_NAME = "document_analyzer"
ENDPOINT_NAME = "document_analyzer_agent"
REGISTERED_MODEL_NAME = f"{CATALOG}.{SCHEMA}.{MODEL_NAME}"
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.ml import ExperimentAccessControlRequest
from databricks.sdk.service.iam import PermissionLevel
import mlflow
# Set experiment and get the experiment object directly
experiment = mlflow.set_experiment(EXPERIMENT_NAME)
experiment_id = experiment.experiment_id
# Fetch the service principal client_id from secret scope
client_id = dbutils.secrets.get(scope=SECRET_SCOPE, key="client_id")
# Set permissions for the SPN which will later write the traces from the serving endpoint
workspace = WorkspaceClient()
# Set CAN_EDIT permissions for the service principal
workspace.experiments.set_permissions(
experiment_id=experiment_id,
access_control_list=[
ExperimentAccessControlRequest(
service_principal_name=client_id,
permission_level=PermissionLevel.CAN_EDIT
)
]
)
print(f"✓ CAN_EDIT permissions granted to SPN {client_id[:8]}... for experiment: {experiment_id}")
Invoer- en uitvoerindeling
In tegenstelling tot gespreksagenten die gebruikmaken van flexibele indelingen voor chatberichten, vereisen niet-gespreksagenten gestructureerde Pydantic-modellen voor invoer en uitvoer:
- Invoerschema's maken met alle vereiste velden voor taakuitvoering.
- Neem traceringsmetagegevens (
trace_id,span_id) op in uitvoerschema's om logboekregistratie van feedback in te schakelen. - Ontwerpresultaten die, indien gepast, gedetailleerde redeneringen of uitleg van redeneerketens geven.
- Valideer schema's tijdens de ontwikkeling om fouten te ondervangen vóór de implementatie.
Invoerindeling (AgentInput)
{
"document_text": "Document content to analyze...",
"questions": [
{ "text": "Do the documents contain a balance sheet?" },
{ "text": "Do the documents contain an income statement?" },
{ "text": "Do the documents contain a cash flow statement?" }
]
}
Uitvoerindeling (AgentOutput)
{
"results": [
{
"question_text": "Do the documents contain a balance sheet?",
"answer": "Yes",
"chain_of_thought": "Detailed reasoning for the answer...",
"span_id": "abc123def456"
}
],
"trace_id": "tr-xyz789abc123"
}
- Gestructureerde invoer: gebruikers bieden zowel documenttekst als vragen in één aanvraag
- Gedetailleerde redenering: elk antwoord bevat stapsgewijze gedachtenketen
-
Traceerbaarheid: Reacties omvatten
trace_idenspan_idvoor feedbackverzameling
Bouw de niet-conversatiële agent
Maak de niet-converserende agent met MLflow-tracering. De agent gebruikt @mlflow.trace decorators om LLM-aanroepen automatisch te registreren en het volledige aanvraagtraject voor maximale waarneembaarheid in kaart te brengen.
Gebruikers leveren zowel de documenttekst als vragen rechtstreeks bij de invoer.
%%writefile model.py
import json
import logging
from typing import Optional
import uuid
import os
import sys
from databricks.sdk import WorkspaceClient
import mlflow
from mlflow.pyfunc import PythonModel
from mlflow.tracing import set_destination
from mlflow.entities import SpanType
from mlflow.entities.trace_location import MlflowExperimentLocation
from pydantic import BaseModel, Field
class Question(BaseModel):
"""Represents a question in the input."""
text: str = Field(..., description="Yes/no question about document content")
class AgentInput(BaseModel):
"""Input model for the document analyzer agent."""
document_text: str = Field(..., description="The document text to analyze")
questions: list[Question] = Field(..., description="List of yes/no questions")
class Answer(BaseModel):
"""Represents a structured response from the LLM."""
answer: str = Field(..., description="Yes or No answer")
chain_of_thought: str = Field(..., description="Step-by-step reasoning for the answer")
class AnalysisResult(BaseModel):
"""Represents an analysis result in the output."""
question_text: str = Field(..., description="Original question text")
answer: str = Field(..., description="Yes or No answer")
chain_of_thought: str = Field(..., description="Step-by-step reasoning for the answer")
span_id: str | None = Field(None, description="MLflow span ID for this specific answer (None during offline evaluation)")
class AgentOutput(BaseModel):
"""Output model for the document analyzer agent."""
results: list[AnalysisResult] = Field(..., description="List of analysis results")
trace_id: str | None = Field(None, description="MLflow trace ID for user feedback collection (None during offline evaluation)")
class DocumentAnalyzer(PythonModel):
"""Non-conversational agent for document analysis using MLflow model serving."""
def __init__(self) -> None:
"""Initialize the document analyzer.
Sets up logging configuration, initializes model properties, and prepares
the model for serving.
"""
self._setup_logging()
self.model_name = "document_analyzer_v1"
self.logger.debug(f"Initialized {self.model_name}")
def _setup_logging(self) -> None:
"""Set up logging configuration for Model Serving.
Configures a logger that uses stderr for better visibility in Model Serving
environments. Log level can be controlled via MODEL_LOG_LEVEL environment
variable (defaults to INFO).
"""
self.logger = logging.getLogger("ModelLogger")
# Set log level from environment variable or default to INFO
log_level = os.getenv("MODEL_LOG_LEVEL", "INFO").upper()
self.logger.setLevel(getattr(logging, log_level, logging.INFO))
if not self.logger.handlers:
handler = logging.StreamHandler()
handler.setLevel(getattr(logging, log_level, logging.INFO))
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def load_context(self, context) -> None:
"""Load model context and initialize clients.
This method is called once when the model is loaded in the serving environment.
It sets up MLflow tracing destination, initializes the Databricks workspace
client, and configures the OpenAI-compatible client for LLM inference.
Args:
context: MLflow model context containing artifacts and configuration
"""
self.logger.debug("Loading model context")
set_destination(MlflowExperimentLocation(experiment_id=os.getenv("MONITORING_EXPERIMENT_ID")))
self.logger.debug("Instantiate workspace client")
self.workspace = WorkspaceClient()
# You can load any artifacts here if needed
# self.artifacts = context.artifacts
self.logger.debug("Instantiate openai client")
# Get an OpenAI-compatible client configured for Databricks serving endpoints
self.openai_client = self.workspace.serving_endpoints.get_open_ai_client()
@mlflow.trace(name="answer_question", span_type=SpanType.LLM)
def answer_question(self, question_text: str, document_text: str) -> tuple[object, str | None]:
"""Answer a question using LLM with structured response format.
Uses the OpenAI-compatible client to call a language model with a structured
JSON response format. The LLM analyzes the provided document text and returns
a yes/no answer with reasoning.
Args:
question_text (str): The yes/no question to answer about the document
document_text (str): The document text to analyze
Returns:
tuple: (openai.ChatCompletion, str|None) - LLM response and span_id
"""
# Create a chat completion request with structured response for questions
question_prompt = f"""
You are a document analysis expert. Answer the following yes/no question based on the provided document.
Question: "{question_text}"
Document:
{document_text}
Analyze the document and provide a structured response.
"""
# Create a separate sub-span for the actual OpenAI API call
llm_response = self._call_openai_completion(question_prompt)
# Get the current span ID for this specific answer
current_span = mlflow.get_current_active_span()
span_id = current_span.span_id if current_span is not None else None
return llm_response, span_id
@mlflow.trace(name="openai_completion", span_type=SpanType.LLM)
def _call_openai_completion(self, prompt: str):
"""Make the actual OpenAI API call with its own sub-span.
Args:
prompt (str): The formatted prompt to send to the LLM
Returns:
OpenAI ChatCompletion response
"""
return self.openai_client.chat.completions.create(
model=os.getenv("LLM_MODEL", "databricks-claude-sonnet-4-5"), # Configurable LLM model
messages=[
{
"role": "user",
"content": prompt
}
],
response_format={
"type": "json_schema",
"json_schema": {
"name": "question_response",
"schema": Answer.model_json_schema()
}
}
)
@mlflow.trace(name="document_analysis")
def predict(self, context, model_input: list[AgentInput]) -> list[AgentOutput]:
"""Process document analysis questions with yes/no answers.
Args:
context: MLflow model context
model_input: List of structured inputs containing document text and questions
Returns:
List of AgentOutput with yes/no answers and reasoning
"""
self.logger.debug(f"Processing {len(model_input)} classification request(s)")
# Get the current trace ID for user feedback collection
# Will be None during offline evaluation when no active span exists
current_span = mlflow.get_current_active_span()
trace_id = current_span.trace_id if current_span is not None else None
results = []
for input_data in model_input:
self.logger.debug(f"Number of questions: {len(input_data.questions)}")
self.logger.debug(f"Document length: {len(input_data.document_text)} characters")
analysis_results = []
for question in input_data.questions:
self.logger.debug(f"Processing question: {question.text}")
# Answer the question using LLM with structured response
llm_response, answer_span_id = self.answer_question(question.text, input_data.document_text)
# Parse structured JSON response
try:
response_data = json.loads(llm_response.choices[0].message.content)
answer_obj = Answer(**response_data)
except Exception as e:
self.logger.debug(f"Failed to parse structured response: {e}")
# Fallback to default response
answer_obj = Answer(
answer="No",
chain_of_thought="Unable to process the question due to parsing error."
)
analysis_results.append(AnalysisResult(
question_text=question.text,
answer=answer_obj.answer,
chain_of_thought=answer_obj.chain_of_thought,
span_id=answer_span_id
))
self.logger.debug(f"Generated {len(analysis_results)} analysis results")
results.append(AgentOutput(
results=analysis_results,
trace_id=trace_id
))
return results
mlflow.models.set_model(DocumentAnalyzer())
De agent loggen en registreren
Voordat de agent kan worden geïmplementeerd naar een service-eindpunt, moet deze worden geregistreerd bij een MLflow-experiment en geregistreerd in Unity Catalog.
import os
import mlflow
import json
from mlflow.pyfunc import PythonModel
from pydantic import BaseModel, Field
from model import DocumentAnalyzer, AgentInput, Question
# Create example input for signature inference
def create_example_input() -> AgentInput:
"""Create example input for the non-conversational agent."""
return AgentInput(
document_text="Total assets: $2,300,000. Total liabilities: $1,200,000. Shareholder's equity: $1,100,000. Net income for the period was $450,000. Revenues: $1,700,000. Expenses: $1,250,000. Net cash provided by operating activities: $80,000. Cash flows from investing activities: -$20,000",
questions=[
Question(text="Do the documents contain a balance sheet?"),
Question(text="Do the documents contain an income statement?"),
Question(text="Do the documents contain a cash flow statement?"),
],
)
input_example = create_example_input()
with mlflow.start_run(run_name="deploy_non_conversational_agent"):
active_run = mlflow.active_run()
current_experiment_id = active_run.info.experiment_id
# Set environment variables for the model using current notebook experiment
os.environ["MONITORING_EXPERIMENT_ID"] = current_experiment_id
print(
f"✓ Using current notebook experiment ID for tracing: {current_experiment_id}"
)
# Log the non-conversational agent with auto-inferred dependencies
model_info = mlflow.pyfunc.log_model(
name=MODEL_NAME,
python_model="model.py", # Path to the model code file
input_example=[create_example_input().model_dump()],
registered_model_name=REGISTERED_MODEL_NAME,
)
# Set logged model as current active model to associate it with the below evaluation results
mlflow.set_active_model(model_id=mlflow.last_logged_model().model_id)
print(f"✓ Model logged and registered: {REGISTERED_MODEL_NAME}")
print(f"✓ Model version: {model_info.registered_model_version}")
De agent evalueren
Voordat u implementeert in productie, evalueert u de prestaties van de agent met behulp van het GenAI-evaluatieframework van MLflow met vooraf gebouwde scorers. Voor sommige scorers is een grondwaarheidsdataset nodig.
import mlflow
import mlflow.genai.datasets
from requests import HTTPError
# Create an evaluation dataset in Unity Catalog
uc_schema = f"{CATALOG}.{SCHEMA}"
evaluation_dataset_table_name = "document_analyzer_eval"
try:
# Try to create a new evaluation dataset
eval_dataset = mlflow.genai.datasets.create_dataset(
name=f"{uc_schema}.{evaluation_dataset_table_name}",
)
print(f"✓ Created evaluation dataset: {uc_schema}.{evaluation_dataset_table_name}")
except HTTPError as e:
# Check if it's a TABLE_ALREADY_EXISTS error
if e.response.status_code == 400 and "TABLE_ALREADY_EXISTS" in str(e):
print(
f"Dataset {uc_schema}.{evaluation_dataset_table_name} already exists, loading existing dataset..."
)
eval_dataset = mlflow.genai.datasets.get_dataset(
name=f"{uc_schema}.{evaluation_dataset_table_name}"
)
print(
f"✓ Loaded existing evaluation dataset: {uc_schema}.{evaluation_dataset_table_name}"
)
else:
# Different HTTP error, re-raise
raise
# Define comprehensive test cases with expected facts for ground truth comparison
sample_document = "Total assets: $2,300,000. Total liabilities: $1,200,000. Shareholder's equity: $1,100,000. Net income for the period was $450,000. Revenues: $1,700,000. Expenses: $1,250,000. Net cash provided by operating activities: $80,000. Cash flows from investing activities: -$20,000"
evaluation_examples = [
{
"inputs": {
"document_text": sample_document,
"questions": [{"text": "Do the documents contain a balance sheet?"}],
},
"expectations": {
"expected_facts": [
"answer is Yes",
"balance sheet information",
"total assets mentioned",
"total liabilities mentioned",
"shareholder's equity mentioned",
]
},
},
{
"inputs": {
"document_text": sample_document,
"questions": [{"text": "Do the documents contain an income statement?"}],
},
"expectations": {
"expected_facts": [
"answer is Yes",
"income statement information",
"net income mentioned",
"revenues mentioned",
"expenses mentioned",
]
},
},
{
"inputs": {
"document_text": sample_document,
"questions": [{"text": "Do the documents contain a cash flow statement?"}],
},
"expectations": {
"expected_facts": [
"answer is Yes",
"cash flow information",
"operating activities mentioned",
"investing activities mentioned",
"cash flows mentioned",
]
},
},
{
"inputs": {
"document_text": sample_document,
"questions": [
{
"text": "Do the documents contain information about employee benefits?"
}
],
},
"expectations": {
"expected_facts": [
"answer is No",
"no employee benefits information",
"financial statements focus",
"no HR-related content",
]
},
},
]
# Add the examples to the evaluation dataset
eval_dataset.merge_records(evaluation_examples)
print(f"✓ Added {len(evaluation_examples)} records to evaluation dataset")
# Preview the dataset
df = eval_dataset.to_df()
print(f"✓ Dataset preview - Total records: {len(df)}")
df.display()
import warnings
import mlflow
from mlflow.genai.scorers import (
RelevanceToQuery,
Correctness,
Guidelines,
)
# Suppress harmless threadpoolctl warnings that can appear in Databricks environments
warnings.filterwarnings("ignore", message=".*threadpoolctl.*")
warnings.filterwarnings("ignore", category=UserWarning, module="threadpoolctl")
# Load the logged model for evaluation
model_uri = f"models:/{REGISTERED_MODEL_NAME}/{model_info.registered_model_version}"
print(f"Loading model for evaluation: {model_uri}")
# Load the model as a predict function
loaded_model = mlflow.pyfunc.load_model(model_uri)
def my_app(document_text, questions):
"""Wrapper function for the model prediction."""
# The evaluation dataset's inputs field contains {"document_text": "...", "questions": [...]}
# but the predict_fn parameter names must match the keys in inputs
input_data = {"document_text": document_text, "questions": questions}
return loaded_model.predict([input_data])
# Define scorers for evaluation including ground truth comparison
correctness_scorer = Correctness() # Compares against expected_facts
relevance_scorer = RelevanceToQuery() # Evaluates relevance of response to question
response_schema_scorer = Guidelines(
name="response_schema",
guidelines="The response must be structured JSON with an 'answer' field containing 'Yes' or 'No' and a 'chain_of_thought' field with clear reasoning. There also needs to be a 'question_text' field that contains the question that was asked. All these fields are part of the 'results' array field.",
) # Validates structured output format
# This creates an evaluation run using the MLflow-managed dataset
results = mlflow.genai.evaluate(
data=eval_dataset, # Use the MLflow-managed dataset
predict_fn=my_app,
scorers=[
correctness_scorer,
relevance_scorer,
response_schema_scorer,
],
)
# Access the run ID
print(f"✓ Evaluation completed")
print(f"Evaluation run ID: {results.run_id}")
# Display evaluation results summary
if hasattr(results, "metrics") and results.metrics:
print("\n📊 Evaluation Results Summary:")
for metric_name, metric_value in results.metrics.items():
if isinstance(metric_value, (int, float)):
print(f" • {metric_name}: {metric_value:.3f}")
else:
print(f" • {metric_name}: {metric_value}")
else:
print("✓ Evaluation completed - view detailed results in the evaluation experiment")
# Display link to the evaluation dataset
print(f"\n📊 Evaluation Dataset: {uc_schema}.{evaluation_dataset_table_name}")
print(f"🔗 View dataset in Unity Catalog Data Explorer")
Inzetten voor modelbediening
Implementeer de geëvalueerde agent naar een Model Serving-eindpunt met de benodigde omgevingsvariabelen voor MLflow 3-tracing. Dit zorgt ervoor dat alle productieaanvragen automatisch worden getraceerd en geregistreerd bij het opgegeven MLflow-experiment.
import mlflow
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import (
ServedEntityInput,
ServingModelWorkloadType,
EndpointCoreConfigInput,
)
from model import DocumentAnalyzer, AgentInput, Question
workspace = WorkspaceClient()
# Use the model version from the logged model
model_version = model_info.registered_model_version
print(f"Using model version: {model_version}")
new_entity = ServedEntityInput(
entity_name=REGISTERED_MODEL_NAME,
entity_version=model_version,
name=f"{MODEL_NAME}-{model_version}",
workload_size="Small",
workload_type=ServingModelWorkloadType.CPU,
scale_to_zero_enabled=True,
environment_vars={
"DATABRICKS_CLIENT_ID": f"{{{{secrets/{SECRET_SCOPE}/client_id}}}}",
"DATABRICKS_CLIENT_SECRET": f"{{{{secrets/{SECRET_SCOPE}/client_secret}}}}",
"DATABRICKS_HOST": DATABRICKS_HOST,
"MLFLOW_TRACKING_URI": "databricks",
"MONITORING_EXPERIMENT_ID": current_experiment_id,
"MODEL_LOG_LEVEL": "INFO",
"LLM_MODEL": LLM_MODEL,
},
)
# Check if endpoint exists and create or update accordingly
try:
# Try to get the existing endpoint
existing_endpoint = workspace.serving_endpoints.get(ENDPOINT_NAME)
print(
f"Endpoint {ENDPOINT_NAME} exists, updating with model version {model_version}"
)
# Update existing endpoint with new model version
workspace.serving_endpoints.update_config(
name=ENDPOINT_NAME, served_entities=[new_entity]
)
print("Endpoint update initiated, waiting for completion...")
# Wait for update to complete
workspace.serving_endpoints.wait_get_serving_endpoint_not_updating(ENDPOINT_NAME)
print("Endpoint updated successfully and is ready")
except Exception as e:
# Endpoint does not exist or you do not have permissions. Try to create it.
print(f"Endpoint {ENDPOINT_NAME} does not exist or you lack permissions. Trying to create new endpoint...")
workspace.serving_endpoints.create(
name=ENDPOINT_NAME,
config=EndpointCoreConfigInput(name=ENDPOINT_NAME, served_entities=[new_entity]),
)
print("Endpoint creation initiated, waiting for completion...")
# Wait for creation to complete
workspace.serving_endpoints.wait_get_serving_endpoint_not_updating(ENDPOINT_NAME)
print("Endpoint created successfully and is ready")
# Final status check
endpoint_status = workspace.serving_endpoints.get(ENDPOINT_NAME)
print(f"Final endpoint status: {endpoint_status.state}")
print(
f"Endpoint URL: https://{DATABRICKS_HOST.replace('https://', '')}/serving-endpoints/{ENDPOINT_NAME}/invocations"
)
Productiebewaking instellen met gebruik van scorers
Configureer automatische kwaliteitsevaluatie voor productieverkeer met MLflow 3-scorers. Scorers analyseren automatisch logbestanden die zijn vastgelegd vanuit productieaanvragen om voortdurende kwaliteitsbewaking te bieden.
from mlflow.genai.scorers import (
RelevanceToQuery,
Guidelines,
ScorerSamplingConfig,
list_scorers,
get_scorer,
)
# Set the active experiment for scoring (use the current notebook's experiment)
print(f"Setting experiment to: {current_experiment_id}")
mlflow.set_experiment(experiment_id=current_experiment_id)
# Verify the experiment is set correctly
current_experiment = mlflow.get_experiment(current_experiment_id)
print(
f"Current experiment: {current_experiment.name} (ID: {current_experiment.experiment_id})"
)
# Setup scorers for production monitoring
print("Setting up production monitoring scorers...")
# Relevance scorer - always create new to avoid conflicts
relevance_scorer = RelevanceToQuery().register(name="financial_relevance_check")
relevance_scorer = relevance_scorer.start(
sampling_config=ScorerSamplingConfig(sample_rate=0.5)
)
print("✅ Created relevance scorer (50% sampling)")
# Guidelines scorer for response schema validation
response_schema_scorer = Guidelines(
name="response_schema",
guidelines="The response must be structured JSON with an 'answer' field containing 'Yes' or 'No' and a 'chain_of_thought' field with clear reasoning.",
).register(name="response_schema_check")
response_schema_scorer = response_schema_scorer.start(
sampling_config=ScorerSamplingConfig(sample_rate=0.4)
)
print("✅ Created response schema scorer (40% sampling)")
# List all active scorers
print(f"\nActive Scorers in Experiment {current_experiment_id}:")
scorers = list_scorers()
for scorer in scorers:
print(f"• {scorer.name}: {scorer.sample_rate*100}% sampling")
De geïmplementeerde agent testen
Test de geïmplementeerde agent met voorbeeldinvoer. Elke aanvraag genereert automatisch MLflow 3-traceringen die de volledige aanvraagstroom vastleggen en de productiescorers evalueren deze traceringen voor kwaliteitsbewaking.
from databricks.sdk import WorkspaceClient
# Test the non-conversational agent endpoint using Databricks SDK
workspace = WorkspaceClient()
# Example payload with structured input for the non-conversational agent
test_input = {
"inputs": [
{
"document_text": "Total assets: $2,300,000. Total liabilities: $1,200,000. Shareholder's equity: $1,100,000. Net income for the period was $450,000. Revenues: $1,700,000. Expenses: $1,250,000. Net cash provided by operating activities: $80,000. Cash flows from investing activities: -$20,000",
"questions": [
{"text": "Do the documents contain a balance sheet?"},
{"text": "Do the documents contain an income statement?"},
{"text": "Do the documents contain a cash flow statement?"},
],
}
]
}
# Query the serving endpoint using the workspace client
response = workspace.serving_endpoints.query(
name=ENDPOINT_NAME, inputs=test_input["inputs"]
)
print("Endpoint Response:")
print(response.as_dict())
# Generate MLflow experiment URL
experiment_url = f"{DATABRICKS_HOST}/ml/experiments/{current_experiment_id}"
print(f"\nMLflow Experiment URL: {experiment_url}")
Feedback van gebruikers registreren
Zelfs voor niet-gespreksagenten is het verzamelen van feedback van gebruikers cruciaal voor continue verbetering. Gebruikersgerichte front-endtoepassingen kunnen gebruikers toestaan afzonderlijke antwoorden van de agent te accepteren of af te wijzen. Deze feedback kan vervolgens worden geregistreerd bij MLflow met behulp van de trace_id en span_id opgenomen in het antwoord.
Veelvoorkomende feedbackscenario's voor niet-conversationele agents:
- Nauwkeurigheidsfeedback: "Was dit ja/nee-antwoord correct?"
- Feedback op relevantie: 'Was de redenering geschikt voor de vraag?'
- Kwaliteitsfeedback: "Was het ondersteunende bewijs voldoende?"
- Foutrapportage: 'Heeft de agent de documentinhoud verkeerd begrepen?'
In de volgende cel wordt getoond hoe u feedback kunt vastleggen voor een afzonderlijk antwoord met behulp van de span_id die in het antwoord is geretourneerd.
import mlflow
from mlflow.entities import AssessmentSource
import time
# Wait 1 second to make sure the trace is ready for feedback logging
time.sleep(1)
# Get the response from the previous test (extract span_id from first result)
# In a real application, this would come from the API response
response_dict = response.as_dict()
first_prediction = response_dict["predictions"][0]
first_result = first_prediction["results"][0]
# Assert we have the required IDs for feedback logging
assert (
first_result.get("span_id") is not None
), "span_id is required for feedback logging"
assert (
first_prediction.get("trace_id") is not None
), "trace_id is required for feedback logging"
span_id = first_result["span_id"]
trace_id = first_prediction["trace_id"]
question_text = first_result["question_text"]
answer = first_result["answer"]
print(f"Logging feedback for question: '{question_text}'")
print(f"Agent answer: {answer}")
print(f"Span ID: {span_id}")
print(f"Trace ID: {trace_id}")
try:
# Example: User provides positive feedback on this specific answer
mlflow.log_feedback(
trace_id=trace_id,
span_id=span_id,
name="user_feedback",
value=True, # True for positive, False for negative
source=AssessmentSource(source_type="HUMAN"),
rationale="Answer was accurate and well-reasoned",
)
print("✅ Feedback logged successfully!")
except Exception as e:
print(f"Note: Could not log feedback in this environment: {e}")
De hulpbronnen opschonen
import os
from mlflow import MlflowClient
from mlflow.genai.scorers import delete_scorer
from mlflow.deployments import get_deploy_client
if CLEAN_UP_RESOURCES:
try:
# Uncomment the lines below to delete the Model Serving endpoint:
client = get_deploy_client("databricks")
client.delete_endpoint(endpoint=ENDPOINT_NAME)
# Model registered to UC
client = MlflowClient()
model_versions = client.search_model_versions(f"name='{REGISTERED_MODEL_NAME}'")
for mv in model_versions:
client.delete_model_version(name=REGISTERED_MODEL_NAME, version=mv.version)
client.delete_registered_model(name=REGISTERED_MODEL_NAME)
# Evaluation dataset registered to UC
mlflow.genai.datasets.delete_dataset(eval_dataset.name)
# Model file in workspace
os.remove(os.path.join(os.getcwd(), "model.py"))
# Scorers registered to MLflow experiment
delete_scorer(name="financial_relevance_check")
delete_scorer(name="response_schema_check")
except Exception as e:
print(f"Failed to clean up all resources. Error: {e}")
Volgende stappen
- Meer informatie over het toevoegen van hulpprogramma's aan uw agent om mogelijkheden uit te breiden
- Raadpleeg de documentatie voor MLflow 3-tracering voor geavanceerde waarneembaarheidsfuncties
- Documentatie voor productiebewaking