概要
- flowライブラリ はアプリケーションロジックとデプロイメントの関心事を厳密に分離
- step-fn によるロジック提供、プロセスランチャーで実行
- チャネル通信やライフサイクル管理 はflowが担当
- プロセス状態やエラー処理 の仕組みを提供
- flow定義とライフサイクル操作 で柔軟なフロー構築が可能
flowライブラリの概要と特徴
- flowライブラリ はアプリケーションロジックとトポロジーや通信、実行環境などのデプロイメントの関心事を厳密に分離
- step-fn (ステップ関数)でロジックを記述し、プロセスランチャーがラップしてループ実行
- チャネル経由のメッセージ送受信やプロセスのライフサイクル管理、モニタリング、エラー処理 をflow側が自動で実施
- step-fn はチャネル操作や状態保持を直接行わず、テストや再利用が容易
- step-fn は4つのarity(describe, init, transition, transform)を持つ
step-fnの詳細
- describe: ステップ関数のパラメータ、入力、出力の静的記述を返却
- 例:
{:params {:size "Max size"} :ins {:in "Input channel"} :outs {:out "Output channel"}}
- 例:
- init: flow定義から渡されたargsを受け取り、初期状態(state)を返却
- transition: ライフサイクル遷移時(start, stop, pause, resume)に呼ばれ、状態を更新
- transform: 入力チャネルからのメッセージごとにループで呼ばれ、新しい状態と出力メッセージを返却
- 出力は複数のチャネルやreportチャネル等に送信可能
- メッセージがnilの場合は出力不可(core.asyncの仕様)
- 例外は全arityでthrow可能、flowが処理
プロセス状態とチャネル管理
- プロセス状態(state) はmap形式で管理
- flow固有のキー
- ::flow/pid: プロセスID
- ::flow/in-ports, ::flow/out-ports: 外部チャネルとの接続情報
- ::flow/input-filter: 入力チャネルのフィルタ
- チャネルの生成や管理はstep-fnのinitで実施、flow本体はチャネルのライフサイクル管理を行わない
step-fnヘルパー
- lift->step*: 1引数関数(コレクション返却)からstep-fn生成(:in, :out)
- lift1->step: 1値返却関数からstep-fn生成(nilの場合は出力なし)
- map->step: describe, init, transition, transformを持つmapからstep-fn生成
プロセスランチャーの作成
- process関数 でstep-fnとオプションmapからプロセスランチャーを作成
- :workload: :mixed, :io, :computeを選択
- :compute-timeout-ms: :compute時のタイムアウト(デフォルト5000ms)
- :workloadによってプロセスループやtransformの実行スレッドが変化
- :io指定時は計算量の多い処理を禁止
- :compute指定時はtransformごとにスレッド実行、タイムアウト管理
- 高度な用途では ProcLauncherプロトコル の直接実装も可能
step-fnのリロードと開発手法
- step-fnをvarで定義し、#'the-fnで参照 することで、REPLでのインタラクティブな再定義が可能
flow定義と作成
- flow定義 は:procs(pid→proc定義map)と:conns(接続定義)から構成
- proc定義: :proc(プロセスランチャー)、:args(init引数)、:chan-opts(チャネルオプション)
- conns: [[from-pid outid] [to-pid inid]]のタプル集合
- 出力が複数接続されている場合、全接続先にメッセージ配信(core.async/mult相当)
- 例:
{:procs {:source-proc {:proc (process #'source-fn) :args {:source-chan in-chan}} :sink-proc {:proc (process #'sink-fn) :args {:sink-chan out-chan}}} :conns [ [[:source-proc :out] [:sink-proc :in]] ]} - create-flow関数 でflowを生成、戻り値はライフサイクル操作やdatafyによる静的情報取得に利用
flowのライフサイクル操作
- start: 全プロセス開始、:report-chanと:error-chanを返却
- stop: 全プロセス停止
- pause/resume: 全プロセス一時停止・再開
- pause-proc/resume-proc: 個別プロセスの一時停止・再開
- ping/ping-proc: 全プロセスまたは指定プロセスの状態確認
- inject: 任意の[pid cid]チャネルにメッセージ注入
- :report-chan: ロギングや通知用途
- :error-chan: 例外やエラーの一元管理
flowモニタリング
- core.async.flow-monitor を利用したフローモニタリング機能を提供