다음을 통해 공유


Databricks ODBC 드라이버 테스트(Simba)

이 페이지에서는 Databricks ODBC 드라이버를 사용하는 코드를 테스트하는 방법을 설명합니다.

ODBC 호환 언어에 대한 테스트 프레임워크를 사용합니다. 다음 예제에서는 pyodbc, pytestunittest.mock 을 사용하여 ODBC 드라이버 연결을 테스트합니다. 이 코드는 Python 및 pyodbc를 Azure Databricks에 연결하는 예제를 기반으로 합니다.

도우미 함수

파일에는 helpers.py ODBC 연결 작업을 위한 유틸리티 함수가 포함되어 있습니다.

  • connect_to_dsn: Azure Databricks 컴퓨팅 리소스에 대한 연결을 엽니다.
  • get_cursor_from_connection: 쿼리를 실행하기 위한 커서를 가져옵니다.
  • select_from_nyctaxi_trips: samples.nyctaxi.trips에서 지정된 수의 행을 쿼리합니다.
  • print_rows: 결과 집합 콘텐츠를 콘솔에 인쇄합니다.
# helpers.py

from pyodbc import connect, Connection, Cursor

def connect_to_dsn(
  connstring: str,
  autocommit: bool
) -> Connection:

  connection = connect(
    connstring,
    autocommit = autocommit
  )

  return connection

def get_cursor_from_connection(
  connection: Connection
) -> Cursor:

  cursor = connection.cursor()
  return cursor

def select_from_nyctaxi_trips(
  cursor: Cursor,
  num_rows: int
) -> Cursor:

  select_cursor = cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
  return select_cursor

def print_rows(cursor: Cursor):
  for row in cursor.fetchall():
    print(row)

Main 클래스

이 파일은 main.py 도우미 함수를 호출하여 데이터를 연결하고 쿼리합니다.

# main.py

from helpers import *

connection = connect_to_dsn(
  connstring = "DSN=<your-dsn-name>",
  autocommit = True
)

cursor = get_cursor_from_connection(
  connection = connection)

select_cursor = select_from_nyctaxi_trips(
  cursor = cursor,
  num_rows = 2
)

print_rows(
  cursor = select_cursor
)

모의를 사용하여 단위 테스트

이 파일은 test_helpers.py pytest 및 unittest.mock을 사용하여 select_from_nyctaxi_trips 함수를 테스트합니다. 모의 작업은 실제 컴퓨팅 리소스를 사용하지 않고 데이터베이스 연결을 시뮬레이션하므로 Azure Databricks 작업 영역에 영향을 주지 않고 몇 초 만에 테스트가 실행됩니다.

# test_helpers.py

from pyodbc import SQL_DBMS_NAME
from helpers import *
from unittest.mock import patch
import datetime

@patch("helpers.connect_to_dsn")
def test_connect_to_dsn(mock_connection):
  mock_connection.return_value.getinfo.return_value = "Spark SQL"

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  assert mock_connection.getinfo(SQL_DBMS_NAME) == "Spark SQL"

@patch('helpers.get_cursor_from_connection')
def test_get_cursor_from_connection(mock_connection):
  mock_cursor = mock_connection.return_value.cursor
  mock_cursor.return_value.rowcount = -1

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  mock_cursor = get_cursor_from_connection(
    connection = mock_connection
  )

  assert mock_cursor.rowcount == -1

@patch('helpers.select_from_nyctaxi_trips')
def test_select_from_nyctaxi_trips(mock_connection):
  mock_cursor = mock_connection.return_value.cursor
  mock_get_cursor = mock_cursor.return_value.execute
  mock_select_cursor = mock_get_cursor.return_value.arraysize = 1

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  mock_get_cursor = get_cursor_from_connection(
    connection = mock_connection
  )

  mock_select_cursor = select_from_nyctaxi_trips(
    cursor = mock_get_cursor,
    num_rows = 2
  )

  assert mock_select_cursor.arraysize == 1

@patch('helpers.print_rows')
def test_print_rows(mock_connection, capsys):
  mock_cursor = mock_connection.return_value.cursor
  mock_get_cursor = mock_cursor.return_value.execute
  mock_select_cursor = mock_get_cursor.return_value.fetchall.return_value = [
    (datetime.datetime(2016, 2, 14, 16, 52, 13), datetime.datetime(2016, 2, 14, 17, 16, 4), 4.94, 19.0, 10282, 10171),
    (datetime.datetime(2016, 2, 4, 18, 44, 19), datetime.datetime(2016, 2, 4, 18, 46), 0.28, 3.5, 10110, 10110)
  ]

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  mock_get_cursor = get_cursor_from_connection(
    connection = mock_connection
  )

  mock_select_cursor = select_from_nyctaxi_trips(
    cursor = mock_get_cursor,
    num_rows = 2
  )

  print_rows(
    cursor = mock_select_cursor
  )

  captured = capsys.readouterr()
  assert captured.out == "(datetime.datetime(2016, 2, 14, 16, 52, 13), datetime.datetime(2016, 2, 14, 17, 16, 4), 4.94, 19.0, 10282, 10171)\n" \
                         "(datetime.datetime(2016, 2, 4, 18, 44, 19), datetime.datetime(2016, 2, 4, 18, 46), 0.28, 3.5, 10110, 10110)\n"

select_from_nyctaxi_tripsSELECT 문장만 실행하기 때문에, 이 경우 모의 작업은 꼭 필요하지 않습니다. 그러나 모의 작업은 테이블 상태에 영향을 주지 않고 반복적으로 테스트를 실행할 수 있으므로 데이터를 수정하는 함수(INSERT INTO, UPDATE, DELETE FROM)를 테스트할 때 특히 유용합니다.