Partilhar via


Fontes de dados personalizadas do PySpark

As fontes de dados personalizadas do PySpark são criadas usando a API DataSource Python (PySpark), que permite ler fontes de dados personalizadas e gravar em coletores de dados personalizados no Apache Spark usando Python. Pode usar fontes de dados personalizadas do PySpark para definir ligações personalizadas a sistemas de dados e implementar funcionalidades adicionais para construir fontes de dados reutilizáveis.

Nota

As fontes de dados personalizadas do PySpark exigem o Databricks Runtime 15.4 LTS e superior, ou o ambiente sem servidor versão 2.

DataSource classe

O PySpark DataSource é uma classe base que fornece métodos para criar leitores e gravadores de dados.

Implementar a subclasse da fonte de dados

Dependendo do seu caso de uso, o seguinte deve ser implementado por qualquer subclasse para tornar uma fonte de dados legível, gravável ou ambas:

Propriedade ou Método Descrição
name Obrigatório. O nome da fonte de dados
schema Obrigatório. O esquema da fonte de dados a ser lida ou gravada
reader() Deve retornar um DataSourceReader para tornar a fonte de dados legível (lote)
writer() Deve retornar a DataSourceWriter para tornar o coletor de dados passível de escrita (processamento em lote)
streamReader() ou simpleStreamReader() Deve retornar um DataSourceStreamReader para tornar o fluxo de dados acessível (em streaming)
streamWriter() Deve retornar a DataSourceStreamWriter para tornar o fluxo de dados passível de escrita (transmissão em fluxo contínuo)

Nota

Os DataSource, DataSourceReader, DataSourceWriter, DataSourceStreamReader, DataSourceStreamWriter definidos pelo utilizador, e os seus métodos, devem ser serializáveis. Ou seja, devem ser um dicionário ou um dicionário aninhado que contenha um tipo primitivo.

Registar a fonte de dados

Depois de implementar a interface, você deve registrá-lo, então você pode carregá-lo ou usá-lo de outra forma, como mostrado no exemplo a seguir:

# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

Exemplo 1: Criar uma fonte de dados PySpark para consulta em lote

Para demonstrar os recursos do leitor PySpark DataSource, crie uma fonte de dados que gere dados de exemplo usando o faker pacote Python. Para mais informações sobre faker, consulte a documentação do Faker.

Instale o faker pacote usando o seguinte comando:

%pip install faker

Etapa 1: Definir o exemplo DataSource

Primeiro, defina seu novo PySpark DataSource como uma subclasse de DataSource com um nome, esquema e leitor. O reader() método deve ser definido para ler a partir de uma fonte de dados numa consulta por lotes.

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

Etapa 2: Implementar o leitor para uma consulta em lote

Em seguida, implemente a lógica do leitor para gerar dados de exemplo. Use a biblioteca instalada faker para preencher cada campo no esquema.

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the method.
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

Etapa 3: Registrar e usar a fonte de dados de exemplo

Para usar a fonte de dados, registre-a. Por padrão, o FakeDataSource tem três linhas e o esquema inclui estes string campos: name, date, zipcode, state. O exemplo a seguir regista, carrega e produz a saída da fonte de dados de exemplo com as predefinições.

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

Somente string campos são suportados, mas você pode especificar um esquema com quaisquer campos que correspondam aos faker campos dos provedores de pacotes para gerar dados aleatórios para teste e desenvolvimento. O exemplo a seguir carrega a fonte de dados com name e company campos:

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

Para carregar a fonte de dados com um número personalizado de linhas, especifique a numRows opção. O exemplo a seguir especifica 5 linhas:

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

Exemplo 2: Criar uma fonte de dados do PySpark GitHub usando variantes

Para demonstrar o uso de variantes em um PySpark DataSource, este exemplo cria uma fonte de dados que lê solicitações pull do GitHub.

Nota

As variantes são suportadas com fontes de dados personalizadas do PySpark no Databricks Runtime 17.1 e superior.

Para obter informações sobre variantes, consulte Consultar dados de variantes.

Etapa 1: Definir a fonte de dados do GitHub

Primeiro, defina seu novo PySpark GitHub DataSource como uma subclasse de DataSource com um nome, esquema e método reader(). O esquema inclui estes campos: id, title, user, created_at, updated_at. O user campo é definido como uma variante.

import json
import requests

from pyspark.sql import Row
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import VariantVal

class GithubVariantDataSource(DataSource):
    @classmethod
    def name(self):
        return "githubVariant"
    def schema(self):
        return "id int, title string, user variant, created_at string, updated_at string"
    def reader(self, schema):
        return GithubVariantPullRequestReader(self.options)

Etapa 2: Implementar o leitor para recuperar solicitações pull

Em seguida, implemente a lógica do leitor para recuperar solicitações pull do repositório GitHub especificado.

class GithubVariantPullRequestReader(DataSourceReader):
    def __init__(self, options):
        self.token = options.get("token")
        self.repo = options.get("path")
        if self.repo is None:
            raise Exception(f"Must specify a repo in `.load()` method.")

    def read(self, partition):
        header = {
            "Accept": "application/vnd.github+json",
        }
        if self.token is not None:
            header["Authorization"] = f"Bearer {self.token}"
        url = f"https://api.github.com/repos/{self.repo}/pulls"
        response = requests.get(url)
        response.raise_for_status()
        prs = response.json()
        for pr in prs:
            yield Row(
                id = pr.get("number"),
                title = pr.get("title"),
                user = VariantVal.parseJson(json.dumps(pr.get("user"))),
                created_at = pr.get("created_at"),
                updated_at = pr.get("updated_at")
            )

Etapa 3: Registrar e usar a fonte de dados

Para usar a fonte de dados, registre-a. O exemplo a seguir registra, carrega a fonte de dados e gera três linhas dos dados PR do repositório GitHub:

spark.dataSource.register(GithubVariantDataSource)
spark.read.format("github_variant").option("numRows", 3).load("apache/spark").display()
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+
| id      | title                                               | user                | created_at           | updated_at           |
+---------+---------------------------------------------------- +---------------------+----------------------+----------------------+
|   51293 |[SPARK-52586][SQL] Introduce AnyTimeType             |  {"avatar_url":...} | 2025-06-26T09:20:59Z | 2025-06-26T15:22:39Z |
|   51292 |[WIP][PYTHON] Arrow UDF for aggregation              |  {"avatar_url":...} | 2025-06-26T07:52:27Z | 2025-06-26T07:52:37Z |
|   51290 |[SPARK-50686][SQL] Hash to sort aggregation fallback |  {"avatar_url":...} | 2025-06-26T06:19:58Z | 2025-06-26T06:20:07Z |
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+

Exemplo 3: Criar PySpark DataSource para streaming de leitura e gravação

Para demonstrar os recursos de leitor e gravador de fluxo do PySpark DataSource, crie uma fonte de dados de exemplo que gere duas linhas em cada microlote usando o faker pacote Python. Para mais informações sobre faker, consulte a documentação do Faker.

Instale o faker pacote usando o seguinte comando:

%pip install faker

Etapa 1: Definir o exemplo DataSource

Primeiro, defina seu novo PySpark DataSource como uma subclasse de DataSource com um nome, esquema e métodos streamReader() e streamWriter().

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
    """
    An example data source for streaming read and write using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fakestream"

    def schema(self):
        return "name string, state string"

    def streamReader(self, schema: StructType):
        return FakeStreamReader(schema, self.options)

    # If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
    # def simpleStreamReader(self, schema: StructType):
    #    return SimpleStreamReader()

    def streamWriter(self, schema: StructType, overwrite: bool):
        return FakeStreamWriter(self.options)

Etapa 2: Implementar o leitor de fluxo

Em seguida, implemente o exemplo de leitor de dados de streaming que gera duas linhas em cada microlote. Você pode implementar DataSourceStreamReader, ou, se a fonte de dados tiver baixa taxa de transferência e não exigir particionamento, pode implementar SimpleDataSourceStreamReader. Ou simpleStreamReader()streamReader() deve ser implementado, e simpleStreamReader() só é invocado quando streamReader() não é implementado.

Implementação de DataSourceStreamReader

A instância streamReader tem um deslocamento inteiro que aumenta a 2 em cada microlote, implementada pela interface DataSourceStreamReader.

from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json

class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

class FakeStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        self.current = 0

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def latestOffset(self) -> dict:
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        self.current += 2
        return {"offset": self.current}

    def partitions(self, start: dict, end: dict):
        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        return [RangePartition(start["offset"], end["offset"])]

    def commit(self, end: dict):
        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

    def read(self, partition) -> Iterator[Tuple]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        start, end = partition.start, partition.end
        for i in range(start, end):
            yield (i, str(i))

Implementação do SimpleDataSourceStreamReader

A instância SimpleStreamReader é a mesma que a instância FakeStreamReader que gera duas linhas em cada lote, mas implementada com a interface SimpleDataSourceStreamReader sem particionamento.

class SimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def read(self, start: dict) -> (Iterator[Tuple], dict):
        """
        Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
        """
        start_idx = start["offset"]
        it = iter([(i,) for i in range(start_idx, start_idx + 2)])
        return (it, {"offset": start_idx + 2})

    def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
        """
        Takes start and end offset as inputs, then reads an iterator of data deterministically.
        This is called when the query replays batches during restart or after a failure.
        """
        start_idx = start["offset"]
        end_idx = end["offset"]
        return iter([(i,) for i in range(start_idx, end_idx)])

    def commit(self, end):
        """
        This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
        """
        pass

Etapa 3: Implementar o gravador de fluxo

Agora implemente o gravador de streaming. Este gravador de dados de streaming grava as informações de metadados de cada microlote em um caminho local.

from pyspark.sql.datasource import DataSourceStreamWriter, WriterCommitMessage

class SimpleCommitMessage(WriterCommitMessage):
   def __init__(self, partition_id: int, count: int):
       self.partition_id = partition_id
       self.count = count

class FakeStreamWriter(DataSourceStreamWriter):
   def __init__(self, options):
       self.options = options
       self.path = self.options.get("path")
       assert self.path is not None

   def write(self, iterator):
       """
       Writes the data and then returns the commit message for that partition. Library imports must be within the method.
       """
       from pyspark import TaskContext
       context = TaskContext.get()
       partition_id = context.partitionId()
       cnt = 0
       for row in iterator:
           cnt += 1
       return SimpleCommitMessage(partition_id=partition_id, count=cnt)

   def commit(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
       In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
       """
       status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
       with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
           file.write(json.dumps(status) + "\n")

   def abort(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
       In this FakeStreamWriter, a failure message is written into a text file inside abort().
       """
       with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
           file.write(f"failed in batch {batchId}")

Etapa 4: Registrar e usar a fonte de dados de exemplo

Para usar a fonte de dados, registre-a. Depois de ser registado, pode utilizá-lo em consultas de streaming como origem ou destino, passando um nome curto ou um nome completo para format(). O exemplo a seguir registra a fonte de dados e, em seguida, inicia uma consulta que lê a partir da fonte de dados de exemplo e envia para o console:

spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

Como alternativa, o exemplo a seguir usa o fluxo de exemplo como um coletor e especifica um caminho de saída:

query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")

Resolução de Problemas

Se a saída for o seguinte erro, sua computação não suporta fontes de dados personalizadas do PySpark. Você deve usar o Databricks Runtime 15.2 ou superior.

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000