Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
En esta página se explica cómo recuperar una canalización en las canalizaciones declarativas de Spark de Lakeflow cuando un punto de control de transmisión se invalida o se corrompe.
¿Qué es un punto de control de streaming?
En Apache Spark Structured Streaming, un punto de control es un mecanismo que se usa para conservar el estado de una consulta de streaming. Este estado incluye:
- Información de progreso: qué desplazamientos del origen se han procesado.
-
Estado intermedio: datos que deben mantenerse entre microprocesos para operaciones con estado (por ejemplo, agregaciones,
mapGroupsWithState). - Metadatos: información sobre la ejecución de la consulta de streaming.
Los puntos de control son esenciales para garantizar la tolerancia a errores y la coherencia de los datos en las aplicaciones de streaming:
- Tolerancia a errores: si se produce un error en una aplicación de streaming (por ejemplo, debido a un error de nodo, bloqueo de la aplicación), el punto de control permite que la aplicación se reinicie desde el último estado de punto de control correcto en lugar de volver a procesar todos los datos desde el principio. Esto evita la pérdida de datos y garantiza el procesamiento incremental.
- Procesamiento exactamente una vez: para muchos orígenes de streaming, puntos de control, junto con receptores idempotentes, habilitar el procesamiento exactamente una vez garantiza que cada registro se procese exactamente una vez, incluso en caso de errores, lo que impide duplicados u omisiones.
- Administración de estado: para las transformaciones con estado, los puntos de control conservan el estado interno de estas operaciones, lo que permite que la consulta de streaming continúe procesando correctamente los nuevos datos en función del estado histórico acumulado.
Puntos de control de canalización
Los pipelines están construidos sobre Structured Streaming y abstraen gran parte de la gestión subyacente de los puntos de control, ofreciendo una metodología declarativa. Al definir una tabla de streaming en la canalización, hay un estado de punto de comprobación para cada escritura de flujo en la tabla de streaming. Estas ubicaciones de punto de control son internas de la canalización y no son accesibles para los usuarios.
Normalmente no es necesario administrar ni comprender los puntos de control subyacentes para las tablas de streaming, excepto en los casos siguientes:
- Rebobinar y reproducir: si desea volver a procesar los datos desde un momento determinado mientras conserva el estado actual de la tabla, debe restablecer el punto de control de la tabla de streaming.
-
Recuperación de un error de punto de control o daños: si se ha producido un error en una consulta escrita en la tabla de streaming debido a errores relacionados con puntos de control, se produce un error difícil y la consulta no puede avanzar aún más. Hay tres enfoques que puede usar para recuperarse de esta clase de error:
- Actualización de tabla completa: restablece la tabla y borra los datos existentes.
- Actualización de tabla completa con copia de seguridad y reposición: realice una copia de seguridad de la tabla antes de realizar una actualización de tabla completa y rellene los datos antiguos, pero esto es muy caro y debe ser el último recurso.
- Restablecer el punto de control y continuar incrementalmente: si no puede permitirse perder los datos existentes, debe realizar un restablecimiento de punto de control selectivo para los flujos de streaming afectados.
Ejemplo: Error de canalización debido al cambio de código
Considere un escenario en el que tiene una tubería que procesa un flujo de datos de cambios junto con la instantánea de la tabla inicial de un sistema de almacenamiento en la nube, como Amazon S3, y escribe en una tabla de streaming SCD-1.
La canalización tiene dos flujos de streaming:
-
customers_incremental_flow: lee incrementalmente la fuente CDC de lacustomertabla de origen, filtra los registros duplicados y los sube a la tabla de destino. -
customers_snapshot_flow: lectura única de la instantánea inicial de lacustomerstabla de origen y upserts los registros en la tabla de destino.
@dp.temporary_view(name="customers_incremental_view")
def query():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.includeExistingFiles", "true")
.load(customers_incremental_path)
.dropDuplicates(["customer_id"])
)
@dp.temporary_view(name="customers_snapshot_view")
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(customers_snapshot_path)
.select("*")
)
dp.create_streaming_table("customers")
dp.create_auto_cdc_flow(
flow_name = "customers_incremental_flow",
target = "customers",
source = "customers_incremental_view",
keys = ["customer_id"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
dp.create_auto_cdc_flow(
flow_name = "customers_snapshot_flow",
target = "customers",
source = "customers_snapshot_view",
keys = ["customer_id"],
sequence_by = lit(0),
stored_as_scd_type = 1,
once = True
)
Después de implementar esta canalización, se ejecuta correctamente y comienza a procesar la fuente de distribución de datos modificados y la instantánea inicial.
Más adelante, se da cuenta de que la lógica de desduplicación de la customers_incremental_view consulta es redundante y provoca un cuello de botella de rendimiento. Quite para mejorar el dropDuplicates() rendimiento:
@dp.temporary_view(name="customers_raw_view")
def query():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.includeExistingFiles", "true")
.load()
# .dropDuplicates()
)
Después de quitar la dropDuplicates() API y volver a implementar la canalización, la actualización produce el siguiente error:
Streaming stateful operator name does not match with the operator in state metadata.
This is likely to happen when a user adds/removes/changes stateful operators of existing streaming query.
Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: dedupe)];
Stateful operators in current batch: []. SQLSTATE: 42K03 SQLSTATE: XXKST
Este error indica que no se permite el cambio debido a una falta de coincidencia entre el estado del punto de control y la definición de consulta actual, lo que impide que la canalización avance aún más.
Los errores relacionados con los puntos de comprobación pueden producirse por varios motivos más allá de simplemente quitar la dropDuplicates API. Entre los escenarios habituales se incluyen los siguientes:
- Agregar o quitar operadores con estado (por ejemplo, introducir o quitar
dropDuplicates()o agregar agregaciones) en una consulta de streaming existente. - Agregar, quitar o combinar orígenes de streaming en una consulta de punto de comprobación anterior (por ejemplo, unión de una consulta de streaming existente con una nueva o adición o eliminación de orígenes de una operación de unión existente).
- Modificar el esquema de estado de las operaciones de streaming con estado (por ejemplo, cambiar las columnas usadas para la desduplicación o agregación).
Para obtener una lista completa de los cambios admitidos y no admitidos, consulte la Guía de streaming estructurado de Spark y Tipos de cambios en las consultas de Structured Streaming.
Opciones de recuperación
Hay tres estrategias de recuperación, en función de los requisitos de durabilidad de los datos y las restricciones de recursos:
| Methods | Complejidad | Cost | Posible pérdida de datos | Posible duplicación de datos | Requiere instantánea inicial | Restablecimiento de tabla completa |
|---|---|---|---|---|---|---|
| Actualización de tabla completa | Low | Mediana | Sí (si no hay ninguna instantánea inicial disponible o si los archivos sin procesar se han eliminado en el origen). | No (para aplicar cambios en la tabla de destino). | Sí | Sí |
| Actualización de tabla completa con copia de seguridad y reposición | Mediana | High | No | No (Para receptores idempotentes. Por ejemplo, CDC automático). | No | No |
| Restablecer punto de control de tabla | Medium-High (medio para orígenes de solo anexión que proporcionan desplazamientos inmutables). | Low | No (requiere una consideración cuidadosa). | No (Para escritores idempotentes. Por ejemplo, CDC automático solo en la tabla de destino). | No | No |
Medium-High complejidad depende del tipo de origen de streaming y de la complejidad de la consulta.
Recommendations
- Use una actualización de tabla completa si no desea tratar con la complejidad de un restablecimiento de punto de control y puede volver a calcular toda la tabla. Esto también le dará una opción para realizar cambios en el código.
- Use la actualización de tabla completa con copia de seguridad y reposición si no desea tratar la complejidad del restablecimiento del punto de control y está bien con el costo adicional de realizar una copia de seguridad y rellenar los datos históricos.
- Use el punto de control de restablecimiento de la tabla si debe conservar los datos existentes en la tabla y continuar procesando los nuevos datos de forma incremental. Sin embargo, este enfoque requiere un control cuidadoso del restablecimiento del punto de control para comprobar que los datos existentes de la tabla no se pierden y que la canalización puede continuar procesando nuevos datos.
Restablecer el punto de control y continuar incrementalmente
Para restablecer el punto de control y continuar procesando incrementalmente, siga estos pasos:
Detener la canalización: asegúrese de que la canalización no tiene actualizaciones activas en ejecución.
Determine la posición inicial del nuevo punto de control: identifique el último desplazamiento correcto o marca de tiempo desde la que desea continuar el procesamiento. Normalmente, este es el desplazamiento más reciente procesado correctamente antes de que se produzca el error.
En el ejemplo anterior, dado que está leyendo los archivos JSON mediante el cargador automático, puede usar la opción para especificar la
modifiedAfterposición inicial del nuevo punto de control. Esta opción permite establecer una marca de tiempo para cuando el cargador automático debe iniciar el procesamiento de nuevos archivos.En el caso de los orígenes de Kafka, puede usar la
startingOffsetsopción para especificar los desplazamientos desde los que la consulta de streaming debe empezar a procesar nuevos datos.En el caso de los orígenes de Delta Lake, puede usar la opción para especificar la
startingVersionversión desde la que la consulta de streaming debe empezar a procesar nuevos datos.Realizar cambios en el código: puede modificar la consulta de streaming para quitar la
dropDuplicates()API o realizar otros cambios. Además, y compruebe que ha agregado lamodifiedAfteropción a la ruta de acceso de lectura del cargador automático.@dp.temporary_view(name="customers_incremental_view") def query(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .option("cloudFiles.includeExistingFiles", "true") .option("modifiedAfter", "2025-04-09T06:15:00") .load(customers_incremental_path) # .dropDuplicates(["customer_id"]) )Nota:
Proporcionar una marca de tiempo incorrecta
modifiedAfterpuede provocar la pérdida de datos o la duplicación. Compruebe que la marca de tiempo está configurada correctamente para evitar volver a procesar los datos antiguos o que falten nuevos datos.Si la consulta tiene una unión de stream-stream o una unión de stream-stream, debe aplicar la estrategia anterior para todos los orígenes de streaming participantes. Por ejemplo:
cdc_1 = spark.readStream.format("cloudFiles")... cdc_2 = spark.readStream.format("cloudFiles")... cdc_source = cdc_1..union(cdc_2)Identifique los nombres de flujo asociados a la tabla de streaming para la que desea restablecer el punto de control. En el ejemplo, es
customers_incremental_flow. Puede encontrar el nombre del flujo en el código de canalización o comprobando la interfaz de usuario de canalización o los registros de eventos de canalización.Restablecer el punto de control: cree un cuaderno de Python y adjunte a un clúster de Azure Databricks.
Necesitará la siguiente información para poder restablecer el punto de control:
- Dirección URL del área de trabajo de Azure Databricks
- ID de canalización
- Nombres de flujo para los que está restableciendo el punto de control
import requests import json # Define your Databricks instance and pipeline ID databricks_instance = "<DATABRICKS_URL>" token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() pipeline_id = "<YOUR_PIPELINE_ID>" flows_to_reset = ["<YOUR_FLOW_NAME>"] # Set up the API endpoint endpoint = f"{databricks_instance}/api/2.0/pipelines/{pipeline_id}/updates" # Set up the request headers headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } # Define the payload payload = { "reset_checkpoint_selection": flows_to_reset } # Make the POST request response = requests.post(endpoint, headers=headers, data=json.dumps(payload)) # Check the response if response.status_code == 200: print("Pipeline update started successfully.") else: print(f"Error: {response.status_code}, {response.text}")Ejecutar la canalización: la canalización comienza a procesar nuevos datos desde la posición inicial especificada con un punto de control nuevo, conservando los datos de tabla existentes mientras continúa el procesamiento incremental.
procedimientos recomendados
- Evite el uso de características de versión preliminar privada en producción.
- Pruebe los cambios antes de realizar cambios en el entorno de producción.
- Cree una canalización de prueba, idealmente en un entorno inferior. Si esto no es posible, intente usar un catálogo y un esquema diferentes para la prueba.
- Reproduzca el error.
- Aplique los cambios.
- Valide los resultados y tome una decisión sobre la marcha/no-go.
- Implemente los cambios en las canalizaciones de producción.