Azure Event Hubs の使い方 (プログラミング編)

こんにちは。日山(@hiiyan0402)です。
Microsoft Azure のストリーミング処理基盤サービスこと「Event Hubs」の使い方について、
前回に続き、Event Hubs を利用するプログラム(C#)の実装方法について説明します。

プログラムによる Event Hubs の利用

以下の操作を行うプログラムの実装方法について説明します。

  • イベントの送信処理 (Publish)
  • イベントの受信処理 (Subscribe)

イベントの送信処理 (Publish)

パッケージインストール

パッケージマネージャで Microsoft Azure Service Bus をインストールします。

Install-Package WindowsAzure.ServiceBus

コーディング

https://gist.github.com/hiiyan0402/c6adf41584de0c118826.js

コード解説

イベントを送信するためには、イベントハブの名前と、送信権限を持つアクセスポリシーを含む接続文字列が必要です。アクセスポリシーの作成方法および接続文字列の確認方法については、前回の記事をご参照ください。

イベントデータはバイナリ形式で指定する必要があるため、本サンプルでは文字列をUTF8型式のバイナリに変換して指定しています。

上記コードでイベントの送信を行った場合、各パーティションにラウンドロビン型式で投入されます。 特定パーティションにイベントを送信したい場合は、以下のようにします。

https://gist.github.com/hiiyan0402/cd5f76c22234a18860a1.js

イベントの受信処理 (Subscribe)

イベントの受信方法

イベントの受信方法は以下の2通りのやり方が存在しています。

  • 直接コンシューマ方式で受信
  • EventProcessorHostを用いた受信

直接コンシューマ方式で受信する場合、受信負荷に応じたクライアントへの担当パーティションの振り分けや、オフセットを管理する処理を自前で実装する必要があります。これは非常に実装が大変です。

そこで Microsoft よりそれらを行ってくれるライブラリである EventProcessorHost が提供されています。

基本的にイベントの受信処理は EventProcessorHost を用いて行うべきと考えています。そのため今回は EventProcessorHost を用いたイベント受信処理の実装について説明します。直接コンシューマ方式とEventProcessorHostについては、以下のMSDNをご参照ください。

Event Hub プログラミング ガイド
https://msdn.microsoft.com/ja-jp/library/azure/dn789972.aspx

パッケージインストール

パッケージマネージャで Microsoft Azure Service Bus Event Hub – EventProcessorHost をインストールします。

Install-Package Microsoft.Azure.ServiceBus.EventProcessorHost

コーディング

IEventProcessorを実装したEventProcessorHostクラスを実装します。
https://gist.github.com/hiiyan0402/729133053bbfe74342a5.js

実装したEventProcessorHostを起動&停止する処理を実装します。https://gist.github.com/hiiyan0402/636ecee7489bf6fbb426.js

コード解説

イベントを受信するためには、イベントハブの名前と、受信権限を持つアクセスポリシーを含む接続文字列、
またオフセットの管理をするためのストレージアカウントの接続文字列が必要です。

ストレージアカウントの接続文字列の例

DefaultEndpointsProtocol=https;AccountName={ ストレージアカウント名 };AccountKey={ アクセスキー }

コンシューマグループも指定することができます。上記の例では、既定のコンシューマグループ($Default)を指定していますが、任意のグループ名を指定できます。
ただし、既定のグループ以外を使い場合は、あらかじめ以下のようにグループを作成しておく必要があります。
https://gist.github.com/hiiyan0402/df8fd7670027c639f05b.js

EventProcessorHostインスタンスのRegisterEventProcessorAsyncメソッドを呼び出すことで、EventProcessorHostを起動します。ジェネリックには実装したEventProcessorクラスを指定します。

同様に、UnregisterEventProcessorAsyncメソッドを呼び出すと停止します。ちゃんと停止しないと変な感じで接続を掴んだままになるので、必ず停止するようにしましょう。

まとめ

今回は Azure Event Hubs を利用するためのプログラムの実装方法について説明しました。
次回はコンシューマグループやオフセットなどの要素について説明します。

今回も最後までご覧いただきありがとうございました!