Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
d83f8da
feat(hyperswarm): add negentropy sync with sqlite store
Bushstar Feb 13, 2026
1a2f8fa
enable fallback to legacy on no capabilities
Bushstar Feb 13, 2026
d2a6492
remove sync concurrency options
Bushstar Feb 13, 2026
43934ae
only rebuild on change
Bushstar Feb 13, 2026
e761f4e
rebuild in background
Bushstar Feb 13, 2026
777a329
add windowed sync
Bushstar Feb 13, 2026
b7cf454
Merge branch 'main' into bush/hypr-rbsr
Bushstar Feb 13, 2026
626881a
chore: update generated OpenAPI docs
actions-user Feb 13, 2026
c0609c8
suppress lint warnings
Bushstar Feb 13, 2026
16cbace
lint: single return statement
Bushstar Feb 13, 2026
3ed6df8
improve coverage
Bushstar Feb 13, 2026
5a05aa8
resolve lint errors
Bushstar Feb 13, 2026
755e8a3
more coverage
Bushstar Feb 13, 2026
fbcaa03
add private swarm env var
Bushstar Feb 19, 2026
297266f
fix macos start script compatibility
Bushstar Feb 19, 2026
033b31b
add new env vars to docker
Bushstar Feb 19, 2026
c23d1b1
omit acceptedHashes from logging
Bushstar Feb 19, 2026
929a828
only sync when sync mode is known
Bushstar Feb 19, 2026
0a95944
update test env script
Bushstar Feb 20, 2026
f6db9fa
Merge branch 'main' into bush/hypr-rbsr
Bushstar Feb 20, 2026
fbba814
add frame default
Bushstar Feb 20, 2026
4e43fee
remove hyperdht env var
Bushstar Feb 20, 2026
ca702fd
add negentrophy toggle
Bushstar Feb 23, 2026
c8161c4
store time in seconds to match negentrophy
Bushstar Feb 23, 2026
50f5165
wait for shutdown to complete
Bushstar Feb 23, 2026
a5670e3
replace unix epoch with mdip epoch
Bushstar Feb 23, 2026
1f7bb67
clamp older dates to the mdip epoch
Bushstar Feb 23, 2026
301577f
add hypr config tests
Bushstar Feb 23, 2026
e7b09e1
full sync on connect
Bushstar Feb 24, 2026
271a538
resync from gatekeeper when drift over one percent
Bushstar Feb 24, 2026
003bf81
fix lint error
Bushstar Feb 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions doc/gatekeeper-api.json
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,13 @@
"type": "integer",
"description": "Number of events that failed validation (bad signature, size limit, etc.)."
},
"rejectedIndices": {
"type": "array",
"description": "Zero-based indexes of rejected events in the original submitted batch order (for importDIDs this is `dids.flat()` order).",
"items": {
"type": "integer"
}
},
"total": {
"type": "integer",
"description": "Total number of events in the queue after this import."
Expand Down Expand Up @@ -1439,6 +1446,13 @@
"type": "integer",
"description": "Number of events that failed validation."
},
"rejectedIndices": {
"type": "array",
"description": "Zero-based indexes of rejected events in the original submitted batch order.",
"items": {
"type": "integer"
}
},
"total": {
"type": "integer",
"description": "The total event queue size after this import."
Expand Down Expand Up @@ -1786,6 +1800,13 @@
"pending": {
"type": "integer",
"description": "Number of events still left in the queue after processing."
},
"acceptedHashes": {
"type": "array",
"description": "Lower-case signature hashes of events accepted during this processing run (added or merged).",
"items": {
"type": "string"
}
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,16 @@ services:
- KC_NODE_NAME=${KC_NODE_NAME}
- KC_MDIP_PROTOCOL=${KC_MDIP_PROTOCOL}
- KC_HYPR_EXPORT_INTERVAL=${KC_HYPR_EXPORT_INTERVAL}
- KC_HYPR_NEGENTROPY_ENABLE=${KC_HYPR_NEGENTROPY_ENABLE}
- KC_HYPR_NEGENTROPY_FRAME_SIZE_LIMIT=${KC_HYPR_NEGENTROPY_FRAME_SIZE_LIMIT}
- KC_HYPR_NEGENTROPY_WINDOW_DAYS=${KC_HYPR_NEGENTROPY_WINDOW_DAYS}
- KC_HYPR_NEGENTROPY_MAX_RECORDS_PER_WINDOW=${KC_HYPR_NEGENTROPY_MAX_RECORDS_PER_WINDOW}
- KC_HYPR_NEGENTROPY_MAX_ROUNDS_PER_SESSION=${KC_HYPR_NEGENTROPY_MAX_ROUNDS_PER_SESSION}
- KC_HYPR_NEGENTROPY_INTERVAL=${KC_HYPR_NEGENTROPY_INTERVAL}
- KC_HYPR_LEGACY_SYNC_ENABLE=${KC_HYPR_LEGACY_SYNC_ENABLE}
- KC_LOG_LEVEL=${KC_LOG_LEVEL}
volumes:
- ./data:/app/hyperswarm/data
user: "${KC_UID}:${KC_GID}"
depends_on:
- gatekeeper
Expand Down
6 changes: 6 additions & 0 deletions jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const config = {
'^pino$': '<rootDir>/tests/common/pino.mock.ts',
'^\\.\\/typeGuards\\.js$': '<rootDir>/packages/keymaster/src/db/typeGuards.ts',
'^\\.\\/db\\/typeGuards\\.js$': '<rootDir>/packages/keymaster/src/db/typeGuards.ts',
'^\\.\\/sync-mapping\\.js$': '<rootDir>/services/mediators/hyperswarm/src/sync-mapping.ts',
'^\\.\\/sync-persistence\\.js$': '<rootDir>/services/mediators/hyperswarm/src/sync-persistence.ts',
'^\\.\\/abstract-json\\.js$': '<rootDir>/packages/gatekeeper/src/db/abstract-json.ts',
'^\\.\\/cipher-base\\.js$': '<rootDir>/packages/cipher/src/cipher-base.ts',
'^\\.\\/encryption\\.js$': '<rootDir>/packages/keymaster/src/encryption.ts',
Expand All @@ -40,6 +42,10 @@ const config = {
"/node_modules/",
"/kc-app/",
"/client/"
],
coveragePathIgnorePatterns: [
"/node_modules/",
"/services/mediators/hyperswarm/src/negentropy/Negentropy\\.cjs$",
]
};

Expand Down
30 changes: 24 additions & 6 deletions packages/gatekeeper/src/gatekeeper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,7 @@ export default class Gatekeeper implements GatekeeperInterface {
let added = 0;
let merged = 0;
let rejected = 0;
const acceptedHashes: string[] = [];

this.eventsQueue = [];

Expand All @@ -1036,10 +1037,16 @@ export default class Gatekeeper implements GatekeeperInterface {

if (status === ImportStatus.ADDED) {
added += 1;
if (event.operation.signature?.hash) {
acceptedHashes.push(event.operation.signature.hash.toLowerCase());
}
this.log.debug(`import ${i}/${total}: added event for ${event.did}`);
}
else if (status === ImportStatus.MERGED) {
merged += 1;
if (event.operation.signature?.hash) {
acceptedHashes.push(event.operation.signature.hash.toLowerCase());
}
this.log.debug(`import ${i}/${total}: merged event for ${event.did}`);
}
else if (status === ImportStatus.REJECTED) {
Expand All @@ -1054,7 +1061,7 @@ export default class Gatekeeper implements GatekeeperInterface {
event = tempQueue.shift();
}

return { added, merged, rejected };
return { added, merged, rejected, acceptedHashes };
}

async processEvents(): Promise<ProcessEventsResult> {
Expand All @@ -1066,6 +1073,7 @@ export default class Gatekeeper implements GatekeeperInterface {
let merged = 0;
let rejected = 0;
let done = false;
const acceptedHashes = new Set<string>();

try {
this.isProcessingEvents = true;
Expand All @@ -1076,6 +1084,9 @@ export default class Gatekeeper implements GatekeeperInterface {
added += response.added;
merged += response.merged;
rejected += response.rejected;
for (const hash of response.acceptedHashes) {
acceptedHashes.add(hash);
}

done = (response.added === 0 && response.merged === 0);
}
Expand All @@ -1089,7 +1100,13 @@ export default class Gatekeeper implements GatekeeperInterface {
}

const pending = this.eventsQueue.length;
const response = { added, merged, rejected, pending };
const response = {
added,
merged,
rejected,
pending,
acceptedHashes: Array.from(acceptedHashes),
};

this.log.debug(`processEvents: ${JSON.stringify(response)}`);

Expand Down Expand Up @@ -1181,8 +1198,8 @@ export default class Gatekeeper implements GatekeeperInterface {
}

let queued = 0;
let rejected = 0;
let processed = 0;
const rejectedIndices: number[] = [];

for (let i = 0; i < batch.length; i++) {
const event = batch[i];
Expand All @@ -1200,15 +1217,16 @@ export default class Gatekeeper implements GatekeeperInterface {
}
}
else {
rejected += 1;
rejectedIndices.push(i);
}
}

return {
queued,
processed,
rejected,
total: this.eventsQueue.length
rejected: rejectedIndices.length,
total: this.eventsQueue.length,
rejectedIndices,
};
}

Expand Down
3 changes: 3 additions & 0 deletions packages/gatekeeper/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export interface ImportBatchResult {
processed: number;
rejected: number;
total: number;
rejectedIndices: number[];
}

export interface ProcessEventsResult {
Expand All @@ -21,6 +22,7 @@ export interface ProcessEventsResult {
merged?: number;
rejected?: number;
pending?: number;
acceptedHashes?: string[];
}

export interface VerifyDbResult {
Expand Down Expand Up @@ -109,6 +111,7 @@ export interface ImportEventsResult {
added: number;
merged: number;
rejected: number;
acceptedHashes: string[];
}

export interface GatekeeperClientOptions {
Expand Down
9 changes: 8 additions & 1 deletion sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,15 @@ KC_KEYMASTER_URL=http://localhost:4226
KC_SEARCH_URL=http://localhost:4002

# Hyperswarm
KC_HYPR_EXPORT_INTERVAL=2
KC_HYPR_EXPORT_INTERVAL=2 # Seconds between export-loop ticks. integer >= 1.
KC_MDIP_PROTOCOL=/MDIP/v1.0-public
KC_HYPR_LEGACY_SYNC_ENABLE=true # Enables legacy sync for peers without negentropy. true|false.
KC_HYPR_NEGENTROPY_ENABLE=true # Enables negentropy sync protocol. true|false.
KC_HYPR_NEGENTROPY_INTERVAL=300 # Seconds between retry attempts for peers not yet fully synced. integer >= 1.
KC_HYPR_NEGENTROPY_WINDOW_DAYS=30 # Reconciliation window size in days for full-sync chunking. integer >= 1.
KC_HYPR_NEGENTROPY_FRAME_SIZE_LIMIT=128 # Negentropy frame limit in KB. valid values are 0 (unlimited) or >= 4.
KC_HYPR_NEGENTROPY_MAX_RECORDS_PER_WINDOW=25000 # Maximum records loaded per reconciliation window. integer >= 1.
KC_HYPR_NEGENTROPY_MAX_ROUNDS_PER_SESSION=64 # Maximum negentropy rounds per sync session. integer >= 1.

# Bitcoin mediator
KC_BTC_HOST=localhost
Expand Down
15 changes: 15 additions & 0 deletions services/gatekeeper/server/src/gatekeeper-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,11 @@ v1router.post('/dids/export', async (req, res) => {
* rejected:
* type: integer
* description: Number of events that failed validation (bad signature, size limit, etc.).
* rejectedIndices:
* type: array
* description: Zero-based indexes of rejected events in the original submitted batch order (for importDIDs this is `dids.flat()` order).
* items:
* type: integer
* total:
* type: integer
* description: Total number of events in the queue after this import.
Expand Down Expand Up @@ -1290,6 +1295,11 @@ v1router.post('/batch/export', async (req, res) => {
* rejected:
* type: integer
* description: Number of events that failed validation.
* rejectedIndices:
* type: array
* description: Zero-based indexes of rejected events in the original submitted batch order.
* items:
* type: integer
* total:
* type: integer
* description: The total event queue size after this import.
Expand Down Expand Up @@ -1608,6 +1618,11 @@ v1router.get('/db/verify', async (req, res) => {
* pending:
* type: integer
* description: Number of events still left in the queue after processing.
* acceptedHashes:
* type: array
* description: Lower-case signature hashes of events accepted during this processing run (added or merged).
* items:
* type: string
* 500:
* description: Internal Server Error.
* content:
Expand Down
73 changes: 58 additions & 15 deletions services/mediators/hyperswarm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,71 @@

The Hyperswarm mediator is responsible for distributing unconfirmed MDIP operations to the network and for organizing an IPFS peer network for file-sharing.

When a node gets a new connection, it sends the connection a `sync` message and the connection replies with a series of `batch` messages containing all the operations in the connection's DID database. The nodes imports these operations into its Gatekeeper. The Gatekeeper will add any new operations it hasn't seen before, merge any operations it has already seen, and reject invalid operations.
The mediator supports two synchronization modes:

While running the mediator will poll the Gatekeeper's hyperswarm queue for new operations, and relay them to all of its connections with a `queue` message.
When a node receives a `queue` message it will import the operations like during a `batch` but also relay the message to all of its connections, distributing the new operations with a "gossip protocol".
- `negentropy` mode (preferred): full-history windowed sync on connect, with periodic retry only until the peer reaches a completed sync, using `neg_open`/`neg_msg`/`ops_req`/`ops_push`/`neg_close`.
- `legacy` mode (compatibility): classic `sync` -> full-history `batch` transfer (`shareDb`).

Realtime propagation is always handled by the Gatekeeper queue gossip path:
- mediator polls `gatekeeper.getQueue('hyperswarm')`
- relays queue operations with a `queue` message
- peers import and further relay `queue` messages

This keeps low latency for new operations while negentropy handles catch-up.

## Sync mode behavior

| peer mode | connect-time behavior | periodic behavior | queue gossip |
| --- | --- | --- | --- |
| `negentropy` | negotiate + run full-history windowed session | periodic retry until sync completes, then stop | enabled |
| `legacy` | `sync` + `shareDb` full-history export | n/a | enabled |

`shareDb` is intentionally retained for backward compatibility and can be disabled once compatibility validation is complete.

## Observability

The mediator emits periodic structured sync metrics in `connectionLoop` including:
- session mode selection counts (`legacy` vs `negentropy`) and fallback rate
- negentropy rounds and have/need totals
- ops requested/pushed sent and received
- gatekeeper apply/reject totals
- bytes sent/received
- session duration aggregates
- queue delay aggregates (from operation `signature.signed` to relay/import time)

## Environment variables

| variable | default | description |
| ------------------------- | ---------------------- | ----------------------------- |
| `KC_GATEKEEPER_URL` | http://localhost:4224 | MDIP gatekeeper service URL |
| `KC_KEYMASTER_URL` | http://localhost:4226 | MDIP keymaster service URL |
| `KC_IPFS_URL` | http://localhost:5001/api/v0 | IPFS RPC URL |
| `KC_IPFS_ENABLE` | true | Enable IPFS + Keymaster peering integration |
| `KC_NODE_ID ` | (no default) | Keymaster node agent name |
| `KC_NODE_NAME` | anon | Human-readable name for the node |
| `KC_MDIP_PROTOCOL` | /MDIP/v1.0-public | MDIP network topic to join |
| `KC_HYPR_EXPORT_INTERVAL` | 2 | Seconds between export cycles |
| `KC_LOG_LEVEL` | info | Log level: `debug`, `info`, `warn`, `error` |
| variable | default | description |
| ------------------------- |------------------------------| ----------------------------- |
| `KC_GATEKEEPER_URL` | http://localhost:4224 | MDIP gatekeeper service URL |
| `KC_KEYMASTER_URL` | http://localhost:4226 | MDIP keymaster service URL |
| `KC_IPFS_URL` | http://localhost:5001/api/v0 | IPFS RPC URL |
| `KC_IPFS_ENABLE` | true | Enable IPFS + Keymaster peering integration |
| `KC_NODE_ID ` | (no default) | Keymaster node agent name |
| `KC_NODE_NAME` | anon | Human-readable name for the node |
| `KC_MDIP_PROTOCOL` | /MDIP/v1.0-public | MDIP network topic to join |
| `KC_HYPR_EXPORT_INTERVAL` | 2 | Seconds between export cycles |
| `KC_HYPR_NEGENTROPY_FRAME_SIZE_LIMIT` | 0 | Negentropy frame-size limit in KB (0 or >= 4) |
| `KC_HYPR_NEGENTROPY_WINDOW_DAYS` | 30 | Reconciliation window size in days for full-sync chunking |
| `KC_HYPR_NEGENTROPY_MAX_RECORDS_PER_WINDOW` | 25000 | Maximum operations loaded into a single window adapter |
| `KC_HYPR_NEGENTROPY_MAX_ROUNDS_PER_SESSION` | 64 | Maximum negentropy rounds per window session |
| `KC_HYPR_NEGENTROPY_INTERVAL` | 300 | Seconds between retry attempts for peers not yet fully synced |
| `KC_HYPR_LEGACY_SYNC_ENABLE` | true | Allow legacy `sync`/`shareDb` compatibility path |
| `KC_LOG_LEVEL` | info | Log level: `debug`, `info`, `warn`, `error` |

Negentropy session concurrency is currently fixed at one active session per node.

## IPFS disabled mode

Set `KC_IPFS_ENABLE=false` to run the mediator without IPFS or Keymaster integration. In this mode:
- operations still sync and relay over Hyperswarm (batch/queue/sync/ping)
- operations still sync and relay over Hyperswarm (queue + negentropy; legacy sync if enabled)
- IPFS peering is disabled and node IPFS info is not published
- `KC_NODE_ID` is not required because Keymaster is not used

## Sync Store Scaffolding

The mediator now includes a sync-store abstraction in `src/db/` with:
- `SqliteOperationSyncStore` for persistent ordered storage
- `InMemoryOperationSyncStore` for tests

The SQLite implementation uses a fixed data path under `data/hyperswarm` (relative to the mediator working directory), with an index on `(ts, id)` to use SQLite's native B-tree ordering for range queries.
Loading
Loading