引入 Workday 报表

本页介绍如何使用 Lakeflow Connect 引入 Workday 报表并将其加载到 Azure Databricks 中。

在您开始之前

若要创建引入管道,必须满足以下要求:

  • 你的工作区必须启用 Unity Catalog。

  • 必须为工作区启用无服务器计算。 请参阅 无服务器计算要求

  • 如果计划创建新的连接:你必须对元存储拥有 CREATE CONNECTION 特权。

    如果连接器支持基于 UI 的管道创作,管理员可以通过完成此页面上的步骤,同时创建连接和管道。 但是,如果创建管道的用户使用基于 API 的管道创作或非管理员用户,则管理员必须先在目录资源管理器中创建连接。 请参阅连接到托管的数据引入源

  • 如果计划使用现有连接:您必须在连接对象上具有 USE CONNECTION 特权或 ALL PRIVILEGES

  • 你必须对目标目录拥有 USE CATALOG 特权。

  • 你必须对现有架构拥有USE SCHEMACREATE TABLE特权,或者对目标目录拥有CREATE SCHEMA特权。

若要从 Workday 引入,必须完成 源设置

配置网络

如果已启用无服务器出口控制,则允许列出报表 URL 的主机名。 例如,报表 URL https://ww1.workday.com/service/ccx/<tenant>/<reportName>?format=json 具有主机名 https://ww1.workday.com。 请参阅 管理无服务器出口控制的网络策略

选项 1:Azure Databricks UI

  1. 在 Azure Databricks 工作区的边栏中,单击 “数据引入”。

  2. 在“添加数据”页面上的“Databricks 连接器”下,单击“Workday 报表”。

    此时会打开引入向导。

  3. 在向导的 引入管道 页上,输入管道的唯一名称。

  4. 对于 事件日志位置,请选择用于存储管道事件日志的目录和架构。

  5. 选择存储访问源数据所需的凭据的 Unity 目录连接。

    如果没有与源的现有连接,请单击“ 创建连接 ”,然后输入从 源设置获取的身份验证详细信息。 你必须对元存储拥有 CREATE CONNECTION 权限。

  6. 单击“ 创建管道”并继续

  7. 在“ 报表 ”页上,单击“ 添加报表 ”并输入报表 URL。 对要引入的每个报表重复此作,然后单击“ 下一步”。

  8. “目标 ”页上,选择要写入的 Unity 目录和架构。

    如果不想使用现有架构,请单击“ 创建架构”。 你必须对父目录拥有 USE CATALOGCREATE SCHEMA 权限。

  9. 单击“ 保存管道”并继续

  10. (可选)在 “设置” 页上,单击“ 创建计划”。 设置刷新目标表的频率。

  11. (可选)设置管道操作成功或失败的电子邮件通知。

  12. 单击“ 保存”并运行管道

选项 2:Databricks 资产包

本部分介绍如何使用 Databricks 资产捆绑包部署引入管道。 捆绑包可以包含作业和任务的 YAML 定义,使用 Databricks CLI 进行管理,并且可以在不同的目标工作区(如开发、过渡和生产)中共享和运行。 有关详细信息,请参阅 Databricks 资产捆绑包

可以在管道定义中使用以下表配置属性,来选择或取消选择需要导入的特定列:

  • include_columns:(可选)指定要添加且用于引入的列的列表。 如果使用此选项显式地添加列,则管道会自动排除将来添加到源的列。 若要引入将来的列,必须将它们添加到列表中。
  • exclude_columns:(可选)指定要从引入中排除的列的列表。 如果使用此选项显式排除某些列,管道会自动包含将来在源中新增的列。 若要引入将来的列,必须将它们添加到列表中。

还可以在报表 URL (source_url)中指定提示,以便引入筛选的报表。

  1. 确认存在与 Workday 的 Unity 目录连接。 有关创建连接的步骤,请参阅 “连接到托管引入源”。

  2. 使用 Databricks CLI 创建新捆绑包:

    databricks bundle init
    
  3. 将两个新资源文件添加到捆绑包:

    • 管道定义文件(resources/workday_pipeline.yml)。
    • 控制数据引入频率(resources/workday_job.yml)的工作流文件。

    以下是一个示例 resources/workday_pipeline.yml 文件:

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    # The main pipeline for workday_dab
    resources:
      pipelines:
        pipeline_workday:
          name: workday_pipeline
          catalog: ${var.dest_catalog}
          schema: ${var.dest_schema}
          ingestion_definition:
            connection_name: <workday-connection>
            objects:
              # An array of objects to ingest from Workday. This example
              # ingests a sample report about all active employees. The Employee_ID key is used as
              # the primary key for the report.
              - report:
                  source_url: https://wd2-impl-services1.workday.com/ccx/service/customreport2/All_Active_Employees_Data?format=json
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
                  destination_table: All_Active_Employees_Data
                  table_configuration:
                    primary_keys:
                      - Employee_ID
                    include_columns: # This can be exclude_columns instead
                      - <column_a>
                      - <column_b>
                      - <column_c>
    

    以下是一个示例 resources/workday_job.yml 文件:

    resources:
      jobs:
        workday_dab_job:
          name: workday_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_workday.id}
    
  4. 使用 Databricks CLI 部署管道:

    databricks bundle deploy
    

选项 3:Azure Databricks 笔记本

可以在管道定义中使用以下表配置属性,来选择或取消选择需要导入的特定列:

  • include_columns:(可选)指定要添加且用于引入的列的列表。 如果使用此选项显式地添加列,则管道会自动排除将来添加到源的列。 若要引入将来的列,必须将它们添加到列表中。
  • exclude_columns:(可选)指定要从引入中排除的列的列表。 如果使用此选项显式排除某些列,管道会自动包含将来在源中新增的列。 若要引入将来的列,必须将它们添加到列表中。

还可以在报表 URL (source_url)中指定提示,以便引入筛选的报表。

  1. 确认存在与 Workday 的 Unity 目录连接。 有关创建连接的步骤,请参阅 “连接到托管引入源”。

  2. 生成个人访问令牌。

  3. 将以下代码粘贴到一个 Python 笔记本单元格中,并修改 <personal-access-token> 值:

    # SHOULD MODIFY
    # This step sets up a PAT to make API calls to the Databricks service.
    api_token = "<personal-access-token>"
    
  4. 将以下代码粘贴到第二个笔记本单元格中:

    # DO NOT MODIFY
    # This step sets up a connection to make API calls to the Databricks service.
    import requests
    import json
    
    notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    workspace_url = notebook_context.apiUrl().get()
    api_url = f"{workspace_url}/api/2.0/pipelines"
    
    headers = {
       'Authorization': 'Bearer {}'.format(api_token),
       'Content-Type': 'application/json'
    }
    
    def check_response(response):
       if response.status_code == 200:
          print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
       else:
          print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
    
    # DO NOT MODIFY
    # These are API definition to be used.
    def create_pipeline(pipeline_definition: str):
    response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
    check_response(response)
    
    def edit_pipeline(id: str, pipeline_definition: str):
    response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
    check_response(response)
    
    def delete_pipeline(id: str):
    response = requests.delete(url=f"{api_url}/{id}", headers=headers)
    check_response(response)
    
    def get_pipeline(id: str):
    response = requests.get(url=f"{api_url}/{id}", headers=headers)
    check_response(response)
    
    def list_pipeline(filter: str = ""):
    body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
    response = requests.get(url=api_url, headers=headers, data=body)
    check_response(response)
    
  5. 将以下代码粘贴到第三个笔记本单元中,并进行修改以反映管道规范:

    # SHOULD MODIFY
    # Update this notebook to configure your ingestion pipeline.
    
    pipeline_spec = """
    {
    "name": "<YOUR_PIPELINE_NAME>",
    "ingestion_definition": {
       "connection_name": "<YOUR_CONNECTON_NAME>",
       "objects": [
          {
             "report": {
             "source_url": "<YOUR_REPORT_URL>",
             "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
             "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
             "destination_table": "<YOUR_DATABRICKS_TABLE>",
             "table_configuration": {
                   "primary_keys": ["<PRIMARY_KEY>"]
                }
             }
          }, {
             "report": {
             "source_url": "<YOUR_SECOND_REPORT_URL>",
             "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
             "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
             "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>",
             "table_configuration": {
                   "primary_keys": ["<PRIMARY_KEY>"],
                   "scd_type": "SCD_TYPE_2",
                   "include_columns": ["<column_a>", "<column_b>", "<column_c>"]
                }
             }
          }
       ]
    }
    }
    """
    
    create_pipeline(pipeline_spec)
    
  6. 使用个人访问令牌运行第一个笔记本单元。

  7. 运行第二个笔记本单元格。

  8. 使用管道详细信息运行第三个笔记本单元。 此操作运行 create_pipeline

    • list_pipeline 会返回管道 ID 及其详细信息。
    • edit_pipeline 允许编辑管道定义。
    • delete_pipeline 会删除管道。

选项 4:Databricks CLI

可以在管道定义中使用以下表配置属性,来选择或取消选择需要导入的特定列:

  • include_columns:(可选)指定要添加且用于引入的列的列表。 如果使用此选项显式地添加列,则管道会自动排除将来添加到源的列。 若要引入将来的列,必须将它们添加到列表中。
  • exclude_columns:(可选)指定要从引入中排除的列的列表。 如果使用此选项显式排除某些列,管道会自动包含将来在源中新增的列。 若要引入将来的列,必须将它们添加到列表中。

还可以在报表 URL (source_url)中指定提示,以便引入筛选的报表。

  1. 确认存在与 Workday 的 Unity 目录连接。 有关创建连接的步骤,请参阅 “连接到托管引入源”。
  2. 运行以下命令以创建管道:
databricks pipelines create --json "<pipeline-definition OR json-file-path>"

管道定义模板

下面是 JSON 管道定义模板:

"ingestion_definition": {

     "connection_name": "<connection-name>",

     "objects": [

       {

         "report": {

           "source_url": "<report-url>",

           "destination_catalog": "<destination-catalog>",

           "destination_schema": "<destination-schema>",

           "table_configuration": {

              "primary_keys": ["<primary-key>"],

              "scd_type": "SCD_TYPE_2",

              "include_columns": ["<column-a>", "<column-b>", "<column-c>"]

           }

         }

       }

     ]

 }

在管道上启动、计划和设置警报

可以在管道详细信息页上为管道创建计划。

  1. 创建管道后,重新访问 Azure Databricks 工作区,然后单击 管道

    新管道将显示在管道列表中。

  2. 若要查看管道详细信息,请单击管道名称。

  3. 在管道详细信息页上,可以通过单击“计划”来计划管道。

  4. 若要在管道上设置通知,请单击 设置,然后添加通知。

对于添加到管道的每个计划,Lakeflow Connect 会自动为其创建作业。 引入管道是作业中的任务。 可以选择将更多任务添加到作业。

示例:将两个 Workday 报表引入单独的架构

本节中的示例管道定义将两个 Workday 报表引入到单独的架构中。 多目标管道支持仅限 API。

resources:
  pipelines:
    pipeline_workday:
      name: workday_pipeline
      catalog: my_catalog_1 # Location of the pipeline event log
      schema: my_schema_1 # Location of the pipeline event log
      ingestion_definition:
        connection_name: <workday-connection>
        objects:
          - report:
              source_url: <report-url-1>
              destination_catalog: my_catalog_1
              destination_schema: my_schema_1
              destination_table: my_table_1
              table_configuration:
                primary_keys:
                  - <primary_key_column>
          - report:
              source_url: <report-url-2>
              destination_catalog: my_catalog_2
              destination_schema: my_schema_2
              destination_table: my_table_2
              table_configuration:
                primary_keys:
                  - <primary_key_column>

其他资源