Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Aangepaste pySpark-gegevensbronnen worden gemaakt met behulp van de Python DataSource-API (PySpark), waarmee u kunt lezen uit aangepaste gegevensbronnen en schrijven naar aangepaste gegevenssinks in Apache Spark met behulp van Python. U kunt aangepaste gegevensbronnen van PySpark gebruiken om aangepaste verbindingen met gegevenssystemen te definiëren en extra functionaliteit te implementeren om herbruikbare gegevensbronnen uit te bouwen.
Notitie
Voor aangepaste pySpark-gegevensbronnen is Databricks Runtime 15.4 LTS en hoger of serverloze omgevingsversie 2 vereist.
Gegevensbronklasse
PySpark DataSource is een basisklasse die methoden biedt voor het maken van gegevenslezers en schrijvers.
De subklasse van de gegevensbron implementeren
Afhankelijk van uw gebruiksscenario moet het volgende worden geïmplementeerd door een subklasse om een gegevensbron leesbaar, beschrijfbaar of beide te maken:
| Eigenschap of methode | Beschrijving |
|---|---|
name |
Vereist. De naam van de gegevensbron |
schema |
Vereist. Het schema van de gegevensbron die moet worden gelezen of geschreven |
reader() |
Moet een DataSourceReader retourneren om de gegevensbron leesbaar te maken (batch) |
writer() |
Om de gegevenssink schrijfbaar te maken, moet een DataSourceWriter worden geretourneerd (batch). |
streamReader() of simpleStreamReader() |
Moet een DataSourceStreamReader retourneren om de gegevensstroom leesbaar te maken (streaming) |
streamWriter() |
Moet een DataSourceStreamWriter retourneren om de gegevensstroom beschrijfbaar te maken (streaming) |
Notitie
De door de gebruiker gedefinieerdeDataSource, DataSourceReader, DataSourceWriteren DataSourceStreamReaderDataSourceStreamWriterhun methoden moeten serialiseerbaar zijn. Met andere woorden, ze moeten een woordenboek of een genest woordenboek zijn die een primitief type bevatten.
De gegevensbron registreren
Nadat u de interface hebt geïmplementeerd, moet u deze registreren en kunt u deze laden of anderszins gebruiken, zoals wordt weergegeven in het volgende voorbeeld:
# Register the data source
spark.dataSource.register(MyDataSourceClass)
# Read from a custom data source
spark.read.format("my_datasource_name").load().show()
Voorbeeld 1: Een PySpark-gegevensbron maken voor batchquery
Als u de mogelijkheden van PySpark DataSource-lezer wilt demonstreren, maakt u een gegevensbron waarmee voorbeeldgegevens worden gegenereerd met behulp van het faker Python-pakket. Raadpleeg de faker voor meer informatie over .
Installeer het faker pakket met behulp van de volgende opdracht:
%pip install faker
Stap 1: De voorbeeldgegevensbron definiëren
Definieer eerst uw nieuwe PySpark DataSource als een subklasse van DataSource met een naam, schema en lezer. De reader() methode moet worden gedefinieerd om te lezen uit een gegevensbron in een batchquery.
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType
class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""
@classmethod
def name(cls):
return "fake"
def schema(self):
return "name string, date string, zipcode string, state string"
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)
Stap 2: De lezer voor een batchquery implementeren
Implementeer vervolgens de lezerlogica om voorbeeldgegevens te genereren. Gebruik de geïnstalleerde faker-bibliotheek om elk veld in het schema te vullen.
class FakeDataSourceReader(DataSourceReader):
def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options
def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()
# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)
Stap 3: De voorbeeldgegevensbron registreren en gebruiken
Als u de gegevensbron wilt gebruiken, moet u deze registreren. De FakeDataSource heeft standaard drie rijen en het schema bevat de volgende string velden: name, date, zipcode, state. In het volgende voorbeeld wordt de voorbeeldgegevensbron geregistreerd, geladen en uitgevoerd met de standaardinstellingen:
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+
Alleen string velden worden ondersteund, maar u kunt een schema opgeven met velden die overeenkomen met faker de velden van pakketproviders om willekeurige gegevens te genereren voor testen en ontwikkelen. In het volgende voorbeeld wordt de gegevensbron geladen met name en company velden:
spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+
Als u de gegevensbron wilt laden met een aangepast aantal rijen, geeft u de numRows optie op. In het volgende voorbeeld worden vijf rijen opgegeven:
spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+
Voorbeeld 2: Een PySpark GitHub DataSource maken met behulp van varianten
Om het gebruik van varianten in een PySpark DataSource te demonstreren, wordt in dit voorbeeld een gegevensbron gemaakt die pull-aanvragen van GitHub leest.
Notitie
Varianten worden ondersteund met aangepaste PySpark-gegevensbronnen in Databricks Runtime 17.1 en hoger.
Zie Queryvariantgegevens voor meer informatie over varianten.
Stap 1: De GitHub-gegevensbron definiëren
Definieer eerst uw nieuwe PySpark GitHub DataSource als subklasse met DataSource een naam, schema en methode reader(). Het schema bevat de volgende velden: id, title, user, created_at, . updated_at Het user veld wordt gedefinieerd als een variant.
import json
import requests
from pyspark.sql import Row
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import VariantVal
class GithubVariantDataSource(DataSource):
@classmethod
def name(self):
return "githubVariant"
def schema(self):
return "id int, title string, user variant, created_at string, updated_at string"
def reader(self, schema):
return GithubVariantPullRequestReader(self.options)
Stap 2: De lezer implementeren om pull-aanvragen op te halen
Implementeer vervolgens de lezerlogica om pull-aanvragen op te halen uit de opgegeven GitHub-opslagplaats.
class GithubVariantPullRequestReader(DataSourceReader):
def __init__(self, options):
self.token = options.get("token")
self.repo = options.get("path")
if self.repo is None:
raise Exception(f"Must specify a repo in `.load()` method.")
# Every value in this `self.options` dictionary is a string.
self.num_rows = int(options.get("numRows", 10))
def read(self, partition):
header = {
"Accept": "application/vnd.github+json",
}
if self.token is not None:
header["Authorization"] = f"Bearer {self.token}"
url = f"https://api.github.com/repos/{self.repo}/pulls"
response = requests.get(url, headers=header)
response.raise_for_status()
prs = response.json()
for pr in prs[:self.num_rows]:
yield Row(
id = pr.get("number"),
title = pr.get("title"),
user = VariantVal.parseJson(json.dumps(pr.get("user"))),
created_at = pr.get("created_at"),
updated_at = pr.get("updated_at")
)
Stap 3: de gegevensbron registreren en gebruiken
Als u de gegevensbron wilt gebruiken, moet u deze registreren. In het volgende voorbeeld wordt de gegevensbron geregistreerd, vervolgens geladen, en worden drie rijen met de PR-gegevens van de GitHub-opslagplaats uitgevoerd.
spark.dataSource.register(GithubVariantDataSource)
spark.read.format("githubVariant").option("numRows", 3).load("apache/spark").display()
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+
| id | title | user | created_at | updated_at |
+---------+---------------------------------------------------- +---------------------+----------------------+----------------------+
| 51293 |[SPARK-52586][SQL] Introduce AnyTimeType | {"avatar_url":...} | 2025-06-26T09:20:59Z | 2025-06-26T15:22:39Z |
| 51292 |[WIP][PYTHON] Arrow UDF for aggregation | {"avatar_url":...} | 2025-06-26T07:52:27Z | 2025-06-26T07:52:37Z |
| 51290 |[SPARK-50686][SQL] Hash to sort aggregation fallback | {"avatar_url":...} | 2025-06-26T06:19:58Z | 2025-06-26T06:20:07Z |
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+
Voorbeeld 3: PySpark DataSource maken voor het streamen van lezen en schrijven
Maak een voorbeeldgegevensbron waarmee twee rijen in elke microbatch worden gegenereerd met behulp van het faker Python-pakket om pySpark DataSource-streamlezer- en schrijfmogelijkheden te demonstreren. Raadpleeg de faker voor meer informatie over .
Installeer het faker pakket met behulp van de volgende opdracht:
%pip install faker
Stap 1: De voorbeeldgegevensbron definiëren
Definieer eerst uw nieuwe PySpark DataSource als een subklasse van DataSource met een naam, schema en methoden streamReader() en streamWriter().
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType
class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""
@classmethod
def name(cls):
return "fakestream"
def schema(self):
return "name string, state string"
def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)
# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()
def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)
Stap 2: De streamlezer implementeren
Implementeer vervolgens de voorbeeldlezer voor streaminggegevens waarmee twee rijen in elke microbatch worden gegenereerd. U kunt in plaats daarvan implementeren DataSourceStreamReaderof als de gegevensbron een lage doorvoer heeft en geen partitionering vereist, kunt u in plaats daarvan implementeren SimpleDataSourceStreamReader .
simpleStreamReader() of streamReader() moet worden geïmplementeerd en simpleStreamReader() wordt alleen aangeroepen wanneer streamReader() niet is geïmplementeerd.
DataSourceStreamReader-implementatie
Het streamReader exemplaar heeft een gehele offset die met 2 toeneemt in elke microbatch, geïmplementeerd via de DataSourceStreamReader interface.
from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json
class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end
class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}
def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]
def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))
Implementatie van SimpleDataSourceStreamReader
Het SimpleStreamReader exemplaar is hetzelfde als het FakeStreamReader exemplaar dat twee rijen in elke batch genereert, maar geïmplementeerd met de SimpleDataSourceStreamReader interface zonder partitionering.
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})
def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass
Stap 3: De stream writer implementeren
Implementeer nu de streamingschrijver. Deze streaminggegevensschrijver schrijft de metagegevens van elke microbatch naar een lokaal pad.
from pyspark.sql.datasource import DataSourceStreamWriter, WriterCommitMessage
class SimpleCommitMessage(WriterCommitMessage):
def __init__(self, partition_id: int, count: int):
self.partition_id = partition_id
self.count = count
class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None
def write(self, iterator):
"""
Writes the data and then returns the commit message for that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")
def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")
Stap 4: De voorbeeldgegevensbron registreren en gebruiken
Als u de gegevensbron wilt gebruiken, moet u deze registreren. Nadat deze is geregistreerd, kunt u deze gebruiken in streamingquery's als bron of sink door een korte naam of volledige naam door te geven aan format(). In het volgende voorbeeld wordt eerst de gegevensbron geregistreerd, waarna een query wordt gestart die leest uit de gegevensbron in het voorbeeld en uitvoer naar de console genereert.
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()
U kunt ook in het volgende voorbeeld de voorbeeldstroom als sink gebruiken en een uitvoerpad opgeven:
query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")
Probleemoplossing
Als de uitvoer de volgende fout is, biedt uw berekening geen ondersteuning voor aangepaste PySpark-gegevensbronnen. U moet Databricks Runtime 15.2 of hoger gebruiken.
Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000