本教學課程說明如何將 Apache Flink 連線至事件中樞,而不需要變更通訊協定用戶端或執行您自己的叢集。 如需有關事件中樞對於 Apache Kafka 取用者通訊協定支援的詳細資訊,請參閱適用於 Apache Kafka 的事件中樞。
在本教學課程中,您會了解如何:
- 建立事件中樞命名空間
- 複製範例專案
- 執行 Flink 生產者
- 執行 Flink 取用者
注意
您可在 GitHub 上取得此範例
必要條件
若要完成本教學課程,請確定您具有下列必要條件:
- 請參閱適用於 Apache Kafka 的事件中樞一文。
- Azure 訂用帳戶。 如果您沒有帳戶,請在開始之前建立 免費帳戶 。
-
Java 開發套件 (JDK) 1.7+
- 在 Ubuntu 上,執行
apt-get install default-jdk來安裝 JDK。 - 務必設定 JAVA_HOME 環境變數,以指向 JDK 安裝所在的資料夾。
- 在 Ubuntu 上,執行
-
下載並安裝 Maven 二進位封存檔
- 在 Ubuntu 上,您可以執行
apt-get install maven來安裝 Maven。
- 在 Ubuntu 上,您可以執行
-
Git
- 在 Ubuntu 上,您可以執行
sudo apt-get install git來安裝 Git。
- 在 Ubuntu 上,您可以執行
建立事件中樞命名空間
您需要事件中樞命名空間,才能從任何事件中樞服務傳送或接收。 請參閱建立事件中樞,以取得建立命名空間和事件中樞的指示。 請務必複製事件中樞連接字串以供稍後使用。
複製範例專案
既然您已經有事件中樞連接字串,請複製適用於 Kafka 的 Azure 事件中樞存放庫,並瀏覽至 flink 子資料夾:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/flink
執行 Flink 生產者
使用提供的 Flink 生產者範例,傳送訊息到事件中樞服務。
提供事件中樞 Kafka 端點
producer.config
更新 bootstrap.servers 中 sasl.jaas.config 和 producer/src/main/resources/producer.config 的值,以使用正確的驗證將生產者導向至事件中樞 Kafka 端點。
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
client.id=FlinkExampleProducer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="$ConnectionString" \
password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
重要
將 {YOUR.EVENTHUBS.CONNECTION.STRING} 取代為事件中樞命名空間的連接字串。 如需有關取得連接字串的指示,請參閱取得事件中樞連接字串。 以下是範例組態:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
從命令列執行生產者
若要從指令行執行生產者,請產生 JAR,然後從 Maven 內執行 (或使用 Maven 產生 JAR,然後透過將一或多個必要的 Kafka JAR 新增至類別路徑,在 Java 中執行):
mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestProducer"
生產者現在會開始將事件傳送到事件中樞 (位於主題 test),並將事件印出至 stdout。
執行 Flink 取用者
使用提供的取用者範例,接收來自事件中樞的訊息。
提供事件中樞 Kafka 端點
consumer.config
更新 bootstrap.servers 中 sasl.jaas.config 和 consumer/src/main/resources/consumer.config 的值,以使用正確的驗證將取用者導向至事件中樞 Kafka 端點。
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=FlinkExampleConsumer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="$ConnectionString" \
password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
重要
將 {YOUR.EVENTHUBS.CONNECTION.STRING} 取代為事件中樞命名空間的連接字串。 如需有關取得連接字串的指示,請參閱取得事件中樞連接字串。 以下是範例組態:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
從命令列執行取用者
若要從命令列執行消費者,請先產生 JAR 檔,然後在 Maven 中執行(或者使用 Maven 產生 JAR 檔後,將一或多個必要的 Kafka JAR 檔新增至類別路徑,在 Java 中執行):
mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestConsumer"
如果事件中樞有事件 (例如您的生產者也正在執行),則取用者現在會開始接收來自主題 test 的事件。
如需有關將 Flink 連線至 Kafka 的詳細資訊,請參閱 Flink 的 Kafka 連接器指南 \(英文\)。
下一步
若要深入了解適用於 Kafka 的事件中樞,請參閱下列文章: