Partilhar via


Utilizar um computador portátil para aceder a uma instância de base de dados

Importante

Esta funcionalidade está na Pré-visualização Pública nas seguintes regiões: westus, westus2, eastus, eastus2, centralussouthcentralusnortheuropewesteuropeaustraliaeastbrazilsouthcanadacentralcentralindiasoutheastasiauksouth.

Esta página contém exemplos de código que mostram como acessar sua instância de banco de dados Lakebase por meio de blocos de anotações do Azure Databricks e executar consultas usando Python e Scala.

Os exemplos abrangem diferentes estratégias de conexão para atender a diferentes casos de uso:

  • Conexão única: usada para scripts simples em que uma única conexão de banco de dados é aberta, usada e fechada.
  • Pool de conexões: usado para cargas de trabalho de alta simultaneidade, onde um pool de conexões reutilizáveis é mantido.
  • Token OAuth M2M rotativo: utiliza tokens OAuth de curta duração e renovados automaticamente para autenticação.

Os exemplos a seguir geram programaticamente credenciais seguras. Evite colocar credenciais diretamente em um bloco de anotações. O Databricks recomenda o uso de um dos seguintes métodos seguros:

  • Armazene senhas do Postgres em segredos do Azure Databricks.
  • Gere tokens OAuth usando M2M OAuth.

Antes de começar

Certifique-se de atender aos seguintes requisitos antes de acessar sua instância de banco de dados:

  • Você tem uma função Postgres correspondente para efetuar login na instância do banco de dados. Consulte Gerenciar funções do Postgres.
  • Sua função Postgres recebe as permissões necessárias para acessar o banco de dados, esquema ou tabela.
  • Você pode autenticar-se à instância do banco de dados. Se você precisar obter manualmente um token OAuth para sua instância de banco de dados, consulte Autenticar em uma instância de banco de dados.

Advertência

Se você estiver usando link privado, precisará usar um único cluster de usuário.

Python

O SDK Python do Azure Databricks pode ser usado para obter um token OAuth para uma respetiva instância de banco de dados.

Conecte-se à sua instância de banco de dados a partir de um bloco de anotações do Azure Databricks usando as seguintes bibliotecas Python:

  • psycopg2
  • psycopg3
  • SQLAlchemy

Pré-requisitos

Antes de executar os seguintes exemplos de código, atualize o SDK Databricks para Python para a versão 0.61.0 ou superior e depois reinicie o Python.

%pip install databricks-sdk>=0.61.0
%restart_python

psycopg2

Os exemplos de código demonstram uma única conexão e o uso de um pool de conexões. Para obter mais informações sobre como obter a instância e as credenciais do banco de dados programaticamente, consulte como obter um token OAuth usando o Python SDK.

Conexão única

import psycopg2

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Connection parameters
conn = psycopg2.connect(
    host = instance.read_write_dns,
    dbname = "databricks_postgres",
    user = "<YOUR USER>",
    password = cred.token,
    sslmode = "require"
)

# Execute query
with conn.cursor() as cur:
    cur.execute("SELECT version()")
    version = cur.fetchone()[0]
    print(version)
conn.close()

Pool de conexões

import psycopg2
from psycopg2 import sql, pool
from pyspark.sql.functions import udf

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Create a connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
    minconn = 1,  # Minimum number of connections in the pool
    maxconn = 10,  # Maximum number of connections in the pool
    user = "<YOUR USER>",
    password = cred.token,
    host = instance.read_write_dns,
    port = '5432',
    database = 'databricks_postgres'
)
if connection_pool:
    print("Connection pool created successfully")


def executeWithPgConnection(execFn):
    connection = None
    try:
        # Get a connection from the pool
        connection = connection_pool.getconn()
        if connection:
            print("Successfully received a connection from the pool")
            execFn(connection)
    finally:
        # Release the connection back to the pool
        if connection:
            connection_pool.putconn(connection)
            print("Connection returned to the pool")


def printVersion(connection):
    cursor = connection.cursor()
    cursor.execute("SELECT version()")
    version = cursor.fetchone()
    print(f"Connected to PostgreSQL database. Version: {version}")

executeWithPgConnection(printVersion)

psycopg3

O exemplo de código demonstra o uso de um pool de conexões com um M2M OAuth rotativo. Ele usa generate_database_credential(). Para obter mais informações sobre como obter a instância e as credenciais do banco de dados programaticamente, consulte como obter um token OAuth usando o Python SDK.

%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid

import psycopg
import string
from psycopg_pool import ConnectionPool

w = WorkspaceClient()

class CustomConnection(psycopg.Connection):
    global w
    def __init__(self, *args, **kwargs):
        # Call the parent class constructor
        super().__init__(*args, **kwargs)

    @classmethod
    def connect(cls, conninfo='', **kwargs):
        # Append the new password to kwargs
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        kwargs['password'] = cred.token

        # Call the superclass's connect method with updated kwargs
        return super().connect(conninfo, **kwargs)


username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

pool = ConnectionPool(
    conninfo=f"dbname={database} user={username} host={host}",
    connection_class=CustomConnection,
    min_size=1,
    max_size=10,
    open=True
)

with pool.connection() as conn:
    with conn.cursor() as cursor:
        cursor.execute("SELECT version()")
        for record in cursor:
            print(record)

SQLAlchemy

Os exemplos de código demonstram uma única conexão e o uso de um pool de conexões com um token OAuth M2M rotativo. Para obter mais informações sobre como obter a instância e as credenciais do banco de dados programaticamente, consulte como obter um token OAuth usando o Python SDK.

Conexão única

%pip install sqlalchemy==1.4 psycopg[binary]
from sqlalchemy import create_engine, text

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token

connection_pool = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode=require")

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Pool de conexões e rotação de OAuth M2M

%pip install sqlalchemy==1.4 psycopg[binary]
from databricks.sdk import WorkspaceClient
import uuid
import time

from sqlalchemy import create_engine, text, event

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)

username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg2://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()

@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
    global postgres_password, last_password_refresh, host

    if postgres_password is None or time.time() - last_password_refresh > 900:
        print("Refreshing PostgreSQL OAuth token")
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        postgres_password = cred.token
        last_password_refresh = time.time()

    cparams["password"] = postgres_password

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Scala

Os exemplos de código mostram como obter programaticamente a instância e as credenciais do banco de dados e como se conectar a uma instância de banco de dados usando uma única conexão ou um pool de conexões.

Etapa 1: Usar o SDK Java do Azure Databricks para obter um token OAuth

Para obter detalhes sobre como obter a instância e as credenciais do banco de dados programaticamente, consulte como obter um token OAuth usando o Java SDK.

Etapa 2: Conectar-se a uma instância de banco de dados

Conexão única

import java.sql.{Connection, DriverManager, ResultSet, Statement}

Class.forName("org.postgresql.Driver")

val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()

val url = f"jdbc:postgresql://${host}:${port}/${database}"

val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")

val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")

if (resultSet.next()) {
    val version = resultSet.getString(1)
    println(s"PostgreSQL version: $version")
}

Pool de conexões

import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection

// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)

// Create a data source
val dataSource = new HikariDataSource(config)

// Function to get a connection and execute a query
def runQuery(): Unit = {
  var connection: Connection = null
  try {
    // Get a connection from the pool
    connection = dataSource.getConnection()

    // Create a statement
    val statement = connection.createStatement()

    // Execute a query
    val resultSet = statement.executeQuery("SELECT version() AS v;")

    // Process the result set
    while (resultSet.next()) {
      val v = resultSet.getString("v")
      println(s"*******Connected to PostgreSQL database. Version: $v")
    }
  } catch {
    case e: Exception => e.printStackTrace()
  } finally {
    // Close the connection which returns it to the pool
    if (connection != null) connection.close()
  }
}

// Run the query
runQuery()

// Close the data source
dataSource.close()