読者です 読者をやめる 読者になる 読者になる

StreamInsightでクエリテンプレートを使ってデータを出力する

注意

この記事は諸般の理由から、悲しい結論になります。「StreamInsightのデータ入出力で複雑な前処理を扱いたい」という方以外は次にアップする記事を参照してください。

クエリテンプレートって何?

StreamInsightでは入力データをOutputAdapterに渡す際にLINQで記述したクエリロジックによるフィルターをかませることができます。この際のクエリロジックのことを「クエリテンプレート」と呼んでいます。

クエリテンプレート登録の流れ

大まかな流れは以下のようになります。

InputAdpterのオブジェクトを生成 -> LINQでフィルター -> OutputAdapterにバインド

1. InputAdapterのオブジェクトを生成

AdapterFactoryDataType は適切に定義してあるとします。
FactoryとConfigを特定のメソッドに渡すことで、InputAdapterのオブジェクト(実際にはCepStreamオブジェクト)を生成することができます。

var inputStream = CepStream<SampleDataType>("streamName",
                                            typeof(SampleAdapterFactory),
                                            new SampleAdapterConfig { hoge = "fuga" },
                                            EventShape.Point);

最後の引数で渡している EventShape.Point はAdapterのイベントタイプと合わせておく必要があります。

2. LINQでフィルター

ここは普通にLINQを記述すればおkです。実際にどのような記述が可能なのかについては こちらのページ を参照してください。

var filteredStream = from s in inputStream
                     where s.name = "ito"
                     select s;

ここでは「名前が "ito" の人のみを抽出する」フィルターを記述しています。

3. OutputAdapterにバインド

filteredStreamをOutputAdapterにバインドすればクエリテンプレートの登録は完了です。

var query = filteredStream.toQuery(app, "queryName", "queryDescription",
                                   typeof(SampleOutputAdapterFactory),
                                   new SampleOutputAdapterConfig { foo = "bar" },
                                   EventShape.Point,
                                   StreamEventOrder.FullyOrdered);

StreamEventOrderにはFullyOrdered(期間イベントの開始時間順に出力)とChainOrdered(期間イベントの終了時間順に出力)がありますが、PointEventを扱っている場合はあまり問題になりません。

で、最後に

Queryを開始することで各Adapterからの入出力が開始されます。

query.Start();

ちなみに

これまで何回かに分けてブログを書いてきたInput/OutputAdapterによるデータフローの制御ですが、 StreamInsight 2.1 では「レガシーモデル」として扱われています。 なんということでしょう…。

これまでは気がつかずにStreamInsight 2.0のドキュメントを見ていたわけです。(白目)

もちろんInput/OutputAdapterを使った方法も複雑な前処理・後処理が必要な場合は有効ですが、次回からはより新しい IEnumerable/IObservableクラスを用いたStreamInsightのストリーム処理を見ていきます。