Databricks 提供了使用 Unity Catalog 卷的批量匯入功能,允許用戶將數據集從本地文件(如 CSV 文件)傳輸進出。 請參閱什麼是 Unity Catalog 磁碟區?。
本文說明如何使用 Databricks JDBC 驅動程式(版本 3 及以上)管理卷內檔案,以及讀寫卷內的串流。
啟用音量操作
若要啟用 Unity Catalog 磁碟區作業,請將連接屬性 VolumeOperationAllowedLocalPaths 設定為允許磁碟區作業的本機路徑的逗號分隔清單。 參見 其他功能屬性
必須啟用 Unity 目錄才能使用此功能。 您可以使用 Databricks UI 來取得類似的功能。 請參閱 將檔案上傳至 Unity 目錄磁碟區。
Unity 目錄擷取命令是 SQL 語句。 下列範例示範 PUT、GET和 REMOVE 作業。
上傳本機檔案
若要將本機檔案 /tmp/test.csv 上傳至 Unity Catalog 卷路徑,將其命名為 /Volumes/main/default/e2etests/file1.csv,請使用 PUT 作業:
PUT '/tmp/test.csv' INTO '/Volumes/main/default/e2etests/file1.csv' OVERWRITE
下載檔案
若要將檔案從 Unity 目錄磁碟區路徑 /Volumes/main/default/e2etests/file1.csv 下載到本機檔案 /tmp/test.csv,請使用 GET 作業:
GET '/Volumes/main/default/e2etests/file1.csv' TO '/tmp/test.csv'
刪除檔案
若要刪除具有 Unity 目錄磁碟區路徑 /Volumes/main/default/e2etests/file1.csv的檔案,請使用 REMOVE 作業:
REMOVE '/Volumes/main/default/e2etests/file1.csv'
使用流讀寫資料
JDBC 驅動程式藉由提供 介面 IDatabricksVolumeClient,支援串流以讀取和寫入 Unity 目錄磁碟區的數據。 請參考 IDatabricksVolumeClient 參考資料 以獲取可用的 API。
IDatabricksVolumeClient 可以使用 DatabricksVolumeClientFactory 工廠工具初始化。
import com.databricks.jdbc.api.impl.volume.DatabricksVolumeClientFactory;
import com.databricks.jdbc.api.volume.IDatabricksVolumeClient;
IDatabricksVolumeClient volumeClient = DatabricksVolumeClientFactory.getVolumeClient(Connection conn);
從串流將檔案寫入到一個磁碟區中
// Create the connection and enable the volume operations.
Connection connection = DriverManager.getConnection(url, prop);
connection.setClientInfo("enableVolumeOperations", "1");
File file = new File("/tmp/test.csv");
FileInputStream fileInputStream = new FileInputStream(file);
// Upload the file stream to UC Volume path
IDatabricksVolumeClient volumeClient = DatabricksVolumeClientFactory.getVolumeClient(Connection conn);
volumeClient.putObject(catalog, schema, volume, objectPath, inputStream, contentLength, true /* overwrite */);
以數據流的形式讀取磁碟區檔案
import org.apache.http.entity.InputStreamEntity;
// Create the connection and enable the volume operations.
Connection connection = DriverManager.getConnection(url, prop);
connection.setClientInfo("enableVolumeOperations", "1");
File file = new File("/tmp/test.csv");
FileInputStream fileInputStream = new FileInputStream(file);
// Upload the file stream to UC Volume path
IDatabricksVolumeClient volumeClient = DatabricksVolumeClientFactory.getVolumeClient(Connection conn);
InputStreamEntity volumeDataStream = volumeClient.getObject(catalog, schema, volume, objectPath);