読者です 読者をやめる 読者になる 読者になる

StreamInsightでOutputAdapterを作ってみる

streaminsight

さて、前回の記事ではStreamInsightの入力アダプター(InputAdapter)を実装することでアダプターの構成や開発の流れを確認しました。

今回は出力アダプター(OutputAdapter)を見ていきます。

OutputAdapterの構成

  • OutputAdapter Class
  • OutputAdapterFactory Class
  • OutputAdapterConfig Class

基本的にInputAdapterの構成と同じです。OutputAdapter, OutputAdapterFactoryは型の有り/無し、回復性サポートの有る/無し、イベントの種類に応じて適切な基本クラスを継承して作成することになります。

OutputAdapterFactoryクラス

以下の様な命名規約にしたがって定義されている基本クラスを継承します。

I[回復性サポート有り/無し][型指定の有り/無し]OutputAdapterFactory

例えば回復性サポートありの型指定有りなOutputAdapterFactoryを作成する場合は、以下のようになります。

public class SampleOutputAdapterFactory : IHighWarterMarkTypedOutputAdapterFactory<SampleOutputAdapterConfig> {}

OutputAdapterFactoryはInputAdapterFactoryと同じように Create, Dispose メソッドを実装する必要があります。 Create メソッドではイベントの種類に応じて適切なOutputAdapterを生成し、 Dispose メソッドでは生成したAdapterの停止処理を行います。

OutputAdapterクラス

以下の様な命名規約にしたがって定義されている基本クラスを継承します。

[型指定の有り/無し][イベントの種類]Adapter

例えば型指定有りのエッジイベントを扱うOutputAdapterを作成する場合は、以下のようになります。

public class SampleOutputAdapter<T> : TypedEdgeOutputAdapter<T> {}

Start, Resume, Disposeなどのメソッドを実装することでアダプターが開始した時、再開するとき、アダプターを削除するときの処理などを定義することができます。 StartResume メソッドの中では Dequeue メソッドを呼び出すことで、InputAdapterによってEnqueueされたPayloadを取り出すことができます。

[Enqueue|Dequeue]OperationResult

そうそう、この前までわかっていなかったのですが、Adapterクラスの Queue, Dequeue はそれぞれ EnqueueOperationResult, DequeueOperationResultオブジェクトを返します。

これらはEnqueue, Dequeueの結果を表しているわけですが、DequeueではAdapterが Suspended, Stoppingの場合に DequeueOperationResult.Empty を返しますので、この値をチェックすることで不要になったAdapterを停止したり、後処理的を実行したりすることができるようです。なにげに便利ですね。

アダプターの状態管理についてはおいおい勉強していくこととして、次回はInputAdapterから入力されたストリームをOutputAdapterに渡すための、LINQによるクエリテンプレートを見ていきます。

そういえば

「作ってみる」と言いながらOutputAdapterを一行も作っていませんでした。自分でデバッグするときは出力が簡単にわかるといいので、Sampleに含まれているConsolePointOutputクラスを流用しています。

実はStreamInsightにはストリームを記録・再生したり、ブレークポイントを設置してデバッグできるようなツールが付属しているようなので、そちらも今後試していきます。