适用于 Microsoft Fabric 数据工程的 Microsoft JDBC 驱动程序(预览版)

注释

此功能处于预览状态。

JDBC(Java 数据库连接)是一种广为采用的标准,使客户端应用程序能够连接到数据库和大数据平台中的数据并处理这些数据。

使用 Microsoft JDBC Driver for Fabric 数据工程,可以使用 JDBC 标准的可靠性与简单性连接、查询和管理 Microsoft Fabric 中的 Spark 工作负载。 该驱动程序基于 Microsoft Fabric 的 Livy API 构建,为 Java 应用程序和 BI 工具提供安全灵活的 Spark SQL 连接。 通过此集成,可以直接提交和执行 Spark 代码,而无需创建单独的笔记本或 Spark 作业定义项目。

主要功能

  • JDBC 4.2 合规:JDBC 4.2 规范的完整实现
  • Microsoft Entra ID 身份验证:多个身份验证流,包括交互式身份验证、客户端凭据和基于证书的身份验证
  • 企业连接池:具有运行状况监视和自动恢复的内置连接池
  • Spark SQL 本机查询支持:直接执行没有翻译的 Spark SQL 语句
  • 全面的数据类型支持:支持所有 Spark SQL 数据类型,包括复杂类型(ARRAY、MAP、STRUCT)
  • 异步结果集预提取:后台数据加载以提高性能
  • 断路器模式:通过自动重试来保护系统免受级联故障的影响
  • 自动重新连接:连接失败时透明会话恢复
  • 代理支持:适用于企业环境的 HTTP 和 SOCKS 代理配置

先决条件

在使用适用于 Microsoft Fabric 数据工程的 Microsoft JDBC 驱动程序之前,请确保:

  • Java 开发工具包(JDK):版本 11 或更高版本(建议使用 Java 21)
  • Microsoft Fabric 访问:访问 Microsoft Fabric 工作区
  • Azure Entra ID 凭据:用于身份验证的相应凭据
  • 工作区和 Lakehouse ID:Fabric 工作区和 Lakehouse 的 GUID 标识符

下载和安装

Microsoft JDBC Driver for Microsoft Fabric 数据工程版本 1.0.0 是公共预览版,支持 Java 11、17 和 21。 我们正在不断改进 Java 连接支持,并建议你使用最新版本的 Microsoft JDBC 驱动程序。

  1. 从上面的链接下载 zip 或 tar 文件。
  2. 提取下载的文件以访问驱动程序 JAR 文件。
  3. 选择与 JRE 版本匹配的 JAR 文件:
    • 对于 Java 11: ms-sparksql-jdbc-1.0.0.jre11.jar
    • 对于 Java 17: ms-sparksql-jdbc-1.0.0.jre17.jar
    • 对于 Java 21: ms-sparksql-jdbc-1.0.0.jre21.jar
  4. 将所选 JAR 文件添加到应用程序的类路径。
  5. 对于 JDBC 客户端,请配置 JDBC 驱动程序类: com.microsoft.spark.livy.jdbc.LivyDriver

快速入门示例

此示例演示如何使用 Microsoft JDBC Driver for Microsoft Fabric 数据工程连接到 Microsoft Fabric 并执行查询。 运行此代码之前,请确保已完成先决条件并安装了驱动程序。

import java.sql.*;

public class QuickStartExample {
    public static void main(String[] args) {
        // Connection string with required parameters
        String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
                     "FabricWorkspaceID=<workspace-id>;" +
                     "FabricLakehouseID=<lakehouse-id>;" +
                     "AuthFlow=2;" +  // Interactive browser authentication
                     "LogLevel=INFO";
        
        try (Connection conn = DriverManager.getConnection(url)) {
            // Execute a simple query
            try (Statement stmt = conn.createStatement();
                 ResultSet rs = stmt.executeQuery("SELECT 'Hello from Fabric!' as message")) {
                
                if (rs.next()) {
                    System.out.println(rs.getString("message"));
                }
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

连接字符串格式

基本连接字符串

适用于 Microsoft Fabric 数据工程的 Microsoft JDBC 驱动程序使用以下连接字符串格式:

jdbc:fabricspark://<hostname>[:<port>][;<parameter1>=<value1>;<parameter2>=<value2>;...]

连接字符串组件

组件 Description Example
协议 JDBC URL 协议标识符 jdbc:fabricspark://
主机名 Microsoft Fabric 终结点主机名 api.fabric.microsoft.com
端口 可选端口号(默认值:443) :443
参数 分号分隔键=值对 FabricWorkspaceID=<guid>

连接字符串示例

基本连接(交互式身份验证)

jdbc:fabricspark://api.fabric.microsoft.com;FabricWorkspaceID=<workspace-id>;FabricLakehouseID=<lakehouse-id>;AuthFlow=2

使用 Spark 资源配置

jdbc:fabricspark://api.fabric.microsoft.com;FabricWorkspaceID=<workspace-id>;FabricLakehouseID=<lakehouse-id>;DriverCores=4;DriverMemory=4g;ExecutorCores=4;ExecutorMemory=8g;NumExecutors=2;AuthFlow=2

使用 Spark 会话属性

jdbc:fabricspark://api.fabric.microsoft.com;FabricWorkspaceID=<workspace-id>;FabricLakehouseID=<lakehouse-id>;spark.sql.adaptive.enabled=true;spark.sql.shuffle.partitions=200;AuthFlow=2

Authentication

适用于 Microsoft Fabric 数据工程的 Microsoft JDBC 驱动程序通过 Microsoft Entra ID(前 Azure Active Directory)支持多种身份验证方法。 身份验证是使用 AuthFlow 连接字符串中的参数配置的。

身份验证流

AuthFlow 身份验证方法 用例
0 Azure CLI 凭据 使用 Azure CLI 进行开发
1 客户端凭据(服务主体) 服务间自动身份验证
2 交互式浏览器 交互式用户身份验证(默认值)
3 SPN 服务主体身份验证
4 基于证书 基于证书的服务主体身份验证
5 访问令牌 预获取的访问令牌

交互式浏览器身份验证

最适合: 开发和交互式应用程序

String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
             "FabricWorkspaceID=<workspace-id>;" +
             "FabricLakehouseID=<lakehouse-id>;" +
             "AuthFlow=2;" +  // Interactive browser authentication
             "AuthTenantID=<tenant-id>;" +  // Optional
             "LogLevel=INFO";

Connection conn = DriverManager.getConnection(url);

参数:

  • AuthFlow=2:指定交互式浏览器身份验证
  • AuthTenantID (可选):Azure 租户 ID
  • AuthClientID (可选):应用程序(客户端)ID

行为:

  • 打开用于用户身份验证的浏览器窗口
  • 凭据将被缓存以供后续连接,直到过期
  • 适用于单用户应用程序

客户端凭据身份验证

最适合: 自动化服务和后台作业

String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
             "FabricWorkspaceID=<workspace-id>;" +
             "FabricLakehouseID=<lakehouse-id>;" +
             "AuthFlow=1;" +  // Client credentials authentication
             "AuthClientID=<client-id>;" +
             "AuthClientSecret=<client-secret>;" +
             "AuthTenantID=<tenant-id>;" +
             "LogLevel=INFO";

Connection conn = DriverManager.getConnection(url);

必需参数:

  • AuthFlow=1:指定客户端凭据身份验证
  • AuthClientID:Microsoft Entra ID 中的应用程序(客户端)ID
  • AuthClientSecret:来自 Microsoft Entra ID 的客户端密码
  • AuthTenantID:Azure 租户 ID

最佳做法:

  • 安全地存储机密(Azure Key Vault、环境变量)
  • 尽可能使用托管标识
  • 定期轮换机密

基于证书的身份验证

最适合: 需要基于证书的身份验证的企业应用程序

String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
             "FabricWorkspaceID=<workspace-id>;" +
             "FabricLakehouseID=<lakehouse-id>;" +
             "AuthFlow=4;" +  // Certificate-based authentication
             "AuthClientID=<client-id>;" +
             "AuthCertificatePath=/path/to/certificate.pfx;" +
             "AuthCertificatePassword=<certificate-password>;" +
             "AuthTenantID=<tenant-id>;" +
             "LogLevel=INFO";

Connection conn = DriverManager.getConnection(url);

必需参数:

  • AuthFlow=4:指定基于证书的身份验证
  • AuthClientID:应用程序(客户端)ID
  • AuthCertificatePath:PFX/PKCS12 证书文件的路径
  • AuthCertificatePassword:证书密码
  • AuthTenantID:Azure 租户 ID

服务主体身份验证

最适合: 无外设环境和远程会话

String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
             "FabricWorkspaceID=<workspace-id>;" +
             "FabricLakehouseID=<lakehouse-id>;" +
             "AuthFlow=3;" +  // Device code authentication
             "AuthClientID=<client-id>;" +
             "AuthTenantID=<tenant-id>;" +
             "LogLevel=INFO";

Connection conn = DriverManager.getConnection(url);

行为:

  • 在控制台中显示设备代码和 URL
  • 用户访问 URL 并输入代码
  • 身份验证在用户验证后完成

访问令牌身份验证

最适合: 自定义身份验证方案

// Acquire token through custom mechanism
String accessToken = acquireTokenFromCustomSource();

String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
             "FabricWorkspaceID=<workspace-id>;" +
             "FabricLakehouseID=<lakehouse-id>;" +
             "AuthFlow=5;" +  // Access token authentication
             "AuthAccessToken=" + accessToken + ";" +
             "LogLevel=INFO";

Connection conn = DriverManager.getConnection(url);

身份验证缓存

驱动程序会自动缓存身份验证令牌以提高性能:

// Enable/disable caching (enabled by default)
String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
             "FabricWorkspaceID=<workspace-id>;" +
             "FabricLakehouseID=<lakehouse-id>;" +
             "AuthFlow=2;" +
             "AuthEnableCaching=true;" +  // Enable token caching
             "AuthCacheTTLMS=3600000";    // Cache TTL: 1 hour

Connection conn = DriverManager.getConnection(url);

配置参数

必需参数

这些参数必须存在于每个连接字符串中:

参数 类型 Description Example
FabricWorkspaceID 唯一通用识别码 (UUID) Microsoft Fabric 工作区标识符 <workspace-id>
FabricLakehouseID 唯一通用识别码 (UUID) Microsoft Fabric lakehouse 标识符 <lakehouse-id>
AuthFlow 整数 身份验证流类型 (0-5) 2

可选参数

API 版本配置

参数 类型 违约 Description
FabricVersion String v1 Microsoft Fabric API 版本
LivyApiVersion String 2023-12-01 Livy API 版本

环境配置

参数 类型 违约 Description
FabricEnvironmentID 唯一通用识别码 (UUID) None 用于标识 Spark 会话环境项的 Fabric 环境标识符

Spark 配置

会话资源配置

配置 Spark 会话资源以实现最佳性能:

参数 类型 违约 Description Example
DriverCores 整数 Spark 默认值 驱动程序的 CPU 核心数 4
DriverMemory String Spark 默认值 驱动程序的内存分配 4g
ExecutorCores 整数 Spark 默认值 每个执行程序的 CPU 核心数 4
ExecutorMemory String Spark 默认值 每个执行程序的内存分配 8g
NumExecutors 整数 Spark 默认值 执行程序数 2

Example:

DriverCores=4;DriverMemory=4g;ExecutorCores=4;ExecutorMemory=8g;NumExecutors=2

自定义 Spark 会话属性

具有前缀 spark. 的任何参数都会自动应用于 Spark 会话:

Spark 配置示例:

spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.shuffle.partitions=200
spark.sql.autoBroadcastJoinThreshold=10485760
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=1
spark.dynamicAllocation.maxExecutors=10
spark.executor.memoryOverhead=1g

本机执行引擎 (NEE):

spark.nee.enabled=true

完整示例:

jdbc:fabricspark://api.fabric.microsoft.com;FabricWorkspaceID=<guid>;FabricLakehouseID=<guid>;DriverMemory=4g;ExecutorMemory=8g;NumExecutors=2;spark.sql.adaptive.enabled=true;spark.nee.enabled=true;AuthFlow=2

HTTP 连接池配置

配置 HTTP 连接池以实现最佳网络性能:

参数 类型 违约 Description
HttpMaxTotalConnections 整数 100 最大 HTTP 连接总数
HttpMaxConnectionsPerRoute 整数 50 每个路由的最大连接数
HttpConnectionTimeoutInSeconds 整数 30 连接超时
HttpSocketTimeoutInSeconds 整数 60 套接字读取超时
HttpReadTimeoutInSeconds 整数 60 HTTP 读取超时
HttpConnectionRequestTimeoutSeconds 整数 30 来自池的连接请求超时
HttpEnableKeepAlive 布尔 启用 HTTP 保持活动状态
HttpKeepAliveTimeoutSeconds 整数 60 保持活动状态超时
HttpFollowRedirects 布尔 遵循 HTTP 重定向
HttpUseAsyncIO 布尔 使用异步 HTTP I/O

Example:

HttpMaxTotalConnections=200;HttpMaxConnectionsPerRoute=100;HttpConnectionTimeoutInSeconds=60

代理配置

为企业环境配置 HTTP 和 SOCKS 代理设置:

参数 类型 违约 Description
UseProxy 布尔 启用代理
ProxyTransport String http 代理传输类型 (http/tcp)
ProxyHost String None 代理主机名
ProxyPort 整数 None 代理端口
ProxyAuthEnabled 布尔 启用代理身份验证
ProxyUsername String None 代理身份验证用户名
ProxyPassword String None 代理身份验证密码
ProxyAuthScheme String basic 身份验证方案 (basic/digest/ntlm)
ProxySocksVersion 整数 5 SOCKS 版本(4/5)

HTTP 代理示例:

UseProxy=true;ProxyTransport=http;ProxyHost=proxy.company.com;ProxyPort=8080;ProxyAuthEnabled=true;ProxyUsername=user;ProxyPassword=pass

SOCKS 代理示例:

UseProxy=true;ProxyTransport=tcp;ProxyHost=socks.company.com;ProxyPort=1080;ProxySocksVersion=5

日志记录配置

参数 类型 违约 Description
LogLevel String INFO 日志记录级别:TRACE、DEBUG、INFO、WARN、ERROR

Example:

LogLevel=DEBUG

默认日志位置:

${user.home}/.microsoft/livy-jdbc-driver/driver.log

自定义日志配置: 在类路径上使用自定义 log4j2.xmllogback.xml 文件。


用法示例

基本连接

import java.sql.*;

public class BasicConnectionExample {
    public static void main(String[] args) {
        String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
                     "FabricWorkspaceID=<workspace-id>;" +
                     "FabricLakehouseID=<lakehouse-id>;" +
                     "AuthFlow=2";
        
        try (Connection conn = DriverManager.getConnection(url)) {
            System.out.println("Connected successfully!");
            System.out.println("Database: " + conn.getMetaData().getDatabaseProductName());
            System.out.println("Driver: " + conn.getMetaData().getDriverName());
            System.out.println("Driver Version: " + conn.getMetaData().getDriverVersion());
        } catch (SQLException e) {
            System.err.println("Connection failed: " + e.getMessage());
            e.printStackTrace();
        }
    }
}

执行查询

简单查询

public void executeSimpleQuery(Connection conn) throws SQLException {
    String sql = "SELECT current_timestamp() as now";
    
    try (Statement stmt = conn.createStatement();
         ResultSet rs = stmt.executeQuery(sql)) {
        
        if (rs.next()) {
            Timestamp now = rs.getTimestamp("now");
            System.out.println("Current timestamp: " + now);
        }
    }
}

使用筛选器进行查询

public void executeQueryWithFilter(Connection conn) throws SQLException {
    String sql = "SELECT * FROM sales WHERE amount > 1000 ORDER BY amount DESC";
    
    try (Statement stmt = conn.createStatement();
         ResultSet rs = stmt.executeQuery(sql)) {
        
        while (rs.next()) {
            int id = rs.getInt("id");
            double amount = rs.getDouble("amount");
            Date date = rs.getDate("sale_date");
            
            System.out.printf("ID: %d, Amount: %.2f, Date: %s%n", 
                            id, amount, date);
        }
    }
}

具有限制的查询

public void executeQueryWithLimit(Connection conn) throws SQLException {
    String sql = "SELECT * FROM customers LIMIT 10";
    
    try (Statement stmt = conn.createStatement();
         ResultSet rs = stmt.executeQuery(sql)) {
        
        ResultSetMetaData metaData = rs.getMetaData();
        int columnCount = metaData.getColumnCount();
        
        // Print column names
        for (int i = 1; i <= columnCount; i++) {
            System.out.print(metaData.getColumnName(i) + "\t");
        }
        System.out.println();
        
        // Print rows
        while (rs.next()) {
            for (int i = 1; i <= columnCount; i++) {
                System.out.print(rs.getString(i) + "\t");
            }
            System.out.println();
        }
    }
}

使用结果集

public void navigateResultSet(Connection conn) throws SQLException {
    String sql = "SELECT id, name, amount FROM orders";
    
    try (Statement stmt = conn.createStatement(
            ResultSet.TYPE_SCROLL_INSENSITIVE,
            ResultSet.CONCUR_READ_ONLY);
         ResultSet rs = stmt.executeQuery(sql)) {
        
        // Move to first row
        if (rs.first()) {
            System.out.println("First row: " + rs.getString("name"));
        }
        
        // Move to last row
        if (rs.last()) {
            System.out.println("Last row: " + rs.getString("name"));
            System.out.println("Total rows: " + rs.getRow());
        }
        
        // Move to specific row
        if (rs.absolute(5)) {
            System.out.println("Row 5: " + rs.getString("name"));
        }
    }
}

处理大型结果集

public void processLargeResultSet(Connection conn) throws SQLException {
    String sql = "SELECT * FROM large_table";
    
    try (Statement stmt = conn.createStatement()) {
        // Set fetch size for efficient memory usage
        stmt.setFetchSize(1000);
        
        try (ResultSet rs = stmt.executeQuery(sql)) {
            int rowCount = 0;
            while (rs.next()) {
                // Process row
                processRow(rs);
                rowCount++;
                
                if (rowCount % 10000 == 0) {
                    System.out.println("Processed " + rowCount + " rows");
                }
            }
            System.out.println("Total rows processed: " + rowCount);
        }
    }
}

private void processRow(ResultSet rs) throws SQLException {
    // Process individual row
}

使用预处理语句

public void usePreparedStatement(Connection conn) throws SQLException {
    String sql = "SELECT * FROM products WHERE category = ? AND price > ?";
    
    try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
        // Set parameters
        pstmt.setString(1, "Electronics");
        pstmt.setDouble(2, 100.0);
        
        try (ResultSet rs = pstmt.executeQuery()) {
            while (rs.next()) {
                String name = rs.getString("name");
                double price = rs.getDouble("price");
                System.out.printf("Product: %s, Price: $%.2f%n", name, price);
            }
        }
    }
}

批量操作

public void executeBatchInsert(Connection conn) throws SQLException {
    String sql = "INSERT INTO logs (timestamp, level, message) VALUES (?, ?, ?)";
    
    try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
        conn.setAutoCommit(false);  // Disable auto-commit for batch
        
        // Add multiple statements to batch
        for (int i = 0; i < 1000; i++) {
            pstmt.setTimestamp(1, new Timestamp(System.currentTimeMillis()));
            pstmt.setString(2, "INFO");
            pstmt.setString(3, "Log message " + i);
            pstmt.addBatch();
            
            // Execute batch every 100 statements
            if (i % 100 == 0) {
                pstmt.executeBatch();
                pstmt.clearBatch();
            }
        }
        
        // Execute remaining statements
        pstmt.executeBatch();
        conn.commit();
        
        System.out.println("Batch insert completed successfully");
    } catch (SQLException e) {
        conn.rollback();
        throw e;
    } finally {
        conn.setAutoCommit(true);
    }
}

数据类型映射

驱动程序将 Spark SQL 数据类型映射到 JDBC SQL 类型和 Java 类型:

Spark SQL 类型 JDBC SQL 类型 Java 类型 注释
BOOLEAN BOOLEAN Boolean
BYTE TINYINT Byte
SHORT SMALLINT Short
INT INTEGER Integer
LONG BIGINT Long
FLOAT FLOAT Float
DOUBLE DOUBLE Double
DECIMAL DECIMAL BigDecimal 保留精度和范围
STRING VARCHAR String
VARCHAR(n) VARCHAR String
CHAR(n) CHAR String
BINARY BINARY byte[]
DATE DATE java.sql.Date
TIMESTAMP TIMESTAMP java.sql.Timestamp
ARRAY VARCHAR String 序列化为 JSON
MAP VARCHAR String 序列化为 JSON
STRUCT VARCHAR String 序列化为 JSON