From 2a78cf0263da8505225c2277a6d9974ede653406 Mon Sep 17 00:00:00 2001 From: ABCxFF <79597906+abcxff@users.noreply.github.com> Date: Tue, 19 May 2026 20:13:44 +0000 Subject: [PATCH] feat(inspector): insert and clear actor queue --- .../packages/rivetkit-core/src/actor/queue.rs | 51 +++++++++++++++++++ .../rivetkit-core/src/registry/inspector.rs | 24 +++++++++ .../rivetkit-core/src/registry/mod.rs | 7 +++ .../packages/rivetkit-napi/index.d.ts | 1 + .../packages/rivetkit-napi/src/queue.rs | 5 ++ .../rivetkit/src/registry/napi-runtime.ts | 4 ++ .../packages/rivetkit/src/registry/native.ts | 21 ++++++++ .../packages/rivetkit/src/registry/runtime.ts | 1 + .../rivetkit/src/registry/wasm-runtime.ts | 5 ++ 9 files changed, 119 insertions(+) diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs index a6a4533318..a533a0548b 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs @@ -535,6 +535,57 @@ impl ActorContext { self.config().max_queue_size } + /// Removes all messages from the queue and resets the size counter. + pub async fn reset(&self) -> Result<()> { + self.ensure_initialized().await?; + + // Hold the metadata lock across the entire reset. enqueue_message holds + // this same lock across its KV writes, so holding it here serializes + // concurrent enqueues behind the reset. Without it, a message written + // between the list and the delete escapes deletion yet has its metadata + // overwritten by the reset, orphaning the message. + let mut metadata = self.0.queue_metadata.lock().await; + + // List and delete all message keys. + let entries = self.list_message_entries().await?; + if !entries.is_empty() { + let keys: Vec> = entries.iter().map(|(k, _)| k.clone()).collect(); + let key_refs: Vec<&[u8]> = keys.iter().map(Vec::as_slice).collect(); + self.0 + .kv + .batch_delete(&key_refs) + .await + .context("delete all queue messages")?; + } + + // Empty the queue but preserve next_id so post-reset messages never + // reuse an id that a pre-reset completion handle still holds. Reusing an + // id would let a stale completion delete or resolve a new message. + metadata.size = 0; + let encoded_metadata = + encode_queue_metadata(&metadata).context("encode reset queue metadata")?; + self.0 + .kv + .put(&QUEUE_METADATA_KEY, &encoded_metadata) + .await + .context("persist reset queue metadata")?; + + // Drop all completion waiters. + self.0.queue_completion_waiters.clear_async().await; + + drop(metadata); + + // Update metrics, notify the inspector, and wake any parked consumers so + // they re-evaluate the now-empty queue. Without the wake, a consumer + // parked in next()/next_batch() with no timeout would hang until the + // next enqueue. This mirrors the notify in the enqueue path. + self.0.metrics.set_queue_depth(0); + self.notify_inspector_update(0); + self.0.queue_notify.notify_waiters(); + + Ok(()) + } + pub(crate) fn configure_queue(&self, config: ActorConfig) { *self.0.queue_config.lock() = config; } diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/inspector.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/inspector.rs index ad36660a91..a6b48186fc 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/inspector.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/inspector.rs @@ -173,6 +173,30 @@ impl RegistryDispatcher { }; json_http_response(StatusCode::OK, &payload) } + (http::Method::DELETE, "/inspector/queue") => { + match instance.ctx.queue().reset().await { + Ok(_) => json_http_response(StatusCode::OK, &json!({})), + Err(error) => Err(error).context("reset inspector queue"), + } + } + (http::Method::POST, "/inspector/queue") => { + let body: InspectorEnqueueBody = match parse_json_body(request) { + Ok(body) => body, + Err(response) => return Ok(Some(response)), + }; + let cbor_body = encode_json_as_cbor(&body.body.unwrap_or(serde_json::Value::Null))?; + match instance.ctx.queue().send(&body.name, &cbor_body).await { + Ok(message) => json_http_response( + StatusCode::OK, + &json!({ + "id": message.id.to_string(), + "name": message.name, + "createdAtMs": message.created_at, + }), + ), + Err(error) => Err(error).context("enqueue inspector queue message"), + } + } (http::Method::GET, "/inspector/workflow-history") => self .inspector_workflow_history(instance) .await diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs index a9c23bf95f..c2223c5237 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs @@ -271,6 +271,13 @@ struct InspectorWorkflowReplayBody { entry_id: Option, } +#[derive(Debug, Default, Deserialize)] +#[serde(default)] +struct InspectorEnqueueBody { + name: String, + body: Option, +} + #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] struct InspectorQueueMessageJson { diff --git a/rivetkit-typescript/packages/rivetkit-napi/index.d.ts b/rivetkit-typescript/packages/rivetkit-napi/index.d.ts index 6d99414903..7c1358c12a 100644 --- a/rivetkit-typescript/packages/rivetkit-napi/index.d.ts +++ b/rivetkit-typescript/packages/rivetkit-napi/index.d.ts @@ -328,6 +328,7 @@ export declare class Queue { tryNext(options?: JsQueueTryNextOptions | undefined | null): QueueMessage | null tryNextBatch(options?: JsQueueTryNextBatchOptions | undefined | null): Array maxSize(): number + reset(): Promise inspectMessages(): Promise> } export declare class QueueMessage { diff --git a/rivetkit-typescript/packages/rivetkit-napi/src/queue.rs b/rivetkit-typescript/packages/rivetkit-napi/src/queue.rs index f75927f0a7..2d53c83e9f 100644 --- a/rivetkit-typescript/packages/rivetkit-napi/src/queue.rs +++ b/rivetkit-typescript/packages/rivetkit-napi/src/queue.rs @@ -216,6 +216,11 @@ impl Queue { self.inner.max_size() } + #[napi] + pub async fn reset(&self) -> napi::Result<()> { + self.inner.reset().await.map_err(napi_anyhow_error) + } + #[napi] pub async fn inspect_messages(&self) -> napi::Result> { self.inner diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts b/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts index 50bb1dec0b..cf6bb594cd 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts @@ -745,6 +745,10 @@ export class NapiCoreRuntime implements CoreRuntime { return await asNativeActorContext(ctx).queue().inspectMessages(); } + async actorQueueReset(ctx: ActorContextHandle): Promise { + await asNativeActorContext(ctx).queue().reset(); + } + actorScheduleAfter( ctx: ActorContextHandle, durationMs: number, diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/native.ts b/rivetkit-typescript/packages/rivetkit/src/registry/native.ts index 3c94653463..9482fcd616 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/native.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/native.ts @@ -3666,6 +3666,27 @@ export function buildNativeFactory( messages, }); } + if ( + url.pathname === "/inspector/queue" && + jsRequest.method === "DELETE" + ) { + await runtime.actorQueueReset(ctx); + return jsonResponse({}); + } + if ( + url.pathname === "/inspector/queue" && + jsRequest.method === "POST" + ) { + const body = await jsRequest.json() as { name?: string; body?: unknown }; + const name = body.name ?? ""; + const cbor = encodeCborCompat((body.body ?? null) as JsonCompatValue); + const message = await runtime.actorQueueSend(ctx, name, cbor); + return jsonResponse({ + id: message.id().toString(), + name: message.name(), + createdAtMs: message.createdAt(), + }); + } if ( url.pathname === "/inspector/traces" && jsRequest.method === "GET" diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts b/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts index b8dca80bb8..c08ee6592d 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts @@ -542,6 +542,7 @@ export interface CoreRuntime { actorQueueInspectMessages( ctx: ActorContextHandle, ): Promise; + actorQueueReset(ctx: ActorContextHandle): Promise; actorScheduleAfter( ctx: ActorContextHandle, diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts b/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts index 2c829495db..beaa498ebf 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts @@ -840,6 +840,11 @@ export class WasmCoreRuntime implements CoreRuntime { return await callHandleAsync(queue, "inspectMessages"); } + async actorQueueReset(ctx: ActorContextHandle): Promise { + const queue = childHandle(asWasmActorContext(ctx), "queue"); + await callHandleAsync(queue, "reset"); + } + actorScheduleAfter( ctx: ActorContextHandle, durationMs: number | bigint,