Compartir a través de


Carga incremental de datos de Data Warehouse a almacenes de lago

En este tutorial, aprenderán a copiar solo los datos nuevos o modificados de un almacén de datos a un Lakehouse. Este enfoque se denomina carga incremental y resulta útil cuando desea mantener los datos up-to-date sin copiar todo cada vez.

Este es el diseño de alto nivel de la solución:

Diagrama que muestra la lógica de datos de carga incremental.

  1. Seleccione una columna de marca de agua. Elija una columna en la tabla de origen que ayude a realizar un seguimiento de los registros nuevos o modificados. Normalmente, esta columna contiene valores que aumentan cuando se agregan o actualizan filas (como una marca de tiempo o un identificador). Usaremos el valor más alto en esta columna como nuestra "marca de agua" para saber dónde nos quedamos.

  2. Configure una tabla para almacenar el último valor de watermark.

  3. Cree una canalización que haga lo siguiente:

    La tubería incluye estas actividades:

    • Dos actividades de búsqueda. El primero obtiene el último valor de marca de agua (donde nos detuvimos la última vez). La segunda obtiene el nuevo valor de marca de referencia (indicando hasta dónde llegaremos esta vez). Ambos valores se pasan a la actividad de copia.
    • Actividad de copia que busca filas donde el valor de la columna de marca de agua está entre las marcas de agua antiguas y nuevas. A continuación, copia estos datos desde el almacén de datos a su sistema Lakehouse como un nuevo archivo.
    • Actividad de procedimiento almacenado que guarda el nuevo valor de marca de agua para que la siguiente ejecución de canalización sepa dónde empezar.

Requisitos previos

  • Almacenamiento de datos. Utilizará el almacén de datos como su almacén de datos de origen. Si no tiene una, consulte Creación de un almacenamiento de datos para obtener instrucciones.
  • Almacén de lago. Usará Lakehouse como almacén de datos de destino. Si no tiene uno, consulte Crear un Lakehouse para obtener instrucciones.
    • Cree una carpeta denominada IncrementalCopy para almacenar los datos copiados.

Prepara tu fuente

Vamos a configurar las tablas y el procedimiento almacenado que necesita en su almacén de datos antes de configurar la canalización de copia incremental.

1. Cree una tabla de origen de datos en el almacenamiento de datos

Ejecute el siguiente comando SQL en Data Warehouse para crear una tabla denominada data_source_table como tabla de origen. Lo usaremos como datos de ejemplo para la copia incremental.

create table data_source_table
(
    PersonID int,
    Name varchar(255),
    LastModifytime DATETIME2(6)
);

INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');

Los datos de la tabla de origen tienen este aspecto:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1        | aaaa | 2017-09-01 00:56:00.000
2        | bbbb | 2017-09-02 05:23:00.000
3        | cccc | 2017-09-03 02:36:00.000
4        | dddd | 2017-09-04 03:21:00.000
5        | eeee | 2017-09-05 08:06:00.000

En este tutorial, usaremos LastModifytime como columna de marca de agua.

2. Cree otra tabla en el almacenamiento de datos para almacenar el último valor de referencia

  1. Ejecute el siguiente comando SQL en el almacenamiento de datos para crear una tabla denominada watermarktable para almacenar el último valor de referencia:

    create table watermarktable
    (
    TableName varchar(255),
    WatermarkValue DATETIME2(6),
    );
    
  2. Establezca el valor predeterminado del último indicador de progreso utilizando el nombre de su tabla fuente. En este tutorial, el nombre de la tabla es data_source_table y estableceremos el valor 1/1/2010 12:00:00 AMpredeterminado en .

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. Compruebe los datos en su watermarktable.

    Select * from watermarktable
    

    Salida:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

3. Cree un procedimiento almacenado en el almacenamiento de datos

Ejecute el siguiente comando para crear un procedimiento almacenado en el almacenamiento de datos. Este procedimiento almacenado actualiza el último valor de marca de agua después de cada ejecución de canalización.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Configuración de una canalización para copia incremental

Paso 1: crear una canalización

  1. Vaya a Power BI.

  2. Seleccione el icono de Power BI en la parte inferior izquierda de la pantalla y, a continuación, seleccione Tejido.

  3. Seleccione Mi área de trabajo para abrir el área de trabajo de Fabric.

  4. Seleccione + Nuevo elemento y, después, canalización y escriba un nombre de canalización para crear una canalización.

    Captura de pantalla que muestra el botón de nueva tubería en el espacio de trabajo recién creado.

    Captura de pantalla que muestra el nombre de la creación de una nueva canalización.

Paso 2: agregar una actividad de búsqueda para la última referencia

En este paso, cree una actividad de búsqueda para obtener el último valor de marca temporal. Obtendremos el valor 1/1/2010 12:00:00 AM predeterminado que se estableció anteriormente.

  1. Seleccione Actividad de canalización y Búsqueda en la lista desplegable.

  2. En la pestaña General , cambie el nombre de esta actividad a LookupOldWaterMarkActivity.

  3. En la pestaña Configuración , configure lo siguiente:

    • Conexión: en Almacenamiento , seleccione Examinar todo y seleccione el almacenamiento de datos en la lista.
    • Usar consulta: elija Tabla.
    • Tabla: elija dbo.watermarktable.
    • Solo primera fila: seleccionado.

    Captura de pantalla que muestra la búsqueda de una referencia antigua.

Paso 3: agregar una actividad de búsqueda para la referencia nueva

En este paso, creará una actividad de búsqueda para obtener el nuevo valor de la marca de agua. Vas a usar una consulta para obtener la nueva marca de agua de la tabla de datos de origen. Obtendremos el valor más alto en la columna LastModifytime de data_source_table.

  1. En la barra superior, seleccione Búsqueda en la pestaña Actividades para agregar la segunda actividad de búsqueda.

  2. En la pestaña General , cambie el nombre de esta actividad a LookupNewWaterMarkActivity.

  3. En la pestaña Configuración , configure lo siguiente:

    • Conexión: bajo Almacén de datos, seleccione Consultar todo y seleccione su almacén de datos en la lista o seleccione su almacén de datos en Conexiones de Elementos de Fabric.

    • Usar consulta: elija Consulta.

    • Consulta: escriba la siguiente consulta para elegir la hora máxima de la última modificación como nueva referencia:

      select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
      
    • Solo primera fila: seleccionado.

    Captura de pantalla que muestra la búsqueda de una referencia nueva.

Paso 4: agregar la actividad de copia para copiar datos incrementales

En este paso, agregará una actividad de copia para copiar los datos incrementales entre la última marca de agua y la nueva marca de agua desde el Data Warehouse al Lakehouse.

  1. Seleccione Actividades en la barra superior y seleccione Copiar datos ->Agregar al lienzo para obtener la actividad de copia.

  2. En la pestaña General , cambie el nombre de esta actividad a IncrementalCopyActivity.

  3. Conecte ambas actividades de búsqueda a la actividad de copia arrastrando el botón verde (Con éxito) desde las actividades de búsqueda hasta la actividad de copia. Suelte el botón del mouse cuando vea el color del borde de la actividad de copia cambiar a verde.

    Captura de pantalla que muestra la conexión de actividades de búsqueda y copia.

  4. En la pestaña Origen , configure lo siguiente:

    • Conexión: bajo Almacén de datos, seleccione Consultar todo y seleccione su almacén de datos en la lista o seleccione su almacén de datos en Conexiones de Elementos de Fabric.

    • Almacenamiento: seleccione su almacén.

    • Usar consulta: elija Consulta.

    • Consulta: escriba la siguiente consulta para copiar datos incrementales entre la última marca de agua y la nueva marca de agua.

      select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
      

    Captura de pantalla que muestra la configuración del origen de la copia.

  5. En la pestaña Destino , configure lo siguiente:

    • Conexión: En Lakehouse seleccione Examinar todo y elija su Lakehouse de la lista o seleccione su Lakehouse en las Conexiones de elementos de Fabric.
    • Almacén de lago: seleccione su almacén de lago.
    • Carpeta raíz: elija Archivos.
    • Ruta de acceso del archivo: elija la carpeta donde desea almacenar los datos copiados. Seleccione Examinar para seleccionar la carpeta. Para el nombre de archivo, abra Agregar contenido dinámico y escriba @CONCAT('Incremental-', pipeline().RunId, '.txt') en la ventana abierta para crear nombres de archivo para el archivo de datos copiado en el almacén de lago.
    • Formato de archivo: seleccione el tipo de formato de los datos.

    Captura de pantalla que muestra la configuración del destino de la copia.

Paso 5: Agregar una actividad de procedimiento almacenado

En este paso, añadirá una actividad de un procedimiento almacenado para actualizar el valor de indicador más reciente para la ejecución siguiente de la canalización.

  1. Seleccione Actividades en la barra superior y seleccione Procedimiento almacenado para agregar una actividad de procedimiento almacenado.

  2. En la pestaña General , cambie el nombre de esta actividad a StoredProceduretoWriteWatermarkActivity.

  3. Conecte la salida verde (en caso de éxito) de la actividad de copia con la actividad de procedimiento almacenado.

  4. En la pestaña Configuración , configure lo siguiente:

    • Almacenamiento de datos: seleccione su almacenamiento de datos.

    • Nombre del procedimiento almacenado: elija el procedimiento almacenado que creó en el almacenamiento de datos: [dbo].[ usp_write_watermark].

    • Expanda Parámetros de procedimiento almacenado. Para establecer valores para los parámetros del procedimiento almacenado, seleccione Importar y escriba los valores siguientes para los parámetros:

      Nombre Tipo Value
      LastModifiedtime DateTime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

    Captura de pantalla que muestra la configuración de la actividad del procedimiento almacenado.

Paso 6: Ejecución de la canalización y supervisión del resultado

En la barra superior, seleccione Ejecutar en la pestaña Inicio . A continuación, seleccione Guardar y ejecutar. La pipeline comienza a ejecutarse y puede monitorearla en la pestaña Salida.

Captura de pantalla que muestra los resultados de la ejecución de canalización.

Vaya a Lakehouse y encontrará el archivo de datos en la carpeta que eligió. Puede seleccionar el archivo para obtener una vista previa de los datos copiados.

Captura de pantalla que muestra los datos del almacén de lago para la primera ejecución de canalización.

Captura de pantalla que muestra la versión preliminar de los datos del almacén de lago para la primera ejecución de canalización.

Agregar más datos para ver los resultados de la copia incremental

Después de finalizar la primera ejecución de la canalización, vamos a agregar más datos a la tabla de origen del almacén de datos para ver si esta canalización puede copiar los datos añadidos de forma incremental.

Paso 1: incorporación de más datos al origen

Para insertar nuevos datos en el almacenamiento de datos, ejecute la consulta siguiente:

INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')

INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')

Los datos actualizados para data_source_table son:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000

Paso 2: Desencadenar otra ejecución de canalización y supervisar el resultado

Volver a la página de la canalización. En la barra superior, seleccione Ejecutar de nuevo en la pestaña Inicio . La pipeline comienza a ejecutarse y puede supervisarla en Salida.

Vaya a Lakehouse y encontrará el nuevo archivo de datos copiado en la carpeta que eligió. Puede seleccionar el archivo para obtener una vista previa de los datos copiados. Verá que los datos incrementales se muestran en este archivo.

Captura de pantalla que muestra los datos del almacén de lago para la segunda ejecución de canalización.

Captura de pantalla que muestra la versión preliminar de los datos del almacén de lago para la segunda ejecución de canalización.

A continuación, obtenga más información sobre cómo copiar desde Azure Blob Storage a Lakehouse.