世界を動かす技術を、日本語で。

Clojure非同期フローガイド

概要

  • flowライブラリ はアプリケーションロジックとデプロイメントの関心事を厳密に分離
  • step-fn によるロジック提供、プロセスランチャーで実行
  • チャネル通信やライフサイクル管理 はflowが担当
  • プロセス状態やエラー処理 の仕組みを提供
  • flow定義とライフサイクル操作 で柔軟なフロー構築が可能

flowライブラリの概要と特徴

  • flowライブラリ はアプリケーションロジックとトポロジーや通信、実行環境などのデプロイメントの関心事を厳密に分離
  • step-fn (ステップ関数)でロジックを記述し、プロセスランチャーがラップしてループ実行
  • チャネル経由のメッセージ送受信やプロセスのライフサイクル管理、モニタリング、エラー処理 をflow側が自動で実施
  • step-fn はチャネル操作や状態保持を直接行わず、テストや再利用が容易
  • step-fn は4つのarity(describe, init, transition, transform)を持つ

step-fnの詳細

  • describe: ステップ関数のパラメータ、入力、出力の静的記述を返却
    • 例:
      {:params {:size "Max size"}
       :ins {:in "Input channel"}
       :outs {:out "Output channel"}}
      
  • init: flow定義から渡されたargsを受け取り、初期状態(state)を返却
  • transition: ライフサイクル遷移時(start, stop, pause, resume)に呼ばれ、状態を更新
  • transform: 入力チャネルからのメッセージごとにループで呼ばれ、新しい状態と出力メッセージを返却
    • 出力は複数のチャネルやreportチャネル等に送信可能
    • メッセージがnilの場合は出力不可(core.asyncの仕様)
    • 例外は全arityでthrow可能、flowが処理

プロセス状態とチャネル管理

  • プロセス状態(state) はmap形式で管理
  • flow固有のキー
    • ::flow/pid: プロセスID
    • ::flow/in-ports, ::flow/out-ports: 外部チャネルとの接続情報
    • ::flow/input-filter: 入力チャネルのフィルタ
  • チャネルの生成や管理はstep-fnのinitで実施、flow本体はチャネルのライフサイクル管理を行わない

step-fnヘルパー

  • lift->step*: 1引数関数(コレクション返却)からstep-fn生成(:in, :out)
  • lift1->step: 1値返却関数からstep-fn生成(nilの場合は出力なし)
  • map->step: describe, init, transition, transformを持つmapからstep-fn生成

プロセスランチャーの作成

  • process関数 でstep-fnとオプションmapからプロセスランチャーを作成
    • :workload: :mixed, :io, :computeを選択
    • :compute-timeout-ms: :compute時のタイムアウト(デフォルト5000ms)
  • :workloadによってプロセスループやtransformの実行スレッドが変化
    • :io指定時は計算量の多い処理を禁止
    • :compute指定時はtransformごとにスレッド実行、タイムアウト管理
  • 高度な用途では ProcLauncherプロトコル の直接実装も可能

step-fnのリロードと開発手法

  • step-fnをvarで定義し、#'the-fnで参照 することで、REPLでのインタラクティブな再定義が可能

flow定義と作成

  • flow定義 は:procs(pid→proc定義map)と:conns(接続定義)から構成
    • proc定義: :proc(プロセスランチャー)、:args(init引数)、:chan-opts(チャネルオプション)
    • conns: [[from-pid outid] [to-pid inid]]のタプル集合
    • 出力が複数接続されている場合、全接続先にメッセージ配信(core.async/mult相当)
  • 例:
    {:procs {:source-proc {:proc (process #'source-fn) :args {:source-chan in-chan}}
             :sink-proc   {:proc (process #'sink-fn)   :args {:sink-chan out-chan}}}
     :conns [ [[:source-proc :out] [:sink-proc :in]] ]}
    
  • create-flow関数 でflowを生成、戻り値はライフサイクル操作やdatafyによる静的情報取得に利用

flowのライフサイクル操作

  • start: 全プロセス開始、:report-chanと:error-chanを返却
  • stop: 全プロセス停止
  • pause/resume: 全プロセス一時停止・再開
  • pause-proc/resume-proc: 個別プロセスの一時停止・再開
  • ping/ping-proc: 全プロセスまたは指定プロセスの状態確認
  • inject: 任意の[pid cid]チャネルにメッセージ注入
  • :report-chan: ロギングや通知用途
  • :error-chan: 例外やエラーの一元管理

flowモニタリング

  • core.async.flow-monitor を利用したフローモニタリング機能を提供

Hackerたちの意見

Clojureってまだ人気あるのかな?そうであってほしいけど、最近HNではあんまりClojureの話を見ないな。

:はい #{もちろんそうだよ}

言語自体はまだアップデートされてるし、1、2ヶ月前に新しいメジャーリリースも出たばかりだよ。ここ5年くらいはちょっと停滞気味に感じるけど、時々新しい面白いプロジェクトが出てくるのも見るし。今日もClojureの上に実装されたAIとML用の関係プログラミング言語Dyna3についての記事を見たよ。Strange Loopカンファレンスが懐かしいな。あそこでClojureの話題がたくさん生まれたと思う。Clojure Westとか他のイベントもまあまあ良かったけど、Strange Loopのトークの質は本当に素晴らしかった。Clojure特化のカンファレンスではなかったけど、あのエレガンスに焦点を当てた感じはあんまり見ないし、主催者はたしかClojureの王子みたいな人だったと思う。今でもこの言語を楽しんでるし、私のプロジェクトは全部ちゃんとビルドして動いてるよ。プラットフォームに対する一番の不満は3Dグラフィックスかな。これは全体的にJVMの問題だけどね。

Clojureはすごく生き生きしてるし、楽しいと思うよ。私はClojure初心者だけど、すごく楽しく学んでる。アクティブに開発されてる素晴らしいツールがたくさんあるし(babashkaは最近の開発者生活で最高のものだよ!)小中規模のコミュニティは学ぶには最高だよ。コミュニティの大物たちもすぐにSlackでつながれるし、みんなすごく熱心だしね。

Clojureユーザーは以前よりも増えてるし、チームもアクティブで、私の知る限りでは以前よりも大きくなってるよ。物事は成熟してきてるし、5年前に聞いたときほどの盛り上がりはないけどね。

今日、XTDB¹(Clojure中心のバイテンポラルDB)について調べていて、Blazor²みたいなバッテリー付きのWebAssemblyフレームワークを探して同じ質問をしてたんだ。

  1. https://xtdb.com/
  2. https://dotnet.microsoft.com/en-us/apps/aspnet/web-apps/blaz...

まだ使ってるよ。1年前に一番の不満点がやっと解決されたんだ。それは、Javaの関数インターフェースにバニラClojureのラムダが使えなかったこと。だから、そのインターフェースを再定義しなきゃいけなくて、すごく面倒だった。今は、インターフェースに@FunctionalInterface属性がちゃんと付いてれば、問題なく動くよ。全部のプロジェクトが@FunctionalInterfaceを使ってるわけじゃないけど、いくつかの場所に追加しようとしてるんだ。[1] [2] [3] それで、Clojureをもっといろんなところで使えるようになったよ。

  1. https://github.com/LMAX-Exchange/disruptor/pull/492
  2. https://github.com/apache/kafka/pull/19234
  3. https://github.com/apache/kafka/pull/19366

他の人も言ってるけど、Clojureはまだ現役だよ。しばらくClojureから離れてた人には、Babashkaをチェックしてみてほしい!Clojureで書かれたbashスクリプトみたいな感じだよ。めっちゃ楽しいから! https://babashka.org/

Makeはまだ現役なの?そうであってほしいけど、最近HNではあまりMakeの活動を見かけないな。

Clojureって、話すよりも実際に作ることを好む人たちを引き寄せる気がする。マーケティングにはあまり良くないけど、プロジェクトの長寿には期待できそう。リッチがバイオリンに例えたときはちょっと気取ってるなと思ったけど、「バイオリンってまだ人気なの?」って聞く人は少ないよね。

Clojureが退屈な技術なのは驚かないね。ゆっくり進化して安定性(つまり後方互換性)に大きく焦点を当ててるから。ちなみに、ここ数年でコアチームが拡大したし、今夏にはClojureの背後にあるNuBankが初の「Clojure Developer Advocate」を発表した。彼らの役割は「既存のClojureコミュニティをサポートし、アウトリーチや開発を通じてコミュニティを成長させる方法に焦点を当てること」だって。^1 [1]: https://building.nubank.com/clojure-developer-advocate-nuban... 編集: 言い回し。

これを試してみようと思ってたんだ。私の理解では、構造化された並行性を得るための宣言的な方法みたいだね。私はcore.asyncチャネルを使って並行性を管理してるコードベースで働いてるから、エラーハンドリングにはすごく気を使う必要があるんだ。新しいスレッドを立ち上げるときは、自分でエラーを再スローするためのカスタムな仕組みを作らないといけないし、チャネルを閉じたりする必要もある。core.async.flowは、これを宣言的にやる方法のように見える。core.async自体がJVM上のバーチャルスレッドの先駆けだったように、私はcore.async.flowを今後の[0]構造化並行性JEPのClojure版だと思ってる。安定したらその下でそれを使うのか気になるな。core.asyncgoマクロからバーチャルスレッドをディスパッチする方向に移行する予定だし。[0]https://openjdk.org/jeps/453

仕事のプロジェクトでバーチャルスレッドを見てたんだけど、既存のコードをバーチャルスレッドで動かすのが結構難しいことがわかった。例えば、クラスの初期化がスレッドを固定しちゃうから、標準的なJavaのやり方(内部クラスの静的インスタンスを使う)で定義したシングルトンがサスペンドすると、プログラムが永遠にハングしちゃう。非同期処理を色なしで実現するためにすごく頑張ったから、メソッド呼び出しがサスペンドするかどうかもわからない。これはネットワーク呼び出しが原因の場合の既知の問題で、ワークアラウンドもあるけど、特定のアプリケーション(ウェブサーバー)には役立つ。プログラムがハングしてた理由を理解するのにもかなり手間がかかったし、Goの並行性の使いやすさにはまだまだ遠いな(全てのスレッドがバーチャルスレッドで、ハングしたら自動的にパニックになる)。

core.asyncをStructured Concurrencyに構造化するのは現実的でも賢明でもないと思う。Structured Concurrencyはflowと同じような問題に取り組もうとしてるけど、アプローチが違うからね(データフロースタイルの並行性に近い)。

LISPはクールだと思うし、もっと使いたいけど、JVMのツールチェーンやデバッグを学ぶ気は全然ないんだ。Racketもあるけど、Clojureのエコシステムはもう小さいし。

Leiningenとdeps.ednは、Javaのプロジェクト管理の厄介さから少し守ってくれるよ。Node.jsみたいな感じがするけど、依存関係はMaven Centralから取ってくるんだ。デバッグやプロファイリングはまだちょっとJavaベースだし、それはイライラすることもあるけど、慣れちゃうよ。個人的には、価値があると思う。Clojureはすごく快適な言語で、こんなに速いのはおかしいくらいだし、core.asyncはメッセージングにロジックを変換できれば本当に素晴らしい並行性フレームワークだよ。Haskellスタイルのトランザクショナルメモリもあるし、Clojureでは多くの問題が少し楽になるんだ。

それはよくある誤解だね。JVMツールチェーンは、他の言語エコシステムの地獄に比べたらずっと良いよ。例えば、Mavenは信頼性が高くて、堅牢だしね。唯一の解決されていない問題は、2つのライブラリやフレームワークが異なる互換性のないバージョンの依存関係を要求する場合だけど、他のエコシステムもそれを簡単に解決してるとは思えない。デバッグに関しては特に最先端だよ。YourKitや一般的なIDEに含まれているデバッガーを見てみて。どのツールも派手なVisual 2025の美学はないけど、信頼性があって、来年も同じように動くから安心だよ。

Clojureは、CLがダメだったところでLispを教えてくれた。SchemeとClojureはLisp-1として学ぶのに最適だね。スタートアップ時間が速いSBCLに切り替えたよ。今はLisp-2ももっと快適に感じる。

面白いな。実は、非同期呼び出しができないことが気になって、Clojureをプロジェクトから外そうかと思ってたんだ。JavaScriptやC#の非同期に慣れすぎちゃって、シンプルに非同期呼び出しを構造化する方法がない環境で作業したくないんだよね。必ずしもasync/awaitである必要はないけど、無視するんじゃなくて、もう少しその問題に目を向けてほしいな。

ClojureのためのAsync/await: https://github.com/xadecimal/async-style

それはClojureの大きな強みの一つ(非同期&並行処理)だよね。新しいJVMのバーチャルスレッドのおかげで、状況はこれまで以上に良くなってる。core.asyncがバーチャルスレッドに移行するのはまだ進行中みたいだけど、最小限の調整で90%の利点はすでに得られてるよ、今の最新バージョンでもね。

Clojureでは、ほぼすべての非同期コーディングのバリエーションをアラカルトで手に入れられるよ。

その結論に至った理由が気になるな。確か、並行性を管理しやすくするのがClojureの主要な目的の一つとしてずっと言われてたと思う。デザインの基本でもあるし、全てのコアデータ構造が不変であることが大きな動機だったりする。STMやアトム、エージェントは最初からあったし、1.1でフューチャーやプロミスが追加されたんだよね。core.asyncは2013年からだし、promesaやmanifoldみたいな人気のサードパーティライブラリももう10年近く前からある。flowは特に複雑なアプリケーションでcore.asyncを扱いやすくすることを約束してるけど、基本的なプライミティブは新しいわけじゃないし、JavaScriptより使いにくいとも思わないな。

これ、クールだね。基本的にはスレッドのグループを作って、それを一つのユニットとして扱えるってことだよね。TrioはPythonの非同期関数で似たようなこと(構造化された並行性)をやってるよ。これはOSスレッドなの、それともグリーンスレッドなの?

OSスレッドでもグリーンスレッドでも、選べるよ。

私にとって、core.asyncの本当の「クリック」瞬間は、コールバックを置き換えることだけじゃなかったんだ。ビジネスプロセス全体を通信チャネルのシステムとしてモデル化できるって気づいたとき、システム設計に対する考え方が完全に変わったよ。

この考え方についてもっと学ぶためのリソースを見つけたことある?

エリクサーのアクターみたいな感じ?

これは本当に興味深いけど、core.async.flowのトポロジーが不変だってことがまだ理解できてないんだ。ほとんどの問題は固定トポロジーでは解決できない気がする。理論的には、値が入れ替わるようにフローを入れ替えることもできると思うけど、このライブラリはそういう使い方を想定してるのかな?それに、この場合、非空のチャネルバッファはどうなるんだろう。ほかの意見も聞きたいな。

Flowは長期間安定したトポロジーを持つプロセス向けに設計されてる。リッチは実行中のトポロジーを「パッチ」する方法を考えてるけど、並行性の問題があってかなり難しい。これが追加されるかどうかはわからないな。フロートポロジーが固定されていても、フローコンポーネントが他の可変リソースを使って単なるコーディネーターとして機能するのは全然問題ない。例えば、外部の動的スレッドプールにデータを送信して、チャネルを通じてコールバックを受け取るプロセスを持つこともできる。

Clojure(Script)は成熟した、良い意味で退屈な言語とエコシステムだね。新しいサブカルチャーや方言、実装が生まれたし、ClojureCLR、Babashka、nbb、Jank、Janet、Fennel、Joker、Basilisp、Hy、Clojerlなど、他にもいくつか忘れたものがあるかも。これらはこのリストにあるかもしれないね: https://github.com/chr15m/awesome-clojure-likes 私たちはそれを使ってクライアント向けのアプリを作ってるし、もうすぐ自社製品もClojureとClojureScriptで作る予定だよ。

最近、c.a.flowを使ってS3からファイルを読み込んでデータを少し加工してデータベースに書き込むプログラムを作ったんだけど、コードがすごくわかりやすくなった。挑戦だったのは、- バックプレッシャーをうまく設定すること。入力チャネルのバッファを正しく調整する必要があって、考えるのに少し時間がかかったし、私の場合は本番環境でいくつか失敗もした。- flow monitor(フローのデバッグ用UI)が大きなプロセス状態を扱えないから、私のプログラムにはほとんど役に立たなかった。- フロー全体を理解すること(特定のプロセスは孤立して理解しやすいけど)。例えば、「このプロセスがしばらくメッセージを受け取ってないのはなぜ?」って質問に答えるのが難しかった。

フィードバックありがとう!フローモニターは今、プロセスの状態にフィルターをサポートしてるよ(それに関しては、フロー自体にもすぐに追加される予定)。モニターを使えたなら、チャンネルのバッファ状態が表示されるけど、それだけじゃ値が流れない理由を推測するには不十分だったかな?