Apache Flink 是一種架構和分散式處理引擎,可用於透過未繫結和已繫結資料流進行具狀態計算。
Flink 連接器是可在任何 Flink 叢集上執行的開放原始碼專案。 它會實作用於從 Flink 叢集移動資料的資料接收器。 使用 Apache Flink 的連接器,您可以建置以資料驅動案例為目標的快速且可調整的應用程式,例如機器學習服務 (ML)、擷取-轉換-載入 (ETL) 和 Log Analytics。
在本文中,您將瞭解如何使用 Flink 連接器將資料從 Flink 傳送至資料表。 您可以建立資料表和資料對應、直接 Flink 以將資料傳送至資料表,然後驗證結果。
必要條件
- Azure 資料總管叢集和資料庫。 在 Microsoft Fabric 即時智慧中建立叢集和資料庫或 KQL 資料庫。
- 資料庫中的目標資料表。 請參閱在 Azure 資料總管 中建立資料表或在即時智慧中建立資料表
- Apache Flink 叢集。 建立叢集。
- Maven 3.x
取得 Flink 連接器
對於使用 Maven 管理相依性的 Flink 專案,請將 Flink Connector Core Sink For Azure Data Explorer 新增為相依性來整合:
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>flink-connector-kusto</artifactId>
<version>1.0.0</version>
</dependency>
針對未使用 Maven 來管理相依性的專案,請複製 Azure Data Explorer Connector for Apache Flink 的存放庫,並在本機建置。 此方法可讓您使用命令 mvn clean install -DskipTests,手動將連接器新增至本機 Maven 存放庫。
您可以從 Flink 進行驗證,以使用 Microsoft Entra ID 應用程式或受控識別。
此服務主體將是連接器用來在 Kusto 資料表中寫入您的資料的識別。 您稍後會授與此服務主體存取 Kusto 資源的權限。
透過 Azure CLI 登入您的 Azure 訂用帳戶。 然後在瀏覽器中進行驗證。
az login選擇用來託管主體的訂用帳戶。 當您有多個訂用帳戶時,需要此步驟。
az account set --subscription YOUR_SUBSCRIPTION_GUID建立服務主體。 在此範例中,服務主體稱為
my-service-principal。az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}從傳回的 JSON 資料中,複製
appId、password和tenant供日後使用。{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
您已建立您的 Microsoft Entra 應用程式和服務主體。
授與資料庫上的應用程式使用者權限:
// Grant database user permissions .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')授與應用程式資料表的擷取者或系統管理員權限。 所需的權限取決於所選的資料寫入方法。 擷取者權限足夠用於 SinkV2,而 WriteAndSink 則需要系統管理員權限。
// Grant table ingestor permissions (SinkV2) .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>') // Grant table admin permissions (WriteAheadSink) .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
如需授權的詳細資訊,請參閱 Kusto 角色型存取控制。
從 Flink 寫入資料
若要從 Flink 寫入資料:
匯入所需的選項:
import com.microsoft.azure.flink.config.KustoConnectionOptions; import com.microsoft.azure.flink.config.KustoWriteOptions;使用您的應用程式或受控識別進行驗證。
針對應用程式驗證:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setAppId("<Application ID>") .setAppKey("<Application key>") .setTenantId("<Tenant ID>") .setClusterUrl("<Cluster URI>").build();針對受控識別驗證:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId("<Object ID>") .setClusterUrl("<Cluster URI>").build();
設定接收器參數,例如資料庫和資料表:
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder() .withDatabase("<Database name>").withTable("<Table name>").build();您可以新增更多選項,如下表所述:
選項 描述 預設值 IngestionMappingRef 參考現有的擷取對應。 FlushImmediately 立即排清資料,並可能導致效能問題。 不建議使用此方法。 BatchIntervalMs 控制資料排清的頻率。 30 秒 BatchSize 設定排清前緩衝記錄的批次大小。 1,000 筆記錄 ClientBatchSizeLimit 指定擷取前的彙總資料大小 (MB)。 300 MB PollForIngestionStatus 如果為 true,連接器會在資料排清之後輪詢擷取狀態。 false DeliveryGuarantee 決定傳遞保證語意。 若要達到確切一次語意,請使用 WriteAheadSink。 AT_LEAST_ONCE 使用下列其中一種方法寫入串流資料:
- SinkV2:這是一種無狀態選項,可在檢查點上排清資料,確保至少一次一致性。 我們建議使用此選項來進行大量資料擷取。
- WriteAheadSink:此方法會將資料發出至 KustoSink。 它已與 Flink 的檢查點系統整合,並提供確切一次保證。 資料會儲存在 AbstractStateBackend 中,而且只有在檢查點完成之後才提交。
下列範例會使用 SinkV2。 若要使用 WriteAheadSink,請使用
buildWriteAheadSink方法,而不是build:KustoWriteSink.builder().setWriteOptions(kustoWriteOptions) .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/ , 2 /*Parallelism to use*/);
完整程式碼應該類似以下:
import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
.withDatabase("<Database name>").withTable("<Table name>").build();
KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
.setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
, 2 /*Parallelism to use*/);
驗證正在擷取資料
設定連線之後,資料就會傳送至您的資料表。 您可以執行 KQL 查詢來驗證資料是否已擷取。
執行下列查詢,以驗證資料已擷取至資料表:
<MyTable> | count執行下列查詢以檢視資料:
<MyTable> | take 100