本页包含的示例演示如何使用 Databricks JDBC 驱动程序版本 3 及更高版本运行查询。
注释
Databricks JDBC 驱动程序对于参数化语句,参数数量的限制为 256 个。
示例:运行查询
以下示例演示如何使用 Databricks JDBC 驱动程序 通过 Azure Databricks 计算资源运行 Databricks SQL 查询。
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
import java.util.Properties;
public class DatabricksJDBCExample {
public static void main(String[] args) {
Class.forName("com.databricks.client.jdbc.Driver");
// Set JDBC URL properties
String jdbcUrl = "jdbc:databricks://dbc-a1b2345c-d6e7.cloud.databricks.com:443";
Properties connectionProperties = new Properties();
connectionProperties.put("httpPath", "sql/protocolv1/o/123456780012345/0123-123450-z000pi22");
connectionProperties.put("ssl", "1");
// Set authentication properties (personal access token)
connectionProperties.put("AuthMech", "3");
connectionProperties.put("user", "token");
connectionProperties.put("password", "12345678901234667890abcdabcd");
// Set logging properties
connectionProperties.put("logPath", "logs/myapplication.log");
// Establish connection and execute query
try (Connection connection = DriverManager.getConnection(jdbcUrl, connectionProperties);
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("SELECT * FROM samples.nyctaxi.trips")) {
// Get metadata and column names
ResultSetMetaData metaData = resultSet.getMetaData();
String[] columns = new String[metaData.getColumnCount()];
for (int i = 0; i < columns.length; i++) {
columns[i] = metaData.getColumnName(i + 1);
}
// Process and print the result set
while (resultSet.next()) {
System.out.print("Row " + resultSet.getRow() + "=[");
for (int i = 0; i < columns.length; i++) {
if (i != 0) {
System.out.print(", ");
}
System.out.print(columns[i] + "='" + resultSet.getObject(i + 1) + "'");
}
System.out.println("]");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
示例:异步运行查询
以下示例演示如何使用 Databricks JDBC 驱动程序 运行和处理异步 Databricks SQL 查询。
有关 API 参考,请参阅 Databricks JDBC 驱动程序的 Java API 参考。
启动语句的异步执行:
Statement statement = conn.createStatement();
IDatabricksStatement dbStatement = statement.unwrap(IDatabricksStatement.class);
ResultSet result = dbStatement.executeAsync(sql);
IDatabricksResultSet asyncResult = result.unwrap(IDatabricksResultSet.class);
IExecutionStatus asyncStatus = asyncResult.getExecutionStatus();
long startTime = System.currentTimeMillis();
while ((asyncStatus.getExecutionState() == ExecutionState.RUNNING | asyncStatus.getExecutionState() == ExecutionState.PENDING) || (startTime + timeout < System.currentTimeMillis())) {
Thread.sleep(1000); // Sleep for 1000 ms
asyncResult = dbStatement.getExecutionResult().unwrap(IDatabricksResultSet.class);
asyncStatus = asyncResult.getExecutionStatus();
}
if (asyncStatus.getExecutionStatus() == ExecutionState.RUNNING | ExecutionState.PENDING) {
dbStatement.cancel();
}
if (asyncStatus.getExecutionStatus() == ExecutionState.SUCCEEDED) {
// process result set
}
if (asyncStatus.getExecutionStatus() == ExecutionState.FAILED) {
String sqlState = asyncStatus.getSqlState();
String errorMessage = asyncStatus.getErrorMessage();
// log error code and message
}
在单独的线程中处理一个语句:
Statement statement = conn1.createStatement();
IDatabricksStatement dbStatement = statement.unwrap(IDatabricksStatement.class);
ResultSet asyncResult = dbStatement.executeAsync(sql);
IDatabricksResultSet drs = asyncResult.unwrap(IDatabricksResultSet.class);
String statementId = drs.getStatementId();
ExecutionState state = drs.getExecutionStatus().getExecutionState();
while (state != ExecutionState.SUCCEEDED) {
Thread.sleep(sleepInterval);
asyncResult = dbStatement.getExecutionResult();
state = asyncResult.unwrap(IDatabricksResultSet.class).getExecutionStatus().getExecutionState();
}
// In another thread
IDatabricksConnection dbConn2 = conn2.unwrap(IDatabricksConnection.class);
IDatabricksStatement asyncStatementHandle = dbConn2.getStatement(statementId).unwrap(IDatabricksStatement.class);
IDatabricksResultSet asyncResultHandle = asyncStatementHandle.getExecutionResult().unwrap(IDatabricksResultSet.class);
// Cancel if needed
if (asyncResultHandle.getExecutionStatus().getExecutionState() == ExecutionState.PENDING | asyncResultHandle.getExecutionStatus().getExecutionState() == ExecutionState.RUNNING) {
asyncStatementHandle.cancel();
}
使用连接 ID 关闭连接:
// Get connection-Id from existing connection
String connectionId = conn.unwrap(IDatabricksConnection.class).getConnectionId();
// Close the connection from other thread using same JDBC Url and connection properties and connection-Id retrieved from above
com.databricks.client.jdbc.Driver.getInstance().closeConnection(jdbcUrl, properties, connectionId);
示例:查询地理空间数据
以下示例演示如何使用 Databricks JDBC 驱动程序查询和检索地理空间数据类型。 若要将地理空间数据检索为结构化 Java 对象,请启用 EnableComplexDatatypeSupport 和 EnableGeoSpatialSupport 连接属性。
有关地理空间数据类型和函数的详细信息,请参阅 ST 地理空间函数。
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
import java.util.Properties;
import com.databricks.jdbc.api.IGeometry;
import com.databricks.jdbc.api.IGeography;
public class GeospatialExample {
public static void main(String[] args) {
// Set JDBC URL properties
String jdbcUrl = "jdbc:databricks://dbc-a1b2345c-d6e7.cloud.databricks.com:443";
Properties connectionProperties = new Properties();
connectionProperties.put("httpPath", "sql/protocolv1/o/123456780012345/0123-123450-z000pi22");
connectionProperties.put("ssl", "1");
// Set authentication properties (personal access token)
connectionProperties.put("AuthMech", "3");
connectionProperties.put("user", "token");
connectionProperties.put("password", "12345678901234667890abcdabcd");
// Enable geospatial support
connectionProperties.put("EnableComplexDatatypeSupport", "1");
connectionProperties.put("EnableGeoSpatialSupport", "1");
// Establish connection and execute geospatial query
try (Connection connection = DriverManager.getConnection(jdbcUrl, connectionProperties);
Statement statement = connection.createStatement();
ResultSet rs = statement.executeQuery(
"SELECT ST_Point(1.0, 2.0) as point, " +
"ST_GeogFromText('POINT(-122.4194 37.7749)') as location")) {
while (rs.next()) {
// Retrieve GEOMETRY object
IGeometry point = (IGeometry) rs.getObject("point");
System.out.println("Point WKT: " + point.getWKT());
System.out.println("Point SRID: " + point.getSRID());
// Retrieve GEOGRAPHY object
IGeography location = (IGeography) rs.getObject("location");
System.out.println("Location WKT: " + location.getWKT());
System.out.println("Location SRID: " + location.getSRID());
// Metadata
ResultSetMetaData meta = rs.getMetaData();
System.out.println("Column 1 type: " + meta.getColumnTypeName(1)); // GEOMETRY
System.out.println("Column 1 class: " + meta.getColumnClassName(1)); // com.databricks.jdbc.api.IGeometry
System.out.println("Column 2 type: " + meta.getColumnTypeName(2)); // GEOGRAPHY
System.out.println("Column 2 class: " + meta.getColumnClassName(2)); // com.databricks.jdbc.api.IGeography
}
} catch (Exception e) {
e.printStackTrace();
}
}
}