注
この機能はプレビュー段階にあります。
JDBC (Java Database Connectivity) は、クライアント アプリケーションがデータベースやビッグ データ プラットフォームのデータに接続して操作できるようにする、広く採用されている標準です。
Microsoft JDBC Driver for Fabric Data Engineering を使用すると、JDBC 標準の信頼性とシンプルさを備えた Microsoft Fabric の Spark ワークロードの接続、クエリ、管理を行うことができます。 Microsoft Fabric の Livy API 上に構築されたドライバーは、Java アプリケーションと BI ツールに安全で柔軟な Spark SQL 接続を提供します。 この統合により、ノートブックまたは Spark ジョブ定義の成果物を個別に作成しなくても、Spark コードを直接送信して実行できます。
主な機能
- JDBC 4.2 準拠: JDBC 4.2 仕様の完全な実装
- Microsoft Entra ID 認証: 対話型、クライアント資格情報、証明書ベースの認証を含む複数の認証フロー
- エンタープライズ接続プール: ヘルスモニタリングと自動リカバリ機能を備えた組み込みの接続プール
- Spark SQL ネイティブ クエリのサポート: 変換なしで Spark SQL ステートメントを直接実行する
- 包括的なデータ型のサポート: 複合型 (ARRAY、MAP、STRUCT) を含むすべての Spark SQL データ型のサポート
- 非同期の結果セットプリフェッチ: パフォーマンス向上のためのバックグラウンド データ読み込み
- サーキット ブレーカー パターン: 自動再試行による連鎖障害に対する保護
- 自動再接続: 接続エラー時の透過的なセッション復旧
- プロキシのサポート: エンタープライズ環境向けの HTTP および SOCKS プロキシの構成
[前提条件]
Microsoft JDBC Driver for Microsoft Fabric Data Engineering を使用する前に、次のことを確認してください。
- Java Development Kit (JDK): バージョン 11 以降 (Java 21 を推奨)
- Microsoft Fabric Access: Microsoft Fabric ワークスペースへのアクセス
- Azure Entra ID 資格情報: 認証に適した資格情報
- ワークスペース ID とレイクハウス ID: Fabric ワークスペースとレイクハウスの GUID 識別子
ダウンロードとインストール
Microsoft JDBC Driver for Microsoft Fabric Data Engineering バージョン 1.0.0 はパブリック プレビュー バージョンであり、Java 11、17、21 をサポートしています。 Java 接続のサポートは継続的に改善されており、Microsoft JDBC ドライバーの最新バージョンを使用することをお勧めします。
- Microsoft JDBC Driver for Microsoft Fabric Data Engineering (zip) のダウンロード
- Microsoft JDBC Driver for Microsoft Fabric Data Engineering (tar) のダウンロード
- 上記のリンクから zip または tar ファイルをダウンロードします。
- ダウンロードしたファイルを抽出して、ドライバー JAR ファイルにアクセスします。
- JRE バージョンに一致する JAR ファイルを選択します。
- Java 11 の場合:
ms-sparksql-jdbc-1.0.0.jre11.jar - Java 17 の場合:
ms-sparksql-jdbc-1.0.0.jre17.jar - Java 21 の場合:
ms-sparksql-jdbc-1.0.0.jre21.jar
- Java 11 の場合:
- 選択した JAR ファイルをアプリケーションのクラスパスに追加します。
- JDBC クライアントの場合は、JDBC ドライバー クラスを構成します。
com.microsoft.spark.livy.jdbc.LivyDriver
クイック スタートの例
この例では、Microsoft Fabric に接続し、Microsoft JDBC Driver for Microsoft Fabric Data Engineering を使用してクエリを実行する方法を示します。 このコードを実行する前に、前提条件が満たされ、ドライバーがインストールされていることを確認してください。
import java.sql.*;
public class QuickStartExample {
public static void main(String[] args) {
// Connection string with required parameters
String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
"FabricWorkspaceID=<workspace-id>;" +
"FabricLakehouseID=<lakehouse-id>;" +
"AuthFlow=2;" + // Interactive browser authentication
"LogLevel=INFO";
try (Connection conn = DriverManager.getConnection(url)) {
// Execute a simple query
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT 'Hello from Fabric!' as message")) {
if (rs.next()) {
System.out.println(rs.getString("message"));
}
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
接続文字列の形式
基本的な接続文字列
Microsoft JDBC Driver for Microsoft Fabric Data Engineering では、次の接続文字列形式が使用されます。
jdbc:fabricspark://<hostname>[:<port>][;<parameter1>=<value1>;<parameter2>=<value2>;...]
接続文字列コンポーネント
| コンポーネント | Description | Example |
|---|---|---|
| プロトコル | JDBC URL プロトコル識別子 | jdbc:fabricspark:// |
| ホスト名 | Microsoft Fabric エンドポイントのホスト名 | api.fabric.microsoft.com |
| ポート | オプションのポート番号 (既定値: 443) | :443 |
| パラメーター | セミコロンで区切られたキーと値のペア | FabricWorkspaceID=<guid> |
接続文字列の例
基本接続 (対話型認証)
jdbc:fabricspark://api.fabric.microsoft.com;FabricWorkspaceID=<workspace-id>;FabricLakehouseID=<lakehouse-id>;AuthFlow=2
Spark リソース構成を使用して
jdbc:fabricspark://api.fabric.microsoft.com;FabricWorkspaceID=<workspace-id>;FabricLakehouseID=<lakehouse-id>;DriverCores=4;DriverMemory=4g;ExecutorCores=4;ExecutorMemory=8g;NumExecutors=2;AuthFlow=2
Spark セッションプロパティを用いて
jdbc:fabricspark://api.fabric.microsoft.com;FabricWorkspaceID=<workspace-id>;FabricLakehouseID=<lakehouse-id>;spark.sql.adaptive.enabled=true;spark.sql.shuffle.partitions=200;AuthFlow=2
Authentication
Microsoft JDBC Driver for Microsoft Fabric Data Engineering では、Microsoft Entra ID (旧称 Azure Active Directory) を使用した複数の認証方法がサポートされています。 認証は、接続文字列の AuthFlow パラメーターを使用して構成されます。
認証フロー
| AuthFlow | 認証方法 | 使用事例 |
|---|---|---|
| 0 | Azure CLI 資格情報 | Azure CLI を使用した開発 |
| 1 | クライアント資格情報 (サービス プリンシパル) | 自動/サービス間認証 |
| 2 | 対話型ブラウザー | 対話型ユーザー認証 (既定) |
| 3 | SPN | サービス プリンシパル認証 |
| 4 | 証明書ベース | 証明書ベースのサービス プリンシパル認証 |
| 5 | アクセストークン | 事前に取得されたアクセス トークン |
対話型ブラウザー認証
最適な用途: 開発と対話型アプリケーション
String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
"FabricWorkspaceID=<workspace-id>;" +
"FabricLakehouseID=<lakehouse-id>;" +
"AuthFlow=2;" + // Interactive browser authentication
"AuthTenantID=<tenant-id>;" + // Optional
"LogLevel=INFO";
Connection conn = DriverManager.getConnection(url);
パラメーター:
-
AuthFlow=2: 対話型ブラウザー認証を指定します -
AuthTenantID(省略可能): Azure テナント ID -
AuthClientID(省略可能): アプリケーション (クライアント) ID
動作:
- ユーザー認証用のブラウザー ウィンドウを開きます
- 資格情報は、有効期限が切れるまで後続の接続用にキャッシュされます
- シングル ユーザー アプリケーションに適しています
クライアント資格情報認証
最適な用途は: 自動化されたサービスとバックグラウンド ジョブ
String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
"FabricWorkspaceID=<workspace-id>;" +
"FabricLakehouseID=<lakehouse-id>;" +
"AuthFlow=1;" + // Client credentials authentication
"AuthClientID=<client-id>;" +
"AuthClientSecret=<client-secret>;" +
"AuthTenantID=<tenant-id>;" +
"LogLevel=INFO";
Connection conn = DriverManager.getConnection(url);
必須パラメーター:
-
AuthFlow=1: クライアント資格情報認証を指定します -
AuthClientID: Microsoft Entra ID からのアプリケーション (クライアント) ID -
AuthClientSecret: Microsoft Entra ID からのクライアント シークレット -
AuthTenantID: Azure テナント ID
ベスト プラクティス:
- シークレットを安全に格納する (Azure Key Vault、環境変数)
- 可能な場合はマネージド ID を使用する
- シークレットを定期的にローテーションする
証明書ベースの認証
最適な対象: 証明書ベースの認証を必要とするエンタープライズ アプリケーション
String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
"FabricWorkspaceID=<workspace-id>;" +
"FabricLakehouseID=<lakehouse-id>;" +
"AuthFlow=4;" + // Certificate-based authentication
"AuthClientID=<client-id>;" +
"AuthCertificatePath=/path/to/certificate.pfx;" +
"AuthCertificatePassword=<certificate-password>;" +
"AuthTenantID=<tenant-id>;" +
"LogLevel=INFO";
Connection conn = DriverManager.getConnection(url);
必須パラメーター:
-
AuthFlow=4: 証明書ベースの認証を指定します -
AuthClientID: アプリケーション (クライアント) ID -
AuthCertificatePath: PFX/PKCS12 証明書ファイルへのパス -
AuthCertificatePassword: 証明書パスワード -
AuthTenantID: Azure テナント ID
サービス プリンシパル認証
最適な用途: ヘッドレス環境とリモート セッション
String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
"FabricWorkspaceID=<workspace-id>;" +
"FabricLakehouseID=<lakehouse-id>;" +
"AuthFlow=3;" + // Device code authentication
"AuthClientID=<client-id>;" +
"AuthTenantID=<tenant-id>;" +
"LogLevel=INFO";
Connection conn = DriverManager.getConnection(url);
動作:
- 本体にデバイス コードと URL を表示します
- ユーザーが URL にアクセスし、コードを入力する
- ユーザー検証後に認証が完了する
アクセス トークン認証
最適な用途: カスタム認証シナリオ
// Acquire token through custom mechanism
String accessToken = acquireTokenFromCustomSource();
String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
"FabricWorkspaceID=<workspace-id>;" +
"FabricLakehouseID=<lakehouse-id>;" +
"AuthFlow=5;" + // Access token authentication
"AuthAccessToken=" + accessToken + ";" +
"LogLevel=INFO";
Connection conn = DriverManager.getConnection(url);
認証キャッシュ
ドライバーは、パフォーマンスを向上させるために認証トークンを自動的にキャッシュします。
// Enable/disable caching (enabled by default)
String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
"FabricWorkspaceID=<workspace-id>;" +
"FabricLakehouseID=<lakehouse-id>;" +
"AuthFlow=2;" +
"AuthEnableCaching=true;" + // Enable token caching
"AuthCacheTTLMS=3600000"; // Cache TTL: 1 hour
Connection conn = DriverManager.getConnection(url);
構成パラメーター
必須のパラメーター
これらのパラメーターは、すべての接続文字列に存在する必要があります。
| パラメーター | タイプ | Description | Example |
|---|---|---|---|
FabricWorkspaceID |
UUID(ユニバーサリー・ユニーク・アイデンティファイア) | Microsoft Fabric ワークスペース識別子 | <workspace-id> |
FabricLakehouseID |
UUID(ユニバーサリー・ユニーク・アイデンティファイア) | Microsoft Fabric Lakehouse 識別子 | <lakehouse-id> |
AuthFlow |
整数 | 認証フローの種類 (0 から 5) | 2 |
省略可能なパラメーター
API バージョンの構成
| パラメーター | タイプ | 既定値 | Description |
|---|---|---|---|
FabricVersion |
糸 | v1 |
Microsoft Fabric API のバージョン |
LivyApiVersion |
糸 | 2023-12-01 |
Livy API バージョン |
環境の構成
| パラメーター | タイプ | 既定値 | Description |
|---|---|---|---|
FabricEnvironmentID |
UUID(ユニバーサリー・ユニーク・アイデンティファイア) | None | Spark セッションの環境項目を参照するためのファブリック環境識別子 |
Spark 構成
セッション リソースの構成
最適なパフォーマンスを得られるように Spark セッション リソースを構成します。
| パラメーター | タイプ | 既定値 | Description | Example |
|---|---|---|---|---|
DriverCores |
整数 | Spark の既定値 | ドライバーの CPU コア数 | 4 |
DriverMemory |
糸 | Spark の既定値 | ドライバーのメモリ割り当て | 4g |
ExecutorCores |
整数 | Spark の既定値 | Executor あたりの CPU コア数 | 4 |
ExecutorMemory |
糸 | Spark の既定値 | Executor あたりのメモリ割り当て | 8g |
NumExecutors |
整数 | Spark の既定値 | Executor の数 | 2 |
Example:
DriverCores=4;DriverMemory=4g;ExecutorCores=4;ExecutorMemory=8g;NumExecutors=2
カスタム Spark セッションのプロパティ
プレフィックス spark. を持つパラメーターは、Spark セッションに自動的に適用されます。
Spark 構成の例:
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.shuffle.partitions=200
spark.sql.autoBroadcastJoinThreshold=10485760
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=1
spark.dynamicAllocation.maxExecutors=10
spark.executor.memoryOverhead=1g
ネイティブ実行エンジン (NEE):
spark.nee.enabled=true
完全な例:
jdbc:fabricspark://api.fabric.microsoft.com;FabricWorkspaceID=<guid>;FabricLakehouseID=<guid>;DriverMemory=4g;ExecutorMemory=8g;NumExecutors=2;spark.sql.adaptive.enabled=true;spark.nee.enabled=true;AuthFlow=2
HTTP 接続プールの構成
最適なネットワーク パフォーマンスを得られるように HTTP 接続プールを構成します。
| パラメーター | タイプ | 既定値 | Description |
|---|---|---|---|
HttpMaxTotalConnections |
整数 | 100 | HTTP 接続の最大数 |
HttpMaxConnectionsPerRoute |
整数 | 50 | ルートあたりの最大接続数 |
HttpConnectionTimeoutInSeconds |
整数 | 30 | 接続タイムアウト |
HttpSocketTimeoutInSeconds |
整数 | 60 | ソケットの読み取りタイムアウト |
HttpReadTimeoutInSeconds |
整数 | 60 | HTTP 読み取りタイムアウト |
HttpConnectionRequestTimeoutSeconds |
整数 | 30 | プールからの接続要求のタイムアウト |
HttpEnableKeepAlive |
ブール値 | true | HTTP キープアライブを有効にする |
HttpKeepAliveTimeoutSeconds |
整数 | 60 | キープアライブ タイムアウト |
HttpFollowRedirects |
ブール値 | true | HTTP リダイレクトに従う |
HttpUseAsyncIO |
ブール値 | false | 非同期 HTTP I/O を使用する |
Example:
HttpMaxTotalConnections=200;HttpMaxConnectionsPerRoute=100;HttpConnectionTimeoutInSeconds=60
プロキシの構成
エンタープライズ環境の HTTP および SOCKS プロキシ設定を構成します。
| パラメーター | タイプ | 既定値 | Description |
|---|---|---|---|
UseProxy |
ブール値 | false | プロキシを有効にする |
ProxyTransport |
糸 | http |
プロキシ トランスポートの種類 (http/tcp) |
ProxyHost |
糸 | None | プロキシホスト名 |
ProxyPort |
整数 | None | プロキシ ポート |
ProxyAuthEnabled |
ブール値 | false | プロキシ認証を有効にする |
ProxyUsername |
糸 | None | プロキシ認証のユーザー名 |
ProxyPassword |
糸 | None | プロキシ認証パスワード |
ProxyAuthScheme |
糸 | basic |
認証スキーム (basic/digest/ntlm) |
ProxySocksVersion |
整数 | 5 | SOCKS バージョン (4/5) |
HTTP プロキシの例:
UseProxy=true;ProxyTransport=http;ProxyHost=proxy.company.com;ProxyPort=8080;ProxyAuthEnabled=true;ProxyUsername=user;ProxyPassword=pass
SOCKS プロキシの例:
UseProxy=true;ProxyTransport=tcp;ProxyHost=socks.company.com;ProxyPort=1080;ProxySocksVersion=5
ログ記録の構成
| パラメーター | タイプ | 既定値 | Description |
|---|---|---|---|
LogLevel |
糸 | INFO |
ログ レベル: TRACE、DEBUG、INFO、WARN、ERROR |
Example:
LogLevel=DEBUG
既定のログの場所:
${user.home}/.microsoft/livy-jdbc-driver/driver.log
カスタム ログ構成: クラスパスでカスタム log4j2.xml または logback.xml ファイルを使用します。
使用例
基本接続
import java.sql.*;
public class BasicConnectionExample {
public static void main(String[] args) {
String url = "jdbc:fabricspark://api.fabric.microsoft.com;" +
"FabricWorkspaceID=<workspace-id>;" +
"FabricLakehouseID=<lakehouse-id>;" +
"AuthFlow=2";
try (Connection conn = DriverManager.getConnection(url)) {
System.out.println("Connected successfully!");
System.out.println("Database: " + conn.getMetaData().getDatabaseProductName());
System.out.println("Driver: " + conn.getMetaData().getDriverName());
System.out.println("Driver Version: " + conn.getMetaData().getDriverVersion());
} catch (SQLException e) {
System.err.println("Connection failed: " + e.getMessage());
e.printStackTrace();
}
}
}
クエリの実行
単純なクエリ
public void executeSimpleQuery(Connection conn) throws SQLException {
String sql = "SELECT current_timestamp() as now";
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
if (rs.next()) {
Timestamp now = rs.getTimestamp("now");
System.out.println("Current timestamp: " + now);
}
}
}
フィルターを使用したクエリ
public void executeQueryWithFilter(Connection conn) throws SQLException {
String sql = "SELECT * FROM sales WHERE amount > 1000 ORDER BY amount DESC";
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
while (rs.next()) {
int id = rs.getInt("id");
double amount = rs.getDouble("amount");
Date date = rs.getDate("sale_date");
System.out.printf("ID: %d, Amount: %.2f, Date: %s%n",
id, amount, date);
}
}
}
制限付きクエリ
public void executeQueryWithLimit(Connection conn) throws SQLException {
String sql = "SELECT * FROM customers LIMIT 10";
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
// Print column names
for (int i = 1; i <= columnCount; i++) {
System.out.print(metaData.getColumnName(i) + "\t");
}
System.out.println();
// Print rows
while (rs.next()) {
for (int i = 1; i <= columnCount; i++) {
System.out.print(rs.getString(i) + "\t");
}
System.out.println();
}
}
}
結果セットの操作
結果セットの移動
public void navigateResultSet(Connection conn) throws SQLException {
String sql = "SELECT id, name, amount FROM orders";
try (Statement stmt = conn.createStatement(
ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
ResultSet rs = stmt.executeQuery(sql)) {
// Move to first row
if (rs.first()) {
System.out.println("First row: " + rs.getString("name"));
}
// Move to last row
if (rs.last()) {
System.out.println("Last row: " + rs.getString("name"));
System.out.println("Total rows: " + rs.getRow());
}
// Move to specific row
if (rs.absolute(5)) {
System.out.println("Row 5: " + rs.getString("name"));
}
}
}
大規模な結果セットの処理
public void processLargeResultSet(Connection conn) throws SQLException {
String sql = "SELECT * FROM large_table";
try (Statement stmt = conn.createStatement()) {
// Set fetch size for efficient memory usage
stmt.setFetchSize(1000);
try (ResultSet rs = stmt.executeQuery(sql)) {
int rowCount = 0;
while (rs.next()) {
// Process row
processRow(rs);
rowCount++;
if (rowCount % 10000 == 0) {
System.out.println("Processed " + rowCount + " rows");
}
}
System.out.println("Total rows processed: " + rowCount);
}
}
}
private void processRow(ResultSet rs) throws SQLException {
// Process individual row
}
準備されたステートメントの使用
public void usePreparedStatement(Connection conn) throws SQLException {
String sql = "SELECT * FROM products WHERE category = ? AND price > ?";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
// Set parameters
pstmt.setString(1, "Electronics");
pstmt.setDouble(2, 100.0);
try (ResultSet rs = pstmt.executeQuery()) {
while (rs.next()) {
String name = rs.getString("name");
double price = rs.getDouble("price");
System.out.printf("Product: %s, Price: $%.2f%n", name, price);
}
}
}
}
バッチ操作
public void executeBatchInsert(Connection conn) throws SQLException {
String sql = "INSERT INTO logs (timestamp, level, message) VALUES (?, ?, ?)";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
conn.setAutoCommit(false); // Disable auto-commit for batch
// Add multiple statements to batch
for (int i = 0; i < 1000; i++) {
pstmt.setTimestamp(1, new Timestamp(System.currentTimeMillis()));
pstmt.setString(2, "INFO");
pstmt.setString(3, "Log message " + i);
pstmt.addBatch();
// Execute batch every 100 statements
if (i % 100 == 0) {
pstmt.executeBatch();
pstmt.clearBatch();
}
}
// Execute remaining statements
pstmt.executeBatch();
conn.commit();
System.out.println("Batch insert completed successfully");
} catch (SQLException e) {
conn.rollback();
throw e;
} finally {
conn.setAutoCommit(true);
}
}
データ型マッピング
ドライバーは、Spark SQL データ型を JDBC SQL 型と Java 型にマップします。
| Spark SQL の種類 | JDBC SQL 型 | Java 型 | 注記 |
|---|---|---|---|
BOOLEAN |
BOOLEAN |
Boolean |
|
BYTE |
TINYINT |
Byte |
|
SHORT |
SMALLINT |
Short |
|
INT |
INTEGER |
Integer |
|
LONG |
BIGINT |
Long |
|
FLOAT |
FLOAT |
Float |
|
DOUBLE |
DOUBLE |
Double |
|
DECIMAL |
DECIMAL |
BigDecimal |
精度とスケールの保持 |
STRING |
VARCHAR |
String |
|
VARCHAR(n) |
VARCHAR |
String |
|
CHAR(n) |
CHAR |
String |
|
BINARY |
BINARY |
byte[] |
|
DATE |
DATE |
java.sql.Date |
|
TIMESTAMP |
TIMESTAMP |
java.sql.Timestamp |
|
ARRAY |
VARCHAR |
String |
JSON としてシリアル化 |
MAP |
VARCHAR |
String |
JSON としてシリアル化 |
STRUCT |
VARCHAR |
String |
JSON としてシリアル化 |