次の方法で共有


Microsoft Fabric Data Engineering 用 Microsoft JDBC ドライバー (プレビュー)

この機能はプレビュー段階にあります。

JDBC (Java Database Connectivity) は、クライアント アプリケーションがデータベースやビッグ データ プラットフォームのデータに接続して操作できるようにする、広く採用されている標準です。

Microsoft JDBC Driver for Fabric Data Engineering を使用すると、JDBC 標準の信頼性とシンプルさを備えた Microsoft Fabric の Spark ワークロードの接続、クエリ、管理を行うことができます。 Microsoft Fabric の Livy API 上に構築されたドライバーは、Java アプリケーションと BI ツールに安全で柔軟な Spark SQL 接続を提供します。 この統合により、ノートブックまたは Spark ジョブ定義の成果物を個別に作成しなくても、Spark コードを直接送信して実行できます。

主な機能

  • JDBC 4.2 準拠: JDBC 4.2 仕様の完全な実装
  • Microsoft Entra ID 認証: 対話型、クライアント資格情報、証明書ベースの認証を含む複数の認証フロー
  • エンタープライズ接続プール: ヘルスモニタリングと自動リカバリ機能を備えた組み込みの接続プール
  • Spark SQL ネイティブ クエリのサポート: 変換なしで Spark SQL ステートメントを直接実行する
  • 包括的なデータ型のサポート: 複合型 (ARRAY、MAP、STRUCT) を含むすべての Spark SQL データ型のサポート
  • 非同期の結果セットプリフェッチ: パフォーマンス向上のためのバックグラウンド データ読み込み
  • サーキット ブレーカー パターン: 自動再試行による連鎖障害に対する保護
  • 自動再接続: 接続エラー時の透過的なセッション復旧
  • プロキシのサポート: エンタープライズ環境向けの HTTP および SOCKS プロキシの構成

[前提条件]

Microsoft JDBC Driver for Microsoft Fabric Data Engineering を使用する前に、次のことを確認してください。

  • Java Development Kit (JDK): バージョン 11 以降 (Java 21 を推奨)
  • Microsoft Fabric Access: Microsoft Fabric ワークスペースへのアクセス
  • Azure Entra ID 資格情報: 認証に適した資格情報
  • ワークスペース ID とレイクハウス ID: Fabric ワークスペースとレイクハウスの GUID 識別子

ダウンロードとインストール

Microsoft JDBC Driver for Microsoft Fabric Data Engineering バージョン 1.0.0 はパブリック プレビュー バージョンであり、Java 11、17、21 をサポートしています。 Java 接続のサポートは継続的に改善されており、Microsoft JDBC ドライバーの最新バージョンを使用することをお勧めします。

  1. 上記のリンクから zip または tar ファイルをダウンロードします。
  2. ダウンロードしたファイルを抽出して、ドライバー JAR ファイルにアクセスします。
  3. JRE バージョンに一致する JAR ファイルを選択します。
    • Java 11 の場合: ms-sparksql-jdbc-1.0.0.jre11.jar
    • Java 17 の場合: ms-sparksql-jdbc-1.0.0.jre17.jar
    • Java 21 の場合: ms-sparksql-jdbc-1.0.0.jre21.jar
  4. 選択した JAR ファイルをアプリケーションのクラスパスに追加します。
  5. JDBC クライアントの場合は、JDBC ドライバー クラスを構成します。 com.microsoft.spark.livy.jdbc.LivyDriver

クイック スタートの例

この例では、Microsoft Fabric に接続し、Microsoft JDBC Driver for Microsoft Fabric Data Engineering を使用してクエリを実行する方法を示します。 このコードを実行する前に、前提条件が満たされ、ドライバーがインストールされていることを確認してください。

import java.sql.*;

public class QuickStartExample {
    public static void main(String[] args) {
        // Connection string with required parameters
        String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
                     "FabricWorkspaceID=<workspace-id>;" +
                     "FabricLakehouseID=<lakehouse-id>;" +
                     "AuthFlow=2;" +  // Interactive browser authentication
                     "LogLevel=INFO";
        
        try (Connection conn = DriverManager.getConnection(url)) {
            // Execute a simple query
            try (Statement stmt = conn.createStatement();
                 ResultSet rs = stmt.executeQuery("SELECT 'Hello from Fabric!' as message")) {
                
                if (rs.next()) {
                    System.out.println(rs.getString("message"));
                }
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

接続文字列の形式

基本的な接続文字列

Microsoft JDBC Driver for Microsoft Fabric Data Engineering では、次の接続文字列形式が使用されます。

jdbc:fabricspark://<hostname>[:<port>][;<parameter1>=<value1>;<parameter2>=<value2>;...]

接続文字列コンポーネント

コンポーネント Description Example
プロトコル JDBC URL プロトコル識別子 jdbc:fabricspark://
ホスト名 Microsoft Fabric エンドポイントのホスト名 api.fabric.microsoft.com
ポート オプションのポート番号 (既定値: 443) :443
パラメーター セミコロンで区切られたキーと値のペア FabricWorkspaceID=<guid>

接続文字列の例

基本接続 (対話型認証)

jdbc:fabricspark://api.fabric.microsoft.com;FabricWorkspaceID=<workspace-id>;FabricLakehouseID=<lakehouse-id>;AuthFlow=2

Spark リソース構成を使用して

jdbc:fabricspark://api.fabric.microsoft.com;FabricWorkspaceID=<workspace-id>;FabricLakehouseID=<lakehouse-id>;DriverCores=4;DriverMemory=4g;ExecutorCores=4;ExecutorMemory=8g;NumExecutors=2;AuthFlow=2

Spark セッションプロパティを用いて

jdbc:fabricspark://api.fabric.microsoft.com;FabricWorkspaceID=<workspace-id>;FabricLakehouseID=<lakehouse-id>;spark.sql.adaptive.enabled=true;spark.sql.shuffle.partitions=200;AuthFlow=2

Authentication

Microsoft JDBC Driver for Microsoft Fabric Data Engineering では、Microsoft Entra ID (旧称 Azure Active Directory) を使用した複数の認証方法がサポートされています。 認証は、接続文字列の AuthFlow パラメーターを使用して構成されます。

認証フロー

AuthFlow 認証方法 使用事例
0 Azure CLI 資格情報 Azure CLI を使用した開発
1 クライアント資格情報 (サービス プリンシパル) 自動/サービス間認証
2 対話型ブラウザー 対話型ユーザー認証 (既定)
3 SPN サービス プリンシパル認証
4 証明書ベース 証明書ベースのサービス プリンシパル認証
5 アクセストークン 事前に取得されたアクセス トークン

対話型ブラウザー認証

最適な用途: 開発と対話型アプリケーション

String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
             "FabricWorkspaceID=<workspace-id>;" +
             "FabricLakehouseID=<lakehouse-id>;" +
             "AuthFlow=2;" +  // Interactive browser authentication
             "AuthTenantID=<tenant-id>;" +  // Optional
             "LogLevel=INFO";

Connection conn = DriverManager.getConnection(url);

パラメーター:

  • AuthFlow=2: 対話型ブラウザー認証を指定します
  • AuthTenantID (省略可能): Azure テナント ID
  • AuthClientID (省略可能): アプリケーション (クライアント) ID

動作:

  • ユーザー認証用のブラウザー ウィンドウを開きます
  • 資格情報は、有効期限が切れるまで後続の接続用にキャッシュされます
  • シングル ユーザー アプリケーションに適しています

クライアント資格情報認証

最適な用途は: 自動化されたサービスとバックグラウンド ジョブ

String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
             "FabricWorkspaceID=<workspace-id>;" +
             "FabricLakehouseID=<lakehouse-id>;" +
             "AuthFlow=1;" +  // Client credentials authentication
             "AuthClientID=<client-id>;" +
             "AuthClientSecret=<client-secret>;" +
             "AuthTenantID=<tenant-id>;" +
             "LogLevel=INFO";

Connection conn = DriverManager.getConnection(url);

必須パラメーター:

  • AuthFlow=1: クライアント資格情報認証を指定します
  • AuthClientID: Microsoft Entra ID からのアプリケーション (クライアント) ID
  • AuthClientSecret: Microsoft Entra ID からのクライアント シークレット
  • AuthTenantID: Azure テナント ID

ベスト プラクティス:

  • シークレットを安全に格納する (Azure Key Vault、環境変数)
  • 可能な場合はマネージド ID を使用する
  • シークレットを定期的にローテーションする

証明書ベースの認証

最適な対象: 証明書ベースの認証を必要とするエンタープライズ アプリケーション

String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
             "FabricWorkspaceID=<workspace-id>;" +
             "FabricLakehouseID=<lakehouse-id>;" +
             "AuthFlow=4;" +  // Certificate-based authentication
             "AuthClientID=<client-id>;" +
             "AuthCertificatePath=/path/to/certificate.pfx;" +
             "AuthCertificatePassword=<certificate-password>;" +
             "AuthTenantID=<tenant-id>;" +
             "LogLevel=INFO";

Connection conn = DriverManager.getConnection(url);

必須パラメーター:

  • AuthFlow=4: 証明書ベースの認証を指定します
  • AuthClientID: アプリケーション (クライアント) ID
  • AuthCertificatePath: PFX/PKCS12 証明書ファイルへのパス
  • AuthCertificatePassword: 証明書パスワード
  • AuthTenantID: Azure テナント ID

サービス プリンシパル認証

最適な用途: ヘッドレス環境とリモート セッション

String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
             "FabricWorkspaceID=<workspace-id>;" +
             "FabricLakehouseID=<lakehouse-id>;" +
             "AuthFlow=3;" +  // Device code authentication
             "AuthClientID=<client-id>;" +
             "AuthTenantID=<tenant-id>;" +
             "LogLevel=INFO";

Connection conn = DriverManager.getConnection(url);

動作:

  • 本体にデバイス コードと URL を表示します
  • ユーザーが URL にアクセスし、コードを入力する
  • ユーザー検証後に認証が完了する

アクセス トークン認証

最適な用途: カスタム認証シナリオ

// Acquire token through custom mechanism
String accessToken = acquireTokenFromCustomSource();

String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
             "FabricWorkspaceID=<workspace-id>;" +
             "FabricLakehouseID=<lakehouse-id>;" +
             "AuthFlow=5;" +  // Access token authentication
             "AuthAccessToken=" + accessToken + ";" +
             "LogLevel=INFO";

Connection conn = DriverManager.getConnection(url);

認証キャッシュ

ドライバーは、パフォーマンスを向上させるために認証トークンを自動的にキャッシュします。

// Enable/disable caching (enabled by default)
String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
             "FabricWorkspaceID=<workspace-id>;" +
             "FabricLakehouseID=<lakehouse-id>;" +
             "AuthFlow=2;" +
             "AuthEnableCaching=true;" +  // Enable token caching
             "AuthCacheTTLMS=3600000";    // Cache TTL: 1 hour

Connection conn = DriverManager.getConnection(url);

構成パラメーター

必須のパラメーター

これらのパラメーターは、すべての接続文字列に存在する必要があります。

パラメーター タイプ Description Example
FabricWorkspaceID UUID(ユニバーサリー・ユニーク・アイデンティファイア) Microsoft Fabric ワークスペース識別子 <workspace-id>
FabricLakehouseID UUID(ユニバーサリー・ユニーク・アイデンティファイア) Microsoft Fabric Lakehouse 識別子 <lakehouse-id>
AuthFlow 整数 認証フローの種類 (0 から 5) 2

省略可能なパラメーター

API バージョンの構成

パラメーター タイプ 既定値 Description
FabricVersion v1 Microsoft Fabric API のバージョン
LivyApiVersion 2023-12-01 Livy API バージョン

環境の構成

パラメーター タイプ 既定値 Description
FabricEnvironmentID UUID(ユニバーサリー・ユニーク・アイデンティファイア) None Spark セッションの環境項目を参照するためのファブリック環境識別子

Spark 構成

セッション リソースの構成

最適なパフォーマンスを得られるように Spark セッション リソースを構成します。

パラメーター タイプ 既定値 Description Example
DriverCores 整数 Spark の既定値 ドライバーの CPU コア数 4
DriverMemory Spark の既定値 ドライバーのメモリ割り当て 4g
ExecutorCores 整数 Spark の既定値 Executor あたりの CPU コア数 4
ExecutorMemory Spark の既定値 Executor あたりのメモリ割り当て 8g
NumExecutors 整数 Spark の既定値 Executor の数 2

Example:

DriverCores=4;DriverMemory=4g;ExecutorCores=4;ExecutorMemory=8g;NumExecutors=2

カスタム Spark セッションのプロパティ

プレフィックス spark. を持つパラメーターは、Spark セッションに自動的に適用されます。

Spark 構成の例:

spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.shuffle.partitions=200
spark.sql.autoBroadcastJoinThreshold=10485760
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=1
spark.dynamicAllocation.maxExecutors=10
spark.executor.memoryOverhead=1g

ネイティブ実行エンジン (NEE):

spark.nee.enabled=true

完全な例:

jdbc:fabricspark://api.fabric.microsoft.com;FabricWorkspaceID=<guid>;FabricLakehouseID=<guid>;DriverMemory=4g;ExecutorMemory=8g;NumExecutors=2;spark.sql.adaptive.enabled=true;spark.nee.enabled=true;AuthFlow=2

HTTP 接続プールの構成

最適なネットワーク パフォーマンスを得られるように HTTP 接続プールを構成します。

パラメーター タイプ 既定値 Description
HttpMaxTotalConnections 整数 100 HTTP 接続の最大数
HttpMaxConnectionsPerRoute 整数 50 ルートあたりの最大接続数
HttpConnectionTimeoutInSeconds 整数 30 接続タイムアウト
HttpSocketTimeoutInSeconds 整数 60 ソケットの読み取りタイムアウト
HttpReadTimeoutInSeconds 整数 60 HTTP 読み取りタイムアウト
HttpConnectionRequestTimeoutSeconds 整数 30 プールからの接続要求のタイムアウト
HttpEnableKeepAlive ブール値 true HTTP キープアライブを有効にする
HttpKeepAliveTimeoutSeconds 整数 60 キープアライブ タイムアウト
HttpFollowRedirects ブール値 true HTTP リダイレクトに従う
HttpUseAsyncIO ブール値 false 非同期 HTTP I/O を使用する

Example:

HttpMaxTotalConnections=200;HttpMaxConnectionsPerRoute=100;HttpConnectionTimeoutInSeconds=60

プロキシの構成

エンタープライズ環境の HTTP および SOCKS プロキシ設定を構成します。

パラメーター タイプ 既定値 Description
UseProxy ブール値 false プロキシを有効にする
ProxyTransport http プロキシ トランスポートの種類 (http/tcp)
ProxyHost None プロキシホスト名
ProxyPort 整数 None プロキシ ポート
ProxyAuthEnabled ブール値 false プロキシ認証を有効にする
ProxyUsername None プロキシ認証のユーザー名
ProxyPassword None プロキシ認証パスワード
ProxyAuthScheme basic 認証スキーム (basic/digest/ntlm)
ProxySocksVersion 整数 5 SOCKS バージョン (4/5)

HTTP プロキシの例:

UseProxy=true;ProxyTransport=http;ProxyHost=proxy.company.com;ProxyPort=8080;ProxyAuthEnabled=true;ProxyUsername=user;ProxyPassword=pass

SOCKS プロキシの例:

UseProxy=true;ProxyTransport=tcp;ProxyHost=socks.company.com;ProxyPort=1080;ProxySocksVersion=5

ログ記録の構成

パラメーター タイプ 既定値 Description
LogLevel INFO ログ レベル: TRACE、DEBUG、INFO、WARN、ERROR

Example:

LogLevel=DEBUG

既定のログの場所:

${user.home}/.microsoft/livy-jdbc-driver/driver.log

カスタム ログ構成: クラスパスでカスタム log4j2.xml または logback.xml ファイルを使用します。


使用例

基本接続

import java.sql.*;

public class BasicConnectionExample {
    public static void main(String[] args) {
        String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
                     "FabricWorkspaceID=<workspace-id>;" +
                     "FabricLakehouseID=<lakehouse-id>;" +
                     "AuthFlow=2";
        
        try (Connection conn = DriverManager.getConnection(url)) {
            System.out.println("Connected successfully!");
            System.out.println("Database: " + conn.getMetaData().getDatabaseProductName());
            System.out.println("Driver: " + conn.getMetaData().getDriverName());
            System.out.println("Driver Version: " + conn.getMetaData().getDriverVersion());
        } catch (SQLException e) {
            System.err.println("Connection failed: " + e.getMessage());
            e.printStackTrace();
        }
    }
}

クエリの実行

単純なクエリ

public void executeSimpleQuery(Connection conn) throws SQLException {
    String sql = "SELECT current_timestamp() as now";
    
    try (Statement stmt = conn.createStatement();
         ResultSet rs = stmt.executeQuery(sql)) {
        
        if (rs.next()) {
            Timestamp now = rs.getTimestamp("now");
            System.out.println("Current timestamp: " + now);
        }
    }
}

フィルターを使用したクエリ

public void executeQueryWithFilter(Connection conn) throws SQLException {
    String sql = "SELECT * FROM sales WHERE amount > 1000 ORDER BY amount DESC";
    
    try (Statement stmt = conn.createStatement();
         ResultSet rs = stmt.executeQuery(sql)) {
        
        while (rs.next()) {
            int id = rs.getInt("id");
            double amount = rs.getDouble("amount");
            Date date = rs.getDate("sale_date");
            
            System.out.printf("ID: %d, Amount: %.2f, Date: %s%n", 
                            id, amount, date);
        }
    }
}

制限付きクエリ

public void executeQueryWithLimit(Connection conn) throws SQLException {
    String sql = "SELECT * FROM customers LIMIT 10";
    
    try (Statement stmt = conn.createStatement();
         ResultSet rs = stmt.executeQuery(sql)) {
        
        ResultSetMetaData metaData = rs.getMetaData();
        int columnCount = metaData.getColumnCount();
        
        // Print column names
        for (int i = 1; i <= columnCount; i++) {
            System.out.print(metaData.getColumnName(i) + "\t");
        }
        System.out.println();
        
        // Print rows
        while (rs.next()) {
            for (int i = 1; i <= columnCount; i++) {
                System.out.print(rs.getString(i) + "\t");
            }
            System.out.println();
        }
    }
}

結果セットの操作

public void navigateResultSet(Connection conn) throws SQLException {
    String sql = "SELECT id, name, amount FROM orders";
    
    try (Statement stmt = conn.createStatement(
            ResultSet.TYPE_SCROLL_INSENSITIVE,
            ResultSet.CONCUR_READ_ONLY);
         ResultSet rs = stmt.executeQuery(sql)) {
        
        // Move to first row
        if (rs.first()) {
            System.out.println("First row: " + rs.getString("name"));
        }
        
        // Move to last row
        if (rs.last()) {
            System.out.println("Last row: " + rs.getString("name"));
            System.out.println("Total rows: " + rs.getRow());
        }
        
        // Move to specific row
        if (rs.absolute(5)) {
            System.out.println("Row 5: " + rs.getString("name"));
        }
    }
}

大規模な結果セットの処理

public void processLargeResultSet(Connection conn) throws SQLException {
    String sql = "SELECT * FROM large_table";
    
    try (Statement stmt = conn.createStatement()) {
        // Set fetch size for efficient memory usage
        stmt.setFetchSize(1000);
        
        try (ResultSet rs = stmt.executeQuery(sql)) {
            int rowCount = 0;
            while (rs.next()) {
                // Process row
                processRow(rs);
                rowCount++;
                
                if (rowCount % 10000 == 0) {
                    System.out.println("Processed " + rowCount + " rows");
                }
            }
            System.out.println("Total rows processed: " + rowCount);
        }
    }
}

private void processRow(ResultSet rs) throws SQLException {
    // Process individual row
}

準備されたステートメントの使用

public void usePreparedStatement(Connection conn) throws SQLException {
    String sql = "SELECT * FROM products WHERE category = ? AND price > ?";
    
    try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
        // Set parameters
        pstmt.setString(1, "Electronics");
        pstmt.setDouble(2, 100.0);
        
        try (ResultSet rs = pstmt.executeQuery()) {
            while (rs.next()) {
                String name = rs.getString("name");
                double price = rs.getDouble("price");
                System.out.printf("Product: %s, Price: $%.2f%n", name, price);
            }
        }
    }
}

バッチ操作

public void executeBatchInsert(Connection conn) throws SQLException {
    String sql = "INSERT INTO logs (timestamp, level, message) VALUES (?, ?, ?)";
    
    try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
        conn.setAutoCommit(false);  // Disable auto-commit for batch
        
        // Add multiple statements to batch
        for (int i = 0; i < 1000; i++) {
            pstmt.setTimestamp(1, new Timestamp(System.currentTimeMillis()));
            pstmt.setString(2, "INFO");
            pstmt.setString(3, "Log message " + i);
            pstmt.addBatch();
            
            // Execute batch every 100 statements
            if (i % 100 == 0) {
                pstmt.executeBatch();
                pstmt.clearBatch();
            }
        }
        
        // Execute remaining statements
        pstmt.executeBatch();
        conn.commit();
        
        System.out.println("Batch insert completed successfully");
    } catch (SQLException e) {
        conn.rollback();
        throw e;
    } finally {
        conn.setAutoCommit(true);
    }
}

データ型マッピング

ドライバーは、Spark SQL データ型を JDBC SQL 型と Java 型にマップします。

Spark SQL の種類 JDBC SQL 型 Java 型 注記
BOOLEAN BOOLEAN Boolean
BYTE TINYINT Byte
SHORT SMALLINT Short
INT INTEGER Integer
LONG BIGINT Long
FLOAT FLOAT Float
DOUBLE DOUBLE Double
DECIMAL DECIMAL BigDecimal 精度とスケールの保持
STRING VARCHAR String
VARCHAR(n) VARCHAR String
CHAR(n) CHAR String
BINARY BINARY byte[]
DATE DATE java.sql.Date
TIMESTAMP TIMESTAMP java.sql.Timestamp
ARRAY VARCHAR String JSON としてシリアル化
MAP VARCHAR String JSON としてシリアル化
STRUCT VARCHAR String JSON としてシリアル化