Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions sharedb/reconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ import (

// ReplayResult is a transport-friendly mailbox replay decision.
type ReplayResult struct {
SessionID string `json:"sessionId"`
Envelopes []Envelope `json:"envelopes"`
LastAcked string `json:"lastAcked,omitempty"`
RequiresResync bool `json:"requiresResync,omitempty"`
SessionID string `json:"sessionId"`
Envelopes []Envelope `json:"envelopes"`
LastAcked string `json:"lastAcked,omitempty"`
RequiresResync bool `json:"requiresResync,omitempty"`
Reason StaleReason `json:"reason,omitempty"`
CurrentVersion int `json:"currentVersion,omitempty"`
MinSupportedVersion int `json:"minSupportedVersion,omitempty"`
MaxRebaseGap int `json:"maxRebaseGap,omitempty"`
}

// ReplaySessionMailbox replays envelopes for a session after the provided cursor.
Expand All @@ -27,6 +31,7 @@ func ReplaySessionMailbox(ctx context.Context, store MailboxStore, sessionID, af
SessionID: sessionID,
LastAcked: lastAcked,
RequiresResync: true,
Reason: StaleReasonReplayCursorNotFound,
}, nil
}
return ReplayResult{}, err
Expand Down
258 changes: 257 additions & 1 deletion sharedb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"reflect"
"sync"

"github.com/edocevol/jsonot"
)
Expand All @@ -19,6 +20,8 @@ var (
ErrInvalidVersion = errors.New("sharedb: invalid version")
// ErrDuplicateSequenceConflict means a client reused Source+Sequence for a different operation.
ErrDuplicateSequenceConflict = errors.New("sharedb: duplicate source sequence conflicts with original operation")
// ErrStaleResyncRequired means the client is too far behind for bounded server-side rebase.
ErrStaleResyncRequired = errors.New("sharedb: stale submit requires resync")
)

// Snapshot is the latest immutable view of a document.
Expand Down Expand Up @@ -81,6 +84,37 @@ type SubmitResult struct {
// SubmitHandler handles a structured submit request.
type SubmitHandler func(context.Context, SubmitRequest) (SubmitResult, error)

// StaleReason classifies why the server requires a client resync instead of
// continuing with replay or server-side OT rebase.
type StaleReason string

const (
StaleReasonVersionBehindWindow StaleReason = "version_behind_window"
StaleReasonReplayCursorNotFound StaleReason = "replay_cursor_not_found"
)

// StaleSubmitError describes a submit that the server rejected because the
// client's base version fell outside the configured bounded rebase window.
type StaleSubmitError struct {
Reason StaleReason `json:"reason,omitempty"`
DocumentID string `json:"documentId,omitempty"`
BaseVersion int `json:"baseVersion,omitempty"`
CurrentVersion int `json:"currentVersion,omitempty"`
MinSupportedVersion int `json:"minSupportedVersion,omitempty"`
MaxRebaseGap int `json:"maxRebaseGap,omitempty"`
}

func (e *StaleSubmitError) Error() string {
if e == nil {
return ErrStaleResyncRequired.Error()
}
return fmt.Sprintf("%s: reason=%s document=%s base=%d current=%d minSupported=%d maxRebaseGap=%d", ErrStaleResyncRequired.Error(), e.Reason, e.DocumentID, e.BaseVersion, e.CurrentVersion, e.MinSupportedVersion, e.MaxRebaseGap)
}

func (e *StaleSubmitError) Unwrap() error {
return ErrStaleResyncRequired
}

// SubmitMiddleware wraps submit handling so callers can validate, reject,
// enrich, or observe submit requests/results.
type SubmitMiddleware func(SubmitHandler) SubmitHandler
Expand All @@ -106,6 +140,14 @@ func WithSubmitMiddleware(middleware ...SubmitMiddleware) ServerOption {
}
}

// WithMaxRebaseGap bounds how many committed versions a stale submit may be
// rebased across on the server. A value <= 0 disables the limit.
func WithMaxRebaseGap(maxGap int) ServerOption {
return func(s *Server) {
s.maxRebaseGap = maxGap
}
}

// Server is the central coordinator for collaborative editing.
//
// Version number flow
Expand All @@ -124,6 +166,9 @@ type Server struct {
pub Publisher
ot *jsonot.JSONOperationTransformer

parsedOps parsedOpCache
maxRebaseGap int

submitMiddleware []SubmitMiddleware
submitHandler SubmitHandler
}
Expand All @@ -138,6 +183,9 @@ func NewServer(backend Backend, locker Locker, opts ...ServerOption) *Server {
backend: backend,
locker: locker,
ot: jsonot.NewJSONOperationTransformer(),
parsedOps: parsedOpCache{
docs: make(map[string]map[int]*jsonot.Operation),
},
}
for _, opt := range opts {
opt(s)
Expand Down Expand Up @@ -184,6 +232,27 @@ func (s *Server) GetSnapshot(ctx context.Context, documentID string) (Snapshot,
}, nil
}

// GetSnapshotAt reconstructs the document snapshot at a historical version by
// applying inverse OT operations from the current snapshot backwards.
func (s *Server) GetSnapshotAt(ctx context.Context, documentID string, version int) (Snapshot, error) {
rec, err := s.backend.GetDoc(ctx, documentID)
if err != nil {
return Snapshot{}, err
}
if version < 0 || version > rec.Version {
return Snapshot{}, fmt.Errorf("%w: expected 0-%d, got %d", ErrInvalidVersion, rec.Version, version)
}
if version == rec.Version {
return Snapshot{DocumentID: documentID, Version: rec.Version, Document: append(json.RawMessage(nil), rec.Doc...)}, nil
}

document, err := s.documentAtVersion(ctx, rec, version)
if err != nil {
return Snapshot{}, err
}
return Snapshot{DocumentID: documentID, Version: version, Document: document}, nil
}

// Submit accepts a client operation based on baseVersion.
//
// Centralized version protocol:
Expand Down Expand Up @@ -265,13 +334,24 @@ func (s *Server) submitCore(ctx context.Context, req SubmitRequest) (SubmitResul
transformed := op
rebased := false
if req.BaseVersion < rec.Version {
gap := rec.Version - req.BaseVersion
if s.maxRebaseGap > 0 && gap > s.maxRebaseGap {
return SubmitResult{}, &StaleSubmitError{
Reason: StaleReasonVersionBehindWindow,
DocumentID: req.DocumentID,
BaseVersion: req.BaseVersion,
CurrentVersion: rec.Version,
MinSupportedVersion: rec.Version - s.maxRebaseGap,
MaxRebaseGap: s.maxRebaseGap,
}
}
concurrentOps, err := s.backend.GetOps(ctx, req.DocumentID, req.BaseVersion, rec.Version)
if err != nil {
return SubmitResult{}, err
}

for _, opRec := range concurrentOps {
concurrent, err := s.parseOperation(opRec.Op)
concurrent, err := s.getOrParseCommittedOperation(opRec)
if err != nil {
return SubmitResult{}, fmt.Errorf("failed to parse concurrent op at version %d: %w", opRec.Version, err)
}
Expand Down Expand Up @@ -331,6 +411,7 @@ func (s *Server) submitCore(ctx context.Context, req SubmitRequest) (SubmitResul
Operation: serializedOp,
Document: newDoc,
}
s.cacheCommittedOperation(OpRecord{DocumentID: req.DocumentID, Version: newVersion, Op: serializedOp}, transformed)

// Step 6: publish event (lock already released via defer, but publish while we have data)
s.pub.Publish(ctx, Event{
Expand Down Expand Up @@ -388,6 +469,7 @@ func (s *Server) DeleteDocument(ctx context.Context, documentID string, baseVers
if err := s.backend.DeleteDoc(ctx, documentID); err != nil {
return err
}
s.parsedOps.DeleteDocument(documentID)

s.pub.Publish(ctx, Event{
Type: EventTypeDelete,
Expand All @@ -399,6 +481,42 @@ func (s *Server) DeleteDocument(ctx context.Context, documentID string, baseVers
return nil
}

// RollbackToVersion computes an inverse OT operation from the current version
// back to targetVersion and commits it as a new versioned operation.
func (s *Server) RollbackToVersion(ctx context.Context, documentID string, targetVersion int, source string) (SubmitResult, error) {
unlock, err := s.locker.Lock(ctx, documentID)
if err != nil {
return SubmitResult{}, err
}
defer unlock()

rec, err := s.backend.GetDoc(ctx, documentID)
if err != nil {
return SubmitResult{}, err
}
if targetVersion < 0 || targetVersion > rec.Version {
return SubmitResult{}, fmt.Errorf("%w: expected 0-%d, got %d", ErrInvalidVersion, rec.Version, targetVersion)
}
if targetVersion == rec.Version {
return SubmitResult{Version: rec.Version, Operation: json.RawMessage("[]"), Document: append(json.RawMessage(nil), rec.Doc...)}, nil
}

rollbackOp, rollbackDoc, err := s.rollbackToVersionOperation(ctx, rec, targetVersion)
if err != nil {
return SubmitResult{}, err
}
serializedOp := append(json.RawMessage(nil), rollbackOp.ToValue().RawMessage()...)
newVersion := rec.Version + 1
if err := s.backend.CommitOp(ctx, DocRecord{DocumentID: documentID, Version: newVersion, Doc: rollbackDoc}, OpRecord{DocumentID: documentID, Version: newVersion, BaseVersion: rec.Version, Source: source, SubmittedOp: serializedOp, Op: serializedOp}); err != nil {
return SubmitResult{}, err
}
s.cacheCommittedOperation(OpRecord{DocumentID: documentID, Version: newVersion, Op: serializedOp}, rollbackOp)

result := SubmitResult{Version: newVersion, Operation: serializedOp, Document: append(json.RawMessage(nil), rollbackDoc...)}
s.pub.Publish(ctx, Event{Type: EventTypeOp, DocumentID: documentID, Version: newVersion, Source: source, Operation: append(json.RawMessage(nil), serializedOp...), Document: append(json.RawMessage(nil), rollbackDoc...)})
return result, nil
}

// GetOperations returns committed operation records that produced versions in
// [fromVersion+1, toVersion]. It is useful for client catch-up, audit logs, and
// reconnect flows that need ShareDB-style op history.
Expand Down Expand Up @@ -520,6 +638,144 @@ func (r OpRecord) MarshalJSON() ([]byte, error) {
})
}

func (s *Server) documentAtVersion(ctx context.Context, rec DocRecord, targetVersion int) (json.RawMessage, error) {
ops, err := s.GetOperations(ctx, rec.DocumentID, targetVersion, rec.Version)
if err != nil {
return nil, err
}
docValue, err := jsonot.UnmarshalValue(rec.Doc)
if err != nil {
return nil, err
}
for i := len(ops) - 1; i >= 0; i-- {
inverse, err := s.inverseOperation(ops[i])
if err != nil {
return nil, err
}
applied := s.ot.Apply(ctx, docValue, inverse)
if applied.IsError() {
return nil, applied.Error()
}
docValue = applied.MustGet()
}
return append(json.RawMessage(nil), docValue.RawMessage()...), nil
}

func (s *Server) rollbackToVersionOperation(ctx context.Context, rec DocRecord, targetVersion int) (*jsonot.Operation, json.RawMessage, error) {
ops, err := s.GetOperations(ctx, rec.DocumentID, targetVersion, rec.Version)
if err != nil {
return nil, nil, err
}
docValue, err := jsonot.UnmarshalValue(rec.Doc)
if err != nil {
return nil, nil, err
}
rollback := jsonot.NewOperation([]*jsonot.OperationComponent{})
for i := len(ops) - 1; i >= 0; i-- {
inverse, err := s.inverseOperation(ops[i])
if err != nil {
return nil, nil, err
}
rollback.Compose(inverse)
applied := s.ot.Apply(ctx, docValue, inverse)
if applied.IsError() {
return nil, nil, applied.Error()
}
docValue = applied.MustGet()
}
if err := rollback.Validation(); err != nil {
return nil, nil, err
}
return rollback, append(json.RawMessage(nil), docValue.RawMessage()...), nil
}

func (s *Server) inverseOperation(opRec OpRecord) (*jsonot.Operation, error) {
op, err := s.getOrParseCommittedOperation(opRec)
if err != nil {
return nil, err
}
components := op.Array()
inverted := jsonot.NewOperation([]*jsonot.OperationComponent{})
for i := len(components) - 1; i >= 0; i-- {
inverse := components[i].Invert()
if inverse.IsError() {
return nil, inverse.Error()
}
inverted.Append(inverse.MustGet())
}
return s.parseOperation(append(json.RawMessage(nil), inverted.ToValue().RawMessage()...))
}

func (s *Server) getOrParseCommittedOperation(opRec OpRecord) (*jsonot.Operation, error) {
if op, ok := s.parsedOps.Get(opRec.DocumentID, opRec.Version); ok {
return op, nil
}
op, err := s.parseOperation(opRec.Op)
if err != nil {
return nil, err
}
s.cacheCommittedOperation(opRec, op)
return cloneOperation(op), nil
}

func (s *Server) cacheCommittedOperation(opRec OpRecord, op *jsonot.Operation) {
if opRec.DocumentID == "" || opRec.Version <= 0 || op == nil {
return
}
s.parsedOps.Store(opRec.DocumentID, opRec.Version, op)
}

func cloneOperation(op *jsonot.Operation) *jsonot.Operation {
if op == nil {
return nil
}
components := op.Array()
clones := make([]*jsonot.OperationComponent, 0, len(components))
for _, component := range components {
clones = append(clones, component.Clone())
}
return jsonot.NewOperation(clones)
}

type parsedOpCache struct {
mu sync.RWMutex
docs map[string]map[int]*jsonot.Operation
}

func (c *parsedOpCache) Get(documentID string, version int) (*jsonot.Operation, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
versions, ok := c.docs[documentID]
if !ok {
return nil, false
}
op, ok := versions[version]
if !ok {
return nil, false
}
return cloneOperation(op), true
}

func (c *parsedOpCache) Store(documentID string, version int, op *jsonot.Operation) {
c.mu.Lock()
defer c.mu.Unlock()
if c.docs == nil {
c.docs = make(map[string]map[int]*jsonot.Operation)
}
versions, ok := c.docs[documentID]
if !ok {
versions = make(map[int]*jsonot.Operation)
c.docs[documentID] = versions
}
versions[version] = cloneOperation(op)
}

func (c *parsedOpCache) DeleteDocument(documentID string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.docs, documentID)
}

func (s *Server) parseOperation(raw json.RawMessage) (*jsonot.Operation, error) {
payload := raw
if len(payload) == 0 {
Expand Down
Loading
Loading