Compartilhar via


Implantar pipelines de inferência em lote

Importante

Esse recurso está em Visualização Pública.

Esta página mostra como você pode integrar o AI Functions em outros dados do Databricks e produtos de IA para criar pipelines de inferência de lote completos. Esses pipelines podem executar fluxos de trabalho de ponta a ponta que incluem ingestão, pré-processamento, inferência e pós-processamento. Os pipelines podem ser criados em SQL ou Python e implantados como:

  • Pipelines Declarativos do Lakeflow Spark
  • Fluxos de trabalho agendados usando fluxos de trabalho do Databricks
  • Fluxos de trabalho de inferência em streaming usando o Streaming Estruturado

Requisitos

  • Um workspace em uma região com suporte para APIs de Modelos Fundacionais.
  • O Databricks Runtime 15.4 LTS ou superior é necessário para cargas de trabalho de inferência em lote usando o AI Functions.
  • Permissão de consulta na tabela Delta no Catálogo do Unity que contém os dados que você deseja usar.
  • Defina as pipelines.channel propriedades da tabela como 'pré-visualização' para usar ai_query(). Consulte os Requisitos para obter uma consulta de exemplo.

Executar inferência em lote incremental em Pipelines Declarativos do Lakeflow Spark

O exemplo a seguir executa a inferência de lote incremental usando pipelines declarativos do Lakeflow Spark para quando os dados são atualizados continuamente.

Etapa 1: Coletar dados brutos de notícias de um volume

SQL

CREATE OR REFRESH STREAMING TABLE news_raw
COMMENT "Raw news articles ingested from volume."
AS SELECT *
FROM STREAM(read_files(
  '/Volumes/databricks_news_summarization_benchmarking_data/v01/csv',
  format => 'csv',
  header => true,
  mode => 'PERMISSIVE',
  multiLine => 'true'
));

Python

Importar os pacotes e definir o esquema JSON da resposta LLM como uma variável python

from pyspark import pipelines as dp
from pyspark.sql.functions import expr, get_json_object, concat

news_extraction_schema = (
    '{"type": "json_schema", "json_schema": {"name": "news_extraction", '
    '"schema": {"type": "object", "properties": {"title": {"type": "string"}, '
    '"category": {"type": "string", "enum": ["Politics", "Sports", "Technology", '
    '"Health", "Entertainment", "Business"]}}}, "strict": true}}'
)

Importar seus dados de um volume do Unity Catalog.

@dp.table(
  comment="Raw news articles ingested from volume."
)
def news_raw():
  return (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("header", True)
      .option("mode", "PERMISSIVE")
      .option("multiLine", "true")
      .load("/Volumes/databricks_news_summarization_benchmarking_data/v01/csv")
  )

Etapa 2: Utilizar inferência de LLM para extrair o título e a categoria

SQL


CREATE OR REFRESH MATERIALIZED VIEW news_categorized
COMMENT "Extract category and title from news articles using LLM inference."
AS
SELECT
  inputs,
  ai_query(
    "databricks-meta-llama-3-3-70b-instruct",
    "Extract the category of the following news article: " || inputs,
    responseFormat => '{
      "type": "json_schema",
      "json_schema": {
        "name": "news_extraction",
        "schema": {
          "type": "object",
          "properties": {
            "title": { "type": "string" },
            "category": {
              "type": "string",
              "enum": ["Politics", "Sports", "Technology", "Health", "Entertainment", "Business"]
            }
          }
        },
        "strict": true
      }
    }'
  ) AS meta_data
FROM news_raw
LIMIT 2;

Python

@dp.materialized_view(
  comment="Extract category and title from news articles using LLM inference."
)
def news_categorized():
  # Limit the number of rows to 2 as in the SQL version
  df_raw = spark.read.table("news_raw").limit(2)
  # Inject the JSON schema variable into the ai_query call using an f-string.
  return df_raw.withColumn(
    "meta_data",
    expr(
      f"ai_query('databricks-meta-llama-3-3-70b-instruct', "
      f"concat('Extract the category of the following news article: ', inputs), "
      f"responseFormat => '{news_extraction_schema}')"
    )
  )

Etapa 3: Validar a saída de inferência do LLM antes do resumo

SQL

CREATE OR REFRESH MATERIALIZED VIEW news_validated (
  CONSTRAINT valid_title EXPECT (size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3),
  CONSTRAINT valid_category EXPECT (get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business'))
)
COMMENT "Validated news articles ensuring the title has at least 3 words and the category is valid."
AS
SELECT *
FROM news_categorized;

Python

@dp.materialized_view(
  comment="Validated news articles ensuring the title has at least 3 words and the category is valid."
)
@dp.expect("valid_title", "size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3")
@dp.expect_or_fail("valid_category", "get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business')")
def news_validated():
  return spark.read.table("news_categorized")

Etapa 4: Resumir artigos de notícias dos dados validados

SQL

CREATE OR REFRESH MATERIALIZED VIEW news_summarized
COMMENT "Summarized political news articles after validation."
AS
SELECT
  get_json_object(meta_data, '$.category') as category,
  get_json_object(meta_data, '$.title') as title,
  ai_query(
    "databricks-meta-llama-3-3-70b-instruct",
    "Summarize the following political news article in 2-3 sentences: " || inputs
  ) AS summary
FROM news_validated;

Python

@dp.materialized_view(
  comment="Summarized political news articles after validation."
)
def news_summarized():
  df = spark.read.table("news_validated")
  return df.select(
    get_json_object("meta_data", "$.category").alias("category"),
    get_json_object("meta_data", "$.title").alias("title"),
    expr(
      "ai_query('databricks-meta-llama-3-3-70b-instruct', "
      "concat('Summarize the following political news article in 2-3 sentences: ', inputs))"
    ).alias("summary")
  )

Automatizar trabalhos de inferência em lotes usando fluxos de trabalho do Databricks

Agende trabalhos de inferência em lotes e automatize pipelines de IA.

SQL

SELECT
   *,
   ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat("You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.
AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price
Examples below:
DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup. The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that. It made three or four large servings of soup. It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound. The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does..
RESULT
[
 {'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
 {'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
 {'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
 {'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
 {'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]
DOCUMENT
", REVIEW_TEXT, '\n\nRESULT\n')) as result
FROM catalog.schema.product_reviews
LIMIT 10

Python


import json
from pyspark.sql.functions import expr

# Define the opinion mining prompt as a multi-line string.
opinion_prompt = """You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.
AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price
Examples below:
DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup.The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that.It made three or four large servings of soup.It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound.The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does.
RESULT
[
 {'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
 {'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
 {'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
 {'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
 {'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]
DOCUMENT
"""

# Escape the prompt so it can be safely embedded in the SQL expression.
escaped_prompt = json.dumps(opinion_prompt)

# Read the source table and limit to 10 rows.
df = spark.table("catalog.schema.product_reviews").limit(10)

# Apply the LLM inference to each row, concatenating the prompt, the review text, and the tail string.
result_df = df.withColumn(
    "result",
    expr(f"ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat({escaped_prompt}, REVIEW_TEXT, '\\n\\nRESULT\\n'))")
)

# Display the result DataFrame.
display(result_df)

Funções de IA usando Streaming Estruturado

Aplique a inferência de IA em cenários quase em tempo real ou em microlote usando ai_query e o Streaming Estruturado.

Etapa 1. Leia sua tabela Delta estática

Leia sua tabela Delta estática como se fosse um fluxo.


from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

# Spark processes all existing rows exactly once in the first micro-batch.
df = spark.table("enterprise.docs")  # Replace with your table name containing enterprise documents
df.repartition(50).write.format("delta").mode("overwrite").saveAsTable("enterprise.docs")
df_stream = spark.readStream.format("delta").option("maxBytesPerTrigger", "50K").table("enterprise.docs")

# Define the prompt outside the SQL expression.
prompt = (
    "You are provided with an enterprise document. Summarize the key points in a concise paragraph. "
    "Do not include extra commentary or suggestions. Document: "
)

Etapa 2. Aplicar ai_query

O Spark processa isso apenas uma vez para dados estáticos, a menos que novas linhas cheguem à tabela.


df_transformed = df_stream.select(
    "document_text",
    F.expr(f"""
      ai_query(
        'databricks-meta-llama-3-1-8b-instruct',
        CONCAT('{prompt}', document_text)
      )
    """).alias("summary")
)

Etapa 3: Gravar a saída resumida

Gravar a saída resumida em outra tabela Delta


# Time-based triggers apply, but only the first trigger processes all existing static data.
query = df_transformed.writeStream \
    .format("delta") \
    .option("checkpointLocation", "/tmp/checkpoints/_docs_summary") \
    .outputMode("append") \
    .toTable("enterprise.docs_summary")

query.awaitTermination()