Partager via


Tester le pilote ODBC Databricks (Simba)

Cette page explique comment tester le code qui utilise le pilote ODBC Databricks.

Utilisez n’importe quelle infrastructure de test pour les langages compatibles ODBC. Les exemples suivants utilisent pyodbc, pytest et unittest.mock pour tester les connexions de pilotes ODBC. Ce code est basé sur l’exemple dans Connecter Python et pyodbc à Azure Databricks.

Fonctions d’assistance

Le helpers.py fichier contient des fonctions utilitaires pour l’utilisation des connexions ODBC :

  • connect_to_dsn: ouvre une connexion à une ressource de calcul Azure Databricks.
  • get_cursor_from_connection: obtient un curseur pour l’exécution de requêtes.
  • select_from_nyctaxi_trips: interroge le nombre spécifié de lignes à partir de samples.nyctaxi.trips.
  • print_rows : imprime dans la console le contenu de l'ensemble de résultats.
# helpers.py

from pyodbc import connect, Connection, Cursor

def connect_to_dsn(
  connstring: str,
  autocommit: bool
) -> Connection:

  connection = connect(
    connstring,
    autocommit = autocommit
  )

  return connection

def get_cursor_from_connection(
  connection: Connection
) -> Cursor:

  cursor = connection.cursor()
  return cursor

def select_from_nyctaxi_trips(
  cursor: Cursor,
  num_rows: int
) -> Cursor:

  select_cursor = cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
  return select_cursor

def print_rows(cursor: Cursor):
  for row in cursor.fetchall():
    print(row)

Classe Main

Le main.py fichier appelle les fonctions d’assistance pour se connecter et interroger des données :

# main.py

from helpers import *

connection = connect_to_dsn(
  connstring = "DSN=<your-dsn-name>",
  autocommit = True
)

cursor = get_cursor_from_connection(
  connection = connection)

select_cursor = select_from_nyctaxi_trips(
  cursor = cursor,
  num_rows = 2
)

print_rows(
  cursor = select_cursor
)

Tests unitaires avec simulation

Le test_helpers.py fichier utilise pytest et unittest.mock pour tester la select_from_nyctaxi_trips fonction. La simulation des connexions de base de données, sans utiliser de ressources de calcul réelles, permet d'exécuter les tests en quelques secondes sans affecter vos espaces de travail Azure Databricks.

# test_helpers.py

from pyodbc import SQL_DBMS_NAME
from helpers import *
from unittest.mock import patch
import datetime

@patch("helpers.connect_to_dsn")
def test_connect_to_dsn(mock_connection):
  mock_connection.return_value.getinfo.return_value = "Spark SQL"

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  assert mock_connection.getinfo(SQL_DBMS_NAME) == "Spark SQL"

@patch('helpers.get_cursor_from_connection')
def test_get_cursor_from_connection(mock_connection):
  mock_cursor = mock_connection.return_value.cursor
  mock_cursor.return_value.rowcount = -1

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  mock_cursor = get_cursor_from_connection(
    connection = mock_connection
  )

  assert mock_cursor.rowcount == -1

@patch('helpers.select_from_nyctaxi_trips')
def test_select_from_nyctaxi_trips(mock_connection):
  mock_cursor = mock_connection.return_value.cursor
  mock_get_cursor = mock_cursor.return_value.execute
  mock_select_cursor = mock_get_cursor.return_value.arraysize = 1

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  mock_get_cursor = get_cursor_from_connection(
    connection = mock_connection
  )

  mock_select_cursor = select_from_nyctaxi_trips(
    cursor = mock_get_cursor,
    num_rows = 2
  )

  assert mock_select_cursor.arraysize == 1

@patch('helpers.print_rows')
def test_print_rows(mock_connection, capsys):
  mock_cursor = mock_connection.return_value.cursor
  mock_get_cursor = mock_cursor.return_value.execute
  mock_select_cursor = mock_get_cursor.return_value.fetchall.return_value = [
    (datetime.datetime(2016, 2, 14, 16, 52, 13), datetime.datetime(2016, 2, 14, 17, 16, 4), 4.94, 19.0, 10282, 10171),
    (datetime.datetime(2016, 2, 4, 18, 44, 19), datetime.datetime(2016, 2, 4, 18, 46), 0.28, 3.5, 10110, 10110)
  ]

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  mock_get_cursor = get_cursor_from_connection(
    connection = mock_connection
  )

  mock_select_cursor = select_from_nyctaxi_trips(
    cursor = mock_get_cursor,
    num_rows = 2
  )

  print_rows(
    cursor = mock_select_cursor
  )

  captured = capsys.readouterr()
  assert captured.out == "(datetime.datetime(2016, 2, 14, 16, 52, 13), datetime.datetime(2016, 2, 14, 17, 16, 4), 4.94, 19.0, 10282, 10171)\n" \
                         "(datetime.datetime(2016, 2, 4, 18, 44, 19), datetime.datetime(2016, 2, 4, 18, 46), 0.28, 3.5, 10110, 10110)\n"

Étant donné que select_from_nyctaxi_trips ne fait qu'exécuter une instruction SELECT, la simulation n’est pas strictement nécessaire ici. Toutefois, le mock est particulièrement utile lorsque vous testez les fonctions qui modifient des données (INSERT INTO, UPDATE, DELETE FROM), car vous pouvez exécuter des tests à plusieurs reprises sans affecter l’état de la table.