Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions sharedb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ It gives you the backend building blocks that usually sit around an OT engine:
- server-side rebase of concurrent operations with `Transform`
- subscription to committed updates (`Subscribe`)

The current implementation includes an in-memory `Store`, which is a good fit for demos, single-node services, prototypes, and custom wrappers.
The current implementation includes an in-memory server for demos/tests plus pluggable backends, lockers, and publishers (including Redis adapters) for multi-process deployments.

## Who should use it?

Expand All @@ -23,6 +23,8 @@ Use `jsonot/sharedb` when you want to:
- keep the document authoritative on the server
- accept client operations against an older version and rebase them automatically
- wrap OT logic with a small backend API instead of designing every primitive from scratch
- expose operation history for reconnect/catch-up flows
- make client retries idempotent with `source` + `seq` / `OpID` metadata

## Quick start

Expand All @@ -39,11 +41,11 @@ import (

func main() {
ctx := context.Background()
store := sharedb.NewStore()
server := sharedb.NewMemoryServer()

_, _ = store.CreateDocument(ctx, "doc-1", json.RawMessage(`{"counter":0}`))
_, _ = server.CreateDocument(ctx, "doc-1", json.RawMessage(`{"counter":0}`))

result, _ := store.Submit(
result, _ := server.Submit(
ctx,
"doc-1",
0,
Expand Down Expand Up @@ -73,6 +75,8 @@ flowchart LR
- `CreateDocument(ctx, documentID, initial)`: create a document at version `0`
- `GetSnapshot(ctx, documentID)`: get the latest snapshot
- `Submit(ctx, documentID, baseVersion, operation, source)`: submit an operation
- `SubmitWithRequest(ctx, req)`: submit an operation with optional `OpID` / `Source` + `Sequence` idempotency metadata
- `GetOperations(ctx, documentID, fromVersion, toVersion)`: fetch committed operation history for versions `(fromVersion, toVersion]`
- `Subscribe(ctx, documentID, buffer)`: subscribe to commit events

## How this relates to ShareDB
Expand All @@ -82,6 +86,8 @@ flowchart LR
- snapshot + version management
- submit by version
- OT rebase on the server
- operation history retrieval for reconnect/catch-up
- idempotent client retries with `OpID` (`source` + `seq`)
- event subscription

That makes it a good choice when you want ShareDB-style ideas with a smaller, Go-native surface area.
Expand All @@ -91,7 +97,7 @@ That makes it a good choice when you want ShareDB-style ideas with a smaller, Go
1. `go get github.com/edocevol/jsonot/sharedb`
2. create a document with `CreateDocument`
3. submit an operation with `Submit`
4. read snapshots or subscribe to committed updates
4. use `GetOperations` to catch up missed versions after reconnects, or `Subscribe` for live committed updates

## FAQ

Expand All @@ -103,13 +109,23 @@ It can be, if your goal is to build a Go-native backend with ShareDB-style conce

Yes. When `baseVersion < currentVersion`, the server transforms the submitted operation against missing history before applying it.

### How should clients retry after a lost acknowledgement?

Use `SubmitWithRequest` and set a stable `OpID` (or the compatibility `Source` and monotonically increasing `Sequence` fields). If the same operation identity is submitted again with the same operation, the server returns `Duplicate: true`, returns the current snapshot, and does not apply the operation a second time. Reusing the same sequence for a different operation returns `ErrDuplicateSequenceConflict`.

### How does a reconnecting client catch up?

Call `GetSnapshot` to learn the current version, or call `GetOperations(ctx, docID, lastSeenVersion, currentVersion)` to fetch the committed ops that produced versions `(lastSeenVersion, currentVersion]`.

### Can I use this in production?

The in-memory store is primarily aimed at demos and small services. For production, you will usually add persistence, isolation, auth, and operational controls on top.

## Notes

- `Submit` requires `baseVersion` to be in `[0, currentVersion]`
- `SubmitWithRequest` deduplicates only when an operation identity is supplied via `OpID` or both `Source` and a positive `Sequence`
- `GetOperations` returns ops whose produced versions are in `(fromVersion, toVersion]`; each `OpRecord` includes `BaseVersion`, original `SubmittedOp`, transformed committed `Op`, and operation identity
- when `baseVersion < currentVersion`, the server transforms the submitted operation against the missing history range
- subscription delivery is non-blocking; slow consumers may drop events unless you add a durable queue upstream

Expand Down
20 changes: 16 additions & 4 deletions sharedb/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ import (
"encoding/json"
)

// OpID identifies a client operation. It is similar to ShareDB's source+seq
// pair and CRDT actor+sequence IDs: Source names the client/session and
// Sequence is monotonically increasing within that source.
type OpID struct {
Source string `json:"source,omitempty"`
Sequence int `json:"seq,omitempty"`
}

// DocRecord holds the latest document snapshot stored in the backend.
type DocRecord struct {
DocumentID string `json:"documentId"`
Expand All @@ -14,10 +22,14 @@ type DocRecord struct {

// OpRecord holds one committed operation entry in the op log.
type OpRecord struct {
DocumentID string `json:"documentId"`
Version int `json:"version"` // version this op produced (1-based)
Source string `json:"source,omitempty"`
Op json.RawMessage `json:"op"`
DocumentID string `json:"documentId"`
Version int `json:"version"` // version this op produced (1-based)
BaseVersion int `json:"baseVersion"` // client version this op was based on before transform
ID OpID `json:"id,omitempty"`
Source string `json:"source,omitempty"` // deprecated: use ID.Source
Sequence int `json:"seq,omitempty"` // deprecated: use ID.Sequence
SubmittedOp json.RawMessage `json:"submittedOp,omitempty"` // original client op before transform
Op json.RawMessage `json:"op"`
}

// Backend abstracts all durable storage for documents and op history.
Expand Down
26 changes: 23 additions & 3 deletions sharedb/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,16 @@ func (b *MemoryBackend) AppendOp(_ context.Context, record OpRecord) error {
return ErrDocumentNotFound
}

d.ops = append(d.ops, record)
d.ops = append(d.ops, OpRecord{
DocumentID: record.DocumentID,
Version: record.Version,
BaseVersion: record.BaseVersion,
ID: record.ID,
Source: record.Source,
Sequence: record.Sequence,
SubmittedOp: append(json.RawMessage(nil), record.SubmittedOp...),
Op: append(json.RawMessage(nil), record.Op...),
})
return nil
}

Expand All @@ -110,7 +119,18 @@ func (b *MemoryBackend) GetOps(_ context.Context, docID string, fromVersion, toV

slice := d.ops[fromVersion:toVersion]
result := make([]OpRecord, len(slice))
copy(result, slice)
for i, rec := range slice {
result[i] = OpRecord{
DocumentID: rec.DocumentID,
Version: rec.Version,
BaseVersion: rec.BaseVersion,
ID: rec.ID,
Source: rec.Source,
Sequence: rec.Sequence,
SubmittedOp: append(json.RawMessage(nil), rec.SubmittedOp...),
Op: append(json.RawMessage(nil), rec.Op...),
}
}
return result, nil
}

Expand Down Expand Up @@ -173,7 +193,7 @@ func (p *MemoryPublisher) Publish(_ context.Context, event Event) {
continue
}
select {
case sub.ch <- event:
case sub.ch <- cloneEvent(event):
default:
// slow subscriber — drop event
}
Expand Down
2 changes: 1 addition & 1 deletion sharedb/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (p *RedisPublisher) listen() {
continue
}
select {
case s.ch <- event:
case s.ch <- cloneEvent(event):
default:
}
}
Expand Down
Loading