obzenflow-topology is an opinionated, WASM-friendly crate for representing and validating flow/pipeline graphs. It was extracted from ObzenFlow so the native backend and the browser UI can share the exact same graph rules.
- Shared contract: same
Topology+TopologyErroron server and UI. - Validation at three levels: structural, semantic, and reachability.
- Cycle-aware: supports multi-stage feedback loops and retry/backflow edges (self-cycles rejected).
- Fast queries: cached upstream/downstream adjacency lists.
- Deterministic: stable fingerprinting for caching and UI diffing.
- Type-safe IDs: phantom-typed ULID
StageIdviaobzenflow-idkit(no RNG required by default).
ObzenFlow is full-stack Rust: a native backend (obzenflow) plus a WASM UI (obzenflow-studio). Both sides need to answer the same questions about a flow graph:
- “Is this wiring valid?”
- “What feeds into this stage?”
- “Is this stage part of a feedback loop?”
- “Can every stage reach a sink?”
Rather than duplicating logic across languages/targets, this crate compiles into both and becomes the single source of truth.
obzenflow (native) <---- JSON (stages/edges) ----> obzenflow-studio (wasm)
\ /
\----------- obzenflow-topology ---------/
Topology::new(...) performs full validation up front. For UI/editor workflows you can opt into structural-only construction and validate later.
Validation is split into:
- Structural: edge endpoints exist, no duplicates, no self-cycles, single connected component.
- Semantic: enforces legal connections based on
StageType→StageRoleandEdgeKind(|>vs<|). - Reachability: requires at least one Producer and one Consumer, and every stage is on some producer → consumer path.
Once built, you get cheap queries:
upstream_stages/downstream_stagesedges()and stage metadata lookupis_in_cycle(SCC-based)scc_id(stage_id)andscc_members(scc_id)for SCC partition queriesmetrics()andtopology_fingerprint()
The crate includes simple port/shape types (PortId, Shape) that UIs can use as building blocks when turning graph structure into visuals.
Non-goal: This crate does not execute pipelines; it’s a value type + validation/query layer.
Serde support is included by default for round-tripping stage/edge data through JSON:
[dependencies]
obzenflow-topology = "0.3"The same dependency works for wasm32-unknown-unknown (no RNG required).
use obzenflow_topology::{DirectedEdge, EdgeKind, StageId, StageInfo, StageType, Topology};
let source: StageId = "01ARZ3NDEKTSV4RRFFQ69G5FAV".parse().unwrap();
let transform: StageId = "01ARZ3NDEKTSV4RRFFQ69G5FAW".parse().unwrap();
let sink: StageId = "01ARZ3NDEKTSV4RRFFQ69G5FAX".parse().unwrap();
let stages = vec![
StageInfo::new(source, "source", StageType::FiniteSource),
StageInfo::new(transform, "transform", StageType::Transform),
StageInfo::new(sink, "sink", StageType::Sink),
];
let edges = vec![
DirectedEdge::new(source, transform, EdgeKind::Forward),
DirectedEdge::new(transform, sink, EdgeKind::Forward),
];
// Full validation happens here (structural + semantic + reachability).
let topology = Topology::new(stages, edges).unwrap();
// Queries are cheap (adjacency lists are cached).
let upstream_of_sink = topology.upstream_stages(sink);
let in_cycle = topology.is_in_cycle(transform);
let fingerprint = topology.topology_fingerprint();
let metrics = topology.metrics();If you’re building an interactive editor, you often want to accept “draft” graphs and validate on demand:
use obzenflow_topology::{Topology, ValidationLevel};
// `stages`/`edges` as in the Quick start example.
let draft = Topology::new_unvalidated(stages, edges).unwrap();
// Validate later (semantic only, or full).
draft.validate_with_level(ValidationLevel::Semantic).unwrap();ValidationLevel:
Structural: endpoints, duplicates, self-cycles, disconnected componentsSemantic: structural +(StageRole, EdgeKind)connection rulesFull: semantic + reachability invariants (sources/sinks, producer→sink paths)
ObzenFlow allows multi-stage cycles for feedback loops and retry patterns. Cycles are represented explicitly with EdgeKind::Backward (<|) edges; self-cycles are rejected.
Semantic rules are intentionally restrictive (high level):
- Forward (
|>): Producer/Processor → Processor/Consumer - Backward (
<|): Consumer/Processor → Processor
Use topology.is_in_cycle(stage) when you need to render or reason about feedback loops. For finer-grained cycle awareness, scc_id(stage) returns the SCC a stage belongs to, and scc_members(scc_id) returns the full member set. SCC identifiers are ULID-based and derived deterministically from the minimum StageId in each component, so they are stable across constructions of the same topology.
Topology IDs are ULIDs, wrapped in a phantom type for safety:
StageIdisobzenflow_idkit::Id<Stage>(a phantom-typedulid::Ulid).- This crate depends on
obzenflow-idkitwithout itsgenfeature, so it does not require an RNG. - In practice, IDs usually come from your domain layer (backend) or from parsing API payloads (UI).
If your application wants to generate IDs, do it in the app crate:
[dependencies]
obzenflow-idkit = { version = "0.2", features = ["gen", "serde"] }
getrandom = { version = "0.2", features = ["js"] } # browser wasm onlyThis crate is intentionally “boring” and predictable:
- Stores stages in a
HashMap<StageId, StageInfo>and edges in aVec<DirectedEdge>. - Builds cached adjacency lists (
HashMap<StageId, HashSet<StageId>>) for both downstream and upstream traversal. - Uses Tarjan SCC to compute cycle membership and SCC partition identity in
O(V + E). Each SCC'sSccIdis derived from the minimumStageIdin its member set. - Structural validation is a single pass over edges plus a connectivity check (
O(V + E)). - Full validation adds reachability checks to ensure every stage is on a producer → consumer path.
topology_fingerprint()sorts IDs/edges by raw ULID bytes to produce a stableu64across runs/targets.
Keep tests deterministic by synthesizing StageIds from a counter:
use std::sync::atomic::{AtomicU64, Ordering};
use obzenflow_topology::StageId;
static CTR: AtomicU64 = AtomicU64::new(0);
fn next_stage_id() -> StageId {
let n = CTR.fetch_add(1, Ordering::Relaxed);
let mut bytes = [0u8; 16];
bytes[8..].copy_from_slice(&n.to_be_bytes());
StageId::from_bytes(bytes)
}cargo test- Changelog:
CHANGELOG.md - Contributing:
CONTRIBUTING.md
- Code of Conduct:
CODE_OF_CONDUCT.md - Security:
SECURITY.md - Trademarks:
TRADEMARKS.md
Dual-licensed under MIT OR Apache-2.0.