Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,38 @@ Meri's [notes from first reading the code](https://leaflet.pub/3abc7a5c-0790-4a4

To compile and run locally: `cargo r -- --otel server -D did:web:localhost`. For testing, you can also pass `--unsafe-auth-token token123`, which enables an authentication bypass, so you don't need a JWT.


## Connection throttling for high-risk endpoints

`leaf-server` now applies lightweight per-actor throttling to these socket events:

- `stream/create`
- `stream/subscribe_events`

Actor identity is derived from authenticated DID when available. Anonymous connections (including local/dev sessions that skip JWTs) are grouped under a per-socket fallback actor key.

When running locally with `--unsafe-auth-token`, requests are attributed to the server DID, so throttling is still enforced and observable in logs/traces.

Self-hosters can tune the limits via CLI flags or env vars:

- `--throttle-window-secs` / `THROTTLE_WINDOW_SECS`
- `--stream-create-limit-per-window` / `STREAM_CREATE_LIMIT_PER_WINDOW`
- `--stream-subscribe-limit-per-window` / `STREAM_SUBSCRIBE_LIMIT_PER_WINDOW`
- `--max-active-subscriptions-per-actor` / `MAX_ACTIVE_SUBSCRIPTIONS_PER_ACTOR`

## Dockerfile build

The Dockerfile for `leaf-server` uses statically linked C code that is compiled for x86 architectures, which is also what the server is running. On x86 machines we can run `docker build -t leaf-server:x86_64 .`

For local dev on arm64 machines, we can pass in a build arg: `docker build --build-arg TARGET=aarch64-unknown-linux-musl -t leaf-server:arm64 .`

Then in either case we can run it as usual: `docker run -it --rm -p 5530:5530 -v $(pwd)/data:/data leaf-server`
Then in either case we can run it as usual: `docker run -it --rm -p 5530:5530 -v $(pwd)/data:/data leaf-server`

## Protocol compatibility policy

For this iteration of the wire protocol, compatibility is maintained with additive changes only:

- Existing event names remain unchanged (for example `stream/create`, `stream/info`, and existing socket events).
- Existing required fields keep their current semantics (`streamDid`, `moduleCid`).
- Any newly introduced fields are additive and optional.
- No breaking wire changes are introduced in this iteration.
7 changes: 6 additions & 1 deletion clients/typescript/src/codec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,20 @@ export type ModuleExistsResp = Result<{ moduleExists: boolean }>;

export type StreamCreateArgs = {
moduleCid: CidLink;
clientStamp?: string;
};

export type StreamCreateResp = Result<{ streamDid: Did }>;
export type StreamCreateResp = Result<{
streamDid: Did;
clientStamp?: string;
}>;

export type StreamInfoArgs = {
streamDid: Did;
};
export type StreamInfoResp = Result<{
moduleCid?: CidLinkWrapper;
clientStamp?: string;
}>;

export type StreamUpdateModuleArgs = {
Expand Down
16 changes: 12 additions & 4 deletions clients/typescript/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,19 @@ export class LeafClient {
return resp.Ok.moduleExists;
}

async createStream(moduleCid: string): Promise<{ streamDid: string }> {
async createStream(
moduleCid: string,
clientStamp?: string,
): Promise<{ streamDid: string; clientStamp?: string }> {
const data: Uint8Array = await this.socket.emitWithAck(
"stream/create",
toBinary(
encode({ moduleCid: { $link: moduleCid } } satisfies StreamCreateArgs),
encode(
{
moduleCid: { $link: moduleCid },
clientStamp,
} satisfies StreamCreateArgs,
),
),
);
const resp: StreamCreateResp = decode(fromBinary(data));
Expand All @@ -202,7 +210,7 @@ export class LeafClient {
return resp.Ok;
}

async streamInfo(streamDid: string): Promise<{ moduleCid?: string }> {
async streamInfo(streamDid: string): Promise<{ moduleCid?: string; clientStamp?: string }> {
const data: Uint8Array = await this.socket.emitWithAck(
"stream/info",
toBinary(
Expand All @@ -213,7 +221,7 @@ export class LeafClient {
if ("Err" in resp) {
throw new Error(resp.Err);
}
return { moduleCid: resp.Ok.moduleCid?.$link };
return { moduleCid: resp.Ok.moduleCid?.$link, clientStamp: resp.Ok.clientStamp };
}

async updateModule(
Expand Down
10 changes: 10 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Changelog

## Unreleased

- API (`stream/create`): Added optional `clientStamp` request/response field as advisory client metadata. `streamDid` remains the canonical, authoritative stream identifier and DID creation flow is unchanged.
- API (`stream/info`): Added optional `clientStamp` response field backed by persisted stream metadata.
- Storage (`streams`): Added nullable `client_stamp` column, migration support for existing databases, and stream info retrieval to preserve backward compatibility for legacy rows.
- Client compatibility: Added serialization compatibility tests so legacy request/response shapes continue to decode when new optional fields are present.
- Reliability (`stream/create`, `stream/subscribe_events`): Added per-actor throttling limits, active-subscription caps, and throttle reset/pruning tests to reduce abusive load without changing happy-path behavior.
- TypeScript client: `LeafClient.createStream` now optionally accepts `clientStamp` and returns it when present.
16 changes: 16 additions & 0 deletions leaf-server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,22 @@ pub struct ServerArgs {
#[arg(long, env)]
pub unsafe_auth_token: Option<String>,

/// Window size for per-actor throttling on high-risk socket endpoints.
#[arg(long, env, default_value_t = 30)]
pub throttle_window_secs: u64,

/// Maximum `stream/create` attempts allowed per actor inside one throttle window.
#[arg(long, env, default_value_t = 8)]
pub stream_create_limit_per_window: u32,

/// Maximum `stream/subscribe_events` attempts allowed per actor inside one throttle window.
#[arg(long, env, default_value_t = 20)]
pub stream_subscribe_limit_per_window: u32,

/// Maximum active `stream/subscribe_events` subscriptions per actor.
#[arg(long, env, default_value_t = 64)]
pub max_active_subscriptions_per_actor: u32,

#[clap(flatten)]
pub backup_config: S3BackupConfigArgs,
}
Expand Down
Loading