処理アプリケーションをスケーリングする

完了

イベント処理アプリケーションをスケーリングするには、アプリケーションの複数のインスタンスを実行し、負荷を分散させることができます。 以前のバージョンでは、 EventProcessorHost を使用すると、受信時にプログラムの複数のインスタンスとチェックポイント イベント間の負荷を分散することができました。 新しいバージョン (5.0 以降) では、 EventProcessorClient (.NET と Java)、または EventHubConsumerClient (Python と JavaScript) でも同じことができます。

Event Hubs をスケーリングするための鍵となるのは、パーティション分割されたコンシューマーのアイデアです。 競合コンシューマー パターンとは異なり、パーティション分割されたコンシューマー パターンでは、競合のボトルネックを取り除き、エンド ツー エンドの並列処理を容易にすることで、高いスケールを実現できます。

サンプル シナリオ

シナリオの例として、10 万件の家を監視するホーム セキュリティ企業を考えてみましょう。 この会社では、各家庭に設置された動体検知器、ドアや窓の開閉センサー、ガラス破損検知器などのさまざまなセンサーから常にデータを取得しています。 この会社では、住民がほぼリアルタイムで自宅の様子を監視できる Web サイトを開設しています。

各センサーにより、データがイベント ハブにプッシュされます。 イベント ハブは、16 個のパーティションで構成されます。 使用側には、これらのイベントを読み取り、統合し、集計をストレージ BLOB にダンプし、ユーザーフレンドリな Web ページに投影できるメカニズムが必要です。

分散環境でコンシューマーを設計する場合、シナリオでは次の要件を処理する必要があります。

  • 規模: 複数のコンシューマーを作成し、各コンシューマーがいくつかの Event Hubs パーティションからの読み取りの所有権を取得します。
  • 負荷分散: コンシューマーを動的に増減します。 たとえば、新しいセンサーの種類 (たとえば、一酸化炭素検知器) が各家庭に追加されると、イベントの数が増加します。 その場合は、オペレーター (人間) がコンシューマー インスタンスの数を増やします。 すると、コンシューマーのプールにより、それ自体が所有するパーティションの数を再調整して、新しく追加されたコンシューマーと負荷を共有することができます。
  • 障害発生時のシームレスな再開: コンシューマー (コンシューマー A) が失敗した場合 (たとえば、コンシューマーをホストしている仮想マシンが突然クラッシュした場合)、他のコンシューマーは コンシューマー A が所有するパーティションを取得して続行できます。 また、 チェックポイント または オフセットと呼ばれる継続ポイントは、 コンシューマー A が失敗した正確な時点、またはその少し前にある必要があります。
  • イベントを使用する: 前の 3 つのポイントではコンシューマーの管理を扱いましたが、イベントを使用して役に立つ処理を行うコードが必要です。 たとえば、イベントを集計し、BLOB ストレージにアップロードするなどです。

イベント プロセッサまたはコンシューマー クライアント

これらの要件を満たすために独自のソリューションを構築する必要はありません。 この機能は、Azure Event Hubs SDK によって提供されます。 .NET または Java SDK では、イベント プロセッサ クライアント (EventProcessorClient) を使用し、Python と JavaScript SDK で EventHubConsumerClient を使用します。

ほとんどの運用シナリオでは、イベントの読み取りと処理にイベント プロセッサ クライアントを使用することをお勧めします。 イベント プロセッサ クライアントは、特定のイベント ハブ用にコンシューマー グループのコンテキスト内で協調的に動作できます。 クライアントは、インスタンスがそのグループに対して使用可能または使用不可能になると、自動的に作業の配布と分散を管理します。

パーティションの所有権の追跡

通常、イベント プロセッサ インスタンスは、1 つまたは複数のパーティションからのイベントを所有および処理します。 パーティションの所有権は、イベント ハブとコンシューマー グループの組み合わせに関連付けられているすべてのアクティブなイベント プロセッサ インスタンス間で均等に分散されます。

各イベント プロセッサには一意識別子が与えられ、チェックポイント ストアのエントリを追加または更新することで、パーティションの所有権を要求します。 すべてのイベント プロセッサ インスタンスは、このストアと定期的に通信して、自身の処理状態を更新するとともに、他のアクティブなインスタンスについて確認します。 このデータは、アクティブなプロセッサ間で負荷を分散するために使用されます。

メッセージを受信する

イベント プロセッサを作成するときは、イベントとエラーを処理する関数を指定します。 イベントを処理する関数を呼び出すたびに、特定のパーティションから 1 つのイベントが配信されます。 このイベントの処理はユーザーが行う必要があります。 コンシューマーによってすべてのメッセージが 1 回以上処理されることを確認する場合は、再試行ロジックを含む独自のコードを作成する必要があります。 ただし、有害メッセージについて注意してください。

これは迅速に済ませることをお勧めします。 つまり、できる限り最小限の処理に留めます。 ストレージへの書き込みとルーティングを行う必要がある場合、2 つのコンシューマー グループを使用して 2 つのイベント プロセッサを所有することをお勧めします。

チェックポイント処理

チェックポイント処理 は、イベント プロセッサがパーティション内で最後に正常に処理されたイベントの位置をマークまたはコミットするプロセスです。 通常、チェックポイントのマーク付けはイベントを処理する関数内で実行され、コンシューマー グループ内のパーティションごとに発生します。

イベント プロセッサがパーティションから切断されると、別のインスタンスが、そのコンシューマー グループ内のそのパーティションの最後のプロセッサによって以前にコミットされたチェックポイントからパーティションの処理を再開できます。 プロセッサは接続の際に、このオフセットをイベント ハブに渡して、読み取りを開始する場所を指定します。 このように、チェックポイント処理を使用することで、ダウンストリーム アプリケーションごとにイベントに "完了" のマークを付けると共に、イベント プロセッサがダウンしたときに回復性をもたらすことができます。 このチェックポイント処理から低いオフセットを指定することで、古いデータに戻る可能性があります。

スレッドの安全性とプロセッサのインスタンス

既定では、イベントを処理する関数は、特定のパーティションに対して順番に呼び出されます。 後続のイベントと同じパーティションからのこの関数に対する呼び出しは、メッセージ ポンプが他のスレッドのバックグラウンドで引き続き実行されるため、バックグラウンドでキューに配置されます。 異なるパーティションからのイベントを同時に処理でき、パーティション間でアクセスされるすべての共有状態を同期する必要があることに注意してください。