Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Van toepassing op:
Databricks SQL
Databricks Runtime 13.3 LTS en hoger
Retourneert een tabel met records die zijn gelezen uit Kinesis uit een of meer streams.
Syntaxis
read_kinesis ( { parameter => value } [, ...] )
Argumenten
read_kinesis vereist de aanroep met benoemde parameters.
Het enige vereiste argument is streamName. Alle andere argumenten zijn optioneel.
De beschrijvingen van de argumenten zijn hier kort. Zie de Amazon Kinesis-documentatie voor meer informatie.
Er zijn meerdere manieren om verbinding te maken en te verifiëren met AWS.
De aanbevolen methode is om een Databricks-servicereferentie te maken en op te geven met behulp van de serviceCredential optie.
U kunt ook verifiëren met awsAccessKey en awsSecretKey.
Deze opties kunnen worden opgegeven in de functieargumenten met behulp van de secret functie, handmatig in de argumenten worden ingesteld of geconfigureerd als omgevingsvariabelen zoals hieronder wordt aangegeven.
roleArn
roleExternalID, roleSessionNamekan ook worden gebruikt voor verificatie met AWS met behulp van exemplaarprofielen.
Als geen van deze is opgegeven, wordt de standaardketen van de AWS-provider gebruikt.
| Kenmerk | Typologie | Beschrijving |
|---|---|---|
streamName |
STRING |
Vereiste, door komma's gescheiden lijst van een of meer kinesisstromen. |
serviceCredential |
STRING |
De naam van uw Databricks-servicereferentie. |
awsAccessKey |
STRING |
De AWS-toegangssleutel, indien van toepassing. Kan ook worden opgegeven via de verschillende opties die worden ondersteund via de standaardketen van de AWS-referentieprovider, inclusief omgevingsvariabelen (AWS_ACCESS_KEY_ID) en een bestand met referentieprofielen. |
awsSecretKey |
STRING |
De geheime sleutel die overeenkomt met de toegangssleutel. Kan worden opgegeven in de argumenten of via de verschillende opties die worden ondersteund via de standaardketen van de AWS-referentieprovider, inclusief omgevingsvariabelen (AWS_SECRET_KEY of AWS_SECRET_ACCESS_KEY) en een bestand met referentiesprofielen. |
roleArn |
STRING |
Amazon-resource-naam van de rol die je moet aannemen bij het openen van Kinesis. |
roleExternalId |
STRING |
Wordt gebruikt bij het delegeren van toegang tot het AWS-account. |
roleSessionName |
STRING |
Naam van AWS-rolsessie. |
stsEndpoint |
STRING |
Een eindpunt voor het aanvragen van referenties voor tijdelijke toegang. |
region |
STRING |
Regio voor de streams die moeten worden opgegeven. De standaardwaarde is de lokaal opgeloste regio. |
endpoint |
STRING |
regionaal eindpunt voor Kinesis-gegevensstromen. De standaardwaarde is de lokaal opgeloste regio. |
initialPosition |
STRING |
Beginpositie voor lezen vanuit de stroom. Een van de volgende: 'laatste' (standaard), 'trim_horizon', 'earliest', 'at_timestamp'. |
consumerMode |
STRING |
Een van: 'polling' (standaard) of 'EFO' (enhanced-fan-out). |
consumerName |
STRING |
De naam van de consument. Alle consumentennamen worden voorafgegaan door 'databricks_'. De standaardwaarde is een lege tekenreeks. |
registerConsumerTimeoutInterval |
STRING |
de maximale wachttijd om te wachten totdat de Kinesis EFO-consument is geregistreerd bij de Kinesis-stream voordat er een fout optreedt. De standaardwaarde is '300s'. |
requireConsumerDeregistration |
BOOLEAN |
true om de EFO-consument bij het stoppen van query's uit te schrijven. Standaard is false. |
deregisterConsumerTimeoutInterval |
STRING |
De maximale wachttijd om te wachten totdat de Kinesis EFO-consument is afgemeld bij de Kinesis-stream voordat er een fout wordt gegenereerd. De standaardwaarde is '300s'. |
consumerRefreshInterval |
STRING |
Het interval waarmee de consument wordt gecontroleerd en vernieuwd. De standaardwaarde is '300s'. |
De volgende argumenten worden gebruikt voor het beheren van de leesdoorvoer en latentie voor Kinesis:
| Kenmerk | Typologie | Beschrijving |
|---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
Optioneel, met een standaardwaarde van 10.000 records die per API-aanvraag naar Kinesis moeten worden gelezen. |
maxFetchRate |
STRING |
Hoe snel u gegevens per shard kunt vooraf fetcheren. Een waarde tussen '1,0' en '2.0' die wordt gemeten in MB/s. De standaardwaarde is '1.0'. |
minFetchPeriod |
STRING |
De maximale wachttijd tussen opeenvolgende prefetch-acties. De standaardwaarde is 400 ms. |
maxFetchDuration |
STRING |
De maximale duur voor het bufferen van vooraf geladen nieuwe gegevens. De standaardwaarde is '10s'. |
fetchBufferSize |
STRING |
De hoeveelheid gegevens voor de volgende trigger. De standaardwaarde is '20 gb'. |
shardsPerTask |
INTEGER (>0) |
Het aantal Kinesis-shards dat parallel per Spark-taak moet worden voorgehaald. De standaard is 5. |
shardFetchinterval |
STRING |
Hoe vaak moet er worden gecontroleerd op resharding. De standaardwaarde is '1s'. |
coalesceThresholdBlockSize |
INTEGER (>0) |
De drempel waarop automatische samenvoeging plaatsvindt. De standaardwaarde is 10.000.000. |
coalesce |
BOOLEAN |
true om voorgesorteerde aanvragen samen te voegen. De standaardwaarde is true. |
coalesceBinSize |
INTEGER (>0) |
De geschatte blokgrootte na het samenvoegen. De standaardwaarde is 128.000.000. |
reuseKinesisClient |
BOOLEAN |
true om de Kinesis-client die in de cache is opgeslagen, opnieuw te gebruiken. De standaardwaarde is true behalve op een PE-cluster. |
clientRetries |
INTEGER (>0) |
Het aantal herhalingen in het herhalingsscenario. De standaard is 5. |
Retouren
Een tabel met Kinesis-records met het volgende schema:
| Naam | Gegevenstype | Null-waarde toegestaan | Standaard | Beschrijving |
|---|---|---|---|---|
partitionKey |
STRING |
Nee | Een sleutel die wordt gebruikt voor het distribueren van gegevens tussen de shards van een stream. Alle gegevensrecords met dezelfde partitiesleutel worden gelezen uit dezelfde shard. | |
data |
BINARY |
Nee | De gegevenslading van de kinesis-gegevens, Base64 gecodeerd. | |
stream |
STRING |
Nee | De naam van de stream waaruit de gegevens zijn gelezen. | |
shardId |
STRING |
Nee | Een unieke id voor de shard waaruit de gegevens zijn gelezen. | |
sequenceNumber |
BIGINT |
Nee | De unieke identificatie van het record binnen zijn shard. | |
approximateArrivalTimestamp |
TIMESTAMP |
Nee | De geschatte tijd dat de record in de stream is ingevoegd. |
De kolommen (stream, shardId, sequenceNumber) vormen een primaire sleutel.
Voorbeelden
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret('test-databricks', 'awsAccessKey'),
awsSecretKey => secret('test-databricks', 'awsSecretKey'),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');