Skip to content

Feature: trigger() に coalesce: 'queue' を追加してほしい #184

@coji

Description

@coji

概要

`trigger()` / `batchTrigger()` の `coalesce` オプションに `'queue'` を追加してほしい。同一 `concurrencyKey` の run が走行中の場合、後続 trigger を drop せず queue して、前の run 完了後に実行する セマンティクス。

現在は `coalesce?: 'skip'` のみで、後続は coalesced(drop)されてしまう。

ユースケース:crawl → process パイプラインの取りこぼし防止

私のプロジェクト upflow は GitHub から PR をフェッチして集計するアプリで、典型的な fan-out → barrier 構造になっている:

```
[crawl job] (raw データを fetch して store) → [process job] (raw を解析して集計テーブル更新)
```

現状の設計

  • crawl は per-org concurrency key (`crawl:${orgId}`) で serialize、定期実行(毎時)
  • crawl の最後で process を trigger する。process は `process:${orgId}` + `coalesce: 'skip'`

これは「crawl が連続で走っても process は 1 回で十分」という意図で skip にしている。

困っているシナリオ

ユーザがリポジトリを追加した直後に初回 crawl を自動 trigger する機能 (coji/upflow#274, coji/upflow#284) を入れた。設計上 2 つの選択肢がある:

(A) per-repo concurrency key で trigger (`crawl:${orgId}:${repoId}`)

  • ✅ 追加 repo の初回 crawl は確実に走る
  • ❌ 定期 crawl と並走しうる → 両方の crawl が完了時に process を trigger するが、後勝ちが skip され drop される
  • 結果: 一方の crawl が fetch した PR が次サイクルまで未処理(最大 1 時間遅延)

(B) org-wide concurrency key で trigger (`crawl:${orgId}` + `coalesce: 'skip'`)

  • ✅ 二重 crawl が起きない
  • ❌ 定期 crawl 走行中の repo 追加は trigger 自体が drop され、初回 crawl が走らない
  • 結果: ユーザが追加した repo のデータが最大 1 時間出ない(機能の目的そのものが達成されない)

どちらも narrow window だが症状は出る。データロスではなく遅延だが、UX は悪い。

`coalesce: 'queue'` があると解決する

  • crawl 側を per-repo key のままにし、process trigger を `coalesce: 'queue'` にする
  • 並走した crawl がそれぞれ完了時に process を trigger しても、前の process が走っていれば後続が queue されて必ず実行される
  • どちらの crawl が fetch した PR も漏れなく process される
  • 「fan-out(並列に走る生産者)→ barrier(順次に走る消費者)」という汎用パターンに自然にハマる

期待するセマンティクス

```ts
await durably.jobs.process.trigger(
{ organizationId: orgId },
{
concurrencyKey: `process:${orgId}`,
coalesce: 'queue', // ← new
},
)
```

  • 'skip' (現状): 同 key で pending/running があれば新規 trigger を coalesce(drop、existing run の disposition を 'coalesced' で返す)
  • 'queue' (新規): 同 key で pending/running があれば新規 run を queue 状態で作成し、前の run 完了後に lease されるようにする
  • queue 中の run に対してさらに同 key の trigger が来た場合の挙動: `'skip'` と同じく queue 中の run に coalesce(つまり queue は最大 1 個まで)にすると無限に積み上がらず使いやすい。idempotency と組み合わせやすい

別案として `coalesce: { strategy: 'queue', maxQueued: 1 }` のような形で設定可能にしてもいい。

補足

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions