StreamInsightでクエリテンプレートを使ってデータを出力する
注意
この記事は諸般の理由から、悲しい結論になります。「StreamInsightのデータ入出力で複雑な前処理を扱いたい」という方以外は次にアップする記事を参照してください。
クエリテンプレートって何?
StreamInsightでは入力データをOutputAdapterに渡す際にLINQで記述したクエリロジックによるフィルターをかませることができます。この際のクエリロジックのことを「クエリテンプレート」と呼んでいます。
クエリテンプレート登録の流れ
大まかな流れは以下のようになります。
InputAdpterのオブジェクトを生成 -> LINQでフィルター -> OutputAdapterにバインド
1. InputAdapterのオブジェクトを生成
AdapterFactory
や DataType
は適切に定義してあるとします。
FactoryとConfigを特定のメソッドに渡すことで、InputAdapterのオブジェクト(実際にはCepStreamオブジェクト)を生成することができます。
var inputStream = CepStream<SampleDataType>("streamName", typeof(SampleAdapterFactory), new SampleAdapterConfig { hoge = "fuga" }, EventShape.Point);
最後の引数で渡している EventShape.Point
はAdapterのイベントタイプと合わせておく必要があります。
2. LINQでフィルター
ここは普通にLINQを記述すればおkです。実際にどのような記述が可能なのかについては こちらのページ を参照してください。
var filteredStream = from s in inputStream where s.name = "ito" select s;
ここでは「名前が "ito" の人のみを抽出する」フィルターを記述しています。
3. OutputAdapterにバインド
filteredStreamをOutputAdapterにバインドすればクエリテンプレートの登録は完了です。
var query = filteredStream.toQuery(app, "queryName", "queryDescription", typeof(SampleOutputAdapterFactory), new SampleOutputAdapterConfig { foo = "bar" }, EventShape.Point, StreamEventOrder.FullyOrdered);
StreamEventOrderにはFullyOrdered(期間イベントの開始時間順に出力)とChainOrdered(期間イベントの終了時間順に出力)がありますが、PointEventを扱っている場合はあまり問題になりません。
で、最後に
Queryを開始することで各Adapterからの入出力が開始されます。
query.Start();
ちなみに
これまで何回かに分けてブログを書いてきたInput/OutputAdapterによるデータフローの制御ですが、 StreamInsight 2.1 では「レガシーモデル」として扱われています。 なんということでしょう…。
これまでは気がつかずにStreamInsight 2.0のドキュメントを見ていたわけです。(白目)
もちろんInput/OutputAdapterを使った方法も複雑な前処理・後処理が必要な場合は有効ですが、次回からはより新しい IEnumerable/IObservableクラスを用いたStreamInsightのストリーム処理を見ていきます。