Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cette page explique comment tester le code qui utilise le pilote ODBC Databricks.
Utilisez n’importe quelle infrastructure de test pour les langages compatibles ODBC. Les exemples suivants utilisent pyodbc, pytest et unittest.mock pour tester les connexions de pilotes ODBC. Ce code est basé sur l’exemple dans Connecter Python et pyodbc à Azure Databricks.
Fonctions d’assistance
Le helpers.py fichier contient des fonctions utilitaires pour l’utilisation des connexions ODBC :
-
connect_to_dsn: ouvre une connexion à une ressource de calcul Azure Databricks. -
get_cursor_from_connection: obtient un curseur pour l’exécution de requêtes. -
select_from_nyctaxi_trips: interroge le nombre spécifié de lignes à partir desamples.nyctaxi.trips. -
print_rows: imprime dans la console le contenu de l'ensemble de résultats.
# helpers.py
from pyodbc import connect, Connection, Cursor
def connect_to_dsn(
connstring: str,
autocommit: bool
) -> Connection:
connection = connect(
connstring,
autocommit = autocommit
)
return connection
def get_cursor_from_connection(
connection: Connection
) -> Cursor:
cursor = connection.cursor()
return cursor
def select_from_nyctaxi_trips(
cursor: Cursor,
num_rows: int
) -> Cursor:
select_cursor = cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
return select_cursor
def print_rows(cursor: Cursor):
for row in cursor.fetchall():
print(row)
Classe Main
Le main.py fichier appelle les fonctions d’assistance pour se connecter et interroger des données :
# main.py
from helpers import *
connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)
cursor = get_cursor_from_connection(
connection = connection)
select_cursor = select_from_nyctaxi_trips(
cursor = cursor,
num_rows = 2
)
print_rows(
cursor = select_cursor
)
Tests unitaires avec simulation
Le test_helpers.py fichier utilise pytest et unittest.mock pour tester la select_from_nyctaxi_trips fonction. La simulation des connexions de base de données, sans utiliser de ressources de calcul réelles, permet d'exécuter les tests en quelques secondes sans affecter vos espaces de travail Azure Databricks.
# test_helpers.py
from pyodbc import SQL_DBMS_NAME
from helpers import *
from unittest.mock import patch
import datetime
@patch("helpers.connect_to_dsn")
def test_connect_to_dsn(mock_connection):
mock_connection.return_value.getinfo.return_value = "Spark SQL"
mock_connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)
assert mock_connection.getinfo(SQL_DBMS_NAME) == "Spark SQL"
@patch('helpers.get_cursor_from_connection')
def test_get_cursor_from_connection(mock_connection):
mock_cursor = mock_connection.return_value.cursor
mock_cursor.return_value.rowcount = -1
mock_connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)
mock_cursor = get_cursor_from_connection(
connection = mock_connection
)
assert mock_cursor.rowcount == -1
@patch('helpers.select_from_nyctaxi_trips')
def test_select_from_nyctaxi_trips(mock_connection):
mock_cursor = mock_connection.return_value.cursor
mock_get_cursor = mock_cursor.return_value.execute
mock_select_cursor = mock_get_cursor.return_value.arraysize = 1
mock_connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)
mock_get_cursor = get_cursor_from_connection(
connection = mock_connection
)
mock_select_cursor = select_from_nyctaxi_trips(
cursor = mock_get_cursor,
num_rows = 2
)
assert mock_select_cursor.arraysize == 1
@patch('helpers.print_rows')
def test_print_rows(mock_connection, capsys):
mock_cursor = mock_connection.return_value.cursor
mock_get_cursor = mock_cursor.return_value.execute
mock_select_cursor = mock_get_cursor.return_value.fetchall.return_value = [
(datetime.datetime(2016, 2, 14, 16, 52, 13), datetime.datetime(2016, 2, 14, 17, 16, 4), 4.94, 19.0, 10282, 10171),
(datetime.datetime(2016, 2, 4, 18, 44, 19), datetime.datetime(2016, 2, 4, 18, 46), 0.28, 3.5, 10110, 10110)
]
mock_connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)
mock_get_cursor = get_cursor_from_connection(
connection = mock_connection
)
mock_select_cursor = select_from_nyctaxi_trips(
cursor = mock_get_cursor,
num_rows = 2
)
print_rows(
cursor = mock_select_cursor
)
captured = capsys.readouterr()
assert captured.out == "(datetime.datetime(2016, 2, 14, 16, 52, 13), datetime.datetime(2016, 2, 14, 17, 16, 4), 4.94, 19.0, 10282, 10171)\n" \
"(datetime.datetime(2016, 2, 4, 18, 44, 19), datetime.datetime(2016, 2, 4, 18, 46), 0.28, 3.5, 10110, 10110)\n"
Étant donné que select_from_nyctaxi_trips ne fait qu'exécuter une instruction SELECT, la simulation n’est pas strictement nécessaire ici. Toutefois, le mock est particulièrement utile lorsque vous testez les fonctions qui modifient des données (INSERT INTO, UPDATE, DELETE FROM), car vous pouvez exécuter des tests à plusieurs reprises sans affecter l’état de la table.