Delen via


Aangepaste gegevensbronnen van PySpark

Aangepaste pySpark-gegevensbronnen worden gemaakt met behulp van de Python DataSource-API (PySpark), waarmee u kunt lezen uit aangepaste gegevensbronnen en schrijven naar aangepaste gegevenssinks in Apache Spark met behulp van Python. U kunt aangepaste gegevensbronnen van PySpark gebruiken om aangepaste verbindingen met gegevenssystemen te definiëren en extra functionaliteit te implementeren om herbruikbare gegevensbronnen uit te bouwen.

Notitie

Voor aangepaste pySpark-gegevensbronnen is Databricks Runtime 15.4 LTS en hoger of serverloze omgevingsversie 2 vereist.

Gegevensbronklasse

PySpark DataSource is een basisklasse die methoden biedt voor het maken van gegevenslezers en schrijvers.

De subklasse van de gegevensbron implementeren

Afhankelijk van uw gebruiksscenario moet het volgende worden geïmplementeerd door een subklasse om een gegevensbron leesbaar, beschrijfbaar of beide te maken:

Eigenschap of methode Beschrijving
name Vereist. De naam van de gegevensbron
schema Vereist. Het schema van de gegevensbron die moet worden gelezen of geschreven
reader() Moet een DataSourceReader retourneren om de gegevensbron leesbaar te maken (batch)
writer() Om de gegevenssink schrijfbaar te maken, moet een DataSourceWriter worden geretourneerd (batch).
streamReader() of simpleStreamReader() Moet een DataSourceStreamReader retourneren om de gegevensstroom leesbaar te maken (streaming)
streamWriter() Moet een DataSourceStreamWriter retourneren om de gegevensstroom beschrijfbaar te maken (streaming)

Notitie

De door de gebruiker gedefinieerdeDataSource, DataSourceReader, DataSourceWriteren DataSourceStreamReaderDataSourceStreamWriterhun methoden moeten serialiseerbaar zijn. Met andere woorden, ze moeten een woordenboek of een genest woordenboek zijn die een primitief type bevatten.

De gegevensbron registreren

Nadat u de interface hebt geïmplementeerd, moet u deze registreren en kunt u deze laden of anderszins gebruiken, zoals wordt weergegeven in het volgende voorbeeld:

# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

Voorbeeld 1: Een PySpark-gegevensbron maken voor batchquery

Als u de mogelijkheden van PySpark DataSource-lezer wilt demonstreren, maakt u een gegevensbron waarmee voorbeeldgegevens worden gegenereerd met behulp van het faker Python-pakket. Raadpleeg de faker voor meer informatie over .

Installeer het faker pakket met behulp van de volgende opdracht:

%pip install faker

Stap 1: De voorbeeldgegevensbron definiëren

Definieer eerst uw nieuwe PySpark DataSource als een subklasse van DataSource met een naam, schema en lezer. De reader() methode moet worden gedefinieerd om te lezen uit een gegevensbron in een batchquery.

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

Stap 2: De lezer voor een batchquery implementeren

Implementeer vervolgens de lezerlogica om voorbeeldgegevens te genereren. Gebruik de geïnstalleerde faker-bibliotheek om elk veld in het schema te vullen.

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the method.
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

Stap 3: De voorbeeldgegevensbron registreren en gebruiken

Als u de gegevensbron wilt gebruiken, moet u deze registreren. De FakeDataSource heeft standaard drie rijen en het schema bevat de volgende string velden: name, date, zipcode, state. In het volgende voorbeeld wordt de voorbeeldgegevensbron geregistreerd, geladen en uitgevoerd met de standaardinstellingen:

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

Alleen string velden worden ondersteund, maar u kunt een schema opgeven met velden die overeenkomen met faker de velden van pakketproviders om willekeurige gegevens te genereren voor testen en ontwikkelen. In het volgende voorbeeld wordt de gegevensbron geladen met name en company velden:

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

Als u de gegevensbron wilt laden met een aangepast aantal rijen, geeft u de numRows optie op. In het volgende voorbeeld worden vijf rijen opgegeven:

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

Voorbeeld 2: Een PySpark GitHub DataSource maken met behulp van varianten

Om het gebruik van varianten in een PySpark DataSource te demonstreren, wordt in dit voorbeeld een gegevensbron gemaakt die pull-aanvragen van GitHub leest.

Notitie

Varianten worden ondersteund met aangepaste PySpark-gegevensbronnen in Databricks Runtime 17.1 en hoger.

Zie Queryvariantgegevens voor meer informatie over varianten.

Stap 1: De GitHub-gegevensbron definiëren

Definieer eerst uw nieuwe PySpark GitHub DataSource als subklasse met DataSource een naam, schema en methode reader(). Het schema bevat de volgende velden: id, title, user, created_at, . updated_at Het user veld wordt gedefinieerd als een variant.

import json
import requests

from pyspark.sql import Row
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import VariantVal

class GithubVariantDataSource(DataSource):
    @classmethod
    def name(self):
        return "githubVariant"
    def schema(self):
        return "id int, title string, user variant, created_at string, updated_at string"
    def reader(self, schema):
        return GithubVariantPullRequestReader(self.options)

Stap 2: De lezer implementeren om pull-aanvragen op te halen

Implementeer vervolgens de lezerlogica om pull-aanvragen op te halen uit de opgegeven GitHub-opslagplaats.

class GithubVariantPullRequestReader(DataSourceReader):
    def __init__(self, options):
        self.token = options.get("token")
        self.repo = options.get("path")
        if self.repo is None:
            raise Exception(f"Must specify a repo in `.load()` method.")
        # Every value in this `self.options` dictionary is a string.
        self.num_rows = int(options.get("numRows", 10))

    def read(self, partition):
        header = {
            "Accept": "application/vnd.github+json",
        }
        if self.token is not None:
            header["Authorization"] = f"Bearer {self.token}"
        url = f"https://api.github.com/repos/{self.repo}/pulls"
        response = requests.get(url, headers=header)
        response.raise_for_status()
        prs = response.json()
        for pr in prs[:self.num_rows]:
            yield Row(
                id = pr.get("number"),
                title = pr.get("title"),
                user = VariantVal.parseJson(json.dumps(pr.get("user"))),
                created_at = pr.get("created_at"),
                updated_at = pr.get("updated_at")
            )

Stap 3: de gegevensbron registreren en gebruiken

Als u de gegevensbron wilt gebruiken, moet u deze registreren. In het volgende voorbeeld wordt de gegevensbron geregistreerd, vervolgens geladen, en worden drie rijen met de PR-gegevens van de GitHub-opslagplaats uitgevoerd.

spark.dataSource.register(GithubVariantDataSource)
spark.read.format("githubVariant").option("numRows", 3).load("apache/spark").display()
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+
| id      | title                                               | user                | created_at           | updated_at           |
+---------+---------------------------------------------------- +---------------------+----------------------+----------------------+
|   51293 |[SPARK-52586][SQL] Introduce AnyTimeType             |  {"avatar_url":...} | 2025-06-26T09:20:59Z | 2025-06-26T15:22:39Z |
|   51292 |[WIP][PYTHON] Arrow UDF for aggregation              |  {"avatar_url":...} | 2025-06-26T07:52:27Z | 2025-06-26T07:52:37Z |
|   51290 |[SPARK-50686][SQL] Hash to sort aggregation fallback |  {"avatar_url":...} | 2025-06-26T06:19:58Z | 2025-06-26T06:20:07Z |
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+

Voorbeeld 3: PySpark DataSource maken voor het streamen van lezen en schrijven

Maak een voorbeeldgegevensbron waarmee twee rijen in elke microbatch worden gegenereerd met behulp van het faker Python-pakket om pySpark DataSource-streamlezer- en schrijfmogelijkheden te demonstreren. Raadpleeg de faker voor meer informatie over .

Installeer het faker pakket met behulp van de volgende opdracht:

%pip install faker

Stap 1: De voorbeeldgegevensbron definiëren

Definieer eerst uw nieuwe PySpark DataSource als een subklasse van DataSource met een naam, schema en methoden streamReader() en streamWriter().

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
    """
    An example data source for streaming read and write using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fakestream"

    def schema(self):
        return "name string, state string"

    def streamReader(self, schema: StructType):
        return FakeStreamReader(schema, self.options)

    # If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
    # def simpleStreamReader(self, schema: StructType):
    #    return SimpleStreamReader()

    def streamWriter(self, schema: StructType, overwrite: bool):
        return FakeStreamWriter(self.options)

Stap 2: De streamlezer implementeren

Implementeer vervolgens de voorbeeldlezer voor streaminggegevens waarmee twee rijen in elke microbatch worden gegenereerd. U kunt in plaats daarvan implementeren DataSourceStreamReaderof als de gegevensbron een lage doorvoer heeft en geen partitionering vereist, kunt u in plaats daarvan implementeren SimpleDataSourceStreamReader . simpleStreamReader() of streamReader() moet worden geïmplementeerd en simpleStreamReader() wordt alleen aangeroepen wanneer streamReader() niet is geïmplementeerd.

DataSourceStreamReader-implementatie

Het streamReader exemplaar heeft een gehele offset die met 2 toeneemt in elke microbatch, geïmplementeerd via de DataSourceStreamReader interface.

from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json

class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

class FakeStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        self.current = 0

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def latestOffset(self) -> dict:
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        self.current += 2
        return {"offset": self.current}

    def partitions(self, start: dict, end: dict):
        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        return [RangePartition(start["offset"], end["offset"])]

    def commit(self, end: dict):
        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

    def read(self, partition) -> Iterator[Tuple]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        start, end = partition.start, partition.end
        for i in range(start, end):
            yield (i, str(i))

Implementatie van SimpleDataSourceStreamReader

Het SimpleStreamReader exemplaar is hetzelfde als het FakeStreamReader exemplaar dat twee rijen in elke batch genereert, maar geïmplementeerd met de SimpleDataSourceStreamReader interface zonder partitionering.

class SimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def read(self, start: dict) -> (Iterator[Tuple], dict):
        """
        Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
        """
        start_idx = start["offset"]
        it = iter([(i,) for i in range(start_idx, start_idx + 2)])
        return (it, {"offset": start_idx + 2})

    def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
        """
        Takes start and end offset as inputs, then reads an iterator of data deterministically.
        This is called when the query replays batches during restart or after a failure.
        """
        start_idx = start["offset"]
        end_idx = end["offset"]
        return iter([(i,) for i in range(start_idx, end_idx)])

    def commit(self, end):
        """
        This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
        """
        pass

Stap 3: De stream writer implementeren

Implementeer nu de streamingschrijver. Deze streaminggegevensschrijver schrijft de metagegevens van elke microbatch naar een lokaal pad.

from pyspark.sql.datasource import DataSourceStreamWriter, WriterCommitMessage

class SimpleCommitMessage(WriterCommitMessage):
   def __init__(self, partition_id: int, count: int):
       self.partition_id = partition_id
       self.count = count

class FakeStreamWriter(DataSourceStreamWriter):
   def __init__(self, options):
       self.options = options
       self.path = self.options.get("path")
       assert self.path is not None

   def write(self, iterator):
       """
       Writes the data and then returns the commit message for that partition. Library imports must be within the method.
       """
       from pyspark import TaskContext
       context = TaskContext.get()
       partition_id = context.partitionId()
       cnt = 0
       for row in iterator:
           cnt += 1
       return SimpleCommitMessage(partition_id=partition_id, count=cnt)

   def commit(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
       In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
       """
       status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
       with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
           file.write(json.dumps(status) + "\n")

   def abort(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
       In this FakeStreamWriter, a failure message is written into a text file inside abort().
       """
       with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
           file.write(f"failed in batch {batchId}")

Stap 4: De voorbeeldgegevensbron registreren en gebruiken

Als u de gegevensbron wilt gebruiken, moet u deze registreren. Nadat deze is geregistreerd, kunt u deze gebruiken in streamingquery's als bron of sink door een korte naam of volledige naam door te geven aan format(). In het volgende voorbeeld wordt eerst de gegevensbron geregistreerd, waarna een query wordt gestart die leest uit de gegevensbron in het voorbeeld en uitvoer naar de console genereert.

spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

U kunt ook in het volgende voorbeeld de voorbeeldstroom als sink gebruiken en een uitvoerpad opgeven:

query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")

Probleemoplossing

Als de uitvoer de volgende fout is, biedt uw berekening geen ondersteuning voor aangepaste PySpark-gegevensbronnen. U moet Databricks Runtime 15.2 of hoger gebruiken.

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000