从 SharePoint 引入文件

重要

标准 SharePoint 连接器为 Beta 版。 工作区管理员可以从 预览 页控制对此功能的访问。 请参阅 管理 Azure Databricks 预览版

了解如何将结构化、半结构化和非结构化文件从 Microsoft SharePoint 引入 Delta 表。 SharePoint 连接器支持通过批处理 API 和流式 API 对 SharePoint 文件进行增量导入,包括 Auto Loader,spark.readCOPY INTO,并且所有这些都符合 Unity Catalog 的治理要求。

选择 SharePoint 连接器

Lakeflow Connect 提供两个互补的 SharePoint 连接器。 它们都访问 SharePoint 中的数据,但它们支持不同的目标。

注意事项 托管 SharePoint 连接器 标准 SharePoint 连接器
管理和自定义 完全托管的连接器。
用于将数据引入 Delta 表并使其与源保持同步的企业应用程序的简单低维护连接器。 请参阅 Lakeflow Connect 中的托管连接器
使用 SQL、PySpark 或 Lakeflow Spark 声明性管道,通过批处理和流式处理 API(例如 read_filesspark.readCOPY INTO 和自动加载器)生成自定义数据引入管道。
提供灵活性,可在引入过程中执行复杂的转换,同时让您在管理和维护管道时承担更大的责任。
输出格式 统一二进制内容表。 以二进制格式引入每个文件(每行一个文件),以及文件元数据
附加列。
结构化Delta表。 将结构化文件(如 CSV 和 Excel)引入为 Delta 表。 还可用于引入
采用二进制格式的非结构化文件。
粒度、筛选和选择 目前没有子文件夹或文件级别选择。 无基于模式的筛选。
引入指定 SharePoint 文档库中的所有文件。
粒度和自定义。
基于 URL 的选择用于从文档库、子文件夹或单个文件中导入。 还支持使用 pathGlobFilter 选项基于模式的筛选。

主要功能

标准 SharePoint 连接器提供:

  • 引入结构化、半结构化和非结构化文件
  • 精细引入:引入特定网站、子网站、文档库、文件夹或单个文件
  • 使用 spark.read、Auto Loader 和流式及批量引入 COPY INTO
  • 结构化和半结构化格式(如 CSV 和 Excel)的自动架构推理和演变
  • 使用 Unity 目录连接保护凭据存储
  • 使用 pathGlobFilter 进行模式匹配的文件选择

要求

若要从 SharePoint 引入文件,必须具有以下各项:

  • 启用了 Unity Catalog 的工作区
  • CREATE CONNECTION 创建 SharePoint 连接的特权(或使用现有的 USE CONNECTION 连接)
  • 使用 Databricks Runtime 版本 17.3 LTS 或更高版本的计算
  • 使用 Sites.Read.All 权限范围设置的 OAuth 身份验证
  • SharePoint Beta 功能是从 预览 页面启用的。 请参阅 管理 Azure Databricks 预览版
  • 可选:启用 Excel Beta 功能以分析 Excel 文件。 请参阅 “读取 Excel 文件”

创建连接

创建用于存储 SharePoint 凭据的 Unity 目录连接。 连接设置过程在标准连接器和托管 SharePoint 连接器之间共享。

有关包括 OAuth 身份验证选项在内的完整连接设置说明,请参阅 SharePoint 引入设置概述

从 SharePoint 读取文件

若要使用 Spark 从 SharePoint 读取文件,请使用数据源选项指定在上一步 databricks.connection 中创建的连接,并提供要访问的 SharePoint 资源的 URL。 此 URL 可以引用特定文件、文件夹、文档库(驱动器)或整个网站。 示例包括:

  • https://mytenant.sharepoint.com/sites/test-site/
  • https://mytenant.sharepoint.com/sites/test-site/test-subsite
  • https://mytenant.sharepoint.com/sites/test-site/test-drive
  • https://mytenant.sharepoint.com/sites/test-site/Shared%20Documents/Forms/AllItems.aspx
  • https://mytenant.sharepoint.com/sites/test-site/test-drive/test-folder
  • https://mytenant.sharepoint.com/sites/test-site/test-drive/test-folder/test.csv
  • https://mytenant.sharepoint.com/sites/test-site/another-subsite/another-drive/test.csv

例子

查找使用标准 SharePoint 连接器读取文件的代码示例。

使用自动加载程序流式传输 SharePoint 文件

自动加载程序提供了从 SharePoint 增量引入结构化文件的最有效方法。 它会自动检测新文件,并在它们到达时对其进行处理。 它还可以使用自动架构推理和演变引入结构化文件和半结构化文件,例如 CSV 和 JSON。 有关自动加载程序使用情况的详细信息,请参阅 常见数据加载模式

# Incrementally ingest new PDF files
df = (spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "binaryFile")
    .option("databricks.connection", "my_sharepoint_conn")
    .option("cloudFiles.schemaLocation", <path to a schema location>)
    .option("pathGlobFilter", "*.pdf")
    .load("https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents")
)

# Incrementally ingest CSV files with automatic schema inference and evolution
df = (spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("databricks.connection", "my_sharepoint_conn")
    .option("pathGlobFilter", "*.csv")
    .option("inferColumnTypes", True)
    .option("header", True)
    .load("https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs")
)

使用 Spark 批处理读取 SharePoint 文件

# Read unstructured data as binary files
df = (spark.read
        .format("binaryFile")
        .option("databricks.connection", "my_sharepoint_conn")
        .option("recursiveFileLookup", True)
        .option("pathGlobFilter", "*.pdf") # optional. Example: only ingest PDFs
        .load("https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents"))

# Read a batch of CSV files, infer the schema, and load the data into a DataFrame
df = (spark.read
        .format("csv")
        .option("databricks.connection", "my_sharepoint_conn")
        .option("pathGlobFilter", "*.csv")
        .option("recursiveFileLookup", True)
        .option("inferSchema", True)
        .option("header", True)
        .load("https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs"))

# Read a specific Excel file from SharePoint, infer the schema, and load the data into a DataFrame
df = (spark.read
        .format("excel")
        .option("databricks.connection", "my_sharepoint_conn")
        .option("headerRows", 1)                   # optional
        .option("dataAddress", "'Sheet1'!A1:M20")  # optional
        .load("https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx"))

使用 Spark SQL 读取 SharePoint 文件

以下示例演示如何使用 read_files 表值函数在 SQL 中引入 SharePoint 文件。 有关 read_files 的用法详细信息,请参阅 read_files 表值函数

-- Read pdf files
CREATE TABLE my_table AS
SELECT * FROM read_files(
  "https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents",
  `databricks.connection` => "my_sharepoint_conn",
  format => "binaryFile",
  pathGlobFilter => "*.pdf", -- optional. Example: only ingest PDFs
  schemaEvolutionMode => "none"
);

-- Read a specific Excel sheet and range
CREATE TABLE my_sheet_table AS
SELECT * FROM read_files(
  "https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx",
  `databricks.connection` => "my_sharepoint_conn",
  format => "excel",
  headerRows => 1,  -- optional
  dataAddress => "'Sheet1'!A2:D10", -- optional
  schemaEvolutionMode => "none"
);

使用增量摄入进行引入COPY INTO

COPY INTO 提供文件到 Delta 表中的幂等增量加载。 有关COPY INTO使用的详细信息,请参阅使用COPY INTO的常见数据加载模式

CREATE TABLE IF NOT EXISTS sharepoint_pdf_table;
CREATE TABLE IF NOT EXISTS sharepoint_csv_table;
CREATE TABLE IF NOT EXISTS sharepoint_excel_table;

# Incrementally ingest new PDF files
COPY INTO sharepoint_pdf_table
  FROM "https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents"
  FILEFORMAT = BINARYFILE
  PATTERN = '*.pdf'
  COPY_OPTIONS ('mergeSchema' = 'true');

# Incrementally ingest CSV files with automatic schema inference and evolution
COPY INTO sharepoint_csv_table
  FROM "https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs"
  FILEFORMAT = CSV
  PATTERN = '*.csv'
  FORMAT_OPTIONS('header' = 'true', 'inferSchema' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true');

# Ingest a single Excel file
COPY INTO sharepoint_excel_table
  FROM "https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx"
  FILEFORMAT = EXCEL
  FORMAT_OPTIONS('headerRows' = '1')
  COPY_OPTIONS ('mergeSchema' = 'true');

在 Lakeflow Spark 声明性管道中引入 SharePoint 文件

注释

SharePoint 连接器需要 Databricks Runtime 17.3 或更高版本。 这在 Lakeflow Spark 声明式管道的版本中尚不可用。 若要查看用于 Lakeflow Spark 声明性管道版本的 Databricks Runtime 版本,请参阅该版本的 发行说明

以下示例演示如何在 Lakeflow Spark 声明性管道中使用自动加载程序读取 SharePoint 文件:

Python

from pyspark import pipelines as dp

# Incrementally ingest new PDF files
@dp.table
def sharepoint_pdf_table():
  return (spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "binaryFile")
    .option("databricks.connection", "my_sharepoint_conn")
    .option("pathGlobFilter", "*.pdf")
    .load("https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents")
  )

# Incrementally ingest CSV files with automatic schema inference and evolution
@dp.table
def sharepoint_csv_table():
  return (spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("databricks.connection", "my_sharepoint_conn")
      .option("pathGlobFilter", "*.csv")
      .option("inferColumnTypes", True)
      .option("header", True)
      .load("https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs")
  )

# Read a specific Excel file from SharePoint in a materialized view
@dp.table
def sharepoint_excel_table():
  return (spark.read.format("excel")
    .option("databricks.connection", "my_sharepoint_conn")
    .option("headerRows", 1)                   # optional
    .option("inferColumnTypes", True)            # optional
    .option("dataAddress", "'Sheet1'!A1:M20")  # optional
    .load("https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx")

SQL

-- Incrementally ingest new PDF files
CREATE OR REFRESH STREAMING TABLE sharepoint_pdf_table
AS SELECT * FROM STREAM read_files(
  "https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents",
  format => "binaryFile",
  `databricks.connection` => "my_sharepoint_conn",
  pathGlobFilter => "*.pdf");

-- Incrementally ingest CSV files with automatic schema inference and evolution
CREATE OR REFRESH STREAMING TABLE sharepoint_csv_table
AS SELECT * FROM STREAM read_files(
  "https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs",
  format => "csv",
  `databricks.connection` => "my_sharepoint_conn",
  pathGlobFilter => "*.csv",
  "header", "true");

-- Read a specific Excel file from SharePoint in a materialized view
CREATE OR REFRESH MATERIALIZED VIEW sharepoint_excel_table
AS SELECT * FROM read_files(
  "https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx",
  `databricks.connection` => "my_sharepoint_conn",
  format => "excel",
  headerRows => 1,  -- optional
  dataAddress => "'Sheet1'!A2:D10", -- optional
  `cloudFiles.schemaEvolutionMode` => "none"
);

局限性

标准 SharePoint 连接器具有以下限制:

  • 无多站点引入:不能使用相同的查询引入多个站点。 若要从两个站点引入,必须编写两个单独的查询。
  • 筛选:可以使用选项 pathGlobFilter 按名称筛选文件。 不支持基于文件夹路径的筛选。
  • 不支持的格式:不支持 SharePoint 列表和.aspx网站页面。 仅支持文档库中的文件。
  • 不支持将数据写回到 SharePoint 服务器。
  • 不支持自动加载程序 cleanSource (在引入后删除或存档源上的文件)。

后续步骤