共用方式為


read_pubsub 串流數據表值函式

適用於:勾選「是」 Databricks SQL 勾選「是」 Databricks Runtime 13.3 LTS 及更高版本

傳回一個數據表,其中包含從 Pub/Sub 主題讀取的記錄。 僅支援串流查詢。

語法

read_pubsub( { parameter => value } [, ...])

引數

read_pubsub 需要具名參數調用

唯一必要的自變數是 subscriptionIdprojectIdtopicId。 所有其他參數都是選擇性的。

如需完整的自變數描述,請參閱 設定發佈/子串流讀取的選項。

Databricks 建議在提供授權選項時使用秘密。 請參閱 secret 函式

如需設定 Pub/Sub 存取權的詳細資訊,請參閱 設定 Pub/Sub 的存取權。

參數 類型 描述
subscriptionId STRING 必要,指派給 Pub/Sub 訂用帳戶的唯一標識碼。
projectId STRING 必需的 Google Cloud 專案 ID,與 Pub/Sub 主題相關聯的項目 ID。
topicId STRING 必要,要訂閱的 Pub/Sub 主題識別碼或名稱。
clientEmail STRING 與服務帳戶關聯用於驗證的電子郵件地址。
clientId STRING 與服務帳戶相關聯的用戶端標識碼以進行驗證。
privateKeyId STRING 與服務帳戶相關聯的私鑰標識碼。
privateKey STRING 與服務帳戶相關聯的私鑰,用於驗證。

從 Pub/Sub 讀取時,這些參數會用於進一步微調:

參數 類型 描述
numFetchPartitions STRING 選擇性,具有預設的執行程序數目。 從訂用帳戶擷取記錄的平行 Spark 工作數目。
deleteSubscriptionOnStreamStop BOOLEAN 選擇性,預設 false為 。 如果設定為 true,當串流作業結束時,會刪除傳遞至串流的訂閱。
maxBytesPerTrigger STRING 在每個觸發的微批次期間,待處理的批次大小有一個軟性限制。 預設值為 『none』。
maxRecordsPerFetch STRING 在處理記錄之前,設定每個工作需要擷取的記錄數目。 預設值為 『1000』。
maxFetchPeriod STRING 處理記錄之前,每個工作需要的擷取時間。 預設值為 『10s』。

退貨

具有下列架構的 Pub/Sub 記錄數據表。 屬性數據行可以是 Null,但所有其他數據行都不是 Null。

名稱 資料類型 可空 標準 描述
messageId STRING Pub/Sub 訊息的唯一標識符。
payload BINARY Pub/Sub 訊息的內容。
attributes STRING 是的 代表 Pub/Sub 訊息屬性的索引鍵/值組。 這是 json 編碼的字串。
publishTimestampInMillis BIGINT 訊息發佈時的時間戳,以毫秒為單位。
sequenceNumber BIGINT 其分片內記錄的唯一識別碼。

範例

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => 'app-events-1234',
                projectId => 'app-events-project',
                topicId => 'app-events-topic',
                clientEmail => secret('app-events', 'clientEmail'),
                clientId => secret('app-events', 'clientId'),
        privateKeyId => secret('app-events', 'privateKeyId'),
                privateKey => secret('app-events', 'privateKey')
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => 'app-events-1234',
                projectId => 'app-events-project',
                topicId => 'app-events-topic'
);

現在需要從 testing.streaming_table 查詢數據,以便進一步分析。

錯誤的查詢:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => 'app-events-1234',
                projectId => 'app-events-project'
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => 'app-events-1234',
                projectId => 'app-events-project',
                topicId => 'app-events-topic',
                maxRecordsPerFetchLimit => '1000001'
);