Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions bubus-rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "bubus-rust"
version = "0.1.0"
edition = "2021"

[dependencies]
serde = { version = "1", features = ["derive"] }
serde_json = "1"
uuid = { version = "1", features = ["v4", "v5", "v7", "serde"] }
futures = { version = "0.3", features = ["executor", "thread-pool"] }
event-listener = "5"
parking_lot = "0.12"

[dev-dependencies]
40 changes: 40 additions & 0 deletions bubus-rust/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# bubus-rust

Idiomatic Rust implementation of `bubus`, matching the Python/TypeScript event JSON surface and execution semantics as closely as possible.

## Current scope

Implemented core features:
- Base event model and event result model with serde JSON compatibility
- Async event bus with queueing and queue-jump behavior
- Event concurrency: `global-serial`, `bus-serial`, `parallel`
- Handler concurrency: `serial`, `parallel`
- Handler completion strategies: `all`, `first`
- Event path tracking and pending bus count

Not yet implemented in this crate revision:
- Bridges
- Middlewares (hook points are left in code comments)

## Quickstart

```rust
use bubus_rust::{base_event, event_bus};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Quickstart example won't compile: event_bus::new(...) and base_event::new(...) are not free functions. new is an associated function on EventBus and BaseEvent respectively. The imports and constructor calls need to reference the struct types directly.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-rust/README.md, line 22:

<comment>Quickstart example won't compile: `event_bus::new(...)` and `base_event::new(...)` are not free functions. `new` is an associated function on `EventBus` and `BaseEvent` respectively. The imports and constructor calls need to reference the struct types directly.</comment>

<file context>
@@ -0,0 +1,40 @@
+## Quickstart
+
+```rust
+use bubus_rust::{base_event, event_bus};
+use futures::executor::block_on;
+use serde_json::{Map, json};
</file context>

Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai bot Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Quickstart example won't compile: event_bus::new(...) and base_event::new(...) are not free functions. new is an associated function on EventBus and BaseEvent respectively. The imports and constructor calls need to reference the struct types directly.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-rust/README.md, line 22:

<comment>Quickstart example won't compile: `event_bus::new(...)` and `base_event::new(...)` are not free functions. `new` is an associated function on `EventBus` and `BaseEvent` respectively. The imports and constructor calls need to reference the struct types directly.</comment>

<file context>
@@ -0,0 +1,40 @@
+## Quickstart
+
+```rust
+use bubus_rust::{base_event, event_bus};
+use futures::executor::block_on;
+use serde_json::{Map, json};
</file context>
Fix with Cubic

use futures::executor::block_on;
use serde_json::{Map, json};

let bus = event_bus::new(Some("MainBus".to_string()));
bus.on("UserLoginEvent", "handle_login", |event| async move {
Ok(json!({"ok": true, "event_id": event.inner.lock().event_id}))
});

let mut payload = Map::new();
payload.insert("username".to_string(), json!("alice"));
let event = base_event::new("UserLoginEvent", payload);
bus.emit(event.clone());

block_on(async {
event.wait_completed().await;
println!("{}", event.to_json_value());
});
```
126 changes: 126 additions & 0 deletions bubus-rust/src/base_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use std::{collections::HashMap, sync::Arc};

use event_listener::Event;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};

use crate::{
event_result::EventResult,
id::uuid_v7_string,
types::{
EventConcurrencyMode, EventHandlerCompletionMode, EventHandlerConcurrencyMode, EventStatus,
},
};

pub fn now_iso() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let dur = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default();
format!("{}.{:09}Z", dur.as_secs(), dur.subsec_nanos())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Bug: now_iso() does not produce ISO 8601 timestamps. It formats raw epoch seconds and nanoseconds (e.g. "1708444800.123456789Z"), not a proper ISO 8601 datetime (e.g. "2024-02-20T12:00:00.123456789Z"). This breaks cross-language compatibility with the Python/TS implementations which use real ISO datetime strings.

Consider using the chrono crate (or time crate) — e.g. chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Nanos, true).

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-rust/src/base_event.rs, line 21:

<comment>Bug: `now_iso()` does not produce ISO 8601 timestamps. It formats raw epoch seconds and nanoseconds (e.g. `"1708444800.123456789Z"`), not a proper ISO 8601 datetime (e.g. `"2024-02-20T12:00:00.123456789Z"`). This breaks cross-language compatibility with the Python/TS implementations which use real ISO datetime strings.

Consider using the `chrono` crate (or `time` crate) — e.g. `chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)`.</comment>

<file context>
@@ -0,0 +1,126 @@
+    let dur = SystemTime::now()
+        .duration_since(UNIX_EPOCH)
+        .unwrap_or_default();
+    format!("{}.{:09}Z", dur.as_secs(), dur.subsec_nanos())
+}
+
</file context>

Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai bot Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Bug: now_iso() does not produce ISO 8601 timestamps. It formats raw epoch seconds and nanoseconds (e.g. "1708444800.123456789Z"), not a proper ISO 8601 datetime (e.g. "2024-02-20T12:00:00.123456789Z"). This breaks cross-language compatibility with the Python/TS implementations which use real ISO datetime strings.

Consider using the chrono crate (or time crate) — e.g. chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Nanos, true).

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-rust/src/base_event.rs, line 21:

<comment>Bug: `now_iso()` does not produce ISO 8601 timestamps. It formats raw epoch seconds and nanoseconds (e.g. `"1708444800.123456789Z"`), not a proper ISO 8601 datetime (e.g. `"2024-02-20T12:00:00.123456789Z"`). This breaks cross-language compatibility with the Python/TS implementations which use real ISO datetime strings.

Consider using the `chrono` crate (or `time` crate) — e.g. `chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)`.</comment>

<file context>
@@ -0,0 +1,126 @@
+    let dur = SystemTime::now()
+        .duration_since(UNIX_EPOCH)
+        .unwrap_or_default();
+    format!("{}.{:09}Z", dur.as_secs(), dur.subsec_nanos())
+}
+
</file context>
Fix with Cubic

}

#[derive(Clone, Serialize, Deserialize)]
pub struct BaseEventData {
pub event_type: String,
pub event_version: String,
pub event_timeout: Option<f64>,
pub event_slow_timeout: Option<f64>,
pub event_concurrency: Option<EventConcurrencyMode>,
pub event_handler_timeout: Option<f64>,
pub event_handler_slow_timeout: Option<f64>,
pub event_handler_concurrency: Option<EventHandlerConcurrencyMode>,
pub event_handler_completion: Option<EventHandlerCompletionMode>,
pub event_result_type: Option<Value>,
pub event_id: String,
pub event_path: Vec<String>,
pub event_parent_id: Option<String>,
pub event_emitted_by_handler_id: Option<String>,
pub event_pending_bus_count: usize,
pub event_created_at: String,
pub event_status: EventStatus,
pub event_started_at: Option<String>,
pub event_completed_at: Option<String>,
pub event_results: HashMap<String, EventResult>,
#[serde(flatten)]
pub payload: Map<String, Value>,
}

pub struct BaseEvent {
pub inner: Mutex<BaseEventData>,
pub completed: Event,
}

impl BaseEvent {
pub fn new(event_type: impl Into<String>, payload: Map<String, Value>) -> Arc<Self> {
Arc::new(Self {
inner: Mutex::new(BaseEventData {
event_type: event_type.into(),
event_version: "0.0.1".to_string(),
event_timeout: None,
event_slow_timeout: None,
event_concurrency: None,
event_handler_timeout: None,
event_handler_slow_timeout: None,
event_handler_concurrency: None,
event_handler_completion: None,
event_result_type: None,
event_id: uuid_v7_string(),
event_path: vec![],
event_parent_id: None,
event_emitted_by_handler_id: None,
event_pending_bus_count: 0,
event_created_at: now_iso(),
event_status: EventStatus::Pending,
event_started_at: None,
event_completed_at: None,
event_results: HashMap::new(),
payload,
}),
completed: Event::new(),
})
}

pub async fn wait_completed(self: &Arc<Self>) {
loop {
let listener = self.completed.listen();
{
let event = self.inner.lock();
if event.event_status == EventStatus::Completed {
return;
}
}
listener.await;
}
}

pub fn mark_started(&self) {
let mut event = self.inner.lock();
if event.event_started_at.is_none() {
event.event_started_at = Some(now_iso());
}
event.event_status = EventStatus::Started;
}

pub fn mark_completed(&self) {
let mut event = self.inner.lock();
event.event_status = EventStatus::Completed;
if event.event_completed_at.is_none() {
event.event_completed_at = Some(now_iso());
}
self.completed.notify(usize::MAX);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Custom agent: Make sure concurrency options work correctly and consistently

mark_completed() calls self.completed.notify(usize::MAX) while still holding the parking_lot::Mutex guard. All woken tasks in wait_completed() will immediately contend on this lock, causing unnecessary contention in high-throughput scenarios. Drop the guard before notifying to avoid the thundering-herd contention window.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-rust/src/base_event.rs, line 112:

<comment>`mark_completed()` calls `self.completed.notify(usize::MAX)` while still holding the `parking_lot::Mutex` guard. All woken tasks in `wait_completed()` will immediately contend on this lock, causing unnecessary contention in high-throughput scenarios. Drop the guard before notifying to avoid the thundering-herd contention window.</comment>

<file context>
@@ -0,0 +1,126 @@
+        if event.event_completed_at.is_none() {
+            event.event_completed_at = Some(now_iso());
+        }
+        self.completed.notify(usize::MAX);
+    }
+
</file context>

Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai bot Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Custom agent: Make sure concurrency options work correctly and consistently

mark_completed() calls self.completed.notify(usize::MAX) while still holding the parking_lot::Mutex guard. All woken tasks in wait_completed() will immediately contend on this lock, causing unnecessary contention in high-throughput scenarios. Drop the guard before notifying to avoid the thundering-herd contention window.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-rust/src/base_event.rs, line 112:

<comment>`mark_completed()` calls `self.completed.notify(usize::MAX)` while still holding the `parking_lot::Mutex` guard. All woken tasks in `wait_completed()` will immediately contend on this lock, causing unnecessary contention in high-throughput scenarios. Drop the guard before notifying to avoid the thundering-herd contention window.</comment>

<file context>
@@ -0,0 +1,126 @@
+        if event.event_completed_at.is_none() {
+            event.event_completed_at = Some(now_iso());
+        }
+        self.completed.notify(usize::MAX);
+    }
+
</file context>
Fix with Cubic

}

pub fn to_json_value(&self) -> Value {
serde_json::to_value(&*self.inner.lock()).unwrap_or(Value::Null)
}

pub fn from_json_value(value: Value) -> Arc<Self> {
let parsed: BaseEventData = serde_json::from_value(value).expect("invalid base_event json");
Arc::new(Self {
inner: Mutex::new(parsed),
completed: Event::new(),
})
}
}
Loading
Loading