diff --git a/sharedb/README.md b/sharedb/README.md index 55c8790..ca89f1a 100644 --- a/sharedb/README.md +++ b/sharedb/README.md @@ -13,7 +13,7 @@ It gives you the backend building blocks that usually sit around an OT engine: - server-side rebase of concurrent operations with `Transform` - subscription to committed updates (`Subscribe`) -The current implementation includes an in-memory `Store`, which is a good fit for demos, single-node services, prototypes, and custom wrappers. +The current implementation includes an in-memory server for demos/tests plus pluggable backends, lockers, and publishers (including Redis adapters) for multi-process deployments. ## Who should use it? @@ -23,6 +23,8 @@ Use `jsonot/sharedb` when you want to: - keep the document authoritative on the server - accept client operations against an older version and rebase them automatically - wrap OT logic with a small backend API instead of designing every primitive from scratch +- expose operation history for reconnect/catch-up flows +- make client retries idempotent with `source` + `seq` / `OpID` metadata ## Quick start @@ -39,11 +41,11 @@ import ( func main() { ctx := context.Background() -store := sharedb.NewStore() +server := sharedb.NewMemoryServer() -_, _ = store.CreateDocument(ctx, "doc-1", json.RawMessage(`{"counter":0}`)) +_, _ = server.CreateDocument(ctx, "doc-1", json.RawMessage(`{"counter":0}`)) -result, _ := store.Submit( +result, _ := server.Submit( ctx, "doc-1", 0, @@ -73,6 +75,8 @@ flowchart LR - `CreateDocument(ctx, documentID, initial)`: create a document at version `0` - `GetSnapshot(ctx, documentID)`: get the latest snapshot - `Submit(ctx, documentID, baseVersion, operation, source)`: submit an operation +- `SubmitWithRequest(ctx, req)`: submit an operation with optional `OpID` / `Source` + `Sequence` idempotency metadata +- `GetOperations(ctx, documentID, fromVersion, toVersion)`: fetch committed operation history for versions `(fromVersion, toVersion]` - `Subscribe(ctx, documentID, buffer)`: subscribe to commit events ## How this relates to ShareDB @@ -82,6 +86,8 @@ flowchart LR - snapshot + version management - submit by version - OT rebase on the server +- operation history retrieval for reconnect/catch-up +- idempotent client retries with `OpID` (`source` + `seq`) - event subscription That makes it a good choice when you want ShareDB-style ideas with a smaller, Go-native surface area. @@ -91,7 +97,7 @@ That makes it a good choice when you want ShareDB-style ideas with a smaller, Go 1. `go get github.com/edocevol/jsonot/sharedb` 2. create a document with `CreateDocument` 3. submit an operation with `Submit` -4. read snapshots or subscribe to committed updates +4. use `GetOperations` to catch up missed versions after reconnects, or `Subscribe` for live committed updates ## FAQ @@ -103,6 +109,14 @@ It can be, if your goal is to build a Go-native backend with ShareDB-style conce Yes. When `baseVersion < currentVersion`, the server transforms the submitted operation against missing history before applying it. +### How should clients retry after a lost acknowledgement? + +Use `SubmitWithRequest` and set a stable `OpID` (or the compatibility `Source` and monotonically increasing `Sequence` fields). If the same operation identity is submitted again with the same operation, the server returns `Duplicate: true`, returns the current snapshot, and does not apply the operation a second time. Reusing the same sequence for a different operation returns `ErrDuplicateSequenceConflict`. + +### How does a reconnecting client catch up? + +Call `GetSnapshot` to learn the current version, or call `GetOperations(ctx, docID, lastSeenVersion, currentVersion)` to fetch the committed ops that produced versions `(lastSeenVersion, currentVersion]`. + ### Can I use this in production? The in-memory store is primarily aimed at demos and small services. For production, you will usually add persistence, isolation, auth, and operational controls on top. @@ -110,6 +124,8 @@ The in-memory store is primarily aimed at demos and small services. For producti ## Notes - `Submit` requires `baseVersion` to be in `[0, currentVersion]` +- `SubmitWithRequest` deduplicates only when an operation identity is supplied via `OpID` or both `Source` and a positive `Sequence` +- `GetOperations` returns ops whose produced versions are in `(fromVersion, toVersion]`; each `OpRecord` includes `BaseVersion`, original `SubmittedOp`, transformed committed `Op`, and operation identity - when `baseVersion < currentVersion`, the server transforms the submitted operation against the missing history range - subscription delivery is non-blocking; slow consumers may drop events unless you add a durable queue upstream diff --git a/sharedb/backend.go b/sharedb/backend.go index 1c43feb..4855fe9 100644 --- a/sharedb/backend.go +++ b/sharedb/backend.go @@ -5,6 +5,14 @@ import ( "encoding/json" ) +// OpID identifies a client operation. It is similar to ShareDB's source+seq +// pair and CRDT actor+sequence IDs: Source names the client/session and +// Sequence is monotonically increasing within that source. +type OpID struct { + Source string `json:"source,omitempty"` + Sequence int `json:"seq,omitempty"` +} + // DocRecord holds the latest document snapshot stored in the backend. type DocRecord struct { DocumentID string `json:"documentId"` @@ -14,10 +22,14 @@ type DocRecord struct { // OpRecord holds one committed operation entry in the op log. type OpRecord struct { - DocumentID string `json:"documentId"` - Version int `json:"version"` // version this op produced (1-based) - Source string `json:"source,omitempty"` - Op json.RawMessage `json:"op"` + DocumentID string `json:"documentId"` + Version int `json:"version"` // version this op produced (1-based) + BaseVersion int `json:"baseVersion"` // client version this op was based on before transform + ID OpID `json:"id,omitempty"` + Source string `json:"source,omitempty"` // deprecated: use ID.Source + Sequence int `json:"seq,omitempty"` // deprecated: use ID.Sequence + SubmittedOp json.RawMessage `json:"submittedOp,omitempty"` // original client op before transform + Op json.RawMessage `json:"op"` } // Backend abstracts all durable storage for documents and op history. diff --git a/sharedb/memory.go b/sharedb/memory.go index 9d020b2..fa4558c 100644 --- a/sharedb/memory.go +++ b/sharedb/memory.go @@ -84,7 +84,16 @@ func (b *MemoryBackend) AppendOp(_ context.Context, record OpRecord) error { return ErrDocumentNotFound } - d.ops = append(d.ops, record) + d.ops = append(d.ops, OpRecord{ + DocumentID: record.DocumentID, + Version: record.Version, + BaseVersion: record.BaseVersion, + ID: record.ID, + Source: record.Source, + Sequence: record.Sequence, + SubmittedOp: append(json.RawMessage(nil), record.SubmittedOp...), + Op: append(json.RawMessage(nil), record.Op...), + }) return nil } @@ -110,7 +119,18 @@ func (b *MemoryBackend) GetOps(_ context.Context, docID string, fromVersion, toV slice := d.ops[fromVersion:toVersion] result := make([]OpRecord, len(slice)) - copy(result, slice) + for i, rec := range slice { + result[i] = OpRecord{ + DocumentID: rec.DocumentID, + Version: rec.Version, + BaseVersion: rec.BaseVersion, + ID: rec.ID, + Source: rec.Source, + Sequence: rec.Sequence, + SubmittedOp: append(json.RawMessage(nil), rec.SubmittedOp...), + Op: append(json.RawMessage(nil), rec.Op...), + } + } return result, nil } @@ -173,7 +193,7 @@ func (p *MemoryPublisher) Publish(_ context.Context, event Event) { continue } select { - case sub.ch <- event: + case sub.ch <- cloneEvent(event): default: // slow subscriber — drop event } diff --git a/sharedb/redis.go b/sharedb/redis.go index f29041f..f342ac0 100644 --- a/sharedb/redis.go +++ b/sharedb/redis.go @@ -292,7 +292,7 @@ func (p *RedisPublisher) listen() { continue } select { - case s.ch <- event: + case s.ch <- cloneEvent(event): default: } } diff --git a/sharedb/server.go b/sharedb/server.go index a67b347..f669325 100644 --- a/sharedb/server.go +++ b/sharedb/server.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "reflect" "github.com/edocevol/jsonot" ) @@ -16,6 +17,8 @@ var ( ErrDocumentExists = errors.New("sharedb: document already exists") // ErrInvalidVersion means the supplied base version is out of range. ErrInvalidVersion = errors.New("sharedb: invalid version") + // ErrDuplicateSequenceConflict means a client reused Source+Sequence for a different operation. + ErrDuplicateSequenceConflict = errors.New("sharedb: duplicate source sequence conflicts with original operation") ) // Snapshot is the latest immutable view of a document. @@ -29,11 +32,26 @@ type Snapshot struct { type Event struct { DocumentID string `json:"documentId"` Version int `json:"version"` - Source string `json:"source,omitempty"` + ID OpID `json:"id,omitempty"` + Source string `json:"source,omitempty"` // deprecated: use ID.Source + Sequence int `json:"seq,omitempty"` // deprecated: use ID.Sequence Operation json.RawMessage `json:"op"` Document json.RawMessage `json:"document"` } +// SubmitRequest is the structured form of a client operation submission. +// Source+Sequence are optional, but when both are supplied the server treats +// retries of the same client sequence as idempotent and returns the already +// committed result without applying the operation again. +type SubmitRequest struct { + DocumentID string `json:"documentId"` + BaseVersion int `json:"baseVersion"` + Operation json.RawMessage `json:"op"` + ID OpID `json:"id,omitempty"` + Source string `json:"source,omitempty"` // deprecated: use ID.Source + Sequence int `json:"seq,omitempty"` // deprecated: use ID.Sequence +} + // SubmitResult is returned by Server.Submit. type SubmitResult struct { // Version is the new document version after the op was committed. @@ -41,6 +59,9 @@ type SubmitResult struct { // Rebased is true when the op was transformed against concurrent ops // before being applied (i.e. baseVersion < server version at submit time). Rebased bool `json:"rebased"` + // Duplicate is true when SubmitWithRequest detected a retry for the same + // Source+Sequence and did not apply the operation again. + Duplicate bool `json:"duplicate,omitempty"` // Operation is the (possibly transformed) op that was actually applied. Operation json.RawMessage `json:"op"` // Document is the document state after the op. @@ -146,24 +167,58 @@ func (s *Server) Submit( rawOperation json.RawMessage, source string, ) (SubmitResult, error) { + return s.SubmitWithRequest(ctx, SubmitRequest{ + DocumentID: documentID, + BaseVersion: baseVersion, + Operation: rawOperation, + Source: source, + }) +} + +// SubmitWithRequest accepts a structured client operation submission. In +// addition to Submit's versioned rebase semantics, Source+Sequence provide +// ShareDB-style idempotency for clients that retry after transport failures. +func (s *Server) SubmitWithRequest(ctx context.Context, req SubmitRequest) (SubmitResult, error) { + opID := req.opID() // Step 1: acquire lock - unlock, err := s.locker.Lock(ctx, documentID) + unlock, err := s.locker.Lock(ctx, req.DocumentID) if err != nil { return SubmitResult{}, err } defer unlock() // Step 2: read current state - rec, err := s.backend.GetDoc(ctx, documentID) + rec, err := s.backend.GetDoc(ctx, req.DocumentID) if err != nil { return SubmitResult{}, err } - if baseVersion < 0 || baseVersion > rec.Version { - return SubmitResult{}, fmt.Errorf("%w: expected 0-%d, got %d", ErrInvalidVersion, rec.Version, baseVersion) + if req.BaseVersion < 0 || req.BaseVersion > rec.Version { + return SubmitResult{}, fmt.Errorf("%w: expected 0-%d, got %d", ErrInvalidVersion, rec.Version, req.BaseVersion) + } + + if opID.Source != "" && opID.Sequence > 0 && rec.Version > 0 { + ops, err := s.backend.GetOps(ctx, req.DocumentID, 0, rec.Version) + if err != nil { + return SubmitResult{}, err + } + for _, opRec := range ops { + recID := opRec.opID() + if recID.Source == opID.Source && recID.Sequence == opID.Sequence { + if len(opRec.SubmittedOp) > 0 && !jsonRawEqual(req.Operation, opRec.SubmittedOp) { + return SubmitResult{}, ErrDuplicateSequenceConflict + } + return SubmitResult{ + Version: rec.Version, + Duplicate: true, + Operation: append(json.RawMessage(nil), opRec.Op...), + Document: append(json.RawMessage(nil), rec.Doc...), + }, nil + } + } } - op, err := s.parseOperation(rawOperation) + op, err := s.parseOperation(req.Operation) if err != nil { return SubmitResult{}, err } @@ -171,8 +226,8 @@ func (s *Server) Submit( // Step 3: transform against concurrent ops transformed := op rebased := false - if baseVersion < rec.Version { - concurrentOps, err := s.backend.GetOps(ctx, documentID, baseVersion, rec.Version) + if req.BaseVersion < rec.Version { + concurrentOps, err := s.backend.GetOps(ctx, req.DocumentID, req.BaseVersion, rec.Version) if err != nil { return SubmitResult{}, err } @@ -216,17 +271,21 @@ func (s *Server) Submit( // Step 5: persist snapshot + op log if err := s.backend.SaveDoc(ctx, DocRecord{ - DocumentID: documentID, + DocumentID: req.DocumentID, Version: newVersion, Doc: newDoc, }); err != nil { return SubmitResult{}, err } if err := s.backend.AppendOp(ctx, OpRecord{ - DocumentID: documentID, - Version: newVersion, - Source: source, - Op: serializedOp, + DocumentID: req.DocumentID, + Version: newVersion, + BaseVersion: req.BaseVersion, + ID: opID, + Source: opID.Source, + Sequence: opID.Sequence, + SubmittedOp: append(json.RawMessage(nil), req.Operation...), + Op: serializedOp, }); err != nil { return SubmitResult{}, err } @@ -240,9 +299,11 @@ func (s *Server) Submit( // Step 6: publish event (lock already released via defer, but publish while we have data) s.pub.Publish(ctx, Event{ - DocumentID: documentID, + DocumentID: req.DocumentID, Version: newVersion, - Source: source, + ID: opID, + Source: opID.Source, + Sequence: opID.Sequence, Operation: append(json.RawMessage(nil), serializedOp...), Document: append(json.RawMessage(nil), newDoc...), }) @@ -261,6 +322,125 @@ func (s *Server) Subscribe(ctx context.Context, documentID string, buffer int) ( return s.pub.Subscribe(ctx, documentID, buffer) } +// GetOperations returns committed operation records that produced versions in +// [fromVersion+1, toVersion]. It is useful for client catch-up, audit logs, and +// reconnect flows that need ShareDB-style op history. +func (s *Server) GetOperations(ctx context.Context, documentID string, fromVersion, toVersion int) ([]OpRecord, error) { + rec, err := s.backend.GetDoc(ctx, documentID) + if err != nil { + return nil, err + } + if fromVersion < 0 || toVersion < fromVersion || toVersion > rec.Version { + return nil, fmt.Errorf("%w: expected 0-%d range, got [%d, %d]", ErrInvalidVersion, rec.Version, fromVersion, toVersion) + } + ops, err := s.backend.GetOps(ctx, documentID, fromVersion, toVersion) + if err != nil { + return nil, err + } + if len(ops) != toVersion-fromVersion { + return nil, fmt.Errorf("sharedb: incomplete operation history for %s: got %d ops, want %d", documentID, len(ops), toVersion-fromVersion) + } + return ops, nil +} + +func (r SubmitRequest) opID() OpID { + id := r.ID + if id.Source == "" { + id.Source = r.Source + } + if id.Sequence == 0 { + id.Sequence = r.Sequence + } + return id +} + +func (r OpRecord) opID() OpID { + id := r.ID + if id.Source == "" { + id.Source = r.Source + } + if id.Sequence == 0 { + id.Sequence = r.Sequence + } + return id +} + +func jsonRawEqual(a, b json.RawMessage) bool { + var av, bv any + if err := json.Unmarshal(a, &av); err != nil { + return false + } + if err := json.Unmarshal(b, &bv); err != nil { + return false + } + return reflect.DeepEqual(av, bv) +} + +func cloneEvent(event Event) Event { + cloned := event + cloned.Operation = append(json.RawMessage(nil), event.Operation...) + cloned.Document = append(json.RawMessage(nil), event.Document...) + return cloned +} + +func (id OpID) isZero() bool { + return id.Source == "" && id.Sequence == 0 +} + +func (e Event) MarshalJSON() ([]byte, error) { + type eventJSON struct { + DocumentID string `json:"documentId"` + Version int `json:"version"` + ID *OpID `json:"id,omitempty"` + Source string `json:"source,omitempty"` + Sequence int `json:"seq,omitempty"` + Operation json.RawMessage `json:"op"` + Document json.RawMessage `json:"document"` + } + var id *OpID + if !e.ID.isZero() { + idValue := e.ID + id = &idValue + } + return json.Marshal(eventJSON{ + DocumentID: e.DocumentID, + Version: e.Version, + ID: id, + Source: e.Source, + Sequence: e.Sequence, + Operation: e.Operation, + Document: e.Document, + }) +} + +func (r OpRecord) MarshalJSON() ([]byte, error) { + type opRecordJSON struct { + DocumentID string `json:"documentId"` + Version int `json:"version"` + BaseVersion int `json:"baseVersion"` + ID *OpID `json:"id,omitempty"` + Source string `json:"source,omitempty"` + Sequence int `json:"seq,omitempty"` + SubmittedOp json.RawMessage `json:"submittedOp,omitempty"` + Op json.RawMessage `json:"op"` + } + var id *OpID + if !r.ID.isZero() { + idValue := r.ID + id = &idValue + } + return json.Marshal(opRecordJSON{ + DocumentID: r.DocumentID, + Version: r.Version, + BaseVersion: r.BaseVersion, + ID: id, + Source: r.Source, + Sequence: r.Sequence, + SubmittedOp: r.SubmittedOp, + Op: r.Op, + }) +} + func (s *Server) parseOperation(raw json.RawMessage) (*jsonot.Operation, error) { payload := raw if len(payload) == 0 { diff --git a/sharedb/store_test.go b/sharedb/store_test.go index 7e89d42..78f15e0 100644 --- a/sharedb/store_test.go +++ b/sharedb/store_test.go @@ -3,9 +3,23 @@ package sharedb import ( "context" "encoding/json" + "reflect" + "strings" "testing" ) +func sameJSON(t *testing.T, got, want json.RawMessage) bool { + t.Helper() + var gotValue, wantValue any + if err := json.Unmarshal(got, &gotValue); err != nil { + t.Fatalf("got invalid JSON %q: %v", got, err) + } + if err := json.Unmarshal(want, &wantValue); err != nil { + t.Fatalf("want invalid JSON %q: %v", want, err) + } + return reflect.DeepEqual(gotValue, wantValue) +} + func TestStoreSequentialAndRebasedSubmit(t *testing.T) { ctx := context.Background() store := NewMemoryServer() @@ -104,3 +118,222 @@ func TestStoreInvalidVersion(t *testing.T) { t.Fatalf("expected invalid version error") } } + +func TestStoreGetOperationsReturnsCommittedHistoryRange(t *testing.T) { + ctx := context.Background() + store := NewMemoryServer() + + _, err := store.CreateDocument(ctx, "doc-history", json.RawMessage(`{"counter":0}`)) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + + first := json.RawMessage(`[{"p":["counter"],"na":1}]`) + second := json.RawMessage(`[{"p":["counter"],"na":2}]`) + third := json.RawMessage(`[{"p":["counter"],"na":3}]`) + + if _, err := store.SubmitWithRequest(ctx, SubmitRequest{DocumentID: "doc-history", BaseVersion: 0, Operation: first, Source: "a", Sequence: 1}); err != nil { + t.Fatalf("submit first failed: %v", err) + } + if _, err := store.SubmitWithRequest(ctx, SubmitRequest{DocumentID: "doc-history", BaseVersion: 1, Operation: second, Source: "b", Sequence: 2}); err != nil { + t.Fatalf("submit second failed: %v", err) + } + if _, err := store.SubmitWithRequest(ctx, SubmitRequest{DocumentID: "doc-history", BaseVersion: 2, Operation: third, Source: "c", Sequence: 3}); err != nil { + t.Fatalf("submit third failed: %v", err) + } + + ops, err := store.GetOperations(ctx, "doc-history", 1, 3) + if err != nil { + t.Fatalf("get operations failed: %v", err) + } + if len(ops) != 2 { + t.Fatalf("unexpected operation count: got %d want 2", len(ops)) + } + if ops[0].Version != 2 || ops[0].BaseVersion != 1 || ops[0].Source != "b" || ops[0].ID.Source != "b" || ops[0].ID.Sequence != 2 || !sameJSON(t, ops[0].Op, second) || !sameJSON(t, ops[0].SubmittedOp, second) { + t.Fatalf("unexpected first history op: %+v", ops[0]) + } + if ops[1].Version != 3 || ops[1].BaseVersion != 2 || ops[1].Source != "c" || ops[1].ID.Source != "c" || ops[1].ID.Sequence != 3 || !sameJSON(t, ops[1].Op, third) || !sameJSON(t, ops[1].SubmittedOp, third) { + t.Fatalf("unexpected second history op: %+v", ops[1]) + } +} + +func TestSubmitWithRequestDeduplicatesClientSequence(t *testing.T) { + ctx := context.Background() + store := NewMemoryServer() + + _, err := store.CreateDocument(ctx, "doc-dedupe", json.RawMessage(`{"counter":0}`)) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + + req := SubmitRequest{ + DocumentID: "doc-dedupe", + BaseVersion: 0, + Operation: json.RawMessage(`[{"p":["counter"],"na":1}]`), + Source: "client-a", + Sequence: 7, + } + + first, err := store.SubmitWithRequest(ctx, req) + if err != nil { + t.Fatalf("first submit failed: %v", err) + } + if first.Version != 1 || first.Duplicate { + t.Fatalf("unexpected first result: %+v", first) + } + + if _, err := store.Submit(ctx, "doc-dedupe", 1, json.RawMessage(`[{"p":["counter"],"na":2}]`), "client-b"); err != nil { + t.Fatalf("interleaved submit failed: %v", err) + } + + retry, err := store.SubmitWithRequest(ctx, req) + if err != nil { + t.Fatalf("retry submit failed: %v", err) + } + if retry.Version != 2 || !retry.Duplicate { + t.Fatalf("unexpected retry result: %+v", retry) + } + + snapshot, err := store.GetSnapshot(ctx, "doc-dedupe") + if err != nil { + t.Fatalf("get snapshot failed: %v", err) + } + if got := string(snapshot.Document); got != `{"counter":3}` { + t.Fatalf("duplicate submit should not apply twice: got %s", got) + } + if !sameJSON(t, retry.Document, snapshot.Document) { + t.Fatalf("duplicate result should return the current snapshot document") + } +} + +func TestSubmitWithRequestRejectsSequenceReuseWithDifferentOperation(t *testing.T) { + ctx := context.Background() + store := NewMemoryServer() + + _, err := store.CreateDocument(ctx, "doc-seq-conflict", json.RawMessage(`{"counter":0}`)) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + + first := SubmitRequest{ + DocumentID: "doc-seq-conflict", + BaseVersion: 0, + Operation: json.RawMessage(`[{"p":["counter"],"na":1}]`), + Source: "client-a", + Sequence: 7, + } + if _, err := store.SubmitWithRequest(ctx, first); err != nil { + t.Fatalf("first submit failed: %v", err) + } + + conflict := first + conflict.Operation = json.RawMessage(`[{"p":["counter"],"na":2}]`) + _, err = store.SubmitWithRequest(ctx, conflict) + if err == nil { + t.Fatalf("expected sequence conflict error") + } +} + +func TestSubmitWithRequestPublishesSequence(t *testing.T) { + ctx := context.Background() + store := NewMemoryServer() + + _, err := store.CreateDocument(ctx, "doc-event-seq", json.RawMessage(`{"counter":0}`)) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + events, cancel, err := store.Subscribe(ctx, "doc-event-seq", 1) + if err != nil { + t.Fatalf("subscribe failed: %v", err) + } + defer cancel() + + _, err = store.SubmitWithRequest(ctx, SubmitRequest{ + DocumentID: "doc-event-seq", + BaseVersion: 0, + Operation: json.RawMessage(`[{"p":["counter"],"na":1}]`), + Source: "client-a", + Sequence: 42, + }) + if err != nil { + t.Fatalf("submit failed: %v", err) + } + + select { + case event := <-events: + if event.Source != "client-a" || event.Sequence != 42 || event.ID.Source != "client-a" || event.ID.Sequence != 42 { + t.Fatalf("unexpected event identity: %+v", event) + } + default: + t.Fatalf("expected one event") + } +} + +func TestGetOperationsRejectsInvalidRanges(t *testing.T) { + ctx := context.Background() + store := NewMemoryServer() + _, err := store.CreateDocument(ctx, "doc-invalid-range", json.RawMessage(`{"counter":0}`)) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + if _, err := store.Submit(ctx, "doc-invalid-range", 0, json.RawMessage(`[{"p":["counter"],"na":1}]`), "client-a"); err != nil { + t.Fatalf("submit failed: %v", err) + } + + cases := []struct{ from, to int }{{-1, 0}, {1, 0}, {0, 2}} + for _, tc := range cases { + if _, err := store.GetOperations(ctx, "doc-invalid-range", tc.from, tc.to); err == nil { + t.Fatalf("expected invalid range error for [%d,%d]", tc.from, tc.to) + } + } + + ops, err := store.GetOperations(ctx, "doc-invalid-range", 1, 1) + if err != nil { + t.Fatalf("empty range should be valid: %v", err) + } + if len(ops) != 0 { + t.Fatalf("empty range returned %d ops", len(ops)) + } +} + +func TestIdentityOmittedFromJSONWhenUnset(t *testing.T) { + eventRaw, err := json.Marshal(Event{DocumentID: "doc", Version: 1, Operation: json.RawMessage(`[]`), Document: json.RawMessage(`{}`)}) + if err != nil { + t.Fatalf("marshal event failed: %v", err) + } + if strings.Contains(string(eventRaw), `"id"`) || strings.Contains(string(eventRaw), `"seq"`) || strings.Contains(string(eventRaw), `"source"`) { + t.Fatalf("unset event identity should be omitted, got %s", eventRaw) + } + + recordRaw, err := json.Marshal(OpRecord{DocumentID: "doc", Version: 1, Op: json.RawMessage(`[]`)}) + if err != nil { + t.Fatalf("marshal op record failed: %v", err) + } + if strings.Contains(string(recordRaw), `"id"`) || strings.Contains(string(recordRaw), `"seq"`) || strings.Contains(string(recordRaw), `"source"`) { + t.Fatalf("unset op identity should be omitted, got %s", recordRaw) + } +} + +func TestMemoryPublisherDeliversIndependentEventPayloads(t *testing.T) { + ctx := context.Background() + pub := NewMemoryPublisher() + left, cancelLeft, err := pub.Subscribe(ctx, "doc-pub", 1) + if err != nil { + t.Fatalf("subscribe left failed: %v", err) + } + defer cancelLeft() + right, cancelRight, err := pub.Subscribe(ctx, "doc-pub", 1) + if err != nil { + t.Fatalf("subscribe right failed: %v", err) + } + defer cancelRight() + + pub.Publish(ctx, Event{DocumentID: "doc-pub", Version: 1, Operation: json.RawMessage(`[{"p":["x"],"na":1}]`), Document: json.RawMessage(`{"x":1}`)}) + leftEvent := <-left + leftEvent.Operation[0] = 'X' + leftEvent.Document[0] = 'X' + rightEvent := <-right + if string(rightEvent.Operation) != `[{"p":["x"],"na":1}]` || string(rightEvent.Document) != `{"x":1}` { + t.Fatalf("subscriber payloads should be independent, got op=%s doc=%s", rightEvent.Operation, rightEvent.Document) + } +}