重要
标准 SharePoint 连接器为 Beta 版。 工作区管理员可以从 预览 页控制对此功能的访问。 请参阅 管理 Azure Databricks 预览版。
了解如何将结构化、半结构化和非结构化文件从 Microsoft SharePoint 引入 Delta 表。 SharePoint 连接器支持通过批处理 API 和流式 API 对 SharePoint 文件进行增量导入,包括 Auto Loader,spark.read 和 COPY INTO,并且所有这些都符合 Unity Catalog 的治理要求。
选择 SharePoint 连接器
Lakeflow Connect 提供两个互补的 SharePoint 连接器。 它们都访问 SharePoint 中的数据,但它们支持不同的目标。
| 注意事项 | 托管 SharePoint 连接器 | 标准 SharePoint 连接器 |
|---|---|---|
| 管理和自定义 | 完全托管的连接器。 用于将数据引入 Delta 表并使其与源保持同步的企业应用程序的简单低维护连接器。 请参阅 Lakeflow Connect 中的托管连接器。 |
使用 SQL、PySpark 或 Lakeflow Spark 声明性管道,通过批处理和流式处理 API(例如 read_files、spark.read、COPY INTO 和自动加载器)生成自定义数据引入管道。提供灵活性,可在引入过程中执行复杂的转换,同时让您在管理和维护管道时承担更大的责任。 |
| 输出格式 | 统一二进制内容表。 以二进制格式引入每个文件(每行一个文件),以及文件元数据 附加列。 |
结构化Delta表。 将结构化文件(如 CSV 和 Excel)引入为 Delta 表。 还可用于引入 采用二进制格式的非结构化文件。 |
| 粒度、筛选和选择 | 目前没有子文件夹或文件级别选择。 无基于模式的筛选。 引入指定 SharePoint 文档库中的所有文件。 |
粒度和自定义。 基于 URL 的选择用于从文档库、子文件夹或单个文件中导入。 还支持使用 pathGlobFilter 选项基于模式的筛选。 |
主要功能
标准 SharePoint 连接器提供:
- 引入结构化、半结构化和非结构化文件
- 精细引入:引入特定网站、子网站、文档库、文件夹或单个文件
- 使用
spark.read、Auto Loader 和流式及批量引入COPY INTO - 结构化和半结构化格式(如 CSV 和 Excel)的自动架构推理和演变
- 使用 Unity 目录连接保护凭据存储
- 使用
pathGlobFilter进行模式匹配的文件选择
要求
若要从 SharePoint 引入文件,必须具有以下各项:
- 启用了 Unity Catalog 的工作区
-
CREATE CONNECTION创建 SharePoint 连接的特权(或使用现有的USE CONNECTION连接) - 使用 Databricks Runtime 版本 17.3 LTS 或更高版本的计算
- 使用
Sites.Read.All权限范围设置的 OAuth 身份验证 - SharePoint Beta 功能是从 预览 页面启用的。 请参阅 管理 Azure Databricks 预览版
- 可选:启用 Excel Beta 功能以分析 Excel 文件。 请参阅 “读取 Excel 文件”
创建连接
创建用于存储 SharePoint 凭据的 Unity 目录连接。 连接设置过程在标准连接器和托管 SharePoint 连接器之间共享。
有关包括 OAuth 身份验证选项在内的完整连接设置说明,请参阅 SharePoint 引入设置概述。
从 SharePoint 读取文件
若要使用 Spark 从 SharePoint 读取文件,请使用数据源选项指定在上一步 databricks.connection 中创建的连接,并提供要访问的 SharePoint 资源的 URL。 此 URL 可以引用特定文件、文件夹、文档库(驱动器)或整个网站。 示例包括:
https://mytenant.sharepoint.com/sites/test-site/https://mytenant.sharepoint.com/sites/test-site/test-subsitehttps://mytenant.sharepoint.com/sites/test-site/test-drivehttps://mytenant.sharepoint.com/sites/test-site/Shared%20Documents/Forms/AllItems.aspxhttps://mytenant.sharepoint.com/sites/test-site/test-drive/test-folderhttps://mytenant.sharepoint.com/sites/test-site/test-drive/test-folder/test.csvhttps://mytenant.sharepoint.com/sites/test-site/another-subsite/another-drive/test.csv
例子
查找使用标准 SharePoint 连接器读取文件的代码示例。
使用自动加载程序流式传输 SharePoint 文件
自动加载程序提供了从 SharePoint 增量引入结构化文件的最有效方法。 它会自动检测新文件,并在它们到达时对其进行处理。 它还可以使用自动架构推理和演变引入结构化文件和半结构化文件,例如 CSV 和 JSON。 有关自动加载程序使用情况的详细信息,请参阅 常见数据加载模式。
# Incrementally ingest new PDF files
df = (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("databricks.connection", "my_sharepoint_conn")
.option("cloudFiles.schemaLocation", <path to a schema location>)
.option("pathGlobFilter", "*.pdf")
.load("https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents")
)
# Incrementally ingest CSV files with automatic schema inference and evolution
df = (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("databricks.connection", "my_sharepoint_conn")
.option("pathGlobFilter", "*.csv")
.option("inferColumnTypes", True)
.option("header", True)
.load("https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs")
)
使用 Spark 批处理读取 SharePoint 文件
# Read unstructured data as binary files
df = (spark.read
.format("binaryFile")
.option("databricks.connection", "my_sharepoint_conn")
.option("recursiveFileLookup", True)
.option("pathGlobFilter", "*.pdf") # optional. Example: only ingest PDFs
.load("https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents"))
# Read a batch of CSV files, infer the schema, and load the data into a DataFrame
df = (spark.read
.format("csv")
.option("databricks.connection", "my_sharepoint_conn")
.option("pathGlobFilter", "*.csv")
.option("recursiveFileLookup", True)
.option("inferSchema", True)
.option("header", True)
.load("https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs"))
# Read a specific Excel file from SharePoint, infer the schema, and load the data into a DataFrame
df = (spark.read
.format("excel")
.option("databricks.connection", "my_sharepoint_conn")
.option("headerRows", 1) # optional
.option("dataAddress", "'Sheet1'!A1:M20") # optional
.load("https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx"))
使用 Spark SQL 读取 SharePoint 文件
以下示例演示如何使用 read_files 表值函数在 SQL 中引入 SharePoint 文件。 有关 read_files 的用法详细信息,请参阅 read_files 表值函数。
-- Read pdf files
CREATE TABLE my_table AS
SELECT * FROM read_files(
"https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents",
`databricks.connection` => "my_sharepoint_conn",
format => "binaryFile",
pathGlobFilter => "*.pdf", -- optional. Example: only ingest PDFs
schemaEvolutionMode => "none"
);
-- Read a specific Excel sheet and range
CREATE TABLE my_sheet_table AS
SELECT * FROM read_files(
"https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx",
`databricks.connection` => "my_sharepoint_conn",
format => "excel",
headerRows => 1, -- optional
dataAddress => "'Sheet1'!A2:D10", -- optional
schemaEvolutionMode => "none"
);
使用增量摄入进行引入COPY INTO
COPY INTO 提供文件到 Delta 表中的幂等增量加载。 有关COPY INTO使用的详细信息,请参阅使用COPY INTO的常见数据加载模式。
CREATE TABLE IF NOT EXISTS sharepoint_pdf_table;
CREATE TABLE IF NOT EXISTS sharepoint_csv_table;
CREATE TABLE IF NOT EXISTS sharepoint_excel_table;
# Incrementally ingest new PDF files
COPY INTO sharepoint_pdf_table
FROM "https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents"
FILEFORMAT = BINARYFILE
PATTERN = '*.pdf'
COPY_OPTIONS ('mergeSchema' = 'true');
# Incrementally ingest CSV files with automatic schema inference and evolution
COPY INTO sharepoint_csv_table
FROM "https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs"
FILEFORMAT = CSV
PATTERN = '*.csv'
FORMAT_OPTIONS('header' = 'true', 'inferSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
# Ingest a single Excel file
COPY INTO sharepoint_excel_table
FROM "https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx"
FILEFORMAT = EXCEL
FORMAT_OPTIONS('headerRows' = '1')
COPY_OPTIONS ('mergeSchema' = 'true');
在 Lakeflow Spark 声明性管道中引入 SharePoint 文件
注释
SharePoint 连接器需要 Databricks Runtime 17.3 或更高版本。 这在 Lakeflow Spark 声明式管道的版本中尚不可用。 若要查看用于 Lakeflow Spark 声明性管道版本的 Databricks Runtime 版本,请参阅该版本的 发行说明 。
以下示例演示如何在 Lakeflow Spark 声明性管道中使用自动加载程序读取 SharePoint 文件:
Python
from pyspark import pipelines as dp
# Incrementally ingest new PDF files
@dp.table
def sharepoint_pdf_table():
return (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("databricks.connection", "my_sharepoint_conn")
.option("pathGlobFilter", "*.pdf")
.load("https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents")
)
# Incrementally ingest CSV files with automatic schema inference and evolution
@dp.table
def sharepoint_csv_table():
return (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("databricks.connection", "my_sharepoint_conn")
.option("pathGlobFilter", "*.csv")
.option("inferColumnTypes", True)
.option("header", True)
.load("https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs")
)
# Read a specific Excel file from SharePoint in a materialized view
@dp.table
def sharepoint_excel_table():
return (spark.read.format("excel")
.option("databricks.connection", "my_sharepoint_conn")
.option("headerRows", 1) # optional
.option("inferColumnTypes", True) # optional
.option("dataAddress", "'Sheet1'!A1:M20") # optional
.load("https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx")
SQL
-- Incrementally ingest new PDF files
CREATE OR REFRESH STREAMING TABLE sharepoint_pdf_table
AS SELECT * FROM STREAM read_files(
"https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents",
format => "binaryFile",
`databricks.connection` => "my_sharepoint_conn",
pathGlobFilter => "*.pdf");
-- Incrementally ingest CSV files with automatic schema inference and evolution
CREATE OR REFRESH STREAMING TABLE sharepoint_csv_table
AS SELECT * FROM STREAM read_files(
"https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs",
format => "csv",
`databricks.connection` => "my_sharepoint_conn",
pathGlobFilter => "*.csv",
"header", "true");
-- Read a specific Excel file from SharePoint in a materialized view
CREATE OR REFRESH MATERIALIZED VIEW sharepoint_excel_table
AS SELECT * FROM read_files(
"https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx",
`databricks.connection` => "my_sharepoint_conn",
format => "excel",
headerRows => 1, -- optional
dataAddress => "'Sheet1'!A2:D10", -- optional
`cloudFiles.schemaEvolutionMode` => "none"
);
局限性
标准 SharePoint 连接器具有以下限制:
- 无多站点引入:不能使用相同的查询引入多个站点。 若要从两个站点引入,必须编写两个单独的查询。
-
筛选:可以使用选项
pathGlobFilter按名称筛选文件。 不支持基于文件夹路径的筛选。 - 不支持的格式:不支持 SharePoint 列表和.aspx网站页面。 仅支持文档库中的文件。
- 不支持将数据写回到 SharePoint 服务器。
- 不支持自动加载程序
cleanSource(在引入后删除或存档源上的文件)。
后续步骤
- 了解高级流式引入模式的自动加载程序
- 探索 COPY INTO 幂等增量加载
- 与 云对象存储引入 模式进行比较
- 设置 作业计划 以自动执行引入工作流
- 使用 Lakeflow Spark 声明性管道 生成具有转换的端到端数据管道