Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,57 @@ impl ActorContext {
self.config().max_queue_size
}

/// Removes all messages from the queue and resets the size counter.
pub async fn reset(&self) -> Result<()> {
self.ensure_initialized().await?;

// Hold the metadata lock across the entire reset. enqueue_message holds
// this same lock across its KV writes, so holding it here serializes
// concurrent enqueues behind the reset. Without it, a message written
// between the list and the delete escapes deletion yet has its metadata
// overwritten by the reset, orphaning the message.
let mut metadata = self.0.queue_metadata.lock().await;

// List and delete all message keys.
let entries = self.list_message_entries().await?;
if !entries.is_empty() {
let keys: Vec<Vec<u8>> = entries.iter().map(|(k, _)| k.clone()).collect();
let key_refs: Vec<&[u8]> = keys.iter().map(Vec::as_slice).collect();
self.0
.kv
.batch_delete(&key_refs)
.await
.context("delete all queue messages")?;
}

// Empty the queue but preserve next_id so post-reset messages never
// reuse an id that a pre-reset completion handle still holds. Reusing an
// id would let a stale completion delete or resolve a new message.
metadata.size = 0;
let encoded_metadata =
encode_queue_metadata(&metadata).context("encode reset queue metadata")?;
self.0
.kv
.put(&QUEUE_METADATA_KEY, &encoded_metadata)
.await
.context("persist reset queue metadata")?;

// Drop all completion waiters.
self.0.queue_completion_waiters.clear_async().await;

drop(metadata);

// Update metrics, notify the inspector, and wake any parked consumers so
// they re-evaluate the now-empty queue. Without the wake, a consumer
// parked in next()/next_batch() with no timeout would hang until the
// next enqueue. This mirrors the notify in the enqueue path.
self.0.metrics.set_queue_depth(0);
self.notify_inspector_update(0);
self.0.queue_notify.notify_waiters();

Ok(())
}

pub(crate) fn configure_queue(&self, config: ActorConfig) {
*self.0.queue_config.lock() = config;
}
Expand Down
24 changes: 24 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/src/registry/inspector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,30 @@ impl RegistryDispatcher {
};
json_http_response(StatusCode::OK, &payload)
}
(http::Method::DELETE, "/inspector/queue") => {
match instance.ctx.queue().reset().await {
Ok(_) => json_http_response(StatusCode::OK, &json!({})),
Err(error) => Err(error).context("reset inspector queue"),
}
}
(http::Method::POST, "/inspector/queue") => {
let body: InspectorEnqueueBody = match parse_json_body(request) {
Ok(body) => body,
Err(response) => return Ok(Some(response)),
};
let cbor_body = encode_json_as_cbor(&body.body.unwrap_or(serde_json::Value::Null))?;
match instance.ctx.queue().send(&body.name, &cbor_body).await {
Ok(message) => json_http_response(
StatusCode::OK,
&json!({
"id": message.id.to_string(),
"name": message.name,
"createdAtMs": message.created_at,
}),
),
Err(error) => Err(error).context("enqueue inspector queue message"),
}
}
(http::Method::GET, "/inspector/workflow-history") => self
.inspector_workflow_history(instance)
.await
Expand Down
7 changes: 7 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,13 @@ struct InspectorWorkflowReplayBody {
entry_id: Option<String>,
}

#[derive(Debug, Default, Deserialize)]
#[serde(default)]
struct InspectorEnqueueBody {
name: String,
body: Option<JsonValue>,
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct InspectorQueueMessageJson {
Expand Down
1 change: 1 addition & 0 deletions rivetkit-typescript/packages/rivetkit-napi/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ export declare class Queue {
tryNext(options?: JsQueueTryNextOptions | undefined | null): QueueMessage | null
tryNextBatch(options?: JsQueueTryNextBatchOptions | undefined | null): Array<QueueMessage>
maxSize(): number
reset(): Promise<void>
inspectMessages(): Promise<Array<JsQueueInspectMessage>>
}
export declare class QueueMessage {
Expand Down
5 changes: 5 additions & 0 deletions rivetkit-typescript/packages/rivetkit-napi/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@ impl Queue {
self.inner.max_size()
}

#[napi]
pub async fn reset(&self) -> napi::Result<()> {
self.inner.reset().await.map_err(napi_anyhow_error)
}

#[napi]
pub async fn inspect_messages(&self) -> napi::Result<Vec<JsQueueInspectMessage>> {
self.inner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,10 @@ export class NapiCoreRuntime implements CoreRuntime {
return await asNativeActorContext(ctx).queue().inspectMessages();
}

async actorQueueReset(ctx: ActorContextHandle): Promise<void> {
await asNativeActorContext(ctx).queue().reset();
}

actorScheduleAfter(
ctx: ActorContextHandle,
durationMs: number,
Expand Down
21 changes: 21 additions & 0 deletions rivetkit-typescript/packages/rivetkit/src/registry/native.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { VirtualWebSocket } from "@rivetkit/virtual-websocket";

Check failure on line 1 in rivetkit-typescript/packages/rivetkit/src/registry/native.ts

View workflow job for this annotation

GitHub Actions / RivetKit / Quality Check

format

Formatter would have printed the following content:
import {
ACTOR_CONTEXT_INTERNAL_SYMBOL,
CONN_STATE_MANAGER_SYMBOL,
Expand Down Expand Up @@ -3666,6 +3666,27 @@
messages,
});
}
if (
url.pathname === "/inspector/queue" &&
jsRequest.method === "DELETE"
) {
await runtime.actorQueueReset(ctx);
return jsonResponse({});
}
if (
url.pathname === "/inspector/queue" &&
jsRequest.method === "POST"
) {
const body = await jsRequest.json() as { name?: string; body?: unknown };
const name = body.name ?? "";
const cbor = encodeCborCompat((body.body ?? null) as JsonCompatValue);
const message = await runtime.actorQueueSend(ctx, name, cbor);
return jsonResponse({
id: message.id().toString(),
name: message.name(),
createdAtMs: message.createdAt(),
});
}
if (
url.pathname === "/inspector/traces" &&
jsRequest.method === "GET"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ export interface CoreRuntime {
actorQueueInspectMessages(
ctx: ActorContextHandle,
): Promise<RuntimeQueueInspectMessage[]>;
actorQueueReset(ctx: ActorContextHandle): Promise<void>;

actorScheduleAfter(
ctx: ActorContextHandle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,11 @@ export class WasmCoreRuntime implements CoreRuntime {
return await callHandleAsync(queue, "inspectMessages");
}

async actorQueueReset(ctx: ActorContextHandle): Promise<void> {
const queue = childHandle(asWasmActorContext(ctx), "queue");
await callHandleAsync(queue, "reset");
}

actorScheduleAfter(
ctx: ActorContextHandle,
durationMs: number | bigint,
Expand Down
Loading