From 178a6dd2ee1b8c9537a18d555f7d0b207e7f9e00 Mon Sep 17 00:00:00 2001 From: edocevol Date: Thu, 30 Apr 2026 14:33:14 +0800 Subject: [PATCH 1/2] feat(sharedb): add bounded stale resync and websocket submit protocol --- sharedb/reconnect.go | 13 +- sharedb/server.go | 258 ++++++++++++++++- sharedb/store_test.go | 547 ++++++++++++++++++++++++++++++++++++ sharedb/ws_protocol.go | 191 ++++++++++++- sharedb/ws_protocol_test.go | 412 +++++++++++++++++++++++++++ sharedb/ws_transport.go | 8 + 6 files changed, 1420 insertions(+), 9 deletions(-) diff --git a/sharedb/reconnect.go b/sharedb/reconnect.go index 44af479..729b0ac 100644 --- a/sharedb/reconnect.go +++ b/sharedb/reconnect.go @@ -7,10 +7,14 @@ import ( // ReplayResult is a transport-friendly mailbox replay decision. type ReplayResult struct { - SessionID string `json:"sessionId"` - Envelopes []Envelope `json:"envelopes"` - LastAcked string `json:"lastAcked,omitempty"` - RequiresResync bool `json:"requiresResync,omitempty"` + SessionID string `json:"sessionId"` + Envelopes []Envelope `json:"envelopes"` + LastAcked string `json:"lastAcked,omitempty"` + RequiresResync bool `json:"requiresResync,omitempty"` + Reason StaleReason `json:"reason,omitempty"` + CurrentVersion int `json:"currentVersion,omitempty"` + MinSupportedVersion int `json:"minSupportedVersion,omitempty"` + MaxRebaseGap int `json:"maxRebaseGap,omitempty"` } // ReplaySessionMailbox replays envelopes for a session after the provided cursor. @@ -27,6 +31,7 @@ func ReplaySessionMailbox(ctx context.Context, store MailboxStore, sessionID, af SessionID: sessionID, LastAcked: lastAcked, RequiresResync: true, + Reason: StaleReasonReplayCursorNotFound, }, nil } return ReplayResult{}, err diff --git a/sharedb/server.go b/sharedb/server.go index 6a30706..e4f1dc0 100644 --- a/sharedb/server.go +++ b/sharedb/server.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "reflect" + "sync" "github.com/edocevol/jsonot" ) @@ -19,6 +20,8 @@ var ( ErrInvalidVersion = errors.New("sharedb: invalid version") // ErrDuplicateSequenceConflict means a client reused Source+Sequence for a different operation. ErrDuplicateSequenceConflict = errors.New("sharedb: duplicate source sequence conflicts with original operation") + // ErrStaleResyncRequired means the client is too far behind for bounded server-side rebase. + ErrStaleResyncRequired = errors.New("sharedb: stale submit requires resync") ) // Snapshot is the latest immutable view of a document. @@ -81,6 +84,37 @@ type SubmitResult struct { // SubmitHandler handles a structured submit request. type SubmitHandler func(context.Context, SubmitRequest) (SubmitResult, error) +// StaleReason classifies why the server requires a client resync instead of +// continuing with replay or server-side OT rebase. +type StaleReason string + +const ( + StaleReasonVersionBehindWindow StaleReason = "version_behind_window" + StaleReasonReplayCursorNotFound StaleReason = "replay_cursor_not_found" +) + +// StaleSubmitError describes a submit that the server rejected because the +// client's base version fell outside the configured bounded rebase window. +type StaleSubmitError struct { + Reason StaleReason `json:"reason,omitempty"` + DocumentID string `json:"documentId,omitempty"` + BaseVersion int `json:"baseVersion,omitempty"` + CurrentVersion int `json:"currentVersion,omitempty"` + MinSupportedVersion int `json:"minSupportedVersion,omitempty"` + MaxRebaseGap int `json:"maxRebaseGap,omitempty"` +} + +func (e *StaleSubmitError) Error() string { + if e == nil { + return ErrStaleResyncRequired.Error() + } + return fmt.Sprintf("%s: reason=%s document=%s base=%d current=%d minSupported=%d maxRebaseGap=%d", ErrStaleResyncRequired.Error(), e.Reason, e.DocumentID, e.BaseVersion, e.CurrentVersion, e.MinSupportedVersion, e.MaxRebaseGap) +} + +func (e *StaleSubmitError) Unwrap() error { + return ErrStaleResyncRequired +} + // SubmitMiddleware wraps submit handling so callers can validate, reject, // enrich, or observe submit requests/results. type SubmitMiddleware func(SubmitHandler) SubmitHandler @@ -106,6 +140,14 @@ func WithSubmitMiddleware(middleware ...SubmitMiddleware) ServerOption { } } +// WithMaxRebaseGap bounds how many committed versions a stale submit may be +// rebased across on the server. A value <= 0 disables the limit. +func WithMaxRebaseGap(maxGap int) ServerOption { + return func(s *Server) { + s.maxRebaseGap = maxGap + } +} + // Server is the central coordinator for collaborative editing. // // Version number flow @@ -124,6 +166,9 @@ type Server struct { pub Publisher ot *jsonot.JSONOperationTransformer + parsedOps parsedOpCache + maxRebaseGap int + submitMiddleware []SubmitMiddleware submitHandler SubmitHandler } @@ -138,6 +183,9 @@ func NewServer(backend Backend, locker Locker, opts ...ServerOption) *Server { backend: backend, locker: locker, ot: jsonot.NewJSONOperationTransformer(), + parsedOps: parsedOpCache{ + docs: make(map[string]map[int]*jsonot.Operation), + }, } for _, opt := range opts { opt(s) @@ -184,6 +232,27 @@ func (s *Server) GetSnapshot(ctx context.Context, documentID string) (Snapshot, }, nil } +// GetSnapshotAt reconstructs the document snapshot at a historical version by +// applying inverse OT operations from the current snapshot backwards. +func (s *Server) GetSnapshotAt(ctx context.Context, documentID string, version int) (Snapshot, error) { + rec, err := s.backend.GetDoc(ctx, documentID) + if err != nil { + return Snapshot{}, err + } + if version < 0 || version > rec.Version { + return Snapshot{}, fmt.Errorf("%w: expected 0-%d, got %d", ErrInvalidVersion, rec.Version, version) + } + if version == rec.Version { + return Snapshot{DocumentID: documentID, Version: rec.Version, Document: append(json.RawMessage(nil), rec.Doc...)}, nil + } + + document, err := s.reconstructSnapshotAt(ctx, documentID, rec.Doc, version, rec.Version) + if err != nil { + return Snapshot{}, err + } + return Snapshot{DocumentID: documentID, Version: version, Document: document}, nil +} + // Submit accepts a client operation based on baseVersion. // // Centralized version protocol: @@ -265,13 +334,24 @@ func (s *Server) submitCore(ctx context.Context, req SubmitRequest) (SubmitResul transformed := op rebased := false if req.BaseVersion < rec.Version { + gap := rec.Version - req.BaseVersion + if s.maxRebaseGap > 0 && gap > s.maxRebaseGap { + return SubmitResult{}, &StaleSubmitError{ + Reason: StaleReasonVersionBehindWindow, + DocumentID: req.DocumentID, + BaseVersion: req.BaseVersion, + CurrentVersion: rec.Version, + MinSupportedVersion: rec.Version - s.maxRebaseGap, + MaxRebaseGap: s.maxRebaseGap, + } + } concurrentOps, err := s.backend.GetOps(ctx, req.DocumentID, req.BaseVersion, rec.Version) if err != nil { return SubmitResult{}, err } for _, opRec := range concurrentOps { - concurrent, err := s.parseOperation(opRec.Op) + concurrent, err := s.getOrParseCommittedOperation(opRec) if err != nil { return SubmitResult{}, fmt.Errorf("failed to parse concurrent op at version %d: %w", opRec.Version, err) } @@ -331,6 +411,7 @@ func (s *Server) submitCore(ctx context.Context, req SubmitRequest) (SubmitResul Operation: serializedOp, Document: newDoc, } + s.cacheCommittedOperation(OpRecord{DocumentID: req.DocumentID, Version: newVersion, Op: serializedOp}, transformed) // Step 6: publish event (lock already released via defer, but publish while we have data) s.pub.Publish(ctx, Event{ @@ -388,6 +469,7 @@ func (s *Server) DeleteDocument(ctx context.Context, documentID string, baseVers if err := s.backend.DeleteDoc(ctx, documentID); err != nil { return err } + s.parsedOps.DeleteDocument(documentID) s.pub.Publish(ctx, Event{ Type: EventTypeDelete, @@ -399,6 +481,42 @@ func (s *Server) DeleteDocument(ctx context.Context, documentID string, baseVers return nil } +// RollbackToVersion computes an inverse OT operation from the current version +// back to targetVersion and commits it as a new versioned operation. +func (s *Server) RollbackToVersion(ctx context.Context, documentID string, targetVersion int, source string) (SubmitResult, error) { + unlock, err := s.locker.Lock(ctx, documentID) + if err != nil { + return SubmitResult{}, err + } + defer unlock() + + rec, err := s.backend.GetDoc(ctx, documentID) + if err != nil { + return SubmitResult{}, err + } + if targetVersion < 0 || targetVersion > rec.Version { + return SubmitResult{}, fmt.Errorf("%w: expected 0-%d, got %d", ErrInvalidVersion, rec.Version, targetVersion) + } + if targetVersion == rec.Version { + return SubmitResult{Version: rec.Version, Operation: json.RawMessage("[]"), Document: append(json.RawMessage(nil), rec.Doc...)}, nil + } + + rollbackOp, rollbackDoc, err := s.buildRollback(ctx, documentID, rec.Doc, targetVersion, rec.Version) + if err != nil { + return SubmitResult{}, err + } + serializedOp := append(json.RawMessage(nil), rollbackOp.ToValue().RawMessage()...) + newVersion := rec.Version + 1 + if err := s.backend.CommitOp(ctx, DocRecord{DocumentID: documentID, Version: newVersion, Doc: rollbackDoc}, OpRecord{DocumentID: documentID, Version: newVersion, BaseVersion: rec.Version, Source: source, SubmittedOp: serializedOp, Op: serializedOp}); err != nil { + return SubmitResult{}, err + } + s.cacheCommittedOperation(OpRecord{DocumentID: documentID, Version: newVersion, Op: serializedOp}, rollbackOp) + + result := SubmitResult{Version: newVersion, Operation: serializedOp, Document: append(json.RawMessage(nil), rollbackDoc...)} + s.pub.Publish(ctx, Event{Type: EventTypeOp, DocumentID: documentID, Version: newVersion, Source: source, Operation: append(json.RawMessage(nil), serializedOp...), Document: append(json.RawMessage(nil), rollbackDoc...)}) + return result, nil +} + // GetOperations returns committed operation records that produced versions in // [fromVersion+1, toVersion]. It is useful for client catch-up, audit logs, and // reconnect flows that need ShareDB-style op history. @@ -520,6 +638,144 @@ func (r OpRecord) MarshalJSON() ([]byte, error) { }) } +func (s *Server) reconstructSnapshotAt(ctx context.Context, documentID string, currentDocument json.RawMessage, targetVersion, currentVersion int) (json.RawMessage, error) { + ops, err := s.GetOperations(ctx, documentID, targetVersion, currentVersion) + if err != nil { + return nil, err + } + docValue, err := jsonot.UnmarshalValue(currentDocument) + if err != nil { + return nil, err + } + for i := len(ops) - 1; i >= 0; i-- { + inverse, err := s.invertCommittedOperation(ops[i]) + if err != nil { + return nil, err + } + applied := s.ot.Apply(ctx, docValue, inverse) + if applied.IsError() { + return nil, applied.Error() + } + docValue = applied.MustGet() + } + return append(json.RawMessage(nil), docValue.RawMessage()...), nil +} + +func (s *Server) buildRollback(ctx context.Context, documentID string, currentDocument json.RawMessage, targetVersion, currentVersion int) (*jsonot.Operation, json.RawMessage, error) { + ops, err := s.GetOperations(ctx, documentID, targetVersion, currentVersion) + if err != nil { + return nil, nil, err + } + docValue, err := jsonot.UnmarshalValue(currentDocument) + if err != nil { + return nil, nil, err + } + rollback := jsonot.NewOperation([]*jsonot.OperationComponent{}) + for i := len(ops) - 1; i >= 0; i-- { + inverse, err := s.invertCommittedOperation(ops[i]) + if err != nil { + return nil, nil, err + } + rollback.Compose(inverse) + applied := s.ot.Apply(ctx, docValue, inverse) + if applied.IsError() { + return nil, nil, applied.Error() + } + docValue = applied.MustGet() + } + if err := rollback.Validation(); err != nil { + return nil, nil, err + } + return rollback, append(json.RawMessage(nil), docValue.RawMessage()...), nil +} + +func (s *Server) invertCommittedOperation(opRec OpRecord) (*jsonot.Operation, error) { + op, err := s.getOrParseCommittedOperation(opRec) + if err != nil { + return nil, err + } + components := op.Array() + inverted := jsonot.NewOperation([]*jsonot.OperationComponent{}) + for i := len(components) - 1; i >= 0; i-- { + inverse := components[i].Invert() + if inverse.IsError() { + return nil, inverse.Error() + } + inverted.Append(inverse.MustGet()) + } + return s.parseOperation(append(json.RawMessage(nil), inverted.ToValue().RawMessage()...)) +} + +func (s *Server) getOrParseCommittedOperation(opRec OpRecord) (*jsonot.Operation, error) { + if op, ok := s.parsedOps.Get(opRec.DocumentID, opRec.Version); ok { + return op, nil + } + op, err := s.parseOperation(opRec.Op) + if err != nil { + return nil, err + } + s.cacheCommittedOperation(opRec, op) + return cloneOperation(op), nil +} + +func (s *Server) cacheCommittedOperation(opRec OpRecord, op *jsonot.Operation) { + if opRec.DocumentID == "" || opRec.Version <= 0 || op == nil { + return + } + s.parsedOps.Store(opRec.DocumentID, opRec.Version, op) +} + +func cloneOperation(op *jsonot.Operation) *jsonot.Operation { + if op == nil { + return nil + } + components := op.Array() + clones := make([]*jsonot.OperationComponent, 0, len(components)) + for _, component := range components { + clones = append(clones, component.Clone()) + } + return jsonot.NewOperation(clones) +} + +type parsedOpCache struct { + mu sync.RWMutex + docs map[string]map[int]*jsonot.Operation +} + +func (c *parsedOpCache) Get(documentID string, version int) (*jsonot.Operation, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + versions, ok := c.docs[documentID] + if !ok { + return nil, false + } + op, ok := versions[version] + if !ok { + return nil, false + } + return cloneOperation(op), true +} + +func (c *parsedOpCache) Store(documentID string, version int, op *jsonot.Operation) { + c.mu.Lock() + defer c.mu.Unlock() + if c.docs == nil { + c.docs = make(map[string]map[int]*jsonot.Operation) + } + versions, ok := c.docs[documentID] + if !ok { + versions = make(map[int]*jsonot.Operation) + c.docs[documentID] = versions + } + versions[version] = cloneOperation(op) +} + +func (c *parsedOpCache) DeleteDocument(documentID string) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.docs, documentID) +} + func (s *Server) parseOperation(raw json.RawMessage) (*jsonot.Operation, error) { payload := raw if len(payload) == 0 { diff --git a/sharedb/store_test.go b/sharedb/store_test.go index c291fe2..557660d 100644 --- a/sharedb/store_test.go +++ b/sharedb/store_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "reflect" "strings" "testing" @@ -113,6 +114,403 @@ func TestStoreSequentialAndRebasedSubmit(t *testing.T) { } } +func TestStorePageBlocksConcurrentInsertions(t *testing.T) { + ctx := context.Background() + store := NewMemoryServer() + + base := pageDocument( + block("welcome-block", "欢迎来到 jsonot + BlockNote 协同编辑示例"), + ) + _, err := store.CreateDocument(ctx, "doc-page-blocks", base) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + + leftOp := json.RawMessage(`[{"p":["blocks",1],"li":{"id":"left-block","type":"paragraph","content":[{"type":"text","text":"左侧新增段落","styles":{}}]}}]`) + rightOp := json.RawMessage(`[{"p":["blocks",1],"li":{"id":"right-block","type":"paragraph","content":[{"type":"text","text":"右侧新增段落","styles":{}}]}}]`) + + leftResult, err := store.Submit(ctx, "doc-page-blocks", 0, leftOp, "left-client") + if err != nil { + t.Fatalf("submit left failed: %v", err) + } + if leftResult.Version != 1 || leftResult.Rebased { + t.Fatalf("unexpected left submit result: %+v", leftResult) + } + + rightResult, err := store.Submit(ctx, "doc-page-blocks", 0, rightOp, "right-client") + if err != nil { + t.Fatalf("submit right failed: %v", err) + } + if rightResult.Version != 2 || !rightResult.Rebased { + t.Fatalf("unexpected right submit result: %+v", rightResult) + } + + snapshot, err := store.GetSnapshot(ctx, "doc-page-blocks") + if err != nil { + t.Fatalf("get snapshot failed: %v", err) + } + + assertPageBlockOrder(t, snapshot.Document, "welcome-block", "right-block", "left-block") + assertPageBlockText(t, snapshot.Document, "left-block", "左侧新增段落") + assertPageBlockText(t, snapshot.Document, "right-block", "右侧新增段落") +} + +func TestStorePageBlocksConcurrentInsertAndModify(t *testing.T) { + ctx := context.Background() + store := NewMemoryServer() + + base := pageDocument( + block("welcome-block", "欢迎来到 jsonot + BlockNote 协同编辑示例"), + ) + _, err := store.CreateDocument(ctx, "doc-page-mixed", base) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + + insertOp := json.RawMessage(`[{"p":["blocks",1],"li":{"id":"appendix-block","type":"paragraph","content":[{"type":"text","text":"追加说明","styles":{}}]}}]`) + modifyOp := json.RawMessage(`[{"p":["blocks",0,"content",0,"text"],"od":"欢迎来到 jsonot + BlockNote 协同编辑示例","oi":"欢迎来到 jsonot + BlockNote 协同编辑示例(已修改)"}]`) + + insertResult, err := store.Submit(ctx, "doc-page-mixed", 0, insertOp, "insert-client") + if err != nil { + t.Fatalf("submit insert failed: %v", err) + } + if insertResult.Version != 1 || insertResult.Rebased { + t.Fatalf("unexpected insert submit result: %+v", insertResult) + } + + modifyResult, err := store.Submit(ctx, "doc-page-mixed", 0, modifyOp, "modify-client") + if err != nil { + t.Fatalf("submit modify failed: %v", err) + } + if modifyResult.Version != 2 || !modifyResult.Rebased { + t.Fatalf("unexpected modify submit result: %+v", modifyResult) + } + + snapshot, err := store.GetSnapshot(ctx, "doc-page-mixed") + if err != nil { + t.Fatalf("get snapshot failed: %v", err) + } + + assertPageBlockOrder(t, snapshot.Document, "welcome-block", "appendix-block") + assertPageBlockText(t, snapshot.Document, "welcome-block", "欢迎来到 jsonot + BlockNote 协同编辑示例(已修改)") + assertPageBlockText(t, snapshot.Document, "appendix-block", "追加说明") +} + +func TestStorePageBlocksConcurrentModifySameBlock(t *testing.T) { + ctx := context.Background() + store := NewMemoryServer() + + base := pageDocument( + block("welcome-block", "hello"), + ) + _, err := store.CreateDocument(ctx, "doc-page-same-block", base) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + + leftOp := json.RawMessage(`[{"p":["blocks",0,"content",0,"text"],"od":"hello","oi":"hello left"}]`) + rightOp := json.RawMessage(`[{"p":["blocks",0,"content",0,"text"],"od":"hello","oi":"hello right"}]`) + + if _, err := store.Submit(ctx, "doc-page-same-block", 0, leftOp, "left-client"); err != nil { + t.Fatalf("submit left failed: %v", err) + } + rightResult, err := store.Submit(ctx, "doc-page-same-block", 0, rightOp, "right-client") + if err != nil { + t.Fatalf("submit right failed: %v", err) + } + if !rightResult.Rebased || !sameJSON(t, rightResult.Operation, json.RawMessage(`[{"p":["blocks",0,"content",0,"text"],"od":"hello left","oi":"hello right"}]`)) { + t.Fatalf("unexpected rebased right operation: %+v", rightResult) + } + + snapshot, err := store.GetSnapshot(ctx, "doc-page-same-block") + if err != nil { + t.Fatalf("get snapshot failed: %v", err) + } + assertPageBlockText(t, snapshot.Document, "welcome-block", "hello right") +} + +func TestSubmitRebasesAcrossMultipleMissedOps(t *testing.T) { + ctx := context.Background() + backend := &countingBackend{MemoryBackend: NewMemoryBackend()} + store := NewServer(backend, NewMemoryLocker()) + + _, err := store.CreateDocument(ctx, "doc-stale-gap", json.RawMessage(`{"counter":0}`)) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + for i, delta := range []int{1, 2, 3} { + if _, err := store.Submit(ctx, "doc-stale-gap", i, json.RawMessage(fmt.Sprintf(`[{"p":["counter"],"na":%d}]`, delta)), fmt.Sprintf("writer-%d", i+1)); err != nil { + t.Fatalf("seed submit %d failed: %v", i+1, err) + } + } + + result, err := store.Submit(ctx, "doc-stale-gap", 0, json.RawMessage(`[{"p":["counter"],"na":10}]`), "stale-client") + if err != nil { + t.Fatalf("submit stale op failed: %v", err) + } + if !result.Rebased || result.Version != 4 { + t.Fatalf("unexpected stale submit result: %+v", result) + } + if backend.getOpsCalls != 1 || backend.lastGetOpsFrom != 0 || backend.lastGetOpsTo != 3 { + t.Fatalf("unexpected GetOps usage: calls=%d from=%d to=%d", backend.getOpsCalls, backend.lastGetOpsFrom, backend.lastGetOpsTo) + } + if !sameJSON(t, result.Operation, json.RawMessage(`[{"p":["counter"],"na":10}]`)) { + t.Fatalf("unexpected transformed op: %s", result.Operation) + } + + snapshot, err := store.GetSnapshot(ctx, "doc-stale-gap") + if err != nil { + t.Fatalf("get snapshot failed: %v", err) + } + if got := string(snapshot.Document); got != `{"counter":16}` { + t.Fatalf("unexpected final document: %s", snapshot.Document) + } + + ops, err := store.GetOperations(ctx, "doc-stale-gap", 3, 4) + if err != nil { + t.Fatalf("get operations failed: %v", err) + } + if len(ops) != 1 { + t.Fatalf("unexpected op count: got %d want 1", len(ops)) + } + if ops[0].BaseVersion != 0 || !sameJSON(t, ops[0].SubmittedOp, json.RawMessage(`[{"p":["counter"],"na":10}]`)) || !sameJSON(t, ops[0].Op, result.Operation) { + t.Fatalf("unexpected stored stale op record: %+v", ops[0]) + } +} + +func TestSubmitStaleDeleteBecomesNoopAfterConcurrentDelete(t *testing.T) { + ctx := context.Background() + store := NewMemoryServer() + + base := pageDocument( + block("welcome-block", "hello"), + block("obsolete-block", "bye"), + ) + _, err := store.CreateDocument(ctx, "doc-stale-noop", base) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + + deleteOp := json.RawMessage(`[{"p":["blocks",1],"ld":{"id":"obsolete-block","type":"paragraph","content":[{"type":"text","text":"bye","styles":{}}]}}]`) + if _, err := store.Submit(ctx, "doc-stale-noop", 0, deleteOp, "deleter-a"); err != nil { + t.Fatalf("first delete failed: %v", err) + } + + result, err := store.Submit(ctx, "doc-stale-noop", 0, deleteOp, "deleter-b") + if err != nil { + t.Fatalf("second stale delete failed: %v", err) + } + if !result.Rebased || result.Version != 1 || !sameJSON(t, result.Operation, json.RawMessage(`[]`)) { + t.Fatalf("unexpected noop stale delete result: %+v", result) + } + + snapshot, err := store.GetSnapshot(ctx, "doc-stale-noop") + if err != nil { + t.Fatalf("get snapshot failed: %v", err) + } + assertPageBlockOrder(t, snapshot.Document, "welcome-block") + + ops, err := store.GetOperations(ctx, "doc-stale-noop", 0, 1) + if err != nil { + t.Fatalf("get operations failed: %v", err) + } + if len(ops) != 1 { + t.Fatalf("noop stale delete should not append op, got %d committed ops", len(ops)) + } +} + +func TestSubmitRejectsStaleBasePastRebaseWindow(t *testing.T) { + ctx := context.Background() + backend := &countingBackend{MemoryBackend: NewMemoryBackend()} + store := NewServer(backend, NewMemoryLocker(), WithMaxRebaseGap(2)) + + _, err := store.CreateDocument(ctx, "doc-stale-window", json.RawMessage(`{"counter":0}`)) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + for i := 0; i < 3; i++ { + if _, err := store.Submit(ctx, "doc-stale-window", i, json.RawMessage(`[{"p":["counter"],"na":1}]`), fmt.Sprintf("writer-%d", i+1)); err != nil { + t.Fatalf("seed submit %d failed: %v", i+1, err) + } + } + + _, err = store.Submit(ctx, "doc-stale-window", 0, json.RawMessage(`[{"p":["counter"],"na":10}]`), "stale-client") + if err == nil { + t.Fatalf("expected stale submit to require resync") + } + if !errors.Is(err, ErrStaleResyncRequired) { + t.Fatalf("expected ErrStaleResyncRequired, got %v", err) + } + var staleErr *StaleSubmitError + if !errors.As(err, &staleErr) { + t.Fatalf("expected StaleSubmitError, got %T", err) + } + if staleErr.BaseVersion != 0 || staleErr.CurrentVersion != 3 || staleErr.MinSupportedVersion != 1 || staleErr.MaxRebaseGap != 2 { + t.Fatalf("unexpected stale error details: %+v", staleErr) + } + if backend.getOpsCalls != 0 { + t.Fatalf("GetOps should not run when rebase window is exceeded, got %d calls", backend.getOpsCalls) + } + + snapshot, err := store.GetSnapshot(ctx, "doc-stale-window") + if err != nil { + t.Fatalf("get snapshot failed: %v", err) + } + if got := string(snapshot.Document); got != `{"counter":3}` { + t.Fatalf("unexpected document after rejected stale submit: %s", snapshot.Document) + } + ops, err := store.GetOperations(ctx, "doc-stale-window", 0, 3) + if err != nil { + t.Fatalf("get operations failed: %v", err) + } + if len(ops) != 3 { + t.Fatalf("rejected stale submit should not append op, got %d committed ops", len(ops)) + } +} + +func TestSubmitAllowsStaleBaseAtRebaseWindowBoundary(t *testing.T) { + ctx := context.Background() + backend := &countingBackend{MemoryBackend: NewMemoryBackend()} + store := NewServer(backend, NewMemoryLocker(), WithMaxRebaseGap(2)) + + _, err := store.CreateDocument(ctx, "doc-stale-boundary", json.RawMessage(`{"counter":0}`)) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + for i := 0; i < 2; i++ { + if _, err := store.Submit(ctx, "doc-stale-boundary", i, json.RawMessage(`[{"p":["counter"],"na":1}]`), fmt.Sprintf("writer-%d", i+1)); err != nil { + t.Fatalf("seed submit %d failed: %v", i+1, err) + } + } + + result, err := store.Submit(ctx, "doc-stale-boundary", 0, json.RawMessage(`[{"p":["counter"],"na":10}]`), "stale-client") + if err != nil { + t.Fatalf("boundary stale submit failed: %v", err) + } + if !result.Rebased || result.Version != 3 { + t.Fatalf("unexpected boundary stale submit result: %+v", result) + } + if backend.getOpsCalls != 1 || backend.lastGetOpsFrom != 0 || backend.lastGetOpsTo != 2 { + t.Fatalf("unexpected GetOps usage at boundary: calls=%d from=%d to=%d", backend.getOpsCalls, backend.lastGetOpsFrom, backend.lastGetOpsTo) + } +} + +func TestStaleSubmitErrorExposesReason(t *testing.T) { + err := &StaleSubmitError{Reason: StaleReasonVersionBehindWindow, DocumentID: "doc-1", BaseVersion: 1, CurrentVersion: 5, MinSupportedVersion: 3, MaxRebaseGap: 2} + if err.Reason != StaleReasonVersionBehindWindow { + t.Fatalf("unexpected stale reason: got %q want %q", err.Reason, StaleReasonVersionBehindWindow) + } +} + +func TestResyncRequiredMessageIncludesReasonAndVersionMetadata(t *testing.T) { + ctx := context.Background() + mailbox := NewMemoryMailboxStore() + if err := mailbox.Append(ctx, testEnvelope("env-1", "sess-1", 1)); err != nil { + t.Fatalf("append envelope failed: %v", err) + } + + serverWS, clientWS := newWebSocketPair(t) + defer clientWS.Close() + conn := NewClientConn(serverWS, 4) + conn.Start(nil) + defer conn.Wait() + defer conn.Close() + + if err := sendConnectHandshake(ctx, conn, mailbox, ConnectRequest{ + Type: MessageTypeConnect, + SessionID: "sess-1", + DocumentID: "doc-1", + LastEnvelopeID: "env-missing", + }); err != nil { + t.Fatalf("send connect handshake failed: %v", err) + } + + var connected ConnectedMessage + readClientJSON(t, clientWS, &connected) + var resync ResyncRequiredMessage + readClientJSON(t, clientWS, &resync) + if resync.Type != MessageTypeResyncRequired { + t.Fatalf("unexpected resync type: %s", resync.Type) + } + if resync.Reason != StaleReasonReplayCursorNotFound { + t.Fatalf("unexpected resync reason: got %q want %q", resync.Reason, StaleReasonReplayCursorNotFound) + } + if resync.CurrentVersion != 0 || resync.MinSupportedVersion != 0 || resync.MaxRebaseGap != 0 { + t.Fatalf("unexpected resync metadata: %+v", resync) + } +} + +func TestGetSnapshotAtReturnsHistoricalVersion(t *testing.T) { + ctx := context.Background() + store := NewMemoryServer() + _, err := store.CreateDocument(ctx, "doc-snapshot-at", json.RawMessage(`{"counter":0}`)) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + for version, delta := range []int{1, 2, 3} { + op := json.RawMessage(fmt.Sprintf(`[{"p":["counter"],"na":%d}]`, delta)) + if _, err := store.Submit(ctx, "doc-snapshot-at", version, op, fmt.Sprintf("writer-%d", version+1)); err != nil { + t.Fatalf("seed submit %d failed: %v", version+1, err) + } + } + + snapshot, err := store.GetSnapshotAt(ctx, "doc-snapshot-at", 2) + if err != nil { + t.Fatalf("GetSnapshotAt failed: %v", err) + } + if snapshot.Version != 2 { + t.Fatalf("unexpected snapshot version: got %d want 2", snapshot.Version) + } + if got := string(snapshot.Document); got != `{"counter":3}` { + t.Fatalf("unexpected historical snapshot: %s", snapshot.Document) + } +} + +func TestRollbackToVersionCommitsInverseOperation(t *testing.T) { + ctx := context.Background() + store := NewMemoryServer() + _, err := store.CreateDocument(ctx, "doc-rollback", json.RawMessage(`{"counter":0}`)) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + if _, err := store.Submit(ctx, "doc-rollback", 0, json.RawMessage(`[{"p":["counter"],"na":1}]`), "writer-1"); err != nil { + t.Fatalf("submit first failed: %v", err) + } + if _, err := store.Submit(ctx, "doc-rollback", 1, json.RawMessage(`[{"p":["counter"],"na":2}]`), "writer-2"); err != nil { + t.Fatalf("submit second failed: %v", err) + } + + result, err := store.RollbackToVersion(ctx, "doc-rollback", 1, "rollback-bot") + if err != nil { + t.Fatalf("RollbackToVersion failed: %v", err) + } + if result.Version != 3 { + t.Fatalf("unexpected rollback version: got %d want 3", result.Version) + } + if !sameJSON(t, result.Operation, json.RawMessage(`[{"p":["counter"],"na":-2}]`)) { + t.Fatalf("unexpected rollback operation: %s", result.Operation) + } + if got := string(result.Document); got != `{"counter":1}` { + t.Fatalf("unexpected rollback document: %s", result.Document) + } + + snapshot, err := store.GetSnapshot(ctx, "doc-rollback") + if err != nil { + t.Fatalf("get snapshot failed: %v", err) + } + if got := string(snapshot.Document); got != `{"counter":1}` { + t.Fatalf("unexpected current snapshot after rollback: %s", snapshot.Document) + } + + historical, err := store.GetSnapshotAt(ctx, "doc-rollback", 1) + if err != nil { + t.Fatalf("GetSnapshotAt after rollback failed: %v", err) + } + if !sameJSON(t, historical.Document, snapshot.Document) { + t.Fatalf("rollback target snapshot mismatch") + } +} + func TestStoreSubscribe(t *testing.T) { ctx := context.Background() store := NewMemoryServer() @@ -580,3 +978,152 @@ func TestMemoryPublisherDeliversIndependentEventPayloads(t *testing.T) { t.Fatalf("subscriber payloads should be independent, got op=%s doc=%s", rightEvent.Operation, rightEvent.Document) } } + +func BenchmarkSubmitStaleBaseGap(b *testing.B) { + for _, gap := range []int{1, 10, 100, 1000} { + b.Run(fmt.Sprintf("counter-gap-%d", gap), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + ctx := context.Background() + store := NewMemoryServer() + docID := fmt.Sprintf("doc-counter-%d", i) + if _, err := store.CreateDocument(ctx, docID, json.RawMessage(`{"counter":0}`)); err != nil { + b.Fatalf("create document failed: %v", err) + } + for version := 0; version < gap; version++ { + op := json.RawMessage(`[{"p":["counter"],"na":1}]`) + if _, err := store.Submit(ctx, docID, version, op, fmt.Sprintf("writer-%d", version)); err != nil { + b.Fatalf("seed submit failed: %v", err) + } + } + b.StartTimer() + _, err := store.Submit(ctx, docID, 0, json.RawMessage(`[{"p":["counter"],"na":10}]`), "stale-client") + b.StopTimer() + if err != nil { + b.Fatalf("stale submit failed: %v", err) + } + } + }) + + b.Run(fmt.Sprintf("page-block-gap-%d", gap), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + ctx := context.Background() + store := NewMemoryServer() + docID := fmt.Sprintf("doc-page-%d", i) + if _, err := store.CreateDocument(ctx, docID, pageDocument(block("welcome-block", "hello"))); err != nil { + b.Fatalf("create document failed: %v", err) + } + for version := 0; version < gap; version++ { + op := json.RawMessage(fmt.Sprintf(`[{"p":["blocks",1],"li":{"id":"seed-%d","type":"paragraph","content":[{"type":"text","text":"seed-%d","styles":{}}]}}]`, version, version)) + if _, err := store.Submit(ctx, docID, version, op, fmt.Sprintf("writer-%d", version)); err != nil { + b.Fatalf("seed submit failed: %v", err) + } + } + staleOp := json.RawMessage(`[{"p":["blocks",0,"content",0,"text"],"od":"hello","oi":"hello stale"}]`) + b.StartTimer() + _, err := store.Submit(ctx, docID, 0, staleOp, "stale-client") + b.StopTimer() + if err != nil { + b.Fatalf("stale submit failed: %v", err) + } + } + }) + } +} + +type countingBackend struct { + *MemoryBackend + getOpsCalls int + lastGetOpsFrom int + lastGetOpsTo int +} + +func (b *countingBackend) GetOps(ctx context.Context, docID string, fromVersion, toVersion int) ([]OpRecord, error) { + b.getOpsCalls++ + b.lastGetOpsFrom = fromVersion + b.lastGetOpsTo = toVersion + return b.MemoryBackend.GetOps(ctx, docID, fromVersion, toVersion) +} + +func pageDocument(blocks ...map[string]any) json.RawMessage { + payload, err := json.Marshal(map[string]any{"blocks": blocks}) + if err != nil { + panic(err) + } + return payload +} + +func block(id, text string) map[string]any { + return map[string]any{ + "id": id, + "type": "paragraph", + "content": []any{ + map[string]any{ + "type": "text", + "text": text, + "styles": map[string]any{}, + }, + }, + } +} + +func assertPageBlockOrder(t *testing.T, document json.RawMessage, wantIDs ...string) { + t.Helper() + got := extractPageBlockIDs(t, document) + if !reflect.DeepEqual(got, wantIDs) { + t.Fatalf("unexpected block order: got %v want %v", got, wantIDs) + } +} + +func assertPageBlockText(t *testing.T, document json.RawMessage, blockID, want string) { + t.Helper() + got := extractPageBlockTexts(t, document) + if got[blockID] != want { + t.Fatalf("unexpected block text for %s: got %q want %q", blockID, got[blockID], want) + } +} + +func extractPageBlockIDs(t *testing.T, document json.RawMessage) []string { + t.Helper() + type pageBlock struct { + ID string `json:"id"` + } + type page struct { + Blocks []pageBlock `json:"blocks"` + } + var decoded page + if err := json.Unmarshal(document, &decoded); err != nil { + t.Fatalf("unmarshal page document failed: %v", err) + } + ids := make([]string, 0, len(decoded.Blocks)) + for _, blk := range decoded.Blocks { + ids = append(ids, blk.ID) + } + return ids +} + +func extractPageBlockTexts(t *testing.T, document json.RawMessage) map[string]string { + t.Helper() + type textNode struct { + Text string `json:"text"` + } + type pageBlock struct { + ID string `json:"id"` + Content []textNode `json:"content"` + } + type page struct { + Blocks []pageBlock `json:"blocks"` + } + var decoded page + if err := json.Unmarshal(document, &decoded); err != nil { + t.Fatalf("unmarshal page document failed: %v", err) + } + texts := make(map[string]string, len(decoded.Blocks)) + for _, blk := range decoded.Blocks { + if len(blk.Content) > 0 { + texts[blk.ID] = blk.Content[0].Text + } + } + return texts +} diff --git a/sharedb/ws_protocol.go b/sharedb/ws_protocol.go index d984eb1..72ca9cb 100644 --- a/sharedb/ws_protocol.go +++ b/sharedb/ws_protocol.go @@ -16,6 +16,9 @@ const ( MessageTypeResyncRequired = "resync_required" MessageTypeEvent = "event" MessageTypeAckEnvelope = "ack_envelope" + MessageTypeSubmit = "submit" + MessageTypeSubmitAck = "submit_ack" + MessageTypeSubmitError = "submit_error" ) const defaultReplayLimit = 256 @@ -47,10 +50,14 @@ type ReplayMessage struct { // ResyncRequiredMessage tells the client mailbox replay cannot continue from its cursor. type ResyncRequiredMessage struct { - Type string `json:"type"` - SessionID string `json:"sessionId"` - DocumentID string `json:"documentId"` - LastAckedEnvelopeID string `json:"lastAckedEnvelopeId,omitempty"` + Type string `json:"type"` + SessionID string `json:"sessionId"` + DocumentID string `json:"documentId"` + LastAckedEnvelopeID string `json:"lastAckedEnvelopeId,omitempty"` + Reason StaleReason `json:"reason,omitempty"` + CurrentVersion int `json:"currentVersion,omitempty"` + MinSupportedVersion int `json:"minSupportedVersion,omitempty"` + MaxRebaseGap int `json:"maxRebaseGap,omitempty"` } // EventMessage delivers one live session envelope over the websocket transport. @@ -68,6 +75,47 @@ type AckEnvelopeRequest struct { EnvelopeID string `json:"envelopeId"` } +// SubmitMessage carries one structured document submit over websocket transport. +type SubmitMessage struct { + Type string `json:"type"` + SessionID string `json:"sessionId"` + DocumentID string `json:"documentId"` + Request SubmitRequest `json:"request"` +} + +// SubmitAckMessage acknowledges a successful submit and includes the replayable envelope. +type SubmitAckMessage struct { + Type string `json:"type"` + SessionID string `json:"sessionId"` + DocumentID string `json:"documentId"` + Result SubmitResult `json:"result"` + Envelope *Envelope `json:"envelope,omitempty"` +} + +// SubmitErrorCode classifies websocket submit failures. +type SubmitErrorCode string + +const ( + SubmitErrorCodeStaleResyncRequired SubmitErrorCode = "stale_resync_required" + SubmitErrorCodeInvalidVersion SubmitErrorCode = "invalid_version" + SubmitErrorCodeDuplicateSequenceConflict SubmitErrorCode = "duplicate_sequence_conflict" + SubmitErrorCodeInvalidClientMessage SubmitErrorCode = "invalid_client_message" + SubmitErrorCodeUnknown SubmitErrorCode = "unknown" +) + +// SubmitErrorMessage surfaces a structured submit failure to websocket clients. +type SubmitErrorMessage struct { + Type string `json:"type"` + SessionID string `json:"sessionId"` + DocumentID string `json:"documentId"` + Code SubmitErrorCode `json:"code"` + Reason StaleReason `json:"reason,omitempty"` + CurrentVersion int `json:"currentVersion,omitempty"` + MinSupportedVersion int `json:"minSupportedVersion,omitempty"` + MaxRebaseGap int `json:"maxRebaseGap,omitempty"` + Message string `json:"message,omitempty"` +} + // HandleConnect registers the websocket connection for the session and sends replay state. func HandleConnect(ctx context.Context, conn *ClientConn, sessions *SessionManager, mailbox MailboxStore, req ConnectRequest) error { if err := sendConnectHandshake(ctx, conn, mailbox, req); err != nil { @@ -138,6 +186,10 @@ func sendConnectHandshake(ctx context.Context, conn *ClientConn, mailbox Mailbox SessionID: req.SessionID, DocumentID: req.DocumentID, LastAckedEnvelopeID: replay.LastAcked, + Reason: replay.Reason, + CurrentVersion: replay.CurrentVersion, + MinSupportedVersion: replay.MinSupportedVersion, + MaxRebaseGap: replay.MaxRebaseGap, }) } if len(replay.Envelopes) == 0 { @@ -165,6 +217,82 @@ func validateAckEnvelopeRequest(req AckEnvelopeRequest) error { return nil } +func validateSubmitMessage(msg SubmitMessage) error { + if msg.Type != MessageTypeSubmit || msg.SessionID == "" || msg.DocumentID == "" { + return ErrInvalidClientMessage + } + if msg.Request.DocumentID == "" { + msg.Request.DocumentID = msg.DocumentID + } + if msg.Request.DocumentID != msg.DocumentID { + return ErrInvalidClientMessage + } + return nil +} + +func canonicalizeSubmitRequest(msg SubmitMessage) SubmitRequest { + req := msg.Request + req.DocumentID = msg.DocumentID + req.Source = msg.SessionID + if req.ID.Source != "" || req.ID.Sequence > 0 { + req.ID.Source = msg.SessionID + } + return req +} + +func submitAckEnvelope(sessionID, documentID string, version int, gen EnvelopeIDGenerator) (Envelope, error) { + if gen == nil { + return Envelope{}, ErrInvalidClientMessage + } + id := gen() + if id == "" { + return Envelope{}, ErrInvalidEnvelope + } + return Envelope{ + ID: id, + SessionID: sessionID, + DocumentID: documentID, + Kind: EnvelopeKindAck, + Version: version, + }, nil +} + +func submitErrorMessage(sessionID, documentID string, err error) SubmitErrorMessage { + msg := SubmitErrorMessage{ + Type: MessageTypeSubmitError, + SessionID: sessionID, + DocumentID: documentID, + Code: SubmitErrorCodeUnknown, + Message: "submit failed", + } + var staleErr *StaleSubmitError + if errors.As(err, &staleErr) { + msg.Code = SubmitErrorCodeStaleResyncRequired + msg.Reason = staleErr.Reason + msg.CurrentVersion = staleErr.CurrentVersion + msg.MinSupportedVersion = staleErr.MinSupportedVersion + msg.MaxRebaseGap = staleErr.MaxRebaseGap + msg.Message = "resync required" + return msg + } + if errors.Is(err, ErrInvalidVersion) { + msg.Code = SubmitErrorCodeInvalidVersion + msg.Message = "invalid document version" + return msg + } + if errors.Is(err, ErrDuplicateSequenceConflict) { + msg.Code = SubmitErrorCodeDuplicateSequenceConflict + msg.Message = "duplicate submit sequence conflict" + return msg + } + if errors.Is(err, ErrInvalidClientMessage) { + msg.Code = SubmitErrorCodeInvalidClientMessage + msg.Message = "invalid client message" + return msg + } + return msg +} + // HandleAckEnvelope advances the session mailbox ack cursor through the provided envelope. func HandleAckEnvelope(ctx context.Context, mailbox MailboxStore, req AckEnvelopeRequest) error { if err := validateAckEnvelopeRequest(req); err != nil { @@ -176,6 +304,42 @@ func HandleAckEnvelope(ctx context.Context, mailbox MailboxStore, req AckEnvelop return mailbox.AckThrough(ctx, req.SessionID, req.EnvelopeID) } +// HandleSubmit executes one structured submit and sends either submit_ack or submit_error. +func HandleSubmit(ctx context.Context, conn *ClientConn, server *Server, mailbox MailboxStore, msg SubmitMessage, gen EnvelopeIDGenerator) error { + if err := validateSubmitMessage(msg); err != nil { + return err + } + if conn == nil || server == nil || mailbox == nil || gen == nil { + return ErrInvalidClientMessage + } + req := canonicalizeSubmitRequest(msg) + result, err := server.SubmitWithRequest(ctx, req) + if err != nil { + if sendErr := conn.sendJSON(submitErrorMessage(msg.SessionID, msg.DocumentID, err)); sendErr != nil { + return sendErr + } + return nil + } + ack := SubmitAckMessage{ + Type: MessageTypeSubmitAck, + SessionID: msg.SessionID, + DocumentID: msg.DocumentID, + Result: result, + } + if result.Duplicate { + return conn.sendJSON(ack) + } + env, err := submitAckEnvelope(msg.SessionID, msg.DocumentID, result.Version, gen) + if err != nil { + return err + } + if err := mailbox.Append(ctx, env); err != nil { + return err + } + ack.Envelope = &env + return conn.sendJSON(ack) +} + // AttachLiveSession subscribes the active websocket connection to live document events. func AttachLiveSession(ctx context.Context, conn *ClientConn, server *Server, mailbox MailboxStore, sessionID, documentID string, gen EnvelopeIDGenerator) error { if conn == nil || server == nil || mailbox == nil || gen == nil || sessionID == "" || documentID == "" || conn.Context().Err() != nil { @@ -220,6 +384,9 @@ func startLiveRelay(conn *ClientConn, mailbox MailboxStore, sessionID, documentI if event.Version <= skipThroughVersion { continue } + if event.Source == sessionID { + continue + } env, err := EnvelopeForSession(sessionID, event, gen) if err != nil { conn.Close() @@ -261,6 +428,22 @@ func NewSessionInboundHandler(server *Server, mailbox MailboxStore, sessions *Se return ErrInvalidClientMessage } return HandleConnectAndAttachLive(ctx, conn, server, sessions, mailbox, req, gen) + case MessageTypeSubmit: + var msg SubmitMessage + if err := json.Unmarshal(payload, &msg); err != nil { + return ErrInvalidClientMessage + } + if sessions == nil { + return ErrInvalidClientMessage + } + active, ok := sessions.Active(msg.SessionID) + if !ok || active != conn { + return ErrInvalidClientMessage + } + if !conn.hasLiveAttachment(msg.SessionID, msg.DocumentID) { + return ErrInvalidClientMessage + } + return HandleSubmit(ctx, conn, server, mailbox, msg, gen) case MessageTypeAckEnvelope: var req AckEnvelopeRequest if err := json.Unmarshal(payload, &req); err != nil { diff --git a/sharedb/ws_protocol_test.go b/sharedb/ws_protocol_test.go index ba5d468..b851694 100644 --- a/sharedb/ws_protocol_test.go +++ b/sharedb/ws_protocol_test.go @@ -4,7 +4,9 @@ import ( "context" "encoding/json" "errors" + "os" "testing" + "time" "github.com/gorilla/websocket" ) @@ -300,6 +302,416 @@ func TestHandleAckEnvelopeReturnsEnvelopeNotFoundForUnknownCursor(t *testing.T) } } +func TestHandleSubmitSendsSubmitAckAndAppendsMailboxEnvelope(t *testing.T) { + ctx := context.Background() + store := NewServer(NewMemoryBackend(), NewMemoryLocker(), WithMaxRebaseGap(2)) + _, err := store.CreateDocument(ctx, "doc-submit", json.RawMessage(`{"counter":0}`)) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + mailbox := NewMemoryMailboxStore() + serverWS, clientWS := newWebSocketPair(t) + defer clientWS.Close() + conn := NewClientConn(serverWS, 8) + conn.Start(nil) + defer conn.Wait() + defer conn.Close() + + err = HandleSubmit(ctx, conn, store, mailbox, SubmitMessage{ + Type: MessageTypeSubmit, + SessionID: "sess-submit", + DocumentID: "doc-submit", + Request: SubmitRequest{ + DocumentID: "doc-submit", + BaseVersion: 0, + Operation: json.RawMessage(`[{"p":["counter"],"na":1}]`), + Source: "client-a", + Sequence: 1, + }, + }, func() string { return "env-submit-1" }) + if err != nil { + t.Fatalf("handle submit failed: %v", err) + } + + var ack SubmitAckMessage + readClientJSON(t, clientWS, &ack) + if ack.Type != MessageTypeSubmitAck || ack.SessionID != "sess-submit" || ack.DocumentID != "doc-submit" { + t.Fatalf("unexpected submit ack envelope: %+v", ack) + } + if ack.Result.Version != 1 || ack.Result.Rebased || ack.Envelope.ID != "env-submit-1" || ack.Envelope.Version != 1 { + t.Fatalf("unexpected submit ack payload: %+v", ack) + } + + replayed, err := mailbox.ReplayAfter(ctx, "sess-submit", "", 0) + if err != nil { + t.Fatalf("replay mailbox failed: %v", err) + } + if len(replayed) != 1 || replayed[0].ID != "env-submit-1" || replayed[0].Version != 1 { + t.Fatalf("unexpected mailbox replay result: %+v", replayed) + } +} + +func TestHandleSubmitSendsSubmitErrorForStaleResyncRequired(t *testing.T) { + ctx := context.Background() + store := NewServer(NewMemoryBackend(), NewMemoryLocker(), WithMaxRebaseGap(1)) + _, err := store.CreateDocument(ctx, "doc-submit-stale", json.RawMessage(`{"counter":0}`)) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + for i := 0; i < 2; i++ { + if _, err := store.Submit(ctx, "doc-submit-stale", i, json.RawMessage(`[{"p":["counter"],"na":1}]`), "writer"); err != nil { + t.Fatalf("seed submit %d failed: %v", i+1, err) + } + } + mailbox := NewMemoryMailboxStore() + serverWS, clientWS := newWebSocketPair(t) + defer clientWS.Close() + conn := NewClientConn(serverWS, 8) + conn.Start(nil) + defer conn.Wait() + defer conn.Close() + + err = HandleSubmit(ctx, conn, store, mailbox, SubmitMessage{ + Type: MessageTypeSubmit, + SessionID: "sess-submit", + DocumentID: "doc-submit-stale", + Request: SubmitRequest{ + DocumentID: "doc-submit-stale", + BaseVersion: 0, + Operation: json.RawMessage(`[{"p":["counter"],"na":10}]`), + Source: "stale-client", + Sequence: 9, + }, + }, func() string { return "env-submit-stale" }) + if err != nil { + t.Fatalf("handle submit should return nil after sending protocol error, got %v", err) + } + + var msg SubmitErrorMessage + readClientJSON(t, clientWS, &msg) + if msg.Type != MessageTypeSubmitError || msg.Reason != StaleReasonVersionBehindWindow { + t.Fatalf("unexpected submit error payload: %+v", msg) + } + if msg.CurrentVersion != 2 || msg.MinSupportedVersion != 1 || msg.MaxRebaseGap != 1 { + t.Fatalf("unexpected submit error metadata: %+v", msg) + } +} + +func TestSessionInboundHandlerRoutesSubmitMessage(t *testing.T) { + ctx := context.Background() + store := NewServer(NewMemoryBackend(), NewMemoryLocker(), WithMaxRebaseGap(2)) + _, err := store.CreateDocument(ctx, "doc-dispatch-submit", json.RawMessage(`{"counter":0}`)) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + mailbox := NewMemoryMailboxStore() + serverWS, clientWS := newWebSocketPair(t) + defer clientWS.Close() + conn := NewClientConn(serverWS, 8) + conn.Start(nil) + defer conn.Wait() + defer conn.Close() + + sessions := NewSessionManager() + if err := sessions.Register("sess-dispatch", conn); err != nil { + t.Fatalf("register session failed: %v", err) + } + if err := AttachLiveSession(ctx, conn, store, mailbox, "sess-dispatch", "doc-dispatch-submit", func() string { return "env-live-unused" }); err != nil { + t.Fatalf("attach live session failed: %v", err) + } + handler := NewSessionInboundHandler(store, mailbox, sessions, func() string { return "env-dispatch-1" }) + payload := []byte(`{"type":"submit","sessionId":"sess-dispatch","documentId":"doc-dispatch-submit","request":{"documentId":"doc-dispatch-submit","baseVersion":0,"op":[{"p":["counter"],"na":2}],"source":"client-a","seq":2}}`) + if err := handler(ctx, conn, payload); err != nil { + t.Fatalf("dispatcher submit failed: %v", err) + } + + var ack SubmitAckMessage + readClientJSON(t, clientWS, &ack) + if ack.Type != MessageTypeSubmitAck || ack.Result.Version != 1 || ack.Envelope == nil || ack.Envelope.ID != "env-dispatch-1" { + t.Fatalf("unexpected dispatcher submit ack: %+v", ack) + } +} + +func TestSessionInboundHandlerRejectsSubmitFromNonActiveSession(t *testing.T) { + ctx := context.Background() + store := NewServer(NewMemoryBackend(), NewMemoryLocker(), WithMaxRebaseGap(2)) + _, err := store.CreateDocument(ctx, "doc-dispatch-reject", json.RawMessage(`{"counter":0}`)) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + mailbox := NewMemoryMailboxStore() + serverWS, clientWS := newWebSocketPair(t) + defer clientWS.Close() + conn := NewClientConn(serverWS, 8) + conn.Start(nil) + defer conn.Wait() + defer conn.Close() + + handler := NewSessionInboundHandler(store, mailbox, NewSessionManager(), func() string { return "env-unused" }) + payload := []byte(`{"type":"submit","sessionId":"sess-other","documentId":"doc-dispatch-reject","request":{"documentId":"doc-dispatch-reject","baseVersion":0,"op":[{"p":["counter"],"na":2}],"source":"client-a","seq":2}}`) + if err := handler(ctx, conn, payload); !errors.Is(err, ErrInvalidClientMessage) { + t.Fatalf("expected invalid client message, got %v", err) + } +} + +func TestSessionInboundHandlerRejectsSubmitForUnattachedDocument(t *testing.T) { + ctx := context.Background() + store := NewServer(NewMemoryBackend(), NewMemoryLocker(), WithMaxRebaseGap(2)) + _, err := store.CreateDocument(ctx, "doc-dispatch-a", json.RawMessage(`{"counter":0}`)) + if err != nil { + t.Fatalf("create document A failed: %v", err) + } + _, err = store.CreateDocument(ctx, "doc-dispatch-b", json.RawMessage(`{"counter":0}`)) + if err != nil { + t.Fatalf("create document B failed: %v", err) + } + mailbox := NewMemoryMailboxStore() + serverWS, clientWS := newWebSocketPair(t) + defer clientWS.Close() + conn := NewClientConn(serverWS, 8) + conn.Start(nil) + defer conn.Wait() + defer conn.Close() + + sessions := NewSessionManager() + if err := sessions.Register("sess-attach", conn); err != nil { + t.Fatalf("register session failed: %v", err) + } + if err := AttachLiveSession(ctx, conn, store, mailbox, "sess-attach", "doc-dispatch-a", func() string { return "env-a" }); err != nil { + t.Fatalf("attach live session failed: %v", err) + } + handler := NewSessionInboundHandler(store, mailbox, sessions, func() string { return "env-unused" }) + payload := []byte(`{"type":"submit","sessionId":"sess-attach","documentId":"doc-dispatch-b","request":{"documentId":"doc-dispatch-b","baseVersion":0,"op":[{"p":["counter"],"na":2}],"source":"client-a","seq":2}}`) + if err := handler(ctx, conn, payload); !errors.Is(err, ErrInvalidClientMessage) { + t.Fatalf("expected unattached-document submit to be rejected, got %v", err) + } +} + +func TestHandleSubmitSkipsSyntheticEnvelopeForDuplicateRetry(t *testing.T) { + ctx := context.Background() + store := NewServer(NewMemoryBackend(), NewMemoryLocker()) + _, err := store.CreateDocument(ctx, "doc-submit-dup-ack", json.RawMessage(`{"counter":0}`)) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + first := SubmitRequest{ + DocumentID: "doc-submit-dup-ack", + BaseVersion: 0, + Operation: json.RawMessage(`[{"p":["counter"],"na":1}]`), + Source: "sess-submit", + Sequence: 3, + } + if _, err := store.SubmitWithRequest(ctx, first); err != nil { + t.Fatalf("seed submit failed: %v", err) + } + mailbox := NewMemoryMailboxStore() + serverWS, clientWS := newWebSocketPair(t) + defer clientWS.Close() + conn := NewClientConn(serverWS, 8) + conn.Start(nil) + defer conn.Wait() + defer conn.Close() + + err = HandleSubmit(ctx, conn, store, mailbox, SubmitMessage{ + Type: MessageTypeSubmit, + SessionID: "sess-submit", + DocumentID: "doc-submit-dup-ack", + Request: first, + }, func() string { return "env-unused" }) + if err != nil { + t.Fatalf("handle duplicate submit failed: %v", err) + } + var ack SubmitAckMessage + readClientJSON(t, clientWS, &ack) + if !ack.Result.Duplicate { + t.Fatalf("expected duplicate submit ack, got %+v", ack) + } + if ack.Envelope != nil { + t.Fatalf("duplicate retry should not emit synthetic or durable envelope: %+v", ack) + } + if replayed, err := mailbox.ReplayAfter(ctx, "sess-submit", "", 0); err != nil { + t.Fatalf("replay mailbox failed: %v", err) + } else if len(replayed) != 0 { + t.Fatalf("duplicate retry should not append mailbox envelope: %+v", replayed) + } +} + +func TestHandleSubmitCanonicalizesSourceToSessionID(t *testing.T) { + ctx := context.Background() + store := NewServer(NewMemoryBackend(), NewMemoryLocker()) + _, err := store.CreateDocument(ctx, "doc-submit-source", json.RawMessage(`{"counter":0}`)) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + mailbox := NewMemoryMailboxStore() + serverWS, clientWS := newWebSocketPair(t) + defer clientWS.Close() + conn := NewClientConn(serverWS, 8) + conn.Start(nil) + defer conn.Wait() + defer conn.Close() + + err = HandleSubmit(ctx, conn, store, mailbox, SubmitMessage{ + Type: MessageTypeSubmit, + SessionID: "sess-canonical", + DocumentID: "doc-submit-source", + Request: SubmitRequest{ + DocumentID: "doc-submit-source", + BaseVersion: 0, + Operation: json.RawMessage(`[{"p":["counter"],"na":1}]`), + Source: "spoofed-source", + Sequence: 11, + }, + }, func() string { return "env-canonical-1" }) + if err != nil { + t.Fatalf("handle submit failed: %v", err) + } + var ack SubmitAckMessage + readClientJSON(t, clientWS, &ack) + ops, err := store.GetOperations(ctx, "doc-submit-source", 0, 1) + if err != nil { + t.Fatalf("get operations failed: %v", err) + } + if len(ops) != 1 || ops[0].Source != "sess-canonical" { + t.Fatalf("expected committed source to be canonicalized to session ID, got %+v", ops) + } + if ack.Envelope == nil || ack.Envelope.Kind != EnvelopeKindAck || ack.Envelope.SessionID != "sess-canonical" { + t.Fatalf("expected ack envelope to be session-bound ack envelope, got %+v", ack) + } +} + +func TestHandleSubmitOnLiveAttachmentSendsAckWithoutSelfEvent(t *testing.T) { + ctx := context.Background() + store := NewServer(NewMemoryBackend(), NewMemoryLocker()) + _, err := store.CreateDocument(ctx, "doc-submit-live", json.RawMessage(`{"counter":0}`)) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + mailbox := NewMemoryMailboxStore() + serverWS, clientWS := newWebSocketPair(t) + defer clientWS.Close() + conn := NewClientConn(serverWS, 8) + conn.Start(nil) + defer conn.Wait() + defer conn.Close() + + if err := AttachLiveSession(ctx, conn, store, mailbox, "sess-live-submit", "doc-submit-live", func() string { return "env-live-event" }); err != nil { + t.Fatalf("attach live session failed: %v", err) + } + if err := HandleSubmit(ctx, conn, store, mailbox, SubmitMessage{ + Type: MessageTypeSubmit, + SessionID: "sess-live-submit", + DocumentID: "doc-submit-live", + Request: SubmitRequest{ + DocumentID: "doc-submit-live", + BaseVersion: 0, + Operation: json.RawMessage(`[{"p":["counter"],"na":1}]`), + Source: "spoofed-source", + Sequence: 1, + }, + }, func() string { return "env-submit-ack" }); err != nil { + t.Fatalf("handle submit failed: %v", err) + } + var ack SubmitAckMessage + readClientJSON(t, clientWS, &ack) + if ack.Type != MessageTypeSubmitAck || ack.Envelope == nil || ack.Envelope.Kind != EnvelopeKindAck { + t.Fatalf("expected ack with durable ack envelope for live submit, got %+v", ack) + } + _ = clientWS.SetReadDeadline(time.Now().Add(200 * time.Millisecond)) + _, _, err = clientWS.ReadMessage() + if !errors.Is(err, os.ErrDeadlineExceeded) && !websocket.IsUnexpectedCloseError(err) { + if nerr, ok := err.(interface{ Timeout() bool }); !ok || !nerr.Timeout() { + t.Fatalf("expected no self event after ack, got err=%v", err) + } + } +} + +func TestHandleSubmitSendsSubmitErrorForInvalidVersion(t *testing.T) { + ctx := context.Background() + store := NewServer(NewMemoryBackend(), NewMemoryLocker()) + _, err := store.CreateDocument(ctx, "doc-submit-invalid-version", json.RawMessage(`{"counter":0}`)) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + mailbox := NewMemoryMailboxStore() + serverWS, clientWS := newWebSocketPair(t) + defer clientWS.Close() + conn := NewClientConn(serverWS, 8) + conn.Start(nil) + defer conn.Wait() + defer conn.Close() + + err = HandleSubmit(ctx, conn, store, mailbox, SubmitMessage{ + Type: MessageTypeSubmit, + SessionID: "sess-submit", + DocumentID: "doc-submit-invalid-version", + Request: SubmitRequest{ + DocumentID: "doc-submit-invalid-version", + BaseVersion: 9, + Operation: json.RawMessage(`[{"p":["counter"],"na":1}]`), + Source: "client-a", + Sequence: 1, + }, + }, func() string { return "env-unused" }) + if err != nil { + t.Fatalf("handle submit should return nil after sending protocol error, got %v", err) + } + + var msg SubmitErrorMessage + readClientJSON(t, clientWS, &msg) + if msg.Type != MessageTypeSubmitError || msg.Code != SubmitErrorCodeInvalidVersion { + t.Fatalf("unexpected invalid-version submit error payload: %+v", msg) + } +} + +func TestHandleSubmitSendsSubmitErrorForDuplicateSequenceConflict(t *testing.T) { + ctx := context.Background() + store := NewServer(NewMemoryBackend(), NewMemoryLocker()) + _, err := store.CreateDocument(ctx, "doc-submit-dup", json.RawMessage(`{"counter":0}`)) + if err != nil { + t.Fatalf("create document failed: %v", err) + } + first := SubmitRequest{ + DocumentID: "doc-submit-dup", + BaseVersion: 0, + Operation: json.RawMessage(`[{"p":["counter"],"na":1}]`), + Source: "sess-submit", + Sequence: 7, + } + if _, err := store.SubmitWithRequest(ctx, first); err != nil { + t.Fatalf("seed submit failed: %v", err) + } + mailbox := NewMemoryMailboxStore() + serverWS, clientWS := newWebSocketPair(t) + defer clientWS.Close() + conn := NewClientConn(serverWS, 8) + conn.Start(nil) + defer conn.Wait() + defer conn.Close() + + err = HandleSubmit(ctx, conn, store, mailbox, SubmitMessage{ + Type: MessageTypeSubmit, + SessionID: "sess-submit", + DocumentID: "doc-submit-dup", + Request: SubmitRequest{ + DocumentID: "doc-submit-dup", + BaseVersion: 0, + Operation: json.RawMessage(`[{"p":["counter"],"na":2}]`), + Source: "client-a", + Sequence: 7, + }, + }, func() string { return "env-unused" }) + if err != nil { + t.Fatalf("handle submit should return nil after sending protocol error, got %v", err) + } + + var msg SubmitErrorMessage + readClientJSON(t, clientWS, &msg) + if msg.Type != MessageTypeSubmitError || msg.Code != SubmitErrorCodeDuplicateSequenceConflict { + t.Fatalf("unexpected duplicate-sequence submit error payload: %+v", msg) + } +} + func readClientJSON(t *testing.T, conn *websocket.Conn, target any) { t.Helper() _, payload, err := conn.ReadMessage() diff --git a/sharedb/ws_transport.go b/sharedb/ws_transport.go index 9988b4f..2217c4c 100644 --- a/sharedb/ws_transport.go +++ b/sharedb/ws_transport.go @@ -147,6 +147,14 @@ func (c *ClientConn) claimLiveAttachment(sessionID, documentID string) bool { return true } +func (c *ClientConn) hasLiveAttachment(sessionID, documentID string) bool { + key := sessionID + "\x00" + documentID + c.liveMu.Lock() + defer c.liveMu.Unlock() + _, exists := c.liveKeys[key] + return exists +} + func (c *ClientConn) readLoop(handler InboundHandler) { for { _, payload, err := c.conn.ReadMessage() From 2d211f84f90d7d9b9d31aac97c95cd99a853f861 Mon Sep 17 00:00:00 2001 From: edocevol Date: Thu, 30 Apr 2026 15:14:50 +0800 Subject: [PATCH 2/2] refactor(sharedb): rename historical snapshot and rollback helpers --- sharedb/server.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sharedb/server.go b/sharedb/server.go index e4f1dc0..a1cc9da 100644 --- a/sharedb/server.go +++ b/sharedb/server.go @@ -246,7 +246,7 @@ func (s *Server) GetSnapshotAt(ctx context.Context, documentID string, version i return Snapshot{DocumentID: documentID, Version: rec.Version, Document: append(json.RawMessage(nil), rec.Doc...)}, nil } - document, err := s.reconstructSnapshotAt(ctx, documentID, rec.Doc, version, rec.Version) + document, err := s.documentAtVersion(ctx, rec, version) if err != nil { return Snapshot{}, err } @@ -501,7 +501,7 @@ func (s *Server) RollbackToVersion(ctx context.Context, documentID string, targe return SubmitResult{Version: rec.Version, Operation: json.RawMessage("[]"), Document: append(json.RawMessage(nil), rec.Doc...)}, nil } - rollbackOp, rollbackDoc, err := s.buildRollback(ctx, documentID, rec.Doc, targetVersion, rec.Version) + rollbackOp, rollbackDoc, err := s.rollbackToVersionOperation(ctx, rec, targetVersion) if err != nil { return SubmitResult{}, err } @@ -638,17 +638,17 @@ func (r OpRecord) MarshalJSON() ([]byte, error) { }) } -func (s *Server) reconstructSnapshotAt(ctx context.Context, documentID string, currentDocument json.RawMessage, targetVersion, currentVersion int) (json.RawMessage, error) { - ops, err := s.GetOperations(ctx, documentID, targetVersion, currentVersion) +func (s *Server) documentAtVersion(ctx context.Context, rec DocRecord, targetVersion int) (json.RawMessage, error) { + ops, err := s.GetOperations(ctx, rec.DocumentID, targetVersion, rec.Version) if err != nil { return nil, err } - docValue, err := jsonot.UnmarshalValue(currentDocument) + docValue, err := jsonot.UnmarshalValue(rec.Doc) if err != nil { return nil, err } for i := len(ops) - 1; i >= 0; i-- { - inverse, err := s.invertCommittedOperation(ops[i]) + inverse, err := s.inverseOperation(ops[i]) if err != nil { return nil, err } @@ -661,18 +661,18 @@ func (s *Server) reconstructSnapshotAt(ctx context.Context, documentID string, c return append(json.RawMessage(nil), docValue.RawMessage()...), nil } -func (s *Server) buildRollback(ctx context.Context, documentID string, currentDocument json.RawMessage, targetVersion, currentVersion int) (*jsonot.Operation, json.RawMessage, error) { - ops, err := s.GetOperations(ctx, documentID, targetVersion, currentVersion) +func (s *Server) rollbackToVersionOperation(ctx context.Context, rec DocRecord, targetVersion int) (*jsonot.Operation, json.RawMessage, error) { + ops, err := s.GetOperations(ctx, rec.DocumentID, targetVersion, rec.Version) if err != nil { return nil, nil, err } - docValue, err := jsonot.UnmarshalValue(currentDocument) + docValue, err := jsonot.UnmarshalValue(rec.Doc) if err != nil { return nil, nil, err } rollback := jsonot.NewOperation([]*jsonot.OperationComponent{}) for i := len(ops) - 1; i >= 0; i-- { - inverse, err := s.invertCommittedOperation(ops[i]) + inverse, err := s.inverseOperation(ops[i]) if err != nil { return nil, nil, err } @@ -689,7 +689,7 @@ func (s *Server) buildRollback(ctx context.Context, documentID string, currentDo return rollback, append(json.RawMessage(nil), docValue.RawMessage()...), nil } -func (s *Server) invertCommittedOperation(opRec OpRecord) (*jsonot.Operation, error) { +func (s *Server) inverseOperation(opRec OpRecord) (*jsonot.Operation, error) { op, err := s.getOrParseCommittedOperation(opRec) if err != nil { return nil, err