共用方式為


使用筆記本來存取資料庫實例

這很重要

這項功能在下列區域中處於公開預覽狀態:westus、、westus2eastuseastus2centralussouthcentralusnortheuropewesteuropeaustraliaeastbrazilsouthcanadacentralcentralindia、、 southeastasiauksouth

此頁面包含程式代碼範例,示範如何透過 Azure Databricks Notebook 存取 Lakebase 資料庫實例,並使用 Python 和 Scala 執行查詢。

這些範例涵蓋不同的連線策略,以符合不同的使用案例:

  • 單一連線:用於開啟、使用和關閉單一資料庫連線的簡單腳本。
  • 連線集區:用於高並發工作負載,維護一個可重複使用連線的集區。
  • 輪替 M2M OAuth 令牌:使用短期自動重新整理的 OAuth 令牌進行驗證。

下列範例會以程序設計方式產生安全認證。 避免直接將認證放入筆記本中。 Databricks 建議使用下列其中一個安全方法:

開始之前

在存取資料庫實例之前,請確定您符合下列需求:

  • 您有相應的 Postgres 角色可登入資料庫實例。 請參閱 管理 Postgres 角色
  • Postgres 角色已獲授與存取資料庫、架構或數據表的必要許可權。
  • 您可以向資料庫實例進行驗證。 如果您必須手動取得資料庫執行個體的 OAuth 權杖,請參閱 向資料庫執行個體進行驗證

警告

如果您使用私人連結,則必須使用單一使用者叢集。

Python

Azure Databricks Python SDK 可用來取得個別資料庫實例的 OAuth 令牌。

使用下列 Python 連結庫從 Azure Databricks Notebook 連線到您的資料庫實例:

  • psycopg2
  • psycopg3
  • SQLAlchemy

先決條件

在執行以下程式碼範例前,先將 Databricks SDK for Python 升級到 0.61.0 或更高版本,然後重新啟動 Python。

%pip install databricks-sdk>=0.61.0
%restart_python

psycopg2

程式代碼範例示範單一連線和使用連接集區。 如需如何以程序設計方式取得資料庫實例和認證的詳細資訊,請參閱 如何使用 Python SDK 取得 OAuth 令牌

單一連線

import psycopg2

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Connection parameters
conn = psycopg2.connect(
    host = instance.read_write_dns,
    dbname = "databricks_postgres",
    user = "<YOUR USER>",
    password = cred.token,
    sslmode = "require"
)

# Execute query
with conn.cursor() as cur:
    cur.execute("SELECT version()")
    version = cur.fetchone()[0]
    print(version)
conn.close()

連線池

import psycopg2
from psycopg2 import sql, pool
from pyspark.sql.functions import udf

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Create a connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
    minconn = 1,  # Minimum number of connections in the pool
    maxconn = 10,  # Maximum number of connections in the pool
    user = "<YOUR USER>",
    password = cred.token,
    host = instance.read_write_dns,
    port = '5432',
    database = 'databricks_postgres'
)
if connection_pool:
    print("Connection pool created successfully")


def executeWithPgConnection(execFn):
    connection = None
    try:
        # Get a connection from the pool
        connection = connection_pool.getconn()
        if connection:
            print("Successfully received a connection from the pool")
            execFn(connection)
    finally:
        # Release the connection back to the pool
        if connection:
            connection_pool.putconn(connection)
            print("Connection returned to the pool")


def printVersion(connection):
    cursor = connection.cursor()
    cursor.execute("SELECT version()")
    version = cursor.fetchone()
    print(f"Connected to PostgreSQL database. Version: {version}")

executeWithPgConnection(printVersion)

psycopg3

程式代碼範例示範使用循環 M2M OAuth 的連線集區。 它會使用 generate_database_credential()。 如需如何以程序設計方式取得資料庫實例和認證的詳細資訊,請參閱 如何使用 Python SDK 取得 OAuth 令牌

%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid

import psycopg
import string
from psycopg_pool import ConnectionPool

w = WorkspaceClient()

class CustomConnection(psycopg.Connection):
    global w
    def __init__(self, *args, **kwargs):
        # Call the parent class constructor
        super().__init__(*args, **kwargs)

    @classmethod
    def connect(cls, conninfo='', **kwargs):
        # Append the new password to kwargs
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        kwargs['password'] = cred.token

        # Call the superclass's connect method with updated kwargs
        return super().connect(conninfo, **kwargs)


username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

pool = ConnectionPool(
    conninfo=f"dbname={database} user={username} host={host}",
    connection_class=CustomConnection,
    min_size=1,
    max_size=10,
    open=True
)

with pool.connection() as conn:
    with conn.cursor() as cursor:
        cursor.execute("SELECT version()")
        for record in cursor:
            print(record)

SQLAlchemy

程式代碼範例示範單一連線,以及搭配旋轉 M2M OAuth 令牌使用聯機集區。 如需如何以程序設計方式取得資料庫實例和認證的詳細資訊,請參閱 如何使用 Python SDK 取得 OAuth 令牌

單一連線

%pip install sqlalchemy==1.4 psycopg[binary]
from sqlalchemy import create_engine, text

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token

connection_pool = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode=require")

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

連線池與輪換 M2M OAuth

%pip install sqlalchemy==1.4 psycopg[binary]
from databricks.sdk import WorkspaceClient
import uuid
import time

from sqlalchemy import create_engine, text, event

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)

username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg2://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()

@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
    global postgres_password, last_password_refresh, host

    if postgres_password is None or time.time() - last_password_refresh > 900:
        print("Refreshing PostgreSQL OAuth token")
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        postgres_password = cred.token
        last_password_refresh = time.time()

    cparams["password"] = postgres_password

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Scala

程式代碼範例示範如何以程序設計方式取得資料庫實例和認證,以及如何使用單一連接或連接集區連接到資料庫實例。

步驟 1:使用 Azure Databricks Java SDK 取得 OAuth 令牌

如需如何以程序設計方式取得資料庫實例和認證的詳細資訊,請參閱 如何使用 Java SDK 取得 OAuth 令牌

步驟 2:連線到資料庫實例

單一連線

import java.sql.{Connection, DriverManager, ResultSet, Statement}

Class.forName("org.postgresql.Driver")

val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()

val url = f"jdbc:postgresql://${host}:${port}/${database}"

val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")

val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")

if (resultSet.next()) {
    val version = resultSet.getString(1)
    println(s"PostgreSQL version: $version")
}

連線池

import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection

// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)

// Create a data source
val dataSource = new HikariDataSource(config)

// Function to get a connection and execute a query
def runQuery(): Unit = {
  var connection: Connection = null
  try {
    // Get a connection from the pool
    connection = dataSource.getConnection()

    // Create a statement
    val statement = connection.createStatement()

    // Execute a query
    val resultSet = statement.executeQuery("SELECT version() AS v;")

    // Process the result set
    while (resultSet.next()) {
      val v = resultSet.getString("v")
      println(s"*******Connected to PostgreSQL database. Version: $v")
    }
  } catch {
    case e: Exception => e.printStackTrace()
  } finally {
    // Close the connection which returns it to the pool
    if (connection != null) connection.close()
  }
}

// Run the query
runQuery()

// Close the data source
dataSource.close()