Azure 数据资源管理器是一个完全托管的高性能大数据分析平台,可便于近实时分析大量数据。
ADX 可帮助用户分析来自流式处理应用程序、网站、IoT 设备等大量数据。将 Apache Flink 与 ADX 集成可帮助你处理实时数据并在 ADX 中分析这些数据。
先决条件
- AKS 上的 HDInsight 上创建 Apache Flink 群集
- 创建 Azure 数据浏览器
在 Flink 中将 Azure 数据资源管理器用作接收器的步骤
根据需要创建包含数据库 和表的 ADX。
在 Kusto 中添加托管身份的引入器权限。
.add database <DATABASE_NAME> ingestors ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY')运行一个示例程序,定义 Kusto 群集 URI(统一资源标识符)、数据库、托管标识以及需要写入的表。
克隆 flink-connector-kusto 项目:https://github.com/Azure/flink-connector-kusto.git
使用以下命令在 ADX 中创建表
.create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime)使用正确的 Kusto 群集 URI、数据库和使用的托管标识更新FlinkKustoSinkSample.java文件。
String database = "sdktests"; //ADX database name String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000)稍后使用“mvn clean package”来构建项目。
在“sample-java/target”文件夹下找到名为“samples-java-1.0-SNAPSHOT-shaded.jar”的 JAR 文件,然后在 Flink UI 中上传此 JAR 文件并提交作业。
查询 Kusto 表以验证输出
将数据从 Flink 写入 Kusto 表不会延迟。
参考
- Apache Flink 网站
- Apache、Apache Flink、Flink 和关联的开源项目名称是 Apache Software Foundation(ASF)的商标。