注释
此功能处于预览状态。
JDBC(Java 数据库连接)是一种广为采用的标准,使客户端应用程序能够连接到数据库和大数据平台中的数据并处理这些数据。
使用 Microsoft JDBC Driver for Fabric 数据工程,可以使用 JDBC 标准的可靠性与简单性连接、查询和管理 Microsoft Fabric 中的 Spark 工作负载。 该驱动程序基于 Microsoft Fabric 的 Livy API 构建,为 Java 应用程序和 BI 工具提供安全灵活的 Spark SQL 连接。 通过此集成,可以直接提交和执行 Spark 代码,而无需创建单独的笔记本或 Spark 作业定义项目。
主要功能
- JDBC 4.2 合规:JDBC 4.2 规范的完整实现
- Microsoft Entra ID 身份验证:多个身份验证流,包括交互式身份验证、客户端凭据和基于证书的身份验证
- 企业连接池:具有运行状况监视和自动恢复的内置连接池
- Spark SQL 本机查询支持:直接执行没有翻译的 Spark SQL 语句
- 全面的数据类型支持:支持所有 Spark SQL 数据类型,包括复杂类型(ARRAY、MAP、STRUCT)
- 异步结果集预提取:后台数据加载以提高性能
- 断路器模式:通过自动重试来保护系统免受级联故障的影响
- 自动重新连接:连接失败时透明会话恢复
- 代理支持:适用于企业环境的 HTTP 和 SOCKS 代理配置
先决条件
在使用适用于 Microsoft Fabric 数据工程的 Microsoft JDBC 驱动程序之前,请确保:
- Java 开发工具包(JDK):版本 11 或更高版本(建议使用 Java 21)
- Microsoft Fabric 访问:访问 Microsoft Fabric 工作区
- Azure Entra ID 凭据:用于身份验证的相应凭据
- 工作区和 Lakehouse ID:Fabric 工作区和 Lakehouse 的 GUID 标识符
下载和安装
Microsoft JDBC Driver for Microsoft Fabric 数据工程版本 1.0.0 是公共预览版,支持 Java 11、17 和 21。 我们正在不断改进 Java 连接支持,并建议你使用最新版本的 Microsoft JDBC 驱动程序。
- 下载适用于 Microsoft Fabric 数据工程的 JDBC 驱动程序Microsoft (zip)
- 下载适用于 Microsoft Fabric 数据工程的 JDBC 驱动程序Microsoft (tar)
- 从上面的链接下载 zip 或 tar 文件。
- 提取下载的文件以访问驱动程序 JAR 文件。
- 选择与 JRE 版本匹配的 JAR 文件:
- 对于 Java 11:
ms-sparksql-jdbc-1.0.0.jre11.jar - 对于 Java 17:
ms-sparksql-jdbc-1.0.0.jre17.jar - 对于 Java 21:
ms-sparksql-jdbc-1.0.0.jre21.jar
- 对于 Java 11:
- 将所选 JAR 文件添加到应用程序的类路径。
- 对于 JDBC 客户端,请配置 JDBC 驱动程序类:
com.microsoft.spark.livy.jdbc.LivyDriver
快速入门示例
此示例演示如何使用 Microsoft JDBC Driver for Microsoft Fabric 数据工程连接到 Microsoft Fabric 并执行查询。 运行此代码之前,请确保已完成先决条件并安装了驱动程序。
import java.sql.*;
public class QuickStartExample {
public static void main(String[] args) {
// Connection string with required parameters
String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
"FabricWorkspaceID=<workspace-id>;" +
"FabricLakehouseID=<lakehouse-id>;" +
"AuthFlow=2;" + // Interactive browser authentication
"LogLevel=INFO";
try (Connection conn = DriverManager.getConnection(url)) {
// Execute a simple query
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT 'Hello from Fabric!' as message")) {
if (rs.next()) {
System.out.println(rs.getString("message"));
}
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
连接字符串格式
基本连接字符串
适用于 Microsoft Fabric 数据工程的 Microsoft JDBC 驱动程序使用以下连接字符串格式:
jdbc:fabricspark://<hostname>[:<port>][;<parameter1>=<value1>;<parameter2>=<value2>;...]
连接字符串组件
| 组件 | Description | Example |
|---|---|---|
| 协议 | JDBC URL 协议标识符 | jdbc:fabricspark:// |
| 主机名 | Microsoft Fabric 终结点主机名 | api.fabric.microsoft.com |
| 端口 | 可选端口号(默认值:443) | :443 |
| 参数 | 分号分隔键=值对 | FabricWorkspaceID=<guid> |
连接字符串示例
基本连接(交互式身份验证)
jdbc:fabricspark://api.fabric.microsoft.com;FabricWorkspaceID=<workspace-id>;FabricLakehouseID=<lakehouse-id>;AuthFlow=2
使用 Spark 资源配置
jdbc:fabricspark://api.fabric.microsoft.com;FabricWorkspaceID=<workspace-id>;FabricLakehouseID=<lakehouse-id>;DriverCores=4;DriverMemory=4g;ExecutorCores=4;ExecutorMemory=8g;NumExecutors=2;AuthFlow=2
使用 Spark 会话属性
jdbc:fabricspark://api.fabric.microsoft.com;FabricWorkspaceID=<workspace-id>;FabricLakehouseID=<lakehouse-id>;spark.sql.adaptive.enabled=true;spark.sql.shuffle.partitions=200;AuthFlow=2
Authentication
适用于 Microsoft Fabric 数据工程的 Microsoft JDBC 驱动程序通过 Microsoft Entra ID(前 Azure Active Directory)支持多种身份验证方法。 身份验证是使用 AuthFlow 连接字符串中的参数配置的。
身份验证流
| AuthFlow | 身份验证方法 | 用例 |
|---|---|---|
| 0 | Azure CLI 凭据 | 使用 Azure CLI 进行开发 |
| 1 | 客户端凭据(服务主体) | 服务间自动身份验证 |
| 2 | 交互式浏览器 | 交互式用户身份验证(默认值) |
| 3 | SPN | 服务主体身份验证 |
| 4 | 基于证书 | 基于证书的服务主体身份验证 |
| 5 | 访问令牌 | 预获取的访问令牌 |
交互式浏览器身份验证
最适合: 开发和交互式应用程序
String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
"FabricWorkspaceID=<workspace-id>;" +
"FabricLakehouseID=<lakehouse-id>;" +
"AuthFlow=2;" + // Interactive browser authentication
"AuthTenantID=<tenant-id>;" + // Optional
"LogLevel=INFO";
Connection conn = DriverManager.getConnection(url);
参数:
-
AuthFlow=2:指定交互式浏览器身份验证 -
AuthTenantID(可选):Azure 租户 ID -
AuthClientID(可选):应用程序(客户端)ID
行为:
- 打开用于用户身份验证的浏览器窗口
- 凭据将被缓存以供后续连接,直到过期
- 适用于单用户应用程序
客户端凭据身份验证
最适合: 自动化服务和后台作业
String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
"FabricWorkspaceID=<workspace-id>;" +
"FabricLakehouseID=<lakehouse-id>;" +
"AuthFlow=1;" + // Client credentials authentication
"AuthClientID=<client-id>;" +
"AuthClientSecret=<client-secret>;" +
"AuthTenantID=<tenant-id>;" +
"LogLevel=INFO";
Connection conn = DriverManager.getConnection(url);
必需参数:
-
AuthFlow=1:指定客户端凭据身份验证 -
AuthClientID:Microsoft Entra ID 中的应用程序(客户端)ID -
AuthClientSecret:来自 Microsoft Entra ID 的客户端密码 -
AuthTenantID:Azure 租户 ID
最佳做法:
- 安全地存储机密(Azure Key Vault、环境变量)
- 尽可能使用托管标识
- 定期轮换机密
基于证书的身份验证
最适合: 需要基于证书的身份验证的企业应用程序
String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
"FabricWorkspaceID=<workspace-id>;" +
"FabricLakehouseID=<lakehouse-id>;" +
"AuthFlow=4;" + // Certificate-based authentication
"AuthClientID=<client-id>;" +
"AuthCertificatePath=/path/to/certificate.pfx;" +
"AuthCertificatePassword=<certificate-password>;" +
"AuthTenantID=<tenant-id>;" +
"LogLevel=INFO";
Connection conn = DriverManager.getConnection(url);
必需参数:
-
AuthFlow=4:指定基于证书的身份验证 -
AuthClientID:应用程序(客户端)ID -
AuthCertificatePath:PFX/PKCS12 证书文件的路径 -
AuthCertificatePassword:证书密码 -
AuthTenantID:Azure 租户 ID
服务主体身份验证
最适合: 无外设环境和远程会话
String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
"FabricWorkspaceID=<workspace-id>;" +
"FabricLakehouseID=<lakehouse-id>;" +
"AuthFlow=3;" + // Device code authentication
"AuthClientID=<client-id>;" +
"AuthTenantID=<tenant-id>;" +
"LogLevel=INFO";
Connection conn = DriverManager.getConnection(url);
行为:
- 在控制台中显示设备代码和 URL
- 用户访问 URL 并输入代码
- 身份验证在用户验证后完成
访问令牌身份验证
最适合: 自定义身份验证方案
// Acquire token through custom mechanism
String accessToken = acquireTokenFromCustomSource();
String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
"FabricWorkspaceID=<workspace-id>;" +
"FabricLakehouseID=<lakehouse-id>;" +
"AuthFlow=5;" + // Access token authentication
"AuthAccessToken=" + accessToken + ";" +
"LogLevel=INFO";
Connection conn = DriverManager.getConnection(url);
身份验证缓存
驱动程序会自动缓存身份验证令牌以提高性能:
// Enable/disable caching (enabled by default)
String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
"FabricWorkspaceID=<workspace-id>;" +
"FabricLakehouseID=<lakehouse-id>;" +
"AuthFlow=2;" +
"AuthEnableCaching=true;" + // Enable token caching
"AuthCacheTTLMS=3600000"; // Cache TTL: 1 hour
Connection conn = DriverManager.getConnection(url);
配置参数
必需参数
这些参数必须存在于每个连接字符串中:
| 参数 | 类型 | Description | Example |
|---|---|---|---|
FabricWorkspaceID |
唯一通用识别码 (UUID) | Microsoft Fabric 工作区标识符 | <workspace-id> |
FabricLakehouseID |
唯一通用识别码 (UUID) | Microsoft Fabric lakehouse 标识符 | <lakehouse-id> |
AuthFlow |
整数 | 身份验证流类型 (0-5) | 2 |
可选参数
API 版本配置
| 参数 | 类型 | 违约 | Description |
|---|---|---|---|
FabricVersion |
String | v1 |
Microsoft Fabric API 版本 |
LivyApiVersion |
String | 2023-12-01 |
Livy API 版本 |
环境配置
| 参数 | 类型 | 违约 | Description |
|---|---|---|---|
FabricEnvironmentID |
唯一通用识别码 (UUID) | None | 用于标识 Spark 会话环境项的 Fabric 环境标识符 |
Spark 配置
会话资源配置
配置 Spark 会话资源以实现最佳性能:
| 参数 | 类型 | 违约 | Description | Example |
|---|---|---|---|---|
DriverCores |
整数 | Spark 默认值 | 驱动程序的 CPU 核心数 | 4 |
DriverMemory |
String | Spark 默认值 | 驱动程序的内存分配 | 4g |
ExecutorCores |
整数 | Spark 默认值 | 每个执行程序的 CPU 核心数 | 4 |
ExecutorMemory |
String | Spark 默认值 | 每个执行程序的内存分配 | 8g |
NumExecutors |
整数 | Spark 默认值 | 执行程序数 | 2 |
Example:
DriverCores=4;DriverMemory=4g;ExecutorCores=4;ExecutorMemory=8g;NumExecutors=2
自定义 Spark 会话属性
具有前缀 spark. 的任何参数都会自动应用于 Spark 会话:
Spark 配置示例:
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.shuffle.partitions=200
spark.sql.autoBroadcastJoinThreshold=10485760
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=1
spark.dynamicAllocation.maxExecutors=10
spark.executor.memoryOverhead=1g
本机执行引擎 (NEE):
spark.nee.enabled=true
完整示例:
jdbc:fabricspark://api.fabric.microsoft.com;FabricWorkspaceID=<guid>;FabricLakehouseID=<guid>;DriverMemory=4g;ExecutorMemory=8g;NumExecutors=2;spark.sql.adaptive.enabled=true;spark.nee.enabled=true;AuthFlow=2
HTTP 连接池配置
配置 HTTP 连接池以实现最佳网络性能:
| 参数 | 类型 | 违约 | Description |
|---|---|---|---|
HttpMaxTotalConnections |
整数 | 100 | 最大 HTTP 连接总数 |
HttpMaxConnectionsPerRoute |
整数 | 50 | 每个路由的最大连接数 |
HttpConnectionTimeoutInSeconds |
整数 | 30 | 连接超时 |
HttpSocketTimeoutInSeconds |
整数 | 60 | 套接字读取超时 |
HttpReadTimeoutInSeconds |
整数 | 60 | HTTP 读取超时 |
HttpConnectionRequestTimeoutSeconds |
整数 | 30 | 来自池的连接请求超时 |
HttpEnableKeepAlive |
布尔 | 是 | 启用 HTTP 保持活动状态 |
HttpKeepAliveTimeoutSeconds |
整数 | 60 | 保持活动状态超时 |
HttpFollowRedirects |
布尔 | 是 | 遵循 HTTP 重定向 |
HttpUseAsyncIO |
布尔 | 假 | 使用异步 HTTP I/O |
Example:
HttpMaxTotalConnections=200;HttpMaxConnectionsPerRoute=100;HttpConnectionTimeoutInSeconds=60
代理配置
为企业环境配置 HTTP 和 SOCKS 代理设置:
| 参数 | 类型 | 违约 | Description |
|---|---|---|---|
UseProxy |
布尔 | 假 | 启用代理 |
ProxyTransport |
String | http |
代理传输类型 (http/tcp) |
ProxyHost |
String | None | 代理主机名 |
ProxyPort |
整数 | None | 代理端口 |
ProxyAuthEnabled |
布尔 | 假 | 启用代理身份验证 |
ProxyUsername |
String | None | 代理身份验证用户名 |
ProxyPassword |
String | None | 代理身份验证密码 |
ProxyAuthScheme |
String | basic |
身份验证方案 (basic/digest/ntlm) |
ProxySocksVersion |
整数 | 5 | SOCKS 版本(4/5) |
HTTP 代理示例:
UseProxy=true;ProxyTransport=http;ProxyHost=proxy.company.com;ProxyPort=8080;ProxyAuthEnabled=true;ProxyUsername=user;ProxyPassword=pass
SOCKS 代理示例:
UseProxy=true;ProxyTransport=tcp;ProxyHost=socks.company.com;ProxyPort=1080;ProxySocksVersion=5
日志记录配置
| 参数 | 类型 | 违约 | Description |
|---|---|---|---|
LogLevel |
String | INFO |
日志记录级别:TRACE、DEBUG、INFO、WARN、ERROR |
Example:
LogLevel=DEBUG
默认日志位置:
${user.home}/.microsoft/livy-jdbc-driver/driver.log
自定义日志配置: 在类路径上使用自定义 log4j2.xml 或 logback.xml 文件。
用法示例
基本连接
import java.sql.*;
public class BasicConnectionExample {
public static void main(String[] args) {
String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
"FabricWorkspaceID=<workspace-id>;" +
"FabricLakehouseID=<lakehouse-id>;" +
"AuthFlow=2";
try (Connection conn = DriverManager.getConnection(url)) {
System.out.println("Connected successfully!");
System.out.println("Database: " + conn.getMetaData().getDatabaseProductName());
System.out.println("Driver: " + conn.getMetaData().getDriverName());
System.out.println("Driver Version: " + conn.getMetaData().getDriverVersion());
} catch (SQLException e) {
System.err.println("Connection failed: " + e.getMessage());
e.printStackTrace();
}
}
}
执行查询
简单查询
public void executeSimpleQuery(Connection conn) throws SQLException {
String sql = "SELECT current_timestamp() as now";
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
if (rs.next()) {
Timestamp now = rs.getTimestamp("now");
System.out.println("Current timestamp: " + now);
}
}
}
使用筛选器进行查询
public void executeQueryWithFilter(Connection conn) throws SQLException {
String sql = "SELECT * FROM sales WHERE amount > 1000 ORDER BY amount DESC";
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
while (rs.next()) {
int id = rs.getInt("id");
double amount = rs.getDouble("amount");
Date date = rs.getDate("sale_date");
System.out.printf("ID: %d, Amount: %.2f, Date: %s%n",
id, amount, date);
}
}
}
具有限制的查询
public void executeQueryWithLimit(Connection conn) throws SQLException {
String sql = "SELECT * FROM customers LIMIT 10";
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
// Print column names
for (int i = 1; i <= columnCount; i++) {
System.out.print(metaData.getColumnName(i) + "\t");
}
System.out.println();
// Print rows
while (rs.next()) {
for (int i = 1; i <= columnCount; i++) {
System.out.print(rs.getString(i) + "\t");
}
System.out.println();
}
}
}
使用结果集
浏览结果集
public void navigateResultSet(Connection conn) throws SQLException {
String sql = "SELECT id, name, amount FROM orders";
try (Statement stmt = conn.createStatement(
ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
ResultSet rs = stmt.executeQuery(sql)) {
// Move to first row
if (rs.first()) {
System.out.println("First row: " + rs.getString("name"));
}
// Move to last row
if (rs.last()) {
System.out.println("Last row: " + rs.getString("name"));
System.out.println("Total rows: " + rs.getRow());
}
// Move to specific row
if (rs.absolute(5)) {
System.out.println("Row 5: " + rs.getString("name"));
}
}
}
处理大型结果集
public void processLargeResultSet(Connection conn) throws SQLException {
String sql = "SELECT * FROM large_table";
try (Statement stmt = conn.createStatement()) {
// Set fetch size for efficient memory usage
stmt.setFetchSize(1000);
try (ResultSet rs = stmt.executeQuery(sql)) {
int rowCount = 0;
while (rs.next()) {
// Process row
processRow(rs);
rowCount++;
if (rowCount % 10000 == 0) {
System.out.println("Processed " + rowCount + " rows");
}
}
System.out.println("Total rows processed: " + rowCount);
}
}
}
private void processRow(ResultSet rs) throws SQLException {
// Process individual row
}
使用预处理语句
public void usePreparedStatement(Connection conn) throws SQLException {
String sql = "SELECT * FROM products WHERE category = ? AND price > ?";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
// Set parameters
pstmt.setString(1, "Electronics");
pstmt.setDouble(2, 100.0);
try (ResultSet rs = pstmt.executeQuery()) {
while (rs.next()) {
String name = rs.getString("name");
double price = rs.getDouble("price");
System.out.printf("Product: %s, Price: $%.2f%n", name, price);
}
}
}
}
批量操作
public void executeBatchInsert(Connection conn) throws SQLException {
String sql = "INSERT INTO logs (timestamp, level, message) VALUES (?, ?, ?)";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
conn.setAutoCommit(false); // Disable auto-commit for batch
// Add multiple statements to batch
for (int i = 0; i < 1000; i++) {
pstmt.setTimestamp(1, new Timestamp(System.currentTimeMillis()));
pstmt.setString(2, "INFO");
pstmt.setString(3, "Log message " + i);
pstmt.addBatch();
// Execute batch every 100 statements
if (i % 100 == 0) {
pstmt.executeBatch();
pstmt.clearBatch();
}
}
// Execute remaining statements
pstmt.executeBatch();
conn.commit();
System.out.println("Batch insert completed successfully");
} catch (SQLException e) {
conn.rollback();
throw e;
} finally {
conn.setAutoCommit(true);
}
}
数据类型映射
驱动程序将 Spark SQL 数据类型映射到 JDBC SQL 类型和 Java 类型:
| Spark SQL 类型 | JDBC SQL 类型 | Java 类型 | 注释 |
|---|---|---|---|
BOOLEAN |
BOOLEAN |
Boolean |
|
BYTE |
TINYINT |
Byte |
|
SHORT |
SMALLINT |
Short |
|
INT |
INTEGER |
Integer |
|
LONG |
BIGINT |
Long |
|
FLOAT |
FLOAT |
Float |
|
DOUBLE |
DOUBLE |
Double |
|
DECIMAL |
DECIMAL |
BigDecimal |
保留精度和范围 |
STRING |
VARCHAR |
String |
|
VARCHAR(n) |
VARCHAR |
String |
|
CHAR(n) |
CHAR |
String |
|
BINARY |
BINARY |
byte[] |
|
DATE |
DATE |
java.sql.Date |
|
TIMESTAMP |
TIMESTAMP |
java.sql.Timestamp |
|
ARRAY |
VARCHAR |
String |
序列化为 JSON |
MAP |
VARCHAR |
String |
序列化为 JSON |
STRUCT |
VARCHAR |
String |
序列化为 JSON |
相关内容
- Fabric 中的 Apache Spark 运行时
- Fabric 运行时 1.3
- 什么是用于数据工程的 Livy API