Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
En este tutorial, aprenderán a copiar solo los datos nuevos o modificados de un almacén de datos a un Lakehouse. Este enfoque se denomina carga incremental y resulta útil cuando desea mantener los datos up-to-date sin copiar todo cada vez.
Este es el diseño de alto nivel de la solución:
Seleccione una columna de marca de agua. Elija una columna en la tabla de origen que ayude a realizar un seguimiento de los registros nuevos o modificados. Normalmente, esta columna contiene valores que aumentan cuando se agregan o actualizan filas (como una marca de tiempo o un identificador). Usaremos el valor más alto en esta columna como nuestra "marca de agua" para saber dónde nos quedamos.
Configure una tabla para almacenar el último valor de watermark.
Cree una canalización que haga lo siguiente:
La tubería incluye estas actividades:
- Dos actividades de búsqueda. El primero obtiene el último valor de marca de agua (donde nos detuvimos la última vez). La segunda obtiene el nuevo valor de marca de referencia (indicando hasta dónde llegaremos esta vez). Ambos valores se pasan a la actividad de copia.
- Actividad de copia que busca filas donde el valor de la columna de marca de agua está entre las marcas de agua antiguas y nuevas. A continuación, copia estos datos desde el almacén de datos a su sistema Lakehouse como un nuevo archivo.
- Actividad de procedimiento almacenado que guarda el nuevo valor de marca de agua para que la siguiente ejecución de canalización sepa dónde empezar.
Requisitos previos
- Almacenamiento de datos. Utilizará el almacén de datos como su almacén de datos de origen. Si no tiene una, consulte Creación de un almacenamiento de datos para obtener instrucciones.
-
Almacén de lago. Usará Lakehouse como almacén de datos de destino. Si no tiene uno, consulte Crear un Lakehouse para obtener instrucciones.
- Cree una carpeta denominada IncrementalCopy para almacenar los datos copiados.
Prepara tu fuente
Vamos a configurar las tablas y el procedimiento almacenado que necesita en su almacén de datos antes de configurar la canalización de copia incremental.
1. Cree una tabla de origen de datos en el almacenamiento de datos
Ejecute el siguiente comando SQL en Data Warehouse para crear una tabla denominada data_source_table como tabla de origen. Lo usaremos como datos de ejemplo para la copia incremental.
create table data_source_table
(
PersonID int,
Name varchar(255),
LastModifytime DATETIME2(6)
);
INSERT INTO data_source_table
(PersonID, Name, LastModifytime)
VALUES
(1, 'aaaa','9/1/2017 12:56:00 AM'),
(2, 'bbbb','9/2/2017 5:23:00 AM'),
(3, 'cccc','9/3/2017 2:36:00 AM'),
(4, 'dddd','9/4/2017 3:21:00 AM'),
(5, 'eeee','9/5/2017 8:06:00 AM');
Los datos de la tabla de origen tienen este aspecto:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
En este tutorial, usaremos LastModifytime como columna de marca de agua.
2. Cree otra tabla en el almacenamiento de datos para almacenar el último valor de referencia
Ejecute el siguiente comando SQL en el almacenamiento de datos para crear una tabla denominada watermarktable para almacenar el último valor de referencia:
create table watermarktable ( TableName varchar(255), WatermarkValue DATETIME2(6), );Establezca el valor predeterminado del último indicador de progreso utilizando el nombre de su tabla fuente. En este tutorial, el nombre de la tabla es data_source_table y estableceremos el valor
1/1/2010 12:00:00 AMpredeterminado en .INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')Compruebe los datos en su watermarktable.
Select * from watermarktableSalida:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
3. Cree un procedimiento almacenado en el almacenamiento de datos
Ejecute el siguiente comando para crear un procedimiento almacenado en el almacenamiento de datos. Este procedimiento almacenado actualiza el último valor de marca de agua después de cada ejecución de canalización.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Configuración de una canalización para copia incremental
Paso 1: crear una canalización
Vaya a Power BI.
Seleccione el icono de Power BI en la parte inferior izquierda de la pantalla y, a continuación, seleccione Tejido.
Seleccione Mi área de trabajo para abrir el área de trabajo de Fabric.
Seleccione + Nuevo elemento y, después, canalización y escriba un nombre de canalización para crear una canalización.
Paso 2: agregar una actividad de búsqueda para la última referencia
En este paso, cree una actividad de búsqueda para obtener el último valor de marca temporal. Obtendremos el valor 1/1/2010 12:00:00 AM predeterminado que se estableció anteriormente.
Seleccione Actividad de canalización y Búsqueda en la lista desplegable.
En la pestaña General , cambie el nombre de esta actividad a LookupOldWaterMarkActivity.
En la pestaña Configuración , configure lo siguiente:
- Conexión: en Almacenamiento , seleccione Examinar todo y seleccione el almacenamiento de datos en la lista.
- Usar consulta: elija Tabla.
- Tabla: elija dbo.watermarktable.
- Solo primera fila: seleccionado.
Paso 3: agregar una actividad de búsqueda para la referencia nueva
En este paso, creará una actividad de búsqueda para obtener el nuevo valor de la marca de agua. Vas a usar una consulta para obtener la nueva marca de agua de la tabla de datos de origen. Obtendremos el valor más alto en la columna LastModifytime de data_source_table.
En la barra superior, seleccione Búsqueda en la pestaña Actividades para agregar la segunda actividad de búsqueda.
En la pestaña General , cambie el nombre de esta actividad a LookupNewWaterMarkActivity.
En la pestaña Configuración , configure lo siguiente:
Conexión: bajo Almacén de datos, seleccione Consultar todo y seleccione su almacén de datos en la lista o seleccione su almacén de datos en Conexiones de Elementos de Fabric.
Usar consulta: elija Consulta.
Consulta: escriba la siguiente consulta para elegir la hora máxima de la última modificación como nueva referencia:
select MAX(LastModifytime) as NewWatermarkvalue from data_source_tableSolo primera fila: seleccionado.
Paso 4: agregar la actividad de copia para copiar datos incrementales
En este paso, agregará una actividad de copia para copiar los datos incrementales entre la última marca de agua y la nueva marca de agua desde el Data Warehouse al Lakehouse.
Seleccione Actividades en la barra superior y seleccione Copiar datos ->Agregar al lienzo para obtener la actividad de copia.
En la pestaña General , cambie el nombre de esta actividad a IncrementalCopyActivity.
Conecte ambas actividades de búsqueda a la actividad de copia arrastrando el botón verde (Con éxito) desde las actividades de búsqueda hasta la actividad de copia. Suelte el botón del mouse cuando vea el color del borde de la actividad de copia cambiar a verde.
En la pestaña Origen , configure lo siguiente:
Conexión: bajo Almacén de datos, seleccione Consultar todo y seleccione su almacén de datos en la lista o seleccione su almacén de datos en Conexiones de Elementos de Fabric.
Almacenamiento: seleccione su almacén.
Usar consulta: elija Consulta.
Consulta: escriba la siguiente consulta para copiar datos incrementales entre la última marca de agua y la nueva marca de agua.
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
En la pestaña Destino , configure lo siguiente:
- Conexión: En Lakehouse seleccione Examinar todo y elija su Lakehouse de la lista o seleccione su Lakehouse en las Conexiones de elementos de Fabric.
- Almacén de lago: seleccione su almacén de lago.
- Carpeta raíz: elija Archivos.
-
Ruta de acceso del archivo: elija la carpeta donde desea almacenar los datos copiados. Seleccione Examinar para seleccionar la carpeta. Para el nombre de archivo, abra Agregar contenido dinámico y escriba
@CONCAT('Incremental-', pipeline().RunId, '.txt')en la ventana abierta para crear nombres de archivo para el archivo de datos copiado en el almacén de lago. - Formato de archivo: seleccione el tipo de formato de los datos.
Paso 5: Agregar una actividad de procedimiento almacenado
En este paso, añadirá una actividad de un procedimiento almacenado para actualizar el valor de indicador más reciente para la ejecución siguiente de la canalización.
Seleccione Actividades en la barra superior y seleccione Procedimiento almacenado para agregar una actividad de procedimiento almacenado.
En la pestaña General , cambie el nombre de esta actividad a StoredProceduretoWriteWatermarkActivity.
Conecte la salida verde (en caso de éxito) de la actividad de copia con la actividad de procedimiento almacenado.
En la pestaña Configuración , configure lo siguiente:
Almacenamiento de datos: seleccione su almacenamiento de datos.
Nombre del procedimiento almacenado: elija el procedimiento almacenado que creó en el almacenamiento de datos: [dbo].[ usp_write_watermark].
Expanda Parámetros de procedimiento almacenado. Para establecer valores para los parámetros del procedimiento almacenado, seleccione Importar y escriba los valores siguientes para los parámetros:
Nombre Tipo Value LastModifiedtime DateTime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
Paso 6: Ejecución de la canalización y supervisión del resultado
En la barra superior, seleccione Ejecutar en la pestaña Inicio . A continuación, seleccione Guardar y ejecutar. La pipeline comienza a ejecutarse y puede monitorearla en la pestaña Salida.
Vaya a Lakehouse y encontrará el archivo de datos en la carpeta que eligió. Puede seleccionar el archivo para obtener una vista previa de los datos copiados.
Agregar más datos para ver los resultados de la copia incremental
Después de finalizar la primera ejecución de la canalización, vamos a agregar más datos a la tabla de origen del almacén de datos para ver si esta canalización puede copiar los datos añadidos de forma incremental.
Paso 1: incorporación de más datos al origen
Para insertar nuevos datos en el almacenamiento de datos, ejecute la consulta siguiente:
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
Los datos actualizados para data_source_table son:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
Paso 2: Desencadenar otra ejecución de canalización y supervisar el resultado
Volver a la página de la canalización. En la barra superior, seleccione Ejecutar de nuevo en la pestaña Inicio . La pipeline comienza a ejecutarse y puede supervisarla en Salida.
Vaya a Lakehouse y encontrará el nuevo archivo de datos copiado en la carpeta que eligió. Puede seleccionar el archivo para obtener una vista previa de los datos copiados. Verá que los datos incrementales se muestran en este archivo.
Contenido relacionado
A continuación, obtenga más información sobre cómo copiar desde Azure Blob Storage a Lakehouse.