Databricks には、Unity カタログ ボリュームを使用した一括インジェスト機能が用意されています。これにより、ユーザーは CSV ファイルなどのローカル ファイルとの間でデータセットを転送できます。 「Unity Catalog ボリュームとは」を参照してください。
この記事では、 Databricks JDBC Driver バージョン 3 以降を使用して、ボリューム内のファイルを管理する方法と、ボリュームとの間でストリームの読み取りと書き込みを行う方法について説明します。
ボリューム操作を有効にする
Unity カタログ ボリューム操作を有効にするには、接続プロパティ VolumeOperationAllowedLocalPaths を、ボリューム操作で許可されるローカル パスのコンマ区切りの一覧に設定します。
その他の機能のプロパティを参照してください
この機能を使用するには、Unity カタログを有効にする必要があります。 同様の機能は、Databricks UI を使用して使用できます。 「Unity Catalog ボリュームにファイルをアップロードする」を参照してください。
Unity カタログ インジェスト コマンドは SQL ステートメントです。 次の例は、PUT、 GET、および REMOVE 操作を示しています。
ローカル ファイルをアップロードする
/tmp/test.csvとしてローカル ファイル /Volumes/main/default/e2etests/file1.csvを Unity カタログ ボリューム パスにアップロードするには、PUT 操作を使用します。
PUT '/tmp/test.csv' INTO '/Volumes/main/default/e2etests/file1.csv' OVERWRITE
ファイルをダウンロードする
Unity カタログのボリューム パス /Volumes/main/default/e2etests/file1.csv からローカル ファイル /tmp/test.csvにファイルをダウンロードするには、 GET 操作を使用します。
GET '/Volumes/main/default/e2etests/file1.csv' TO '/tmp/test.csv'
ファイルを削除する
Unity カタログ のボリューム パスが /Volumes/main/default/e2etests/file1.csvファイルを削除するには、 REMOVE 操作を使用します。
REMOVE '/Volumes/main/default/e2etests/file1.csv'
ストリームを使用したデータの読み取り/書き込み
JDBC ドライバーは、インターフェイス IDatabricksVolumeClientを提供することで、Unity カタログ ボリュームとの間でデータの読み取りと書き込みを行うためのストリーミングをサポートします。 使用可能な API については、 IDatabricksVolumeClient リファレンスを参照 してください。
IDatabricksVolumeClientは、DatabricksVolumeClientFactory ファクトリ ユーティリティを使用して初期化できます。
import com.databricks.jdbc.api.impl.volume.DatabricksVolumeClientFactory;
import com.databricks.jdbc.api.volume.IDatabricksVolumeClient;
IDatabricksVolumeClient volumeClient = DatabricksVolumeClientFactory.getVolumeClient(Connection conn);
ストリームからボリュームにファイルを書き込む
// Create the connection and enable the volume operations.
Connection connection = DriverManager.getConnection(url, prop);
connection.setClientInfo("enableVolumeOperations", "1");
File file = new File("/tmp/test.csv");
FileInputStream fileInputStream = new FileInputStream(file);
// Upload the file stream to UC Volume path
IDatabricksVolumeClient volumeClient = DatabricksVolumeClientFactory.getVolumeClient(Connection conn);
volumeClient.putObject(catalog, schema, volume, objectPath, inputStream, contentLength, true /* overwrite */);
ボリューム ファイルをストリームとして読み取る
import org.apache.http.entity.InputStreamEntity;
// Create the connection and enable the volume operations.
Connection connection = DriverManager.getConnection(url, prop);
connection.setClientInfo("enableVolumeOperations", "1");
File file = new File("/tmp/test.csv");
FileInputStream fileInputStream = new FileInputStream(file);
// Upload the file stream to UC Volume path
IDatabricksVolumeClient volumeClient = DatabricksVolumeClientFactory.getVolumeClient(Connection conn);
InputStreamEntity volumeDataStream = volumeClient.getObject(catalog, schema, volume, objectPath);