From df3bb7313059e6ce103c3cdecf46972841c2adba Mon Sep 17 00:00:00 2001 From: pnwmatt <180812017+pnwmatt@users.noreply.github.com> Date: Fri, 3 Oct 2025 13:07:16 -0700 Subject: [PATCH 01/13] feat: send wsID with client requests --- client/auth/config.go | 1 + client/config.go | 46 +++++++++++++++++++++++++++++++++++++++++++ client/go.mod | 1 + client/go.sum | 2 ++ 4 files changed, 50 insertions(+) diff --git a/client/auth/config.go b/client/auth/config.go index 76d241c..bea5925 100644 --- a/client/auth/config.go +++ b/client/auth/config.go @@ -15,6 +15,7 @@ import ( type DefaultsConfig struct { Defaults struct { Server string `toml:"server"` + WsID string `toml:"wsID"` } `toml:"defaults"` } diff --git a/client/config.go b/client/config.go index 77724b3..0920ca7 100644 --- a/client/config.go +++ b/client/config.go @@ -9,12 +9,14 @@ import ( "time" "github.com/BurntSushi/toml" + "github.com/google/uuid" ) // .config/sqlrsync/defaults.toml type DefaultsConfig struct { Defaults struct { Server string `toml:"server"` + WsID string `toml:"wsID"` } `toml:"defaults"` } @@ -78,6 +80,14 @@ func LoadDefaultsConfig() (*DefaultsConfig, error) { // Return default config if file doesn't exist config := &DefaultsConfig{} config.Defaults.Server = "wss://sqlrsync.com" + // Generate wsID if it doesn't exist + if err := generateAndSetWsID(config); err != nil { + return nil, fmt.Errorf("failed to generate wsID: %w", err) + } + // Save the new config with wsID + if err := SaveDefaultsConfig(config); err != nil { + return nil, fmt.Errorf("failed to save defaults config with wsID: %w", err) + } return config, nil } return nil, fmt.Errorf("failed to read defaults config file %s: %w", path, err) @@ -93,9 +103,45 @@ func LoadDefaultsConfig() (*DefaultsConfig, error) { config.Defaults.Server = "wss://sqlrsync.com" } + // Generate wsID if it doesn't exist + needsSave := false + if config.Defaults.WsID == "" { + if err := generateAndSetWsID(&config); err != nil { + return nil, fmt.Errorf("failed to generate wsID: %w", err) + } + needsSave = true + } + + // Save config if we made changes + if needsSave { + if err := SaveDefaultsConfig(&config); err != nil { + return nil, fmt.Errorf("failed to save defaults config with wsID: %w", err) + } + } + return &config, nil } +// generateAndSetWsID generates a new wsID (UUID + hostname) and sets it in the config +func generateAndSetWsID(config *DefaultsConfig) error { + hostname, err := os.Hostname() + if err != nil { + return fmt.Errorf("failed to get hostname: %w", err) + } + + config.Defaults.WsID = hostname + ":" + uuid.New().String() + return nil +} + +// GetWsID loads the defaults config and returns the wsID +func GetWsID() (string, error) { + config, err := LoadDefaultsConfig() + if err != nil { + return "", fmt.Errorf("failed to load defaults config: %w", err) + } + return config.Defaults.WsID, nil +} + func SaveDefaultsConfig(config *DefaultsConfig) error { path, err := GetDefaultsPath() if err != nil { diff --git a/client/go.mod b/client/go.mod index 300e31e..5416e7f 100644 --- a/client/go.mod +++ b/client/go.mod @@ -12,6 +12,7 @@ require ( ) require ( + github.com/google/uuid v1.6.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect diff --git a/client/go.sum b/client/go.sum index 51cb4ae..5a5051c 100644 --- a/client/go.sum +++ b/client/go.sum @@ -5,6 +5,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= From 1715b65009574d2d7d39d067ad004678741c06c3 Mon Sep 17 00:00:00 2001 From: pnwmatt <180812017+pnwmatt@users.noreply.github.com> Date: Fri, 3 Oct 2025 13:08:27 -0700 Subject: [PATCH 02/13] Bump version from 0.0.4 to 0.0.5 --- client/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/main.go b/client/main.go index 2183898..c810e7c 100644 --- a/client/main.go +++ b/client/main.go @@ -15,7 +15,7 @@ import ( "github.com/sqlrsync/sqlrsync.com/sync" ) -var VERSION = "0.0.4" +var VERSION = "0.0.5" var ( serverURL string verbose bool From 679cc0570e2c477223842ba23a0279d048a33d84 Mon Sep 17 00:00:00 2001 From: pnwmatt <180812017+pnwmatt@users.noreply.github.com> Date: Fri, 3 Oct 2025 14:25:53 -0700 Subject: [PATCH 03/13] feat: improve client error reporting from server --- client/remote/client.go | 56 +++++++++++++++++++++++++++++++--- client/subscription/manager.go | 56 ++++++++++++++++++++++++++++++++-- 2 files changed, 105 insertions(+), 7 deletions(-) diff --git a/client/remote/client.go b/client/remote/client.go index 8a5f86b..f11094b 100644 --- a/client/remote/client.go +++ b/client/remote/client.go @@ -703,12 +703,58 @@ func (c *Client) Connect() error { conn, response, err := dialer.DialContext(connectCtx, u.String(), headers) if err != nil { - if response != nil && response.Body != nil { - respStr, _ := io.ReadAll(response.Body) - response.Body.Close() - return fmt.Errorf("%s", respStr) + if response != nil { + // Extract detailed error information from the response + statusCode := response.StatusCode + statusText := response.Status + + var respBodyStr string + if response.Body != nil { + respBytes, readErr := io.ReadAll(response.Body) + response.Body.Close() + if readErr == nil { + respBodyStr = strings.TrimSpace(string(respBytes)) + } + } + + // Create a clean error message + var errorMsg strings.Builder + errorMsg.WriteString(fmt.Sprintf("HTTP %d (%s)", statusCode, statusText)) + + if respBodyStr != "" { + errorMsg.WriteString(fmt.Sprintf(": %s", respBodyStr)) + } + + return fmt.Errorf("%s", errorMsg.String()) + } + + // Handle cases where response is nil (e.g., network errors, bad handshake) + var errorMsg strings.Builder + errorMsg.WriteString("Failed to connect to WebSocket") + + // Analyze the error type and provide helpful context + errorStr := err.Error() + if strings.Contains(errorStr, "bad handshake") { + errorMsg.WriteString(" - WebSocket handshake failed") + errorMsg.WriteString("\nThis could be due to:") + errorMsg.WriteString("\n• Invalid server URL or endpoint") + errorMsg.WriteString("\n• Server not supporting WebSocket connections") + errorMsg.WriteString("\n• Network connectivity issues") + errorMsg.WriteString("\n• Authentication problems") + } else if strings.Contains(errorStr, "timeout") { + errorMsg.WriteString(" - Connection timeout") + errorMsg.WriteString("\nThe server may be overloaded or unreachable") + } else if strings.Contains(errorStr, "refused") { + errorMsg.WriteString(" - Connection refused") + errorMsg.WriteString("\nThe server may be down or the port may be blocked") + } else if strings.Contains(errorStr, "no such host") { + errorMsg.WriteString(" - DNS resolution failed") + errorMsg.WriteString("\nCheck the server hostname in your configuration") } - return fmt.Errorf("failed to connect to WebSocket: %w", err) + + errorMsg.WriteString(fmt.Sprintf("\nOriginal error: %v", err)) + + return fmt.Errorf("%s", errorMsg.String()) } defer response.Body.Close() diff --git a/client/subscription/manager.go b/client/subscription/manager.go index 304af46..8c4a4df 100644 --- a/client/subscription/manager.go +++ b/client/subscription/manager.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io" "net/http" "net/url" "strings" @@ -207,9 +208,60 @@ func (m *Manager) doConnect() error { m.logger.Debug("Dialing WebSocket", zap.String("url", u.String())) - conn, _, err := dialer.DialContext(m.ctx, u.String(), headers) + conn, response, err := dialer.DialContext(m.ctx, u.String(), headers) if err != nil { - return fmt.Errorf("failed to connect to subscription service: %w", err) + if response != nil { + // Extract detailed error information from the response + statusCode := response.StatusCode + statusText := response.Status + + var respBodyStr string + if response.Body != nil { + respBytes, readErr := io.ReadAll(response.Body) + response.Body.Close() + if readErr == nil { + respBodyStr = strings.TrimSpace(string(respBytes)) + } + } + + // Create a clean error message + var errorMsg strings.Builder + errorMsg.WriteString(fmt.Sprintf("HTTP %d (%s)", statusCode, statusText)) + + if respBodyStr != "" { + errorMsg.WriteString(fmt.Sprintf(": %s", respBodyStr)) + } + + return fmt.Errorf("%s", errorMsg.String()) + } + + // Handle cases where response is nil (e.g., network errors, bad handshake) + var errorMsg strings.Builder + errorMsg.WriteString("Failed to connect to subscription service") + + // Analyze the error type and provide helpful context + errorStr := err.Error() + if strings.Contains(errorStr, "bad handshake") { + errorMsg.WriteString(" - WebSocket handshake failed") + errorMsg.WriteString("\nThis could be due to:") + errorMsg.WriteString("\n• Invalid server URL or endpoint") + errorMsg.WriteString("\n• Server not supporting WebSocket connections") + errorMsg.WriteString("\n• Network connectivity issues") + errorMsg.WriteString("\n• Authentication problems") + } else if strings.Contains(errorStr, "timeout") { + errorMsg.WriteString(" - Connection timeout") + errorMsg.WriteString("\nThe server may be overloaded or unreachable") + } else if strings.Contains(errorStr, "refused") { + errorMsg.WriteString(" - Connection refused") + errorMsg.WriteString("\nThe server may be down or the port may be blocked") + } else if strings.Contains(errorStr, "no such host") { + errorMsg.WriteString(" - DNS resolution failed") + errorMsg.WriteString("\nCheck the server hostname in your configuration") + } + + errorMsg.WriteString(fmt.Sprintf("\nOriginal error: %v", err)) + + return fmt.Errorf("%s", errorMsg.String()) } m.mu.Lock() From 440064fcbdc397630c4e1efa56f04b0e32b122c3 Mon Sep 17 00:00:00 2001 From: pnwmatt <180812017+pnwmatt@users.noreply.github.com> Date: Fri, 3 Oct 2025 14:25:53 -0700 Subject: [PATCH 04/13] feat: improve client error reporting from server --- client/remote/client.go | 56 +++++++++++++++++++++++++++++++--- client/subscription/manager.go | 56 ++++++++++++++++++++++++++++++++-- 2 files changed, 105 insertions(+), 7 deletions(-) diff --git a/client/remote/client.go b/client/remote/client.go index 8a5f86b..f11094b 100644 --- a/client/remote/client.go +++ b/client/remote/client.go @@ -703,12 +703,58 @@ func (c *Client) Connect() error { conn, response, err := dialer.DialContext(connectCtx, u.String(), headers) if err != nil { - if response != nil && response.Body != nil { - respStr, _ := io.ReadAll(response.Body) - response.Body.Close() - return fmt.Errorf("%s", respStr) + if response != nil { + // Extract detailed error information from the response + statusCode := response.StatusCode + statusText := response.Status + + var respBodyStr string + if response.Body != nil { + respBytes, readErr := io.ReadAll(response.Body) + response.Body.Close() + if readErr == nil { + respBodyStr = strings.TrimSpace(string(respBytes)) + } + } + + // Create a clean error message + var errorMsg strings.Builder + errorMsg.WriteString(fmt.Sprintf("HTTP %d (%s)", statusCode, statusText)) + + if respBodyStr != "" { + errorMsg.WriteString(fmt.Sprintf(": %s", respBodyStr)) + } + + return fmt.Errorf("%s", errorMsg.String()) + } + + // Handle cases where response is nil (e.g., network errors, bad handshake) + var errorMsg strings.Builder + errorMsg.WriteString("Failed to connect to WebSocket") + + // Analyze the error type and provide helpful context + errorStr := err.Error() + if strings.Contains(errorStr, "bad handshake") { + errorMsg.WriteString(" - WebSocket handshake failed") + errorMsg.WriteString("\nThis could be due to:") + errorMsg.WriteString("\n• Invalid server URL or endpoint") + errorMsg.WriteString("\n• Server not supporting WebSocket connections") + errorMsg.WriteString("\n• Network connectivity issues") + errorMsg.WriteString("\n• Authentication problems") + } else if strings.Contains(errorStr, "timeout") { + errorMsg.WriteString(" - Connection timeout") + errorMsg.WriteString("\nThe server may be overloaded or unreachable") + } else if strings.Contains(errorStr, "refused") { + errorMsg.WriteString(" - Connection refused") + errorMsg.WriteString("\nThe server may be down or the port may be blocked") + } else if strings.Contains(errorStr, "no such host") { + errorMsg.WriteString(" - DNS resolution failed") + errorMsg.WriteString("\nCheck the server hostname in your configuration") } - return fmt.Errorf("failed to connect to WebSocket: %w", err) + + errorMsg.WriteString(fmt.Sprintf("\nOriginal error: %v", err)) + + return fmt.Errorf("%s", errorMsg.String()) } defer response.Body.Close() diff --git a/client/subscription/manager.go b/client/subscription/manager.go index 304af46..8c4a4df 100644 --- a/client/subscription/manager.go +++ b/client/subscription/manager.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io" "net/http" "net/url" "strings" @@ -207,9 +208,60 @@ func (m *Manager) doConnect() error { m.logger.Debug("Dialing WebSocket", zap.String("url", u.String())) - conn, _, err := dialer.DialContext(m.ctx, u.String(), headers) + conn, response, err := dialer.DialContext(m.ctx, u.String(), headers) if err != nil { - return fmt.Errorf("failed to connect to subscription service: %w", err) + if response != nil { + // Extract detailed error information from the response + statusCode := response.StatusCode + statusText := response.Status + + var respBodyStr string + if response.Body != nil { + respBytes, readErr := io.ReadAll(response.Body) + response.Body.Close() + if readErr == nil { + respBodyStr = strings.TrimSpace(string(respBytes)) + } + } + + // Create a clean error message + var errorMsg strings.Builder + errorMsg.WriteString(fmt.Sprintf("HTTP %d (%s)", statusCode, statusText)) + + if respBodyStr != "" { + errorMsg.WriteString(fmt.Sprintf(": %s", respBodyStr)) + } + + return fmt.Errorf("%s", errorMsg.String()) + } + + // Handle cases where response is nil (e.g., network errors, bad handshake) + var errorMsg strings.Builder + errorMsg.WriteString("Failed to connect to subscription service") + + // Analyze the error type and provide helpful context + errorStr := err.Error() + if strings.Contains(errorStr, "bad handshake") { + errorMsg.WriteString(" - WebSocket handshake failed") + errorMsg.WriteString("\nThis could be due to:") + errorMsg.WriteString("\n• Invalid server URL or endpoint") + errorMsg.WriteString("\n• Server not supporting WebSocket connections") + errorMsg.WriteString("\n• Network connectivity issues") + errorMsg.WriteString("\n• Authentication problems") + } else if strings.Contains(errorStr, "timeout") { + errorMsg.WriteString(" - Connection timeout") + errorMsg.WriteString("\nThe server may be overloaded or unreachable") + } else if strings.Contains(errorStr, "refused") { + errorMsg.WriteString(" - Connection refused") + errorMsg.WriteString("\nThe server may be down or the port may be blocked") + } else if strings.Contains(errorStr, "no such host") { + errorMsg.WriteString(" - DNS resolution failed") + errorMsg.WriteString("\nCheck the server hostname in your configuration") + } + + errorMsg.WriteString(fmt.Sprintf("\nOriginal error: %v", err)) + + return fmt.Errorf("%s", errorMsg.String()) } m.mu.Lock() From 334faf3a5f114b1eadc6b74b23e44c9689156a62 Mon Sep 17 00:00:00 2001 From: pnwmatt <180812017+pnwmatt@users.noreply.github.com> Date: Fri, 3 Oct 2025 13:08:27 -0700 Subject: [PATCH 05/13] Bump version from 0.0.4 to 0.0.5 --- client/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/main.go b/client/main.go index 2183898..c810e7c 100644 --- a/client/main.go +++ b/client/main.go @@ -15,7 +15,7 @@ import ( "github.com/sqlrsync/sqlrsync.com/sync" ) -var VERSION = "0.0.4" +var VERSION = "0.0.5" var ( serverURL string verbose bool From e675db9f6544d398941a2a64ebc51a056934b25b Mon Sep 17 00:00:00 2001 From: pnwmatt <180812017+pnwmatt@users.noreply.github.com> Date: Fri, 3 Oct 2025 14:42:04 -0700 Subject: [PATCH 06/13] fix: bugs with wsid on subscribe connections --- client/go.mod | 2 +- client/main.go | 8 ++++++++ client/remote/client.go | 23 +++++++++++++++-------- client/subscription/manager.go | 31 +++++++++++++++++++------------ client/sync/coordinator.go | 8 ++++++-- 5 files changed, 49 insertions(+), 23 deletions(-) diff --git a/client/go.mod b/client/go.mod index 5416e7f..d224737 100644 --- a/client/go.mod +++ b/client/go.mod @@ -5,6 +5,7 @@ go 1.24.5 require ( github.com/BurntSushi/toml v1.5.0 github.com/fatih/color v1.18.0 + github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.0 github.com/spf13/cobra v1.8.0 github.com/sqlrsync/sqlrsync.com/bridge v0.0.0-00010101000000-000000000000 @@ -12,7 +13,6 @@ require ( ) require ( - github.com/google/uuid v1.6.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect diff --git a/client/main.go b/client/main.go index c810e7c..cea0782 100644 --- a/client/main.go +++ b/client/main.go @@ -145,6 +145,13 @@ func runSync(cmd *cobra.Command, args []string) error { visibility = 1 } + // Get workspace ID for client identification + wsID, err := GetWsID() + if err != nil { + logger.Warn("Failed to get workspace ID", zap.Error(err)) + wsID = "" // Continue with empty wsID + } + // Create sync coordinator coordinator := sync.NewCoordinator(&sync.CoordinatorConfig{ ServerURL: serverURL, @@ -162,6 +169,7 @@ func runSync(cmd *cobra.Command, args []string) error { DryRun: dryRun, Logger: logger, Verbose: verbose, + WsID: wsID, // Add workspace ID }) // Execute the operation diff --git a/client/remote/client.go b/client/remote/client.go index f11094b..f219e5b 100644 --- a/client/remote/client.go +++ b/client/remote/client.go @@ -407,6 +407,7 @@ type Config struct { SendConfigCmd bool // we don't have the version number or remote path LocalHostname string LocalAbsolutePath string + WsID string // Workspace ID for X-ClientID header // Progress tracking ProgressConfig *ProgressConfig @@ -685,6 +686,12 @@ func (c *Client) Connect() error { headers.Set("Authorization", c.config.AuthToken) + if c.config.WsID != "" { + headers.Set("X-ClientID", c.config.WsID) + } else { + c.logger.Fatal("No wsID provided for X-ClientID header") + } + if c.config.LocalHostname != "" { headers.Set("X-LocalHostname", c.config.LocalHostname) } @@ -707,7 +714,7 @@ func (c *Client) Connect() error { // Extract detailed error information from the response statusCode := response.StatusCode statusText := response.Status - + var respBodyStr string if response.Body != nil { respBytes, readErr := io.ReadAll(response.Body) @@ -716,22 +723,22 @@ func (c *Client) Connect() error { respBodyStr = strings.TrimSpace(string(respBytes)) } } - + // Create a clean error message var errorMsg strings.Builder errorMsg.WriteString(fmt.Sprintf("HTTP %d (%s)", statusCode, statusText)) - + if respBodyStr != "" { errorMsg.WriteString(fmt.Sprintf(": %s", respBodyStr)) } - + return fmt.Errorf("%s", errorMsg.String()) } - + // Handle cases where response is nil (e.g., network errors, bad handshake) var errorMsg strings.Builder errorMsg.WriteString("Failed to connect to WebSocket") - + // Analyze the error type and provide helpful context errorStr := err.Error() if strings.Contains(errorStr, "bad handshake") { @@ -751,9 +758,9 @@ func (c *Client) Connect() error { errorMsg.WriteString(" - DNS resolution failed") errorMsg.WriteString("\nCheck the server hostname in your configuration") } - + errorMsg.WriteString(fmt.Sprintf("\nOriginal error: %v", err)) - + return fmt.Errorf("%s", errorMsg.String()) } defer response.Body.Close() diff --git a/client/subscription/manager.go b/client/subscription/manager.go index 8c4a4df..4c7ee19 100644 --- a/client/subscription/manager.go +++ b/client/subscription/manager.go @@ -34,12 +34,13 @@ type Message struct { Timestamp time.Time `json:"timestamp"` } -// Config holds subscription manager configuration -type Config struct { +// ManagerConfig holds subscription manager configuration +type ManagerConfig struct { ServerURL string ReplicaPath string AuthToken string ReplicaID string + WsID string // websocket ID for client identification Logger *zap.Logger MaxReconnectAttempts int // Maximum number of reconnect attempts (0 = infinite) InitialReconnectDelay time.Duration // Initial delay before first reconnect @@ -54,7 +55,7 @@ type Config struct { // MaxReconnectDelay is reached. Reconnection attempts continue indefinitely unless // MaxReconnectAttempts is set to a positive value. type Manager struct { - config *Config + config *ManagerConfig logger *zap.Logger conn *websocket.Conn ctx context.Context @@ -73,7 +74,7 @@ type Manager struct { } // NewManager creates a new subscription manager -func NewManager(config *Config) *Manager { +func NewManager(config *ManagerConfig) *Manager { ctx, cancel := context.WithCancel(context.Background()) // Set default reconnection parameters if not provided @@ -202,6 +203,12 @@ func (m *Manager) doConnect() error { headers.Set("X-ReplicaID", m.config.ReplicaID) } + if m.config.WsID != "" { + headers.Set("X-ClientID", m.config.WsID) + } else { + m.logger.Fatal("No wsID provided for X-ClientID header") + } + dialer := websocket.Dialer{ HandshakeTimeout: 10 * time.Second, } @@ -214,7 +221,7 @@ func (m *Manager) doConnect() error { // Extract detailed error information from the response statusCode := response.StatusCode statusText := response.Status - + var respBodyStr string if response.Body != nil { respBytes, readErr := io.ReadAll(response.Body) @@ -223,22 +230,22 @@ func (m *Manager) doConnect() error { respBodyStr = strings.TrimSpace(string(respBytes)) } } - + // Create a clean error message var errorMsg strings.Builder errorMsg.WriteString(fmt.Sprintf("HTTP %d (%s)", statusCode, statusText)) - + if respBodyStr != "" { errorMsg.WriteString(fmt.Sprintf(": %s", respBodyStr)) } - + return fmt.Errorf("%s", errorMsg.String()) } - + // Handle cases where response is nil (e.g., network errors, bad handshake) var errorMsg strings.Builder errorMsg.WriteString("Failed to connect to subscription service") - + // Analyze the error type and provide helpful context errorStr := err.Error() if strings.Contains(errorStr, "bad handshake") { @@ -258,9 +265,9 @@ func (m *Manager) doConnect() error { errorMsg.WriteString(" - DNS resolution failed") errorMsg.WriteString("\nCheck the server hostname in your configuration") } - + errorMsg.WriteString(fmt.Sprintf("\nOriginal error: %v", err)) - + return fmt.Errorf("%s", errorMsg.String()) } diff --git a/client/sync/coordinator.go b/client/sync/coordinator.go index 18aa1df..f05ff27 100644 --- a/client/sync/coordinator.go +++ b/client/sync/coordinator.go @@ -46,6 +46,7 @@ type CoordinatorConfig struct { DryRun bool Logger *zap.Logger Verbose bool + WsID string // Workspace ID for client identification } // Coordinator manages sync operations and subscriptions @@ -240,11 +241,12 @@ func (c *Coordinator) executeSubscribe() error { } // Create subscription manager with reconnection configuration - c.subManager = subscription.NewManager(&subscription.Config{ + c.subManager = subscription.NewManager(&subscription.ManagerConfig{ ServerURL: authResult.ServerURL, ReplicaPath: authResult.RemotePath, AuthToken: authResult.AccessToken, ReplicaID: authResult.ReplicaID, + WsID: c.config.WsID, Logger: c.logger.Named("subscription"), MaxReconnectAttempts: 20, // Infinite reconnect attempts InitialReconnectDelay: 5 * time.Second, // Start with 5 seconds delay @@ -373,6 +375,7 @@ func (c *Coordinator) executePull(isSubscription bool) error { Version: version, SendConfigCmd: true, SendKeyRequest: c.authResolver.CheckNeedsDashFile(c.config.LocalPath, remotePath), + WsID: c.config.WsID, // Add websocket ID //ProgressCallback: remote.DefaultProgressCallback(remote.FormatSimple), ProgressCallback: nil, ProgressConfig: &remote.ProgressConfig{ @@ -499,7 +502,8 @@ func (c *Coordinator) executePush() error { SendConfigCmd: true, SetVisibility: c.config.SetVisibility, CommitMessage: c.config.CommitMessage, - ProgressCallback: nil, //remote.DefaultProgressCallback(remote.FormatSimple), + WsID: c.config.WsID, // Add websocket ID + ProgressCallback: nil, //remote.DefaultProgressCallback(remote.FormatSimple), ProgressConfig: &remote.ProgressConfig{ Enabled: true, Format: remote.FormatSimple, From ffaa84353223d64da01fcb646488eb89b01a3820 Mon Sep 17 00:00:00 2001 From: pnwmatt <180812017+pnwmatt@users.noreply.github.com> Date: Fri, 3 Oct 2025 15:52:47 -0700 Subject: [PATCH 07/13] send client version --- client/main.go | 3 ++- client/remote/client.go | 3 +++ client/subscription/manager.go | 8 +++----- client/sync/coordinator.go | 6 +++++- 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/client/main.go b/client/main.go index cea0782..45e972a 100644 --- a/client/main.go +++ b/client/main.go @@ -169,7 +169,8 @@ func runSync(cmd *cobra.Command, args []string) error { DryRun: dryRun, Logger: logger, Verbose: verbose, - WsID: wsID, // Add workspace ID + WsID: wsID, // Add websocket ID + ClientVersion: VERSION, }) // Execute the operation diff --git a/client/remote/client.go b/client/remote/client.go index f219e5b..0b00cd2 100644 --- a/client/remote/client.go +++ b/client/remote/client.go @@ -402,6 +402,7 @@ type Config struct { InspectionDepth int // How many bytes to inspect (default: 32) PingPong bool AuthToken string + ClientVersion string // version of the client software SendKeyRequest bool // the -sqlrsync file doesn't exist, so make a token SendConfigCmd bool // we don't have the version number or remote path @@ -686,6 +687,8 @@ func (c *Client) Connect() error { headers.Set("Authorization", c.config.AuthToken) + headers.Set("X-ClientVersion", c.config.ClientVersion); + if c.config.WsID != "" { headers.Set("X-ClientID", c.config.WsID) } else { diff --git a/client/subscription/manager.go b/client/subscription/manager.go index 4c7ee19..74e45c2 100644 --- a/client/subscription/manager.go +++ b/client/subscription/manager.go @@ -41,6 +41,7 @@ type ManagerConfig struct { AuthToken string ReplicaID string WsID string // websocket ID for client identification + ClientVersion string // version of the client software Logger *zap.Logger MaxReconnectAttempts int // Maximum number of reconnect attempts (0 = infinite) InitialReconnectDelay time.Duration // Initial delay before first reconnect @@ -203,11 +204,8 @@ func (m *Manager) doConnect() error { headers.Set("X-ReplicaID", m.config.ReplicaID) } - if m.config.WsID != "" { - headers.Set("X-ClientID", m.config.WsID) - } else { - m.logger.Fatal("No wsID provided for X-ClientID header") - } + headers.Set("X-ClientVersion", m.config.ClientVersion) + headers.Set("X-ClientID", m.config.WsID) dialer := websocket.Dialer{ HandshakeTimeout: 10 * time.Second, diff --git a/client/sync/coordinator.go b/client/sync/coordinator.go index f05ff27..4da1af6 100644 --- a/client/sync/coordinator.go +++ b/client/sync/coordinator.go @@ -46,7 +46,8 @@ type CoordinatorConfig struct { DryRun bool Logger *zap.Logger Verbose bool - WsID string // Workspace ID for client identification + WsID string // Websocket ID for client identification + ClientVersion string // version of the client software } // Coordinator manages sync operations and subscriptions @@ -247,6 +248,7 @@ func (c *Coordinator) executeSubscribe() error { AuthToken: authResult.AccessToken, ReplicaID: authResult.ReplicaID, WsID: c.config.WsID, + ClientVersion: c.config.ClientVersion, Logger: c.logger.Named("subscription"), MaxReconnectAttempts: 20, // Infinite reconnect attempts InitialReconnectDelay: 5 * time.Second, // Start with 5 seconds delay @@ -376,6 +378,7 @@ func (c *Coordinator) executePull(isSubscription bool) error { SendConfigCmd: true, SendKeyRequest: c.authResolver.CheckNeedsDashFile(c.config.LocalPath, remotePath), WsID: c.config.WsID, // Add websocket ID + ClientVersion: c.config.ClientVersion, //ProgressCallback: remote.DefaultProgressCallback(remote.FormatSimple), ProgressCallback: nil, ProgressConfig: &remote.ProgressConfig{ @@ -503,6 +506,7 @@ func (c *Coordinator) executePush() error { SetVisibility: c.config.SetVisibility, CommitMessage: c.config.CommitMessage, WsID: c.config.WsID, // Add websocket ID + ClientVersion: c.config.ClientVersion, ProgressCallback: nil, //remote.DefaultProgressCallback(remote.FormatSimple), ProgressConfig: &remote.ProgressConfig{ Enabled: true, From 4a1c30e337110be5116ae4bc9414fd9db49a17a2 Mon Sep 17 00:00:00 2001 From: pnwmatt <180812017+pnwmatt@users.noreply.github.com> Date: Fri, 3 Oct 2025 15:54:27 -0700 Subject: [PATCH 08/13] Merge pull request #19 from sqlrsync/showConnectError feat: improve client error reporting from server --- client/AUTO MERGE:.md | 13 ++ client/AUTO-MERGE-STATUS.md | 240 ++++++++++++++++++++++++++++++++ client/IMPLEMENTATION-STATUS.md | 185 ++++++++++++++++++++++++ client/PUSH-Subscription.md | 207 +++++++++++++++++++++++++++ client/SQLDIFF-INTEGRATION.md | 226 ++++++++++++++++++++++++++++++ 5 files changed, 871 insertions(+) create mode 100644 client/AUTO MERGE:.md create mode 100644 client/AUTO-MERGE-STATUS.md create mode 100644 client/IMPLEMENTATION-STATUS.md create mode 100644 client/PUSH-Subscription.md create mode 100644 client/SQLDIFF-INTEGRATION.md diff --git a/client/AUTO MERGE:.md b/client/AUTO MERGE:.md new file mode 100644 index 0000000..c69bfd8 --- /dev/null +++ b/client/AUTO MERGE:.md @@ -0,0 +1,13 @@ +AUTO MERGE: + +If the server responds that there was a newer version that we didn't know about but we've already PUSHed some changes, the server rejects the PUSH and tells us what the latest version is. If the user provides the --merge flag, then we: + +- use LOCAL mode to copy this database to /tmp/sqlrsync-merge-local\*randomstuff +- use PULL mode to grab latest and assert it over that /tmp file +- use sqlrsync --diff to diff the /tmp vs LOCAL and generate a patch file +- if the patch doesn't have any conflicting primary keys, then apply it and push, otherwise send a call to POST serverURL/sapi/notification/account/replicaName/. The server will block this server until a human resolves the conflict. + +Message body for post: +{ type: "merge-conflict", the diff file as base64, the versions impacted, hostname, wsID } + +Here's the source code for https://raw.githubusercontent.com/sqlite/sqlite/refs/heads/master/tool/sqldiff.c diff --git a/client/AUTO-MERGE-STATUS.md b/client/AUTO-MERGE-STATUS.md new file mode 100644 index 0000000..b07870b --- /dev/null +++ b/client/AUTO-MERGE-STATUS.md @@ -0,0 +1,240 @@ +# Auto-Merge Implementation Status + +## ✅ Completed Features + +### 1. CLI Flag + +- `--merge`: Automatically merge changes when server has newer version + +### 2. Version Conflict Detection + +**Location:** `remote/client.go:1279-1297` + +- Detects `ABORT=VERSION_CONFLICT:versionNumber` message from server +- Stores conflict state and latest version number +- Provides methods: + - `HasVersionConflict() bool` - Check if conflict occurred + - `GetLatestVersion() string` - Get server's latest version + - `ResetVersionConflict()` - Clear conflict state + +### 3. Auto-Merge Flow + +**Location:** `sync/coordinator.go:537-557` + +When PUSH fails with version conflict and `--merge` is enabled: + +1. ✅ **Create temp file** - `/tmp/sqlrsync-merge-local-*.sqlite` +2. ✅ **Copy local to temp** - Uses LOCAL mode (`RunDirectSync`) +3. ✅ **PULL latest over temp** - Fetches server version and applies to temp +4. ⚠️ **Generate diff** - Currently stubbed, needs sqldiff integration +5. ⚠️ **Check conflicts** - Currently stubbed, needs primary key conflict detection +6. ✅ **Apply changes** - Copies merged result back to local +7. ✅ **Retry PUSH** - Attempts push again with merged data + +### 4. Conflict Notification + +**Location:** `sync/coordinator.go:1141-1158` + +When conflicts are detected: +- Prepares notification with: + - Type: `"merge-conflict"` + - Diff data (base64 encoded) + - Version information + - Hostname and wsID +- ⚠️ **TODO**: Implement HTTP POST to `$server/sapi/notification/account/$replicaName/` + +## 🚧 TODO: Remaining Work + +### 1. SQLDiff Integration + +Need to implement actual diff generation and parsing: + +```go +// Generate diff between temp (latest) and local (our changes) +// Using sqldiff tool or equivalent +diffOutput := runSQLDiff(tempPath, c.config.LocalPath) + +// Parse diff output to extract primary key operations +conflicts := parseDiffForConflicts(diffOutput) +``` + +**Reference:** https://raw.githubusercontent.com/sqlite/sqlite/refs/heads/master/tool/sqldiff.c + +### 2. Primary Key Conflict Detection + +Parse diff output to detect conflicting operations: + +```go +type DiffOperation struct { + Type string // INSERT, UPDATE, DELETE + Table string + PrimaryKey map[string]interface{} +} + +func parseDiffForConflicts(diffOutput []byte) ([]DiffOperation, error) { + // Parse SQL diff output + // Identify operations on same primary keys + // Return conflicting operations +} +``` + +### 3. HTTP Notification Implementation + +Implement the POST request to notify server of conflicts: + +```go +func (c *Coordinator) sendMergeConflictNotification(serverURL, replicaName, version string, diffData []byte) error { + payload := map[string]interface{}{ + "type": "merge-conflict", + "diff": base64.StdEncoding.EncodeToString(diffData), + "versions": []string{c.config.Version, version}, + "hostname": hostname, + "wsID": wsID, + } + + url := fmt.Sprintf("%s/sapi/notification/account/%s/", serverURL, replicaName) + // POST JSON payload +} +``` + +## 📝 Usage Examples + +### Basic Auto-Merge + +```bash +sqlrsync mydb.sqlite namespace/mydb.sqlite --merge +``` + +When pushing and server has newer version: +1. Automatically pulls latest version +2. Merges changes (if no conflicts) +3. Retries push with merged data + +### Auto-Merge with Subscription + +```bash +sqlrsync mydb.sqlite namespace/mydb.sqlite --subscribe --waitIdle 10m --merge +``` + +- Watches for local changes +- Pushes after 10 minutes idle +- Auto-merges if version conflict occurs +- Continues watching after successful merge + +### Expected Behavior + +#### No Conflicts +``` +🔄 Performing PUSH... +⚠️ Version conflict: Server has newer version 42 +🔄 Auto-merge enabled - attempting to merge with server version 42... +📋 Step 1/5: Copying local database to temp file... +📥 Step 2/5: Pulling latest version 42 from server over temp file... +🔍 Step 3/5: Generating diff between temp (latest) and local (your changes)... +✅ Step 4/5: No conflicts detected +📝 Step 5/5: Applying changes to local database... +✅ Merge completed successfully +✅ Auto-merge successful - retrying PUSH... +✅ PUSH completed +``` + +#### With Conflicts +``` +🔄 Performing PUSH... +⚠️ Version conflict: Server has newer version 42 +🔄 Auto-merge enabled - attempting to merge with server version 42... +📋 Step 1/5: Copying local database to temp file... +📥 Step 2/5: Pulling latest version 42 from server over temp file... +🔍 Step 3/5: Generating diff between temp (latest) and local (your changes)... +❌ Merge conflict detected - server blocking until manual resolution + Server: wss://sqlrsync.com + Replica: namespace/mydb.sqlite + Version: 42 + Hostname: my-laptop + wsID: abc123 +Error: merge conflict requires manual resolution +``` + +## 🏗️ Architecture + +### Flow Diagram + +``` +┌─────────────┐ +│ PUSH Failed │ +│ (version │ +│ conflict) │ +└──────┬──────┘ + │ + ▼ + ┌───────────┐ + │ --merge? │ + └─────┬─────┘ + │ yes + ▼ + ┌──────────────────┐ + │ Copy LOCAL │ + │ to /tmp/file │ + └────────┬─────────┘ + │ + ▼ + ┌──────────────────┐ + │ PULL latest │ + │ over /tmp/file │ + └────────┬─────────┘ + │ + ▼ + ┌──────────────────┐ + │ Generate diff │ + │ (sqldiff) │ + └────────┬─────────┘ + │ + ▼ + ┌──────────────────┐ + │ Check PK │ + │ conflicts? │ + └─────┬────────────┘ + │ + ┌────┴────┐ + │ │ + no yes + │ │ + ▼ ▼ +┌────────┐ ┌──────────────┐ +│ Apply │ │ POST to │ +│ diff │ │ /sapi/notify │ +└───┬────┘ └──────┬───────┘ + │ │ + ▼ ▼ +┌────────┐ ┌──────────────┐ +│ Retry │ │ Error: │ +│ PUSH │ │ Manual fix │ +└────────┘ └──────────────┘ +``` + +### Server Requirements + +The server needs to: + +1. **Detect version conflicts** - When client pushes with old version number +2. **Send ABORT message** - Format: `ABORT=VERSION_CONFLICT:latestVersion` +3. **Handle notification endpoint** - `POST /sapi/notification/account/{replicaName}/` + - Accept JSON payload with conflict details + - Block replica until manual resolution + - Notify account owner of conflict + +### Client State + +During auto-merge, the client maintains: +- Original local database (unchanged until merge succeeds) +- Temp file with server's latest version +- Diff between temp and local +- Conflict detection results +- Version numbers (old and new) + +## 🔐 Security Considerations + +1. **Temp file cleanup** - Always cleaned up via `defer os.Remove(tempPath)` +2. **Authentication** - Uses same auth as regular PULL/PUSH +3. **Version validation** - Server must validate version numbers to prevent replay attacks +4. **Conflict data** - Diff data may contain sensitive information, ensure HTTPS for notifications diff --git a/client/IMPLEMENTATION-STATUS.md b/client/IMPLEMENTATION-STATUS.md new file mode 100644 index 0000000..7a067fe --- /dev/null +++ b/client/IMPLEMENTATION-STATUS.md @@ -0,0 +1,185 @@ +# PUSH Subscription Implementation Status + +## ✅ Completed Features + +### 1. CLI Flags + +- `--subscribe`: Enable subscription mode for both PUSH and PULL +- `--waitIdle`: Time to wait for idleness before pushing (e.g., 10m, 1h30m, 3d) - min 10 seconds +- `--maxInterval`: Maximum time between pushes regardless of activity (e.g., 24h, 1w) +- `--minInterval`: Minimum time between subsequent pushes (defaults to 1/2 maxInterval) + +### 2. Duration Parsing + +- Custom duration parser supporting: s, m, h, d (days), w (weeks) +- Does not support months (as specified) +- Validates waitIdle min (10 seconds) and max (24 days) + +### 3. File Watching + +- Uses fsnotify to monitor SQLite database and WAL files +- Detects Write and Create events +- Non-blocking change notifications via channel +- Callback sends SQLRSYNC_CHANGED notifications to server for analytics + +### 4. PUSH Subscription Core Logic + +- **Initial PUSH**: Performs initial PUSH when subscription starts +- **Key type validation**: Checks if PULL key is being used and switches to PULL subscription mode +- **File change detection**: Monitors both .sqlite and .sqlite-wal files +- **Singleton waitIdle timer**: Timer that resets on each file change for another MAX(waitIdle OR lastPush - minInterval). On expiration, performs PUSH and resets sessionStarted=false +- **First write notification**: Sends SQLRSYNC_CHANGED with waitIdle duration, sets sessionStarted=true +- **Subsequent write notifications**: If resend timer isn't running, starts timer for (waitIdle - 10 seconds). When timer fires, resends SQLRSYNC_CHANGED with remaining duration +- **minInterval enforcement**: Ensures minimum spacing between pushes +- **maxInterval enforcement**: Forces push if maxInterval time has elapsed since last push +- **Timer reset on change**: Each file change recalculates and resets the timer + +### 5. Error Handling + +- Exponential backoff with retry (5 attempts) +- Delays: 5s, 10s, 20s, 40s, 80s +- Error reporting to server stub (HTTPS POST to $server/sapi/$replicaID) - not yet implemented +- Continues watching even after push failures + +### 6. User Notifications + +- Progress messages for all push operations +- Clear status updates during file watching +- Timer duration logging in verbose mode +- Automatic warning when PULL key is used with --subscribe flag + +### 7. Protocol Messages + +```go +const ( + SQLRSYNC_CONFIG = 0x51 // Config with keys, replicaID, keyType + SQLRSYNC_NEWREPLICAVERSION = 0x52 // New version available + SQLRSYNC_KEYREQUEST = 0x53 // Request keys + SQLRSYNC_COMMITMESSAGE = 0x54 // Commit message + SQLRSYNC_CHANGED = 0x57 // Write detected notification with duration +) +``` + +**SQLRSYNC_CHANGED Format:** +- `[0x57][duration: 4 bytes as seconds]` +- Sent on first write with waitIdle duration +- Resent 10 seconds before expiration if there were subsequent writes +- Contains remaining time until push will occur + +### 8. Key Type Detection + +- Server sends `keyType` field in CONFIG message (`"PUSH"` or `"PULL"`) +- Client parses keyType from CONFIG JSON (remote/client.go:1303-1306) +- If PULL key detected with --subscribe flag, client automatically switches to PULL subscription mode +- Prevents misuse of --waitIdle/--maxInterval/--minInterval with PULL keys + +## 🚧 TODO: Remaining Work + +### 1. Error Reporting to Server + +Stub exists at `coordinator.go:912-914`. Need to implement: + +```go +func (c *Coordinator) reportErrorToServer(err error) { + // HTTPS POST to $server/sapi/$replicaID + // Include error message + // Only if failures persist for > 5 minutes +} +``` + +## 📝 Usage Examples + +### Basic PUSH subscription + +```bash +sqlrsync mydb.sqlite namespace/mydb.sqlite --subscribe --waitIdle 10m --maxInterval 24h +``` + +- Pushes after 10 minutes of idleness +- Forces push at least every 24 hours even with continuous activity +- Sends SQLRSYNC_CHANGED on first write, resends 10s before expiration if subsequent writes + +### Custom intervals + +```bash +sqlrsync mydb.sqlite namespace/mydb.sqlite --subscribe \ + --waitIdle 5m --minInterval 10m --maxInterval 1h +``` + +- Timer = MAX(10m - timeSinceLastPush, 5m) +- If last push was 3 min ago: timer = MAX(7m, 5m) = 7m +- If last push was 12 min ago: timer = MAX(-2m, 5m) = 5m + +### Using with PULL key (auto-switches) + +```bash +# This will automatically switch to PULL subscription mode +sqlrsync namespace/mydb.sqlite --subscribe --waitIdle 10m +``` + +- Client detects PULL key from server CONFIG +- Ignores --waitIdle/--maxInterval/--minInterval +- Switches to standard PULL subscription mode + +## 🏗️ Architecture + +### File Structure + +``` +client/ +├── main.go # CLI flags, operation routing +├── sync/ +│ ├── coordinator.go # Main sync orchestration + PUSH subscribe +│ └── duration.go # Duration parsing utilities +├── watcher/ +│ └── watcher.go # File system monitoring +├── remote/ +│ └── client.go # WebSocket client with CHANGED notifications +└── subscription/ + └── manager.go # PULL subscription (existing) +``` + +### Flow for PUSH Subscribe + +1. Parse and validate duration parameters (min 10s for waitIdle) +2. Execute initial PUSH +3. Establish persistent WebSocket connection +4. Check key type from CONFIG - switch to PULL subscription if PULL key detected +5. Start file watcher +6. Event loop: + - Check for maxInterval timeout + - Wait for file changes + - On first write: send SQLRSYNC_CHANGED with waitIdle duration, set sessionStarted=true + - On subsequent writes: schedule resend for 10s before expiration + - Apply timing logic (waitIdle, minInterval) + - Execute PUSH with retry on timer expiration + - Reset sessionStarted=false after successful push + +## 🔄 Server-Side Responsibilities + +### Conflict Resolution + +When a client pushes and is not in sync: + +1. Pull down the version the client was aware of → **OLD** +2. Pull down the latest version → **LATEST** +3. Treat client's current copy as → **CANDIDATE** +4. Run `sqlite_diff OLD CANDIDATE` → **second.diff** +5. Run `sqlite_diff OLD LATEST` → **first.diff** +6. If no overlap between primary keys in both diffs: + - Apply second.diff to LATEST + - Push result as new version + - Discard CANDIDATE + +### Key Type Validation + +- Server sends `keyType` in CONFIG message +- Valid values: `"PUSH"` or `"PULL"` +- Client uses this to validate operation mode +- Prevents PUSH operations with PULL-only keys + +### Analytics + +- Server receives SQLRSYNC_CHANGED (0x57) messages with duration +- Can track client activity and expected push timing +- Updates on subsequent writes provide refined timing expectations diff --git a/client/PUSH-Subscription.md b/client/PUSH-Subscription.md new file mode 100644 index 0000000..32947b9 --- /dev/null +++ b/client/PUSH-Subscription.md @@ -0,0 +1,207 @@ +PUSH Subscription mode is a new feature for SQLRsync that allows +you to optionally automatically PUSH changes from the LOCAL to +REMOTE. + +The feature works by using OS native file watchers to monitor changes +to the file, and when a change is detected, begin a PUSH operation after +a variable delay. Users specify how long to a fixed amount of time since +the last change. + +The subscription mode is activated by adding the --subscribe flag to a +PUSH command. The first time the command is run, it will do a normal +PUSH, and then begin monitoring the file for changes. When a change is +detected, it will wait the specified delay time (or auto mode) and then +begin a PUSH operation. If another change is detected during the wait +time, the wait timer is reset UNLESS it has been X hours since the last +PUSH, in which case it will immediately begin a PUSH operation. + +Example: +sqlrsync mydb.sqlite namespace/mydb.sqlite --subscribe --waitIdle 10m --maxInterval 24h + +Wait for 10 minutes of idleness before pushing, but in the event that there's +activity every 8 minutes for multiple days straight, it will at least push +24 hours after the last push. + +Example: +sqlrsync mydb.sqlite namespace/mydb.sqlite -m "turn on push subscribe" --subscribe --waitIdle 10m --maxInterval 24h + +With push subscribe the database is checked to see if PRAGMA busy_timeout is +greater than waitIdle. If not, it asks the user what value they want to increase +it to and sets it. + +## Coordination Mode (Optional) + +If --coordinate is specified, the client opts into the coordination system. Each client +configures their own waitIdle and maxInterval values independently - these are NOT shared +between clients. + +When --coordinate is enabled: +- Client does NOT send SQLRSYNC_COORDINATE to other clients +- Client registers with server as participating in coordination +- First file change detected: + - Client sends `SQLRSYNC_CLAIM $waitIdle` to server + - Starts extension timer for `waitIdle - 10 seconds` +- Subsequent writes: + - Sets flag "had additional writes since CLAIM" + - Does NOT send CLAIM immediately (minimizes traffic) +- When extension timer fires (10 seconds before claim expires): + - If flag is set: sends `SQLRSYNC_CLAIM $remainingWaitIdle` + - Where `remainingWaitIdle = waitIdle - (seconds since last write)` + - Starts new timer for `remainingWaitIdle - 10 seconds` + - If no additional writes: lets original claim expire naturally +- Server rebroadcasts CLAIM to all other coordinating clients for this replica +- Other clients suppress their own PUSH attempts during the claim window (polite yield) + +Without --coordinate: +- No CLAIM system is used +- Sends `SQLRSYNC_CHANGED` notifications to server for analytics +- All clients push independently based on their local timers +- Risk of concurrent pushes and conflicts (accepted risk) +- Server handles conflicts via SQLite rsync page-level merging + +If we are subscribing with these PUSH flags, then we will PUSH when the command starts. + +--minInterval, which defaults to 1/2 maxInterval but otherwise specifies the minimum +gap between subsequent PUSHes. + +The server will warn and ignore if --waitIdle, --maxInterval, or --minInterval are used with a PULL Key. + +--waitIdle, --minInterval, and --maxInterval both allow time durations like 3s, 10m, 1h30m, 2h, 1h45m, 3d, 1w, 4w, etc. They do not support months. + +The max for waitIdle is 24 days because busy_time is non-negative integer representing milliseconds, which is 24.8 days. The min is 1 second. + +Example: +sqlrsync mydb.sqlite namespace/mydb.sqlite -m "turn on coordinated subscribe" --subscribe --waitIdle 10m --maxInterval 24h --coordinate + +## Split-Brain Handling + +When two clients send SQLRSYNC_CLAIM simultaneously: +- Server accepts the first CLAIM it receives +- Responds to first client with `SQLRSYNC_CLAIM` (no arguments) = acknowledgment +- Broadcasts to all other coordinating clients: `SQLRSYNC_CLAIM $waitIdle` (using claimer's waitIdle) +- Second client receives the other client's CLAIM instead of acknowledgment +- Second client suppresses its push attempts for that waitIdle duration +- Server ignores subsequent CLAIMs until the current claim's waitIdle expires + +Note: CLAIMS do not block local writes - both databases continue accepting writes. +The coordination only affects when pushes are attempted. + +In addition to --waitIlde, --maxInterval, another flag is + +Areas for Consideration & Additional Scenarios + +1. Error Handling & Recovery + Network failures: What happens if a PUSH fails due to network issues? Should it retry with exponential backoff? + Matt says: Yes, exponential backoff using the existing exponential backoff code. The client will do an HTTPS POST request to $server/sapi/$replicaID with + the error message if it has been more than 5 minutes of failure. + +Partial sync failures: How should the system handle cases where a PUSH partially completes? +Matt says: that'd be handled the same as a network failure: exponential backoff. The client will do an HTTPS POST request to $server/sapi/$replicaID with +the error message 100% of the time. + +Server unavailability: Should the system queue changes locally and sync when the server becomes available? +Matt says: the above network and partial sync cases handle that with existing exponential backoff code. + +2. File System Edge Cases + Temporary files: Should the file watcher ignore temporary files created by editors (e.g., .tmp, .swp files)? + Matt says: yes, both the name of the file and -wal should be monitored. + +Atomic writes: How does it handle applications that write to temporary files and then rename them? +Matt says: The sqlite spec may do that to the local.db-wal file, but that will always trigger a traditional write to the local.db file + +Multiple rapid changes: What about scenarios with hundreds of small changes in quick succession? +Matt says: the --waitIdle code quickly ignores it if it hasn't been 75% of the waitIdle time or --maxInterval since the last PUSH + +File locking: What happens if the SQLite database is locked when trying to PUSH? +Matt says: The sqlrsync client PUSHes without needing a write lock by using a special control table in the sqlite database that has already been built into sqlrsync. + +3. Coordination Protocol Edge Cases + Split-brain scenarios: What if network partitions cause multiple clients to think they have the claim? + Matt says: the server is single threaded and we accept that the first claim the server received will win. + +Client crashes: How is a stale claim cleaned up if a client crashes while holding it? +Matt says: the claim is time-limited to waitIdle, so if a client crashes while holding it, the claim will expire after waitIdle time. + +Clock skew: How does the system handle clients with different system times? +Matt says: figuring in internet latency, since times are relative, other systems might be off by a few seconds. + +Late joiners: What happens when a new client subscribes to a database that already has active claims? 4. Performance & Resource Management +Matt says: The server will send a CLAIM when it connects if one already exists. + +Resource cleanup: How are file watchers cleaned up when subscription ends? +Matt says: These are long running permanent processes that should be added to systemd or similar to ensure they are restarted if they crash. + +Memory usage: For long-running subscriptions, how is memory managed? +Matt says: The file watcher uses minimal memory - mostly waiting, and not using any dynamic memory. + +CPU usage: What's the impact of continuous file watching on system resources? 5. User Experience +Matt says: I believe there are hooks into the OS that use no resources until a change is detected. + +Status visibility: How can users see the current subscription status, active claims, etc.? +Matt says: This is handled serverside. + +Graceful shutdown: How should users stop subscription mode? +Matt says: This is already handled by ctrl-c + +Logging: What level of logging should be provided for debugging coordination issues? +Matt says: Verbose logging is already available and can be enabled with the --verbose flag. + +# Specific documentation for the server side + +CLAIMS are sent from the worker to the durable for the account, which makes decisions and coordinates for that since it will be less resource constrained. + +CLAIMS do not block other writes. Both will be saved. + +# Questions for Claude + +1. How do concurrent writes work? + +**Answer**: Based on the specification, here's how concurrent writes are handled: + +## Without --coordinate flag (Default, No Coordination) +- Each client operates independently with no coordination +- The timer logic (`MAX(minInterval - timeSinceLastPush, waitIdle)`) prevents too-frequent pushes from same client +- All clients can push whenever their timers expire +- Server accepts all pushes in order received +- Risk of concurrent pushes and conflicts (accepted risk) +- Server handles conflicts via SQLite rsync page-level merging + +## With --coordinate flag (Optional Coordination) +- **--coordinate is optional** - each client opts in individually +- **Each client configures own waitIdle/maxInterval** - these are NOT shared between clients +- **CLAIMS do not block local writes** - all local databases continue accepting writes + +When Client A (with --coordinate) detects a change: +1. First write: + - Sends `SQLRSYNC_CLAIM $waitIdle` to server + - Starts extension timer for `waitIdle - 10 seconds` +2. Subsequent writes: + - Sets flag "had additional writes" + - Does NOT send CLAIM yet +3. When extension timer fires (10s before claim expires): + - If flag set: sends `SQLRSYNC_CLAIM $remainingWaitIdle` + - Where `remainingWaitIdle = waitIdle - (seconds since last write)` + - Starts new timer for `remainingWaitIdle - 10 seconds` + +Server (via durable object) handles CLAIM: +- Accepts first CLAIM for a replica +- Responds to Client A: `SQLRSYNC_CLAIM` (no args) = acknowledgment +- Broadcasts to other coordinating clients: `SQLRSYNC_CLAIM $waitIdle` (using A's waitIdle) + +Other coordinating clients receiving the CLAIM: +- **Do NOT block local writes** (application continues writing) +- Set a timer for received $waitIdle duration +- Suppress their own PUSH and CLAIM attempts during this window (polite yield) +- After timer expires, can push accumulated changes + +Client A completes its PUSH, then other clients can push their accumulated changes. + +## Conflict Resolution +Since CLAIMS don't block writes and coordination is optional: +1. Client A (coordinating) writes locally + claims +2. Client B (coordinating) writes locally but yields on push +3. Client C (no --coordinate) writes and pushes independently +4. Server accepts pushes in order received +5. Conflicts resolved using SQLite rsync protocol (page-level merging) + +The coordination is about **reducing push collisions** and **giving polite turn-taking** among coordinating clients, not preventing concurrent local writes. Non-coordinating clients can still push anytime. diff --git a/client/SQLDIFF-INTEGRATION.md b/client/SQLDIFF-INTEGRATION.md new file mode 100644 index 0000000..743a990 --- /dev/null +++ b/client/SQLDIFF-INTEGRATION.md @@ -0,0 +1,226 @@ +# SQLDiff Integration + +## Overview + +Created a CGO wrapper to integrate SQLite's sqldiff functionality into the auto-merge feature. + +## Files Created + +### 1. `/bridge/sqldiff_wrapper.h` +C header file defining the interface: +```c +int sqldiff_run(const char *db1, const char *db2, char **result, char **error); +``` + +### 2. `/bridge/sqldiff_wrapper.c` +C implementation that: +- Opens both databases in read-only mode +- Currently returns a placeholder diff +- **TODO**: Integrate full sqldiff.c logic once `sqlite3_stdio.h` is available + +### 3. `/bridge/cgo_sqldiff.go` +Go wrapper providing: + +**Main Function:** +```go +func RunSQLDiff(db1Path, db2Path string) (*DiffResult, error) +``` + +**Data Structures:** +```go +type DiffResult struct { + SQL string // SQL statements to transform db1 to db2 + HasChanges bool // Whether there are any differences + Operations []DiffOperation // Parsed operations from the diff + Conflicts []PrimaryKeyConflict // Detected primary key conflicts +} + +type DiffOperation struct { + Type string // INSERT, UPDATE, DELETE + Table string // Table name + PrimaryKey map[string]interface{} // Primary key values + SQL string // The actual SQL statement +} + +type PrimaryKeyConflict struct { + Table string + PrimaryKey map[string]interface{} + Operation1 string // First operation + Operation2 string // Second operation +} +``` + +**Helper Function:** +```go +func ApplyDiff(dbPath string, diff *DiffResult) error +``` + +## Integration with Auto-Merge + +### Location: `sync/coordinator.go:1112-1156` + +The auto-merge flow now: + +1. **Creates temp file** - Copies local database +2. **Pulls latest** - Gets server's version over temp +3. **Generates diff** - Calls `bridge.RunSQLDiff(tempPath, localPath)` +4. **Checks for changes** - Returns early if databases are identical +5. **Detects conflicts** - Checks for primary key conflicts +6. **Sends notification** - If conflicts exist, notifies server +7. **Applies diff** - If no conflicts, applies changes +8. **Retries push** - Pushes merged result + +### Usage in Code + +```go +diffResult, err := bridge.RunSQLDiff(tempPath, c.config.LocalPath) +if err != nil { + return fmt.Errorf("failed to generate diff: %w", err) +} + +if !diffResult.HasChanges { + fmt.Println("✅ No changes detected - databases are identical") + return nil +} + +if len(diffResult.Conflicts) > 0 { + fmt.Printf("❌ Detected %d primary key conflict(s)\n", len(diffResult.Conflicts)) + return c.sendMergeConflictNotification(serverURL, remotePath, latestVersion, []byte(diffResult.SQL)) +} + +if err := bridge.ApplyDiff(tempPath, diffResult); err != nil { + // Fallback to simple copy + c.logger.Warn("Failed to apply diff, using direct copy", zap.Error(err)) +} +``` + +## Current Status + +### ✅ Completed +- CGO wrapper infrastructure +- Go type definitions +- Integration with coordinator +- Error handling and fallbacks +- Placeholder diff generation + +### 🚧 TODO + +#### 1. Full SQLDiff Integration +The `sqldiff.c` file is present but not compiled due to missing `sqlite3_stdio.h`. + +**Next Steps:** +- Obtain `sqlite3_stdio.h` and `sqlite3_stdio.c` from SQLite ext/misc +- Remove `//go:build ignore` tag from `sqldiff.c` +- Modify `sqldiff_wrapper.c` to call actual diff functions +- Update CGO build flags if needed + +#### 2. SQL Parsing (parseSQL) +Currently stubbed in `cgo_sqldiff.go:75-82`. + +**Needs:** +```go +func parseSQL(sql string) []DiffOperation { + // Parse SQL statements + // Extract INSERT/UPDATE/DELETE operations + // Identify table names and primary keys + // Return structured operations +} +``` + +#### 3. Conflict Detection (detectConflicts) +Currently stubbed in `cgo_sqldiff.go:85-92`. + +**Algorithm:** +```go +func detectConflicts(operations []DiffOperation) []PrimaryKeyConflict { + // Group operations by table and primary key + // Find cases where same PK has multiple operations: + // - Multiple UPDATEs on same PK + // - UPDATE + DELETE on same PK + // - INSERT on existing PK + // Return conflicts +} +``` + +#### 4. Diff Application (ApplyDiff) +Currently stubbed in `cgo_sqldiff.go:95-105`. + +**Needs:** +```go +func ApplyDiff(dbPath string, diff *DiffResult) error { + // Open database + // Begin transaction + // Execute each SQL statement from diff + // Commit transaction + // Handle errors with rollback +} +``` + +## Testing Strategy + +### Unit Tests Needed + +1. **TestRunSQLDiff** - Test diff generation + - Identical databases → no changes + - Simple INSERT → detected + - UPDATE operation → detected + - DELETE operation → detected + +2. **TestParseSQL** - Test SQL parsing + - Parse INSERT statements + - Parse UPDATE statements + - Parse DELETE statements + - Extract primary keys correctly + +3. **TestDetectConflicts** - Test conflict detection + - No conflicts case + - UPDATE + UPDATE conflict + - INSERT + INSERT conflict + - UPDATE + DELETE conflict + +4. **TestApplyDiff** - Test diff application + - Apply simple changes + - Rollback on error + - Verify final state + +### Integration Tests Needed + +1. **TestAutoMergeNoConflict** - Full merge flow without conflicts +2. **TestAutoMergeWithConflict** - Merge with conflicts triggers notification +3. **TestAutoMergeFallback** - Falls back to copy on apply failure + +## Build Configuration + +The sqldiff.c is currently excluded from build: +```c +//go:build ignore +``` + +Once `sqlite3_stdio.h` is available, remove this tag and ensure: +- CGO can find all required headers +- SQLite library is linked properly +- Build succeeds on all platforms + +## Dependencies + +- SQLite3 library (`-lsqlite3`) +- C standard library +- `sqlite3_stdio.h` (pending) +- `sqlite3_stdio.c` (pending) + +## Error Handling + +The wrapper handles errors at multiple levels: + +1. **C Level** - Opens databases, checks SQLite errors +2. **CGO Level** - Converts C strings/errors to Go +3. **Go Level** - Returns structured errors with context +4. **Coordinator Level** - Falls back to simple copy if diff fails + +## Future Enhancements + +1. **Streaming Diffs** - For very large databases +2. **Incremental Parsing** - Parse SQL as it's generated +3. **Custom Conflict Resolution** - User-defined merge strategies +4. **Diff Caching** - Cache diffs for retry scenarios +5. **Progress Reporting** - Show diff generation progress From b5c391a04ab34d46fa8193e5b9360726819bf0a0 Mon Sep 17 00:00:00 2001 From: pnwmatt <180812017+pnwmatt@users.noreply.github.com> Date: Fri, 3 Oct 2025 13:07:16 -0700 Subject: [PATCH 09/13] feat: send wsID with client requests --- client/auth/config.go | 1 + client/config.go | 46 +++++++++++++++++++++++++++++++++++++++++++ client/go.mod | 1 + client/go.sum | 2 ++ 4 files changed, 50 insertions(+) diff --git a/client/auth/config.go b/client/auth/config.go index 76d241c..bea5925 100644 --- a/client/auth/config.go +++ b/client/auth/config.go @@ -15,6 +15,7 @@ import ( type DefaultsConfig struct { Defaults struct { Server string `toml:"server"` + WsID string `toml:"wsID"` } `toml:"defaults"` } diff --git a/client/config.go b/client/config.go index 77724b3..0920ca7 100644 --- a/client/config.go +++ b/client/config.go @@ -9,12 +9,14 @@ import ( "time" "github.com/BurntSushi/toml" + "github.com/google/uuid" ) // .config/sqlrsync/defaults.toml type DefaultsConfig struct { Defaults struct { Server string `toml:"server"` + WsID string `toml:"wsID"` } `toml:"defaults"` } @@ -78,6 +80,14 @@ func LoadDefaultsConfig() (*DefaultsConfig, error) { // Return default config if file doesn't exist config := &DefaultsConfig{} config.Defaults.Server = "wss://sqlrsync.com" + // Generate wsID if it doesn't exist + if err := generateAndSetWsID(config); err != nil { + return nil, fmt.Errorf("failed to generate wsID: %w", err) + } + // Save the new config with wsID + if err := SaveDefaultsConfig(config); err != nil { + return nil, fmt.Errorf("failed to save defaults config with wsID: %w", err) + } return config, nil } return nil, fmt.Errorf("failed to read defaults config file %s: %w", path, err) @@ -93,9 +103,45 @@ func LoadDefaultsConfig() (*DefaultsConfig, error) { config.Defaults.Server = "wss://sqlrsync.com" } + // Generate wsID if it doesn't exist + needsSave := false + if config.Defaults.WsID == "" { + if err := generateAndSetWsID(&config); err != nil { + return nil, fmt.Errorf("failed to generate wsID: %w", err) + } + needsSave = true + } + + // Save config if we made changes + if needsSave { + if err := SaveDefaultsConfig(&config); err != nil { + return nil, fmt.Errorf("failed to save defaults config with wsID: %w", err) + } + } + return &config, nil } +// generateAndSetWsID generates a new wsID (UUID + hostname) and sets it in the config +func generateAndSetWsID(config *DefaultsConfig) error { + hostname, err := os.Hostname() + if err != nil { + return fmt.Errorf("failed to get hostname: %w", err) + } + + config.Defaults.WsID = hostname + ":" + uuid.New().String() + return nil +} + +// GetWsID loads the defaults config and returns the wsID +func GetWsID() (string, error) { + config, err := LoadDefaultsConfig() + if err != nil { + return "", fmt.Errorf("failed to load defaults config: %w", err) + } + return config.Defaults.WsID, nil +} + func SaveDefaultsConfig(config *DefaultsConfig) error { path, err := GetDefaultsPath() if err != nil { diff --git a/client/go.mod b/client/go.mod index 300e31e..5416e7f 100644 --- a/client/go.mod +++ b/client/go.mod @@ -12,6 +12,7 @@ require ( ) require ( + github.com/google/uuid v1.6.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect diff --git a/client/go.sum b/client/go.sum index 51cb4ae..5a5051c 100644 --- a/client/go.sum +++ b/client/go.sum @@ -5,6 +5,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= From 854c7a8149ceece017eab4bce44c580d4fccf799 Mon Sep 17 00:00:00 2001 From: pnwmatt <180812017+pnwmatt@users.noreply.github.com> Date: Fri, 3 Oct 2025 14:42:04 -0700 Subject: [PATCH 10/13] fix: bugs with wsid on subscribe connections --- client/go.mod | 2 +- client/main.go | 8 ++++++++ client/remote/client.go | 23 +++++++++++++++-------- client/subscription/manager.go | 31 +++++++++++++++++++------------ client/sync/coordinator.go | 8 ++++++-- 5 files changed, 49 insertions(+), 23 deletions(-) diff --git a/client/go.mod b/client/go.mod index 5416e7f..d224737 100644 --- a/client/go.mod +++ b/client/go.mod @@ -5,6 +5,7 @@ go 1.24.5 require ( github.com/BurntSushi/toml v1.5.0 github.com/fatih/color v1.18.0 + github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.0 github.com/spf13/cobra v1.8.0 github.com/sqlrsync/sqlrsync.com/bridge v0.0.0-00010101000000-000000000000 @@ -12,7 +13,6 @@ require ( ) require ( - github.com/google/uuid v1.6.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect diff --git a/client/main.go b/client/main.go index c810e7c..cea0782 100644 --- a/client/main.go +++ b/client/main.go @@ -145,6 +145,13 @@ func runSync(cmd *cobra.Command, args []string) error { visibility = 1 } + // Get workspace ID for client identification + wsID, err := GetWsID() + if err != nil { + logger.Warn("Failed to get workspace ID", zap.Error(err)) + wsID = "" // Continue with empty wsID + } + // Create sync coordinator coordinator := sync.NewCoordinator(&sync.CoordinatorConfig{ ServerURL: serverURL, @@ -162,6 +169,7 @@ func runSync(cmd *cobra.Command, args []string) error { DryRun: dryRun, Logger: logger, Verbose: verbose, + WsID: wsID, // Add workspace ID }) // Execute the operation diff --git a/client/remote/client.go b/client/remote/client.go index f11094b..f219e5b 100644 --- a/client/remote/client.go +++ b/client/remote/client.go @@ -407,6 +407,7 @@ type Config struct { SendConfigCmd bool // we don't have the version number or remote path LocalHostname string LocalAbsolutePath string + WsID string // Workspace ID for X-ClientID header // Progress tracking ProgressConfig *ProgressConfig @@ -685,6 +686,12 @@ func (c *Client) Connect() error { headers.Set("Authorization", c.config.AuthToken) + if c.config.WsID != "" { + headers.Set("X-ClientID", c.config.WsID) + } else { + c.logger.Fatal("No wsID provided for X-ClientID header") + } + if c.config.LocalHostname != "" { headers.Set("X-LocalHostname", c.config.LocalHostname) } @@ -707,7 +714,7 @@ func (c *Client) Connect() error { // Extract detailed error information from the response statusCode := response.StatusCode statusText := response.Status - + var respBodyStr string if response.Body != nil { respBytes, readErr := io.ReadAll(response.Body) @@ -716,22 +723,22 @@ func (c *Client) Connect() error { respBodyStr = strings.TrimSpace(string(respBytes)) } } - + // Create a clean error message var errorMsg strings.Builder errorMsg.WriteString(fmt.Sprintf("HTTP %d (%s)", statusCode, statusText)) - + if respBodyStr != "" { errorMsg.WriteString(fmt.Sprintf(": %s", respBodyStr)) } - + return fmt.Errorf("%s", errorMsg.String()) } - + // Handle cases where response is nil (e.g., network errors, bad handshake) var errorMsg strings.Builder errorMsg.WriteString("Failed to connect to WebSocket") - + // Analyze the error type and provide helpful context errorStr := err.Error() if strings.Contains(errorStr, "bad handshake") { @@ -751,9 +758,9 @@ func (c *Client) Connect() error { errorMsg.WriteString(" - DNS resolution failed") errorMsg.WriteString("\nCheck the server hostname in your configuration") } - + errorMsg.WriteString(fmt.Sprintf("\nOriginal error: %v", err)) - + return fmt.Errorf("%s", errorMsg.String()) } defer response.Body.Close() diff --git a/client/subscription/manager.go b/client/subscription/manager.go index 8c4a4df..4c7ee19 100644 --- a/client/subscription/manager.go +++ b/client/subscription/manager.go @@ -34,12 +34,13 @@ type Message struct { Timestamp time.Time `json:"timestamp"` } -// Config holds subscription manager configuration -type Config struct { +// ManagerConfig holds subscription manager configuration +type ManagerConfig struct { ServerURL string ReplicaPath string AuthToken string ReplicaID string + WsID string // websocket ID for client identification Logger *zap.Logger MaxReconnectAttempts int // Maximum number of reconnect attempts (0 = infinite) InitialReconnectDelay time.Duration // Initial delay before first reconnect @@ -54,7 +55,7 @@ type Config struct { // MaxReconnectDelay is reached. Reconnection attempts continue indefinitely unless // MaxReconnectAttempts is set to a positive value. type Manager struct { - config *Config + config *ManagerConfig logger *zap.Logger conn *websocket.Conn ctx context.Context @@ -73,7 +74,7 @@ type Manager struct { } // NewManager creates a new subscription manager -func NewManager(config *Config) *Manager { +func NewManager(config *ManagerConfig) *Manager { ctx, cancel := context.WithCancel(context.Background()) // Set default reconnection parameters if not provided @@ -202,6 +203,12 @@ func (m *Manager) doConnect() error { headers.Set("X-ReplicaID", m.config.ReplicaID) } + if m.config.WsID != "" { + headers.Set("X-ClientID", m.config.WsID) + } else { + m.logger.Fatal("No wsID provided for X-ClientID header") + } + dialer := websocket.Dialer{ HandshakeTimeout: 10 * time.Second, } @@ -214,7 +221,7 @@ func (m *Manager) doConnect() error { // Extract detailed error information from the response statusCode := response.StatusCode statusText := response.Status - + var respBodyStr string if response.Body != nil { respBytes, readErr := io.ReadAll(response.Body) @@ -223,22 +230,22 @@ func (m *Manager) doConnect() error { respBodyStr = strings.TrimSpace(string(respBytes)) } } - + // Create a clean error message var errorMsg strings.Builder errorMsg.WriteString(fmt.Sprintf("HTTP %d (%s)", statusCode, statusText)) - + if respBodyStr != "" { errorMsg.WriteString(fmt.Sprintf(": %s", respBodyStr)) } - + return fmt.Errorf("%s", errorMsg.String()) } - + // Handle cases where response is nil (e.g., network errors, bad handshake) var errorMsg strings.Builder errorMsg.WriteString("Failed to connect to subscription service") - + // Analyze the error type and provide helpful context errorStr := err.Error() if strings.Contains(errorStr, "bad handshake") { @@ -258,9 +265,9 @@ func (m *Manager) doConnect() error { errorMsg.WriteString(" - DNS resolution failed") errorMsg.WriteString("\nCheck the server hostname in your configuration") } - + errorMsg.WriteString(fmt.Sprintf("\nOriginal error: %v", err)) - + return fmt.Errorf("%s", errorMsg.String()) } diff --git a/client/sync/coordinator.go b/client/sync/coordinator.go index 18aa1df..f05ff27 100644 --- a/client/sync/coordinator.go +++ b/client/sync/coordinator.go @@ -46,6 +46,7 @@ type CoordinatorConfig struct { DryRun bool Logger *zap.Logger Verbose bool + WsID string // Workspace ID for client identification } // Coordinator manages sync operations and subscriptions @@ -240,11 +241,12 @@ func (c *Coordinator) executeSubscribe() error { } // Create subscription manager with reconnection configuration - c.subManager = subscription.NewManager(&subscription.Config{ + c.subManager = subscription.NewManager(&subscription.ManagerConfig{ ServerURL: authResult.ServerURL, ReplicaPath: authResult.RemotePath, AuthToken: authResult.AccessToken, ReplicaID: authResult.ReplicaID, + WsID: c.config.WsID, Logger: c.logger.Named("subscription"), MaxReconnectAttempts: 20, // Infinite reconnect attempts InitialReconnectDelay: 5 * time.Second, // Start with 5 seconds delay @@ -373,6 +375,7 @@ func (c *Coordinator) executePull(isSubscription bool) error { Version: version, SendConfigCmd: true, SendKeyRequest: c.authResolver.CheckNeedsDashFile(c.config.LocalPath, remotePath), + WsID: c.config.WsID, // Add websocket ID //ProgressCallback: remote.DefaultProgressCallback(remote.FormatSimple), ProgressCallback: nil, ProgressConfig: &remote.ProgressConfig{ @@ -499,7 +502,8 @@ func (c *Coordinator) executePush() error { SendConfigCmd: true, SetVisibility: c.config.SetVisibility, CommitMessage: c.config.CommitMessage, - ProgressCallback: nil, //remote.DefaultProgressCallback(remote.FormatSimple), + WsID: c.config.WsID, // Add websocket ID + ProgressCallback: nil, //remote.DefaultProgressCallback(remote.FormatSimple), ProgressConfig: &remote.ProgressConfig{ Enabled: true, Format: remote.FormatSimple, From c947f2694b31bcc5d73235269c8bc18349ff0605 Mon Sep 17 00:00:00 2001 From: pnwmatt <180812017+pnwmatt@users.noreply.github.com> Date: Fri, 3 Oct 2025 15:52:47 -0700 Subject: [PATCH 11/13] send client version --- client/main.go | 3 ++- client/remote/client.go | 3 +++ client/subscription/manager.go | 8 +++----- client/sync/coordinator.go | 6 +++++- 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/client/main.go b/client/main.go index cea0782..45e972a 100644 --- a/client/main.go +++ b/client/main.go @@ -169,7 +169,8 @@ func runSync(cmd *cobra.Command, args []string) error { DryRun: dryRun, Logger: logger, Verbose: verbose, - WsID: wsID, // Add workspace ID + WsID: wsID, // Add websocket ID + ClientVersion: VERSION, }) // Execute the operation diff --git a/client/remote/client.go b/client/remote/client.go index f219e5b..0b00cd2 100644 --- a/client/remote/client.go +++ b/client/remote/client.go @@ -402,6 +402,7 @@ type Config struct { InspectionDepth int // How many bytes to inspect (default: 32) PingPong bool AuthToken string + ClientVersion string // version of the client software SendKeyRequest bool // the -sqlrsync file doesn't exist, so make a token SendConfigCmd bool // we don't have the version number or remote path @@ -686,6 +687,8 @@ func (c *Client) Connect() error { headers.Set("Authorization", c.config.AuthToken) + headers.Set("X-ClientVersion", c.config.ClientVersion); + if c.config.WsID != "" { headers.Set("X-ClientID", c.config.WsID) } else { diff --git a/client/subscription/manager.go b/client/subscription/manager.go index 4c7ee19..74e45c2 100644 --- a/client/subscription/manager.go +++ b/client/subscription/manager.go @@ -41,6 +41,7 @@ type ManagerConfig struct { AuthToken string ReplicaID string WsID string // websocket ID for client identification + ClientVersion string // version of the client software Logger *zap.Logger MaxReconnectAttempts int // Maximum number of reconnect attempts (0 = infinite) InitialReconnectDelay time.Duration // Initial delay before first reconnect @@ -203,11 +204,8 @@ func (m *Manager) doConnect() error { headers.Set("X-ReplicaID", m.config.ReplicaID) } - if m.config.WsID != "" { - headers.Set("X-ClientID", m.config.WsID) - } else { - m.logger.Fatal("No wsID provided for X-ClientID header") - } + headers.Set("X-ClientVersion", m.config.ClientVersion) + headers.Set("X-ClientID", m.config.WsID) dialer := websocket.Dialer{ HandshakeTimeout: 10 * time.Second, diff --git a/client/sync/coordinator.go b/client/sync/coordinator.go index f05ff27..4da1af6 100644 --- a/client/sync/coordinator.go +++ b/client/sync/coordinator.go @@ -46,7 +46,8 @@ type CoordinatorConfig struct { DryRun bool Logger *zap.Logger Verbose bool - WsID string // Workspace ID for client identification + WsID string // Websocket ID for client identification + ClientVersion string // version of the client software } // Coordinator manages sync operations and subscriptions @@ -247,6 +248,7 @@ func (c *Coordinator) executeSubscribe() error { AuthToken: authResult.AccessToken, ReplicaID: authResult.ReplicaID, WsID: c.config.WsID, + ClientVersion: c.config.ClientVersion, Logger: c.logger.Named("subscription"), MaxReconnectAttempts: 20, // Infinite reconnect attempts InitialReconnectDelay: 5 * time.Second, // Start with 5 seconds delay @@ -376,6 +378,7 @@ func (c *Coordinator) executePull(isSubscription bool) error { SendConfigCmd: true, SendKeyRequest: c.authResolver.CheckNeedsDashFile(c.config.LocalPath, remotePath), WsID: c.config.WsID, // Add websocket ID + ClientVersion: c.config.ClientVersion, //ProgressCallback: remote.DefaultProgressCallback(remote.FormatSimple), ProgressCallback: nil, ProgressConfig: &remote.ProgressConfig{ @@ -503,6 +506,7 @@ func (c *Coordinator) executePush() error { SetVisibility: c.config.SetVisibility, CommitMessage: c.config.CommitMessage, WsID: c.config.WsID, // Add websocket ID + ClientVersion: c.config.ClientVersion, ProgressCallback: nil, //remote.DefaultProgressCallback(remote.FormatSimple), ProgressConfig: &remote.ProgressConfig{ Enabled: true, From 1fc41eac6f7c118314ddc7734afd529781178ee8 Mon Sep 17 00:00:00 2001 From: pnwmatt <180812017+pnwmatt@users.noreply.github.com> Date: Fri, 3 Oct 2025 15:54:27 -0700 Subject: [PATCH 12/13] Merge pull request #19 from sqlrsync/showConnectError feat: improve client error reporting from server --- client/AUTO MERGE:.md | 13 ++ client/AUTO-MERGE-STATUS.md | 240 ++++++++++++++++++++++++++++++++ client/IMPLEMENTATION-STATUS.md | 185 ++++++++++++++++++++++++ client/PUSH-Subscription.md | 207 +++++++++++++++++++++++++++ client/SQLDIFF-INTEGRATION.md | 226 ++++++++++++++++++++++++++++++ 5 files changed, 871 insertions(+) create mode 100644 client/AUTO MERGE:.md create mode 100644 client/AUTO-MERGE-STATUS.md create mode 100644 client/IMPLEMENTATION-STATUS.md create mode 100644 client/PUSH-Subscription.md create mode 100644 client/SQLDIFF-INTEGRATION.md diff --git a/client/AUTO MERGE:.md b/client/AUTO MERGE:.md new file mode 100644 index 0000000..c69bfd8 --- /dev/null +++ b/client/AUTO MERGE:.md @@ -0,0 +1,13 @@ +AUTO MERGE: + +If the server responds that there was a newer version that we didn't know about but we've already PUSHed some changes, the server rejects the PUSH and tells us what the latest version is. If the user provides the --merge flag, then we: + +- use LOCAL mode to copy this database to /tmp/sqlrsync-merge-local\*randomstuff +- use PULL mode to grab latest and assert it over that /tmp file +- use sqlrsync --diff to diff the /tmp vs LOCAL and generate a patch file +- if the patch doesn't have any conflicting primary keys, then apply it and push, otherwise send a call to POST serverURL/sapi/notification/account/replicaName/. The server will block this server until a human resolves the conflict. + +Message body for post: +{ type: "merge-conflict", the diff file as base64, the versions impacted, hostname, wsID } + +Here's the source code for https://raw.githubusercontent.com/sqlite/sqlite/refs/heads/master/tool/sqldiff.c diff --git a/client/AUTO-MERGE-STATUS.md b/client/AUTO-MERGE-STATUS.md new file mode 100644 index 0000000..b07870b --- /dev/null +++ b/client/AUTO-MERGE-STATUS.md @@ -0,0 +1,240 @@ +# Auto-Merge Implementation Status + +## ✅ Completed Features + +### 1. CLI Flag + +- `--merge`: Automatically merge changes when server has newer version + +### 2. Version Conflict Detection + +**Location:** `remote/client.go:1279-1297` + +- Detects `ABORT=VERSION_CONFLICT:versionNumber` message from server +- Stores conflict state and latest version number +- Provides methods: + - `HasVersionConflict() bool` - Check if conflict occurred + - `GetLatestVersion() string` - Get server's latest version + - `ResetVersionConflict()` - Clear conflict state + +### 3. Auto-Merge Flow + +**Location:** `sync/coordinator.go:537-557` + +When PUSH fails with version conflict and `--merge` is enabled: + +1. ✅ **Create temp file** - `/tmp/sqlrsync-merge-local-*.sqlite` +2. ✅ **Copy local to temp** - Uses LOCAL mode (`RunDirectSync`) +3. ✅ **PULL latest over temp** - Fetches server version and applies to temp +4. ⚠️ **Generate diff** - Currently stubbed, needs sqldiff integration +5. ⚠️ **Check conflicts** - Currently stubbed, needs primary key conflict detection +6. ✅ **Apply changes** - Copies merged result back to local +7. ✅ **Retry PUSH** - Attempts push again with merged data + +### 4. Conflict Notification + +**Location:** `sync/coordinator.go:1141-1158` + +When conflicts are detected: +- Prepares notification with: + - Type: `"merge-conflict"` + - Diff data (base64 encoded) + - Version information + - Hostname and wsID +- ⚠️ **TODO**: Implement HTTP POST to `$server/sapi/notification/account/$replicaName/` + +## 🚧 TODO: Remaining Work + +### 1. SQLDiff Integration + +Need to implement actual diff generation and parsing: + +```go +// Generate diff between temp (latest) and local (our changes) +// Using sqldiff tool or equivalent +diffOutput := runSQLDiff(tempPath, c.config.LocalPath) + +// Parse diff output to extract primary key operations +conflicts := parseDiffForConflicts(diffOutput) +``` + +**Reference:** https://raw.githubusercontent.com/sqlite/sqlite/refs/heads/master/tool/sqldiff.c + +### 2. Primary Key Conflict Detection + +Parse diff output to detect conflicting operations: + +```go +type DiffOperation struct { + Type string // INSERT, UPDATE, DELETE + Table string + PrimaryKey map[string]interface{} +} + +func parseDiffForConflicts(diffOutput []byte) ([]DiffOperation, error) { + // Parse SQL diff output + // Identify operations on same primary keys + // Return conflicting operations +} +``` + +### 3. HTTP Notification Implementation + +Implement the POST request to notify server of conflicts: + +```go +func (c *Coordinator) sendMergeConflictNotification(serverURL, replicaName, version string, diffData []byte) error { + payload := map[string]interface{}{ + "type": "merge-conflict", + "diff": base64.StdEncoding.EncodeToString(diffData), + "versions": []string{c.config.Version, version}, + "hostname": hostname, + "wsID": wsID, + } + + url := fmt.Sprintf("%s/sapi/notification/account/%s/", serverURL, replicaName) + // POST JSON payload +} +``` + +## 📝 Usage Examples + +### Basic Auto-Merge + +```bash +sqlrsync mydb.sqlite namespace/mydb.sqlite --merge +``` + +When pushing and server has newer version: +1. Automatically pulls latest version +2. Merges changes (if no conflicts) +3. Retries push with merged data + +### Auto-Merge with Subscription + +```bash +sqlrsync mydb.sqlite namespace/mydb.sqlite --subscribe --waitIdle 10m --merge +``` + +- Watches for local changes +- Pushes after 10 minutes idle +- Auto-merges if version conflict occurs +- Continues watching after successful merge + +### Expected Behavior + +#### No Conflicts +``` +🔄 Performing PUSH... +⚠️ Version conflict: Server has newer version 42 +🔄 Auto-merge enabled - attempting to merge with server version 42... +📋 Step 1/5: Copying local database to temp file... +📥 Step 2/5: Pulling latest version 42 from server over temp file... +🔍 Step 3/5: Generating diff between temp (latest) and local (your changes)... +✅ Step 4/5: No conflicts detected +📝 Step 5/5: Applying changes to local database... +✅ Merge completed successfully +✅ Auto-merge successful - retrying PUSH... +✅ PUSH completed +``` + +#### With Conflicts +``` +🔄 Performing PUSH... +⚠️ Version conflict: Server has newer version 42 +🔄 Auto-merge enabled - attempting to merge with server version 42... +📋 Step 1/5: Copying local database to temp file... +📥 Step 2/5: Pulling latest version 42 from server over temp file... +🔍 Step 3/5: Generating diff between temp (latest) and local (your changes)... +❌ Merge conflict detected - server blocking until manual resolution + Server: wss://sqlrsync.com + Replica: namespace/mydb.sqlite + Version: 42 + Hostname: my-laptop + wsID: abc123 +Error: merge conflict requires manual resolution +``` + +## 🏗️ Architecture + +### Flow Diagram + +``` +┌─────────────┐ +│ PUSH Failed │ +│ (version │ +│ conflict) │ +└──────┬──────┘ + │ + ▼ + ┌───────────┐ + │ --merge? │ + └─────┬─────┘ + │ yes + ▼ + ┌──────────────────┐ + │ Copy LOCAL │ + │ to /tmp/file │ + └────────┬─────────┘ + │ + ▼ + ┌──────────────────┐ + │ PULL latest │ + │ over /tmp/file │ + └────────┬─────────┘ + │ + ▼ + ┌──────────────────┐ + │ Generate diff │ + │ (sqldiff) │ + └────────┬─────────┘ + │ + ▼ + ┌──────────────────┐ + │ Check PK │ + │ conflicts? │ + └─────┬────────────┘ + │ + ┌────┴────┐ + │ │ + no yes + │ │ + ▼ ▼ +┌────────┐ ┌──────────────┐ +│ Apply │ │ POST to │ +│ diff │ │ /sapi/notify │ +└───┬────┘ └──────┬───────┘ + │ │ + ▼ ▼ +┌────────┐ ┌──────────────┐ +│ Retry │ │ Error: │ +│ PUSH │ │ Manual fix │ +└────────┘ └──────────────┘ +``` + +### Server Requirements + +The server needs to: + +1. **Detect version conflicts** - When client pushes with old version number +2. **Send ABORT message** - Format: `ABORT=VERSION_CONFLICT:latestVersion` +3. **Handle notification endpoint** - `POST /sapi/notification/account/{replicaName}/` + - Accept JSON payload with conflict details + - Block replica until manual resolution + - Notify account owner of conflict + +### Client State + +During auto-merge, the client maintains: +- Original local database (unchanged until merge succeeds) +- Temp file with server's latest version +- Diff between temp and local +- Conflict detection results +- Version numbers (old and new) + +## 🔐 Security Considerations + +1. **Temp file cleanup** - Always cleaned up via `defer os.Remove(tempPath)` +2. **Authentication** - Uses same auth as regular PULL/PUSH +3. **Version validation** - Server must validate version numbers to prevent replay attacks +4. **Conflict data** - Diff data may contain sensitive information, ensure HTTPS for notifications diff --git a/client/IMPLEMENTATION-STATUS.md b/client/IMPLEMENTATION-STATUS.md new file mode 100644 index 0000000..7a067fe --- /dev/null +++ b/client/IMPLEMENTATION-STATUS.md @@ -0,0 +1,185 @@ +# PUSH Subscription Implementation Status + +## ✅ Completed Features + +### 1. CLI Flags + +- `--subscribe`: Enable subscription mode for both PUSH and PULL +- `--waitIdle`: Time to wait for idleness before pushing (e.g., 10m, 1h30m, 3d) - min 10 seconds +- `--maxInterval`: Maximum time between pushes regardless of activity (e.g., 24h, 1w) +- `--minInterval`: Minimum time between subsequent pushes (defaults to 1/2 maxInterval) + +### 2. Duration Parsing + +- Custom duration parser supporting: s, m, h, d (days), w (weeks) +- Does not support months (as specified) +- Validates waitIdle min (10 seconds) and max (24 days) + +### 3. File Watching + +- Uses fsnotify to monitor SQLite database and WAL files +- Detects Write and Create events +- Non-blocking change notifications via channel +- Callback sends SQLRSYNC_CHANGED notifications to server for analytics + +### 4. PUSH Subscription Core Logic + +- **Initial PUSH**: Performs initial PUSH when subscription starts +- **Key type validation**: Checks if PULL key is being used and switches to PULL subscription mode +- **File change detection**: Monitors both .sqlite and .sqlite-wal files +- **Singleton waitIdle timer**: Timer that resets on each file change for another MAX(waitIdle OR lastPush - minInterval). On expiration, performs PUSH and resets sessionStarted=false +- **First write notification**: Sends SQLRSYNC_CHANGED with waitIdle duration, sets sessionStarted=true +- **Subsequent write notifications**: If resend timer isn't running, starts timer for (waitIdle - 10 seconds). When timer fires, resends SQLRSYNC_CHANGED with remaining duration +- **minInterval enforcement**: Ensures minimum spacing between pushes +- **maxInterval enforcement**: Forces push if maxInterval time has elapsed since last push +- **Timer reset on change**: Each file change recalculates and resets the timer + +### 5. Error Handling + +- Exponential backoff with retry (5 attempts) +- Delays: 5s, 10s, 20s, 40s, 80s +- Error reporting to server stub (HTTPS POST to $server/sapi/$replicaID) - not yet implemented +- Continues watching even after push failures + +### 6. User Notifications + +- Progress messages for all push operations +- Clear status updates during file watching +- Timer duration logging in verbose mode +- Automatic warning when PULL key is used with --subscribe flag + +### 7. Protocol Messages + +```go +const ( + SQLRSYNC_CONFIG = 0x51 // Config with keys, replicaID, keyType + SQLRSYNC_NEWREPLICAVERSION = 0x52 // New version available + SQLRSYNC_KEYREQUEST = 0x53 // Request keys + SQLRSYNC_COMMITMESSAGE = 0x54 // Commit message + SQLRSYNC_CHANGED = 0x57 // Write detected notification with duration +) +``` + +**SQLRSYNC_CHANGED Format:** +- `[0x57][duration: 4 bytes as seconds]` +- Sent on first write with waitIdle duration +- Resent 10 seconds before expiration if there were subsequent writes +- Contains remaining time until push will occur + +### 8. Key Type Detection + +- Server sends `keyType` field in CONFIG message (`"PUSH"` or `"PULL"`) +- Client parses keyType from CONFIG JSON (remote/client.go:1303-1306) +- If PULL key detected with --subscribe flag, client automatically switches to PULL subscription mode +- Prevents misuse of --waitIdle/--maxInterval/--minInterval with PULL keys + +## 🚧 TODO: Remaining Work + +### 1. Error Reporting to Server + +Stub exists at `coordinator.go:912-914`. Need to implement: + +```go +func (c *Coordinator) reportErrorToServer(err error) { + // HTTPS POST to $server/sapi/$replicaID + // Include error message + // Only if failures persist for > 5 minutes +} +``` + +## 📝 Usage Examples + +### Basic PUSH subscription + +```bash +sqlrsync mydb.sqlite namespace/mydb.sqlite --subscribe --waitIdle 10m --maxInterval 24h +``` + +- Pushes after 10 minutes of idleness +- Forces push at least every 24 hours even with continuous activity +- Sends SQLRSYNC_CHANGED on first write, resends 10s before expiration if subsequent writes + +### Custom intervals + +```bash +sqlrsync mydb.sqlite namespace/mydb.sqlite --subscribe \ + --waitIdle 5m --minInterval 10m --maxInterval 1h +``` + +- Timer = MAX(10m - timeSinceLastPush, 5m) +- If last push was 3 min ago: timer = MAX(7m, 5m) = 7m +- If last push was 12 min ago: timer = MAX(-2m, 5m) = 5m + +### Using with PULL key (auto-switches) + +```bash +# This will automatically switch to PULL subscription mode +sqlrsync namespace/mydb.sqlite --subscribe --waitIdle 10m +``` + +- Client detects PULL key from server CONFIG +- Ignores --waitIdle/--maxInterval/--minInterval +- Switches to standard PULL subscription mode + +## 🏗️ Architecture + +### File Structure + +``` +client/ +├── main.go # CLI flags, operation routing +├── sync/ +│ ├── coordinator.go # Main sync orchestration + PUSH subscribe +│ └── duration.go # Duration parsing utilities +├── watcher/ +│ └── watcher.go # File system monitoring +├── remote/ +│ └── client.go # WebSocket client with CHANGED notifications +└── subscription/ + └── manager.go # PULL subscription (existing) +``` + +### Flow for PUSH Subscribe + +1. Parse and validate duration parameters (min 10s for waitIdle) +2. Execute initial PUSH +3. Establish persistent WebSocket connection +4. Check key type from CONFIG - switch to PULL subscription if PULL key detected +5. Start file watcher +6. Event loop: + - Check for maxInterval timeout + - Wait for file changes + - On first write: send SQLRSYNC_CHANGED with waitIdle duration, set sessionStarted=true + - On subsequent writes: schedule resend for 10s before expiration + - Apply timing logic (waitIdle, minInterval) + - Execute PUSH with retry on timer expiration + - Reset sessionStarted=false after successful push + +## 🔄 Server-Side Responsibilities + +### Conflict Resolution + +When a client pushes and is not in sync: + +1. Pull down the version the client was aware of → **OLD** +2. Pull down the latest version → **LATEST** +3. Treat client's current copy as → **CANDIDATE** +4. Run `sqlite_diff OLD CANDIDATE` → **second.diff** +5. Run `sqlite_diff OLD LATEST` → **first.diff** +6. If no overlap between primary keys in both diffs: + - Apply second.diff to LATEST + - Push result as new version + - Discard CANDIDATE + +### Key Type Validation + +- Server sends `keyType` in CONFIG message +- Valid values: `"PUSH"` or `"PULL"` +- Client uses this to validate operation mode +- Prevents PUSH operations with PULL-only keys + +### Analytics + +- Server receives SQLRSYNC_CHANGED (0x57) messages with duration +- Can track client activity and expected push timing +- Updates on subsequent writes provide refined timing expectations diff --git a/client/PUSH-Subscription.md b/client/PUSH-Subscription.md new file mode 100644 index 0000000..32947b9 --- /dev/null +++ b/client/PUSH-Subscription.md @@ -0,0 +1,207 @@ +PUSH Subscription mode is a new feature for SQLRsync that allows +you to optionally automatically PUSH changes from the LOCAL to +REMOTE. + +The feature works by using OS native file watchers to monitor changes +to the file, and when a change is detected, begin a PUSH operation after +a variable delay. Users specify how long to a fixed amount of time since +the last change. + +The subscription mode is activated by adding the --subscribe flag to a +PUSH command. The first time the command is run, it will do a normal +PUSH, and then begin monitoring the file for changes. When a change is +detected, it will wait the specified delay time (or auto mode) and then +begin a PUSH operation. If another change is detected during the wait +time, the wait timer is reset UNLESS it has been X hours since the last +PUSH, in which case it will immediately begin a PUSH operation. + +Example: +sqlrsync mydb.sqlite namespace/mydb.sqlite --subscribe --waitIdle 10m --maxInterval 24h + +Wait for 10 minutes of idleness before pushing, but in the event that there's +activity every 8 minutes for multiple days straight, it will at least push +24 hours after the last push. + +Example: +sqlrsync mydb.sqlite namespace/mydb.sqlite -m "turn on push subscribe" --subscribe --waitIdle 10m --maxInterval 24h + +With push subscribe the database is checked to see if PRAGMA busy_timeout is +greater than waitIdle. If not, it asks the user what value they want to increase +it to and sets it. + +## Coordination Mode (Optional) + +If --coordinate is specified, the client opts into the coordination system. Each client +configures their own waitIdle and maxInterval values independently - these are NOT shared +between clients. + +When --coordinate is enabled: +- Client does NOT send SQLRSYNC_COORDINATE to other clients +- Client registers with server as participating in coordination +- First file change detected: + - Client sends `SQLRSYNC_CLAIM $waitIdle` to server + - Starts extension timer for `waitIdle - 10 seconds` +- Subsequent writes: + - Sets flag "had additional writes since CLAIM" + - Does NOT send CLAIM immediately (minimizes traffic) +- When extension timer fires (10 seconds before claim expires): + - If flag is set: sends `SQLRSYNC_CLAIM $remainingWaitIdle` + - Where `remainingWaitIdle = waitIdle - (seconds since last write)` + - Starts new timer for `remainingWaitIdle - 10 seconds` + - If no additional writes: lets original claim expire naturally +- Server rebroadcasts CLAIM to all other coordinating clients for this replica +- Other clients suppress their own PUSH attempts during the claim window (polite yield) + +Without --coordinate: +- No CLAIM system is used +- Sends `SQLRSYNC_CHANGED` notifications to server for analytics +- All clients push independently based on their local timers +- Risk of concurrent pushes and conflicts (accepted risk) +- Server handles conflicts via SQLite rsync page-level merging + +If we are subscribing with these PUSH flags, then we will PUSH when the command starts. + +--minInterval, which defaults to 1/2 maxInterval but otherwise specifies the minimum +gap between subsequent PUSHes. + +The server will warn and ignore if --waitIdle, --maxInterval, or --minInterval are used with a PULL Key. + +--waitIdle, --minInterval, and --maxInterval both allow time durations like 3s, 10m, 1h30m, 2h, 1h45m, 3d, 1w, 4w, etc. They do not support months. + +The max for waitIdle is 24 days because busy_time is non-negative integer representing milliseconds, which is 24.8 days. The min is 1 second. + +Example: +sqlrsync mydb.sqlite namespace/mydb.sqlite -m "turn on coordinated subscribe" --subscribe --waitIdle 10m --maxInterval 24h --coordinate + +## Split-Brain Handling + +When two clients send SQLRSYNC_CLAIM simultaneously: +- Server accepts the first CLAIM it receives +- Responds to first client with `SQLRSYNC_CLAIM` (no arguments) = acknowledgment +- Broadcasts to all other coordinating clients: `SQLRSYNC_CLAIM $waitIdle` (using claimer's waitIdle) +- Second client receives the other client's CLAIM instead of acknowledgment +- Second client suppresses its push attempts for that waitIdle duration +- Server ignores subsequent CLAIMs until the current claim's waitIdle expires + +Note: CLAIMS do not block local writes - both databases continue accepting writes. +The coordination only affects when pushes are attempted. + +In addition to --waitIlde, --maxInterval, another flag is + +Areas for Consideration & Additional Scenarios + +1. Error Handling & Recovery + Network failures: What happens if a PUSH fails due to network issues? Should it retry with exponential backoff? + Matt says: Yes, exponential backoff using the existing exponential backoff code. The client will do an HTTPS POST request to $server/sapi/$replicaID with + the error message if it has been more than 5 minutes of failure. + +Partial sync failures: How should the system handle cases where a PUSH partially completes? +Matt says: that'd be handled the same as a network failure: exponential backoff. The client will do an HTTPS POST request to $server/sapi/$replicaID with +the error message 100% of the time. + +Server unavailability: Should the system queue changes locally and sync when the server becomes available? +Matt says: the above network and partial sync cases handle that with existing exponential backoff code. + +2. File System Edge Cases + Temporary files: Should the file watcher ignore temporary files created by editors (e.g., .tmp, .swp files)? + Matt says: yes, both the name of the file and -wal should be monitored. + +Atomic writes: How does it handle applications that write to temporary files and then rename them? +Matt says: The sqlite spec may do that to the local.db-wal file, but that will always trigger a traditional write to the local.db file + +Multiple rapid changes: What about scenarios with hundreds of small changes in quick succession? +Matt says: the --waitIdle code quickly ignores it if it hasn't been 75% of the waitIdle time or --maxInterval since the last PUSH + +File locking: What happens if the SQLite database is locked when trying to PUSH? +Matt says: The sqlrsync client PUSHes without needing a write lock by using a special control table in the sqlite database that has already been built into sqlrsync. + +3. Coordination Protocol Edge Cases + Split-brain scenarios: What if network partitions cause multiple clients to think they have the claim? + Matt says: the server is single threaded and we accept that the first claim the server received will win. + +Client crashes: How is a stale claim cleaned up if a client crashes while holding it? +Matt says: the claim is time-limited to waitIdle, so if a client crashes while holding it, the claim will expire after waitIdle time. + +Clock skew: How does the system handle clients with different system times? +Matt says: figuring in internet latency, since times are relative, other systems might be off by a few seconds. + +Late joiners: What happens when a new client subscribes to a database that already has active claims? 4. Performance & Resource Management +Matt says: The server will send a CLAIM when it connects if one already exists. + +Resource cleanup: How are file watchers cleaned up when subscription ends? +Matt says: These are long running permanent processes that should be added to systemd or similar to ensure they are restarted if they crash. + +Memory usage: For long-running subscriptions, how is memory managed? +Matt says: The file watcher uses minimal memory - mostly waiting, and not using any dynamic memory. + +CPU usage: What's the impact of continuous file watching on system resources? 5. User Experience +Matt says: I believe there are hooks into the OS that use no resources until a change is detected. + +Status visibility: How can users see the current subscription status, active claims, etc.? +Matt says: This is handled serverside. + +Graceful shutdown: How should users stop subscription mode? +Matt says: This is already handled by ctrl-c + +Logging: What level of logging should be provided for debugging coordination issues? +Matt says: Verbose logging is already available and can be enabled with the --verbose flag. + +# Specific documentation for the server side + +CLAIMS are sent from the worker to the durable for the account, which makes decisions and coordinates for that since it will be less resource constrained. + +CLAIMS do not block other writes. Both will be saved. + +# Questions for Claude + +1. How do concurrent writes work? + +**Answer**: Based on the specification, here's how concurrent writes are handled: + +## Without --coordinate flag (Default, No Coordination) +- Each client operates independently with no coordination +- The timer logic (`MAX(minInterval - timeSinceLastPush, waitIdle)`) prevents too-frequent pushes from same client +- All clients can push whenever their timers expire +- Server accepts all pushes in order received +- Risk of concurrent pushes and conflicts (accepted risk) +- Server handles conflicts via SQLite rsync page-level merging + +## With --coordinate flag (Optional Coordination) +- **--coordinate is optional** - each client opts in individually +- **Each client configures own waitIdle/maxInterval** - these are NOT shared between clients +- **CLAIMS do not block local writes** - all local databases continue accepting writes + +When Client A (with --coordinate) detects a change: +1. First write: + - Sends `SQLRSYNC_CLAIM $waitIdle` to server + - Starts extension timer for `waitIdle - 10 seconds` +2. Subsequent writes: + - Sets flag "had additional writes" + - Does NOT send CLAIM yet +3. When extension timer fires (10s before claim expires): + - If flag set: sends `SQLRSYNC_CLAIM $remainingWaitIdle` + - Where `remainingWaitIdle = waitIdle - (seconds since last write)` + - Starts new timer for `remainingWaitIdle - 10 seconds` + +Server (via durable object) handles CLAIM: +- Accepts first CLAIM for a replica +- Responds to Client A: `SQLRSYNC_CLAIM` (no args) = acknowledgment +- Broadcasts to other coordinating clients: `SQLRSYNC_CLAIM $waitIdle` (using A's waitIdle) + +Other coordinating clients receiving the CLAIM: +- **Do NOT block local writes** (application continues writing) +- Set a timer for received $waitIdle duration +- Suppress their own PUSH and CLAIM attempts during this window (polite yield) +- After timer expires, can push accumulated changes + +Client A completes its PUSH, then other clients can push their accumulated changes. + +## Conflict Resolution +Since CLAIMS don't block writes and coordination is optional: +1. Client A (coordinating) writes locally + claims +2. Client B (coordinating) writes locally but yields on push +3. Client C (no --coordinate) writes and pushes independently +4. Server accepts pushes in order received +5. Conflicts resolved using SQLite rsync protocol (page-level merging) + +The coordination is about **reducing push collisions** and **giving polite turn-taking** among coordinating clients, not preventing concurrent local writes. Non-coordinating clients can still push anytime. diff --git a/client/SQLDIFF-INTEGRATION.md b/client/SQLDIFF-INTEGRATION.md new file mode 100644 index 0000000..743a990 --- /dev/null +++ b/client/SQLDIFF-INTEGRATION.md @@ -0,0 +1,226 @@ +# SQLDiff Integration + +## Overview + +Created a CGO wrapper to integrate SQLite's sqldiff functionality into the auto-merge feature. + +## Files Created + +### 1. `/bridge/sqldiff_wrapper.h` +C header file defining the interface: +```c +int sqldiff_run(const char *db1, const char *db2, char **result, char **error); +``` + +### 2. `/bridge/sqldiff_wrapper.c` +C implementation that: +- Opens both databases in read-only mode +- Currently returns a placeholder diff +- **TODO**: Integrate full sqldiff.c logic once `sqlite3_stdio.h` is available + +### 3. `/bridge/cgo_sqldiff.go` +Go wrapper providing: + +**Main Function:** +```go +func RunSQLDiff(db1Path, db2Path string) (*DiffResult, error) +``` + +**Data Structures:** +```go +type DiffResult struct { + SQL string // SQL statements to transform db1 to db2 + HasChanges bool // Whether there are any differences + Operations []DiffOperation // Parsed operations from the diff + Conflicts []PrimaryKeyConflict // Detected primary key conflicts +} + +type DiffOperation struct { + Type string // INSERT, UPDATE, DELETE + Table string // Table name + PrimaryKey map[string]interface{} // Primary key values + SQL string // The actual SQL statement +} + +type PrimaryKeyConflict struct { + Table string + PrimaryKey map[string]interface{} + Operation1 string // First operation + Operation2 string // Second operation +} +``` + +**Helper Function:** +```go +func ApplyDiff(dbPath string, diff *DiffResult) error +``` + +## Integration with Auto-Merge + +### Location: `sync/coordinator.go:1112-1156` + +The auto-merge flow now: + +1. **Creates temp file** - Copies local database +2. **Pulls latest** - Gets server's version over temp +3. **Generates diff** - Calls `bridge.RunSQLDiff(tempPath, localPath)` +4. **Checks for changes** - Returns early if databases are identical +5. **Detects conflicts** - Checks for primary key conflicts +6. **Sends notification** - If conflicts exist, notifies server +7. **Applies diff** - If no conflicts, applies changes +8. **Retries push** - Pushes merged result + +### Usage in Code + +```go +diffResult, err := bridge.RunSQLDiff(tempPath, c.config.LocalPath) +if err != nil { + return fmt.Errorf("failed to generate diff: %w", err) +} + +if !diffResult.HasChanges { + fmt.Println("✅ No changes detected - databases are identical") + return nil +} + +if len(diffResult.Conflicts) > 0 { + fmt.Printf("❌ Detected %d primary key conflict(s)\n", len(diffResult.Conflicts)) + return c.sendMergeConflictNotification(serverURL, remotePath, latestVersion, []byte(diffResult.SQL)) +} + +if err := bridge.ApplyDiff(tempPath, diffResult); err != nil { + // Fallback to simple copy + c.logger.Warn("Failed to apply diff, using direct copy", zap.Error(err)) +} +``` + +## Current Status + +### ✅ Completed +- CGO wrapper infrastructure +- Go type definitions +- Integration with coordinator +- Error handling and fallbacks +- Placeholder diff generation + +### 🚧 TODO + +#### 1. Full SQLDiff Integration +The `sqldiff.c` file is present but not compiled due to missing `sqlite3_stdio.h`. + +**Next Steps:** +- Obtain `sqlite3_stdio.h` and `sqlite3_stdio.c` from SQLite ext/misc +- Remove `//go:build ignore` tag from `sqldiff.c` +- Modify `sqldiff_wrapper.c` to call actual diff functions +- Update CGO build flags if needed + +#### 2. SQL Parsing (parseSQL) +Currently stubbed in `cgo_sqldiff.go:75-82`. + +**Needs:** +```go +func parseSQL(sql string) []DiffOperation { + // Parse SQL statements + // Extract INSERT/UPDATE/DELETE operations + // Identify table names and primary keys + // Return structured operations +} +``` + +#### 3. Conflict Detection (detectConflicts) +Currently stubbed in `cgo_sqldiff.go:85-92`. + +**Algorithm:** +```go +func detectConflicts(operations []DiffOperation) []PrimaryKeyConflict { + // Group operations by table and primary key + // Find cases where same PK has multiple operations: + // - Multiple UPDATEs on same PK + // - UPDATE + DELETE on same PK + // - INSERT on existing PK + // Return conflicts +} +``` + +#### 4. Diff Application (ApplyDiff) +Currently stubbed in `cgo_sqldiff.go:95-105`. + +**Needs:** +```go +func ApplyDiff(dbPath string, diff *DiffResult) error { + // Open database + // Begin transaction + // Execute each SQL statement from diff + // Commit transaction + // Handle errors with rollback +} +``` + +## Testing Strategy + +### Unit Tests Needed + +1. **TestRunSQLDiff** - Test diff generation + - Identical databases → no changes + - Simple INSERT → detected + - UPDATE operation → detected + - DELETE operation → detected + +2. **TestParseSQL** - Test SQL parsing + - Parse INSERT statements + - Parse UPDATE statements + - Parse DELETE statements + - Extract primary keys correctly + +3. **TestDetectConflicts** - Test conflict detection + - No conflicts case + - UPDATE + UPDATE conflict + - INSERT + INSERT conflict + - UPDATE + DELETE conflict + +4. **TestApplyDiff** - Test diff application + - Apply simple changes + - Rollback on error + - Verify final state + +### Integration Tests Needed + +1. **TestAutoMergeNoConflict** - Full merge flow without conflicts +2. **TestAutoMergeWithConflict** - Merge with conflicts triggers notification +3. **TestAutoMergeFallback** - Falls back to copy on apply failure + +## Build Configuration + +The sqldiff.c is currently excluded from build: +```c +//go:build ignore +``` + +Once `sqlite3_stdio.h` is available, remove this tag and ensure: +- CGO can find all required headers +- SQLite library is linked properly +- Build succeeds on all platforms + +## Dependencies + +- SQLite3 library (`-lsqlite3`) +- C standard library +- `sqlite3_stdio.h` (pending) +- `sqlite3_stdio.c` (pending) + +## Error Handling + +The wrapper handles errors at multiple levels: + +1. **C Level** - Opens databases, checks SQLite errors +2. **CGO Level** - Converts C strings/errors to Go +3. **Go Level** - Returns structured errors with context +4. **Coordinator Level** - Falls back to simple copy if diff fails + +## Future Enhancements + +1. **Streaming Diffs** - For very large databases +2. **Incremental Parsing** - Parse SQL as it's generated +3. **Custom Conflict Resolution** - User-defined merge strategies +4. **Diff Caching** - Cache diffs for retry scenarios +5. **Progress Reporting** - Show diff generation progress From 3f2dd13254f1e9fefa740170aba1a36aaf213782 Mon Sep 17 00:00:00 2001 From: pnwmatt <180812017+pnwmatt@users.noreply.github.com> Date: Fri, 3 Oct 2025 15:56:55 -0700 Subject: [PATCH 13/13] cleanup --- client/AUTO MERGE:.md | 13 -- client/AUTO-MERGE-STATUS.md | 240 -------------------------------- client/IMPLEMENTATION-STATUS.md | 185 ------------------------ client/PUSH-Subscription.md | 207 --------------------------- client/SQLDIFF-INTEGRATION.md | 226 ------------------------------ 5 files changed, 871 deletions(-) delete mode 100644 client/AUTO MERGE:.md delete mode 100644 client/AUTO-MERGE-STATUS.md delete mode 100644 client/IMPLEMENTATION-STATUS.md delete mode 100644 client/PUSH-Subscription.md delete mode 100644 client/SQLDIFF-INTEGRATION.md diff --git a/client/AUTO MERGE:.md b/client/AUTO MERGE:.md deleted file mode 100644 index c69bfd8..0000000 --- a/client/AUTO MERGE:.md +++ /dev/null @@ -1,13 +0,0 @@ -AUTO MERGE: - -If the server responds that there was a newer version that we didn't know about but we've already PUSHed some changes, the server rejects the PUSH and tells us what the latest version is. If the user provides the --merge flag, then we: - -- use LOCAL mode to copy this database to /tmp/sqlrsync-merge-local\*randomstuff -- use PULL mode to grab latest and assert it over that /tmp file -- use sqlrsync --diff to diff the /tmp vs LOCAL and generate a patch file -- if the patch doesn't have any conflicting primary keys, then apply it and push, otherwise send a call to POST serverURL/sapi/notification/account/replicaName/. The server will block this server until a human resolves the conflict. - -Message body for post: -{ type: "merge-conflict", the diff file as base64, the versions impacted, hostname, wsID } - -Here's the source code for https://raw.githubusercontent.com/sqlite/sqlite/refs/heads/master/tool/sqldiff.c diff --git a/client/AUTO-MERGE-STATUS.md b/client/AUTO-MERGE-STATUS.md deleted file mode 100644 index b07870b..0000000 --- a/client/AUTO-MERGE-STATUS.md +++ /dev/null @@ -1,240 +0,0 @@ -# Auto-Merge Implementation Status - -## ✅ Completed Features - -### 1. CLI Flag - -- `--merge`: Automatically merge changes when server has newer version - -### 2. Version Conflict Detection - -**Location:** `remote/client.go:1279-1297` - -- Detects `ABORT=VERSION_CONFLICT:versionNumber` message from server -- Stores conflict state and latest version number -- Provides methods: - - `HasVersionConflict() bool` - Check if conflict occurred - - `GetLatestVersion() string` - Get server's latest version - - `ResetVersionConflict()` - Clear conflict state - -### 3. Auto-Merge Flow - -**Location:** `sync/coordinator.go:537-557` - -When PUSH fails with version conflict and `--merge` is enabled: - -1. ✅ **Create temp file** - `/tmp/sqlrsync-merge-local-*.sqlite` -2. ✅ **Copy local to temp** - Uses LOCAL mode (`RunDirectSync`) -3. ✅ **PULL latest over temp** - Fetches server version and applies to temp -4. ⚠️ **Generate diff** - Currently stubbed, needs sqldiff integration -5. ⚠️ **Check conflicts** - Currently stubbed, needs primary key conflict detection -6. ✅ **Apply changes** - Copies merged result back to local -7. ✅ **Retry PUSH** - Attempts push again with merged data - -### 4. Conflict Notification - -**Location:** `sync/coordinator.go:1141-1158` - -When conflicts are detected: -- Prepares notification with: - - Type: `"merge-conflict"` - - Diff data (base64 encoded) - - Version information - - Hostname and wsID -- ⚠️ **TODO**: Implement HTTP POST to `$server/sapi/notification/account/$replicaName/` - -## 🚧 TODO: Remaining Work - -### 1. SQLDiff Integration - -Need to implement actual diff generation and parsing: - -```go -// Generate diff between temp (latest) and local (our changes) -// Using sqldiff tool or equivalent -diffOutput := runSQLDiff(tempPath, c.config.LocalPath) - -// Parse diff output to extract primary key operations -conflicts := parseDiffForConflicts(diffOutput) -``` - -**Reference:** https://raw.githubusercontent.com/sqlite/sqlite/refs/heads/master/tool/sqldiff.c - -### 2. Primary Key Conflict Detection - -Parse diff output to detect conflicting operations: - -```go -type DiffOperation struct { - Type string // INSERT, UPDATE, DELETE - Table string - PrimaryKey map[string]interface{} -} - -func parseDiffForConflicts(diffOutput []byte) ([]DiffOperation, error) { - // Parse SQL diff output - // Identify operations on same primary keys - // Return conflicting operations -} -``` - -### 3. HTTP Notification Implementation - -Implement the POST request to notify server of conflicts: - -```go -func (c *Coordinator) sendMergeConflictNotification(serverURL, replicaName, version string, diffData []byte) error { - payload := map[string]interface{}{ - "type": "merge-conflict", - "diff": base64.StdEncoding.EncodeToString(diffData), - "versions": []string{c.config.Version, version}, - "hostname": hostname, - "wsID": wsID, - } - - url := fmt.Sprintf("%s/sapi/notification/account/%s/", serverURL, replicaName) - // POST JSON payload -} -``` - -## 📝 Usage Examples - -### Basic Auto-Merge - -```bash -sqlrsync mydb.sqlite namespace/mydb.sqlite --merge -``` - -When pushing and server has newer version: -1. Automatically pulls latest version -2. Merges changes (if no conflicts) -3. Retries push with merged data - -### Auto-Merge with Subscription - -```bash -sqlrsync mydb.sqlite namespace/mydb.sqlite --subscribe --waitIdle 10m --merge -``` - -- Watches for local changes -- Pushes after 10 minutes idle -- Auto-merges if version conflict occurs -- Continues watching after successful merge - -### Expected Behavior - -#### No Conflicts -``` -🔄 Performing PUSH... -⚠️ Version conflict: Server has newer version 42 -🔄 Auto-merge enabled - attempting to merge with server version 42... -📋 Step 1/5: Copying local database to temp file... -📥 Step 2/5: Pulling latest version 42 from server over temp file... -🔍 Step 3/5: Generating diff between temp (latest) and local (your changes)... -✅ Step 4/5: No conflicts detected -📝 Step 5/5: Applying changes to local database... -✅ Merge completed successfully -✅ Auto-merge successful - retrying PUSH... -✅ PUSH completed -``` - -#### With Conflicts -``` -🔄 Performing PUSH... -⚠️ Version conflict: Server has newer version 42 -🔄 Auto-merge enabled - attempting to merge with server version 42... -📋 Step 1/5: Copying local database to temp file... -📥 Step 2/5: Pulling latest version 42 from server over temp file... -🔍 Step 3/5: Generating diff between temp (latest) and local (your changes)... -❌ Merge conflict detected - server blocking until manual resolution - Server: wss://sqlrsync.com - Replica: namespace/mydb.sqlite - Version: 42 - Hostname: my-laptop - wsID: abc123 -Error: merge conflict requires manual resolution -``` - -## 🏗️ Architecture - -### Flow Diagram - -``` -┌─────────────┐ -│ PUSH Failed │ -│ (version │ -│ conflict) │ -└──────┬──────┘ - │ - ▼ - ┌───────────┐ - │ --merge? │ - └─────┬─────┘ - │ yes - ▼ - ┌──────────────────┐ - │ Copy LOCAL │ - │ to /tmp/file │ - └────────┬─────────┘ - │ - ▼ - ┌──────────────────┐ - │ PULL latest │ - │ over /tmp/file │ - └────────┬─────────┘ - │ - ▼ - ┌──────────────────┐ - │ Generate diff │ - │ (sqldiff) │ - └────────┬─────────┘ - │ - ▼ - ┌──────────────────┐ - │ Check PK │ - │ conflicts? │ - └─────┬────────────┘ - │ - ┌────┴────┐ - │ │ - no yes - │ │ - ▼ ▼ -┌────────┐ ┌──────────────┐ -│ Apply │ │ POST to │ -│ diff │ │ /sapi/notify │ -└───┬────┘ └──────┬───────┘ - │ │ - ▼ ▼ -┌────────┐ ┌──────────────┐ -│ Retry │ │ Error: │ -│ PUSH │ │ Manual fix │ -└────────┘ └──────────────┘ -``` - -### Server Requirements - -The server needs to: - -1. **Detect version conflicts** - When client pushes with old version number -2. **Send ABORT message** - Format: `ABORT=VERSION_CONFLICT:latestVersion` -3. **Handle notification endpoint** - `POST /sapi/notification/account/{replicaName}/` - - Accept JSON payload with conflict details - - Block replica until manual resolution - - Notify account owner of conflict - -### Client State - -During auto-merge, the client maintains: -- Original local database (unchanged until merge succeeds) -- Temp file with server's latest version -- Diff between temp and local -- Conflict detection results -- Version numbers (old and new) - -## 🔐 Security Considerations - -1. **Temp file cleanup** - Always cleaned up via `defer os.Remove(tempPath)` -2. **Authentication** - Uses same auth as regular PULL/PUSH -3. **Version validation** - Server must validate version numbers to prevent replay attacks -4. **Conflict data** - Diff data may contain sensitive information, ensure HTTPS for notifications diff --git a/client/IMPLEMENTATION-STATUS.md b/client/IMPLEMENTATION-STATUS.md deleted file mode 100644 index 7a067fe..0000000 --- a/client/IMPLEMENTATION-STATUS.md +++ /dev/null @@ -1,185 +0,0 @@ -# PUSH Subscription Implementation Status - -## ✅ Completed Features - -### 1. CLI Flags - -- `--subscribe`: Enable subscription mode for both PUSH and PULL -- `--waitIdle`: Time to wait for idleness before pushing (e.g., 10m, 1h30m, 3d) - min 10 seconds -- `--maxInterval`: Maximum time between pushes regardless of activity (e.g., 24h, 1w) -- `--minInterval`: Minimum time between subsequent pushes (defaults to 1/2 maxInterval) - -### 2. Duration Parsing - -- Custom duration parser supporting: s, m, h, d (days), w (weeks) -- Does not support months (as specified) -- Validates waitIdle min (10 seconds) and max (24 days) - -### 3. File Watching - -- Uses fsnotify to monitor SQLite database and WAL files -- Detects Write and Create events -- Non-blocking change notifications via channel -- Callback sends SQLRSYNC_CHANGED notifications to server for analytics - -### 4. PUSH Subscription Core Logic - -- **Initial PUSH**: Performs initial PUSH when subscription starts -- **Key type validation**: Checks if PULL key is being used and switches to PULL subscription mode -- **File change detection**: Monitors both .sqlite and .sqlite-wal files -- **Singleton waitIdle timer**: Timer that resets on each file change for another MAX(waitIdle OR lastPush - minInterval). On expiration, performs PUSH and resets sessionStarted=false -- **First write notification**: Sends SQLRSYNC_CHANGED with waitIdle duration, sets sessionStarted=true -- **Subsequent write notifications**: If resend timer isn't running, starts timer for (waitIdle - 10 seconds). When timer fires, resends SQLRSYNC_CHANGED with remaining duration -- **minInterval enforcement**: Ensures minimum spacing between pushes -- **maxInterval enforcement**: Forces push if maxInterval time has elapsed since last push -- **Timer reset on change**: Each file change recalculates and resets the timer - -### 5. Error Handling - -- Exponential backoff with retry (5 attempts) -- Delays: 5s, 10s, 20s, 40s, 80s -- Error reporting to server stub (HTTPS POST to $server/sapi/$replicaID) - not yet implemented -- Continues watching even after push failures - -### 6. User Notifications - -- Progress messages for all push operations -- Clear status updates during file watching -- Timer duration logging in verbose mode -- Automatic warning when PULL key is used with --subscribe flag - -### 7. Protocol Messages - -```go -const ( - SQLRSYNC_CONFIG = 0x51 // Config with keys, replicaID, keyType - SQLRSYNC_NEWREPLICAVERSION = 0x52 // New version available - SQLRSYNC_KEYREQUEST = 0x53 // Request keys - SQLRSYNC_COMMITMESSAGE = 0x54 // Commit message - SQLRSYNC_CHANGED = 0x57 // Write detected notification with duration -) -``` - -**SQLRSYNC_CHANGED Format:** -- `[0x57][duration: 4 bytes as seconds]` -- Sent on first write with waitIdle duration -- Resent 10 seconds before expiration if there were subsequent writes -- Contains remaining time until push will occur - -### 8. Key Type Detection - -- Server sends `keyType` field in CONFIG message (`"PUSH"` or `"PULL"`) -- Client parses keyType from CONFIG JSON (remote/client.go:1303-1306) -- If PULL key detected with --subscribe flag, client automatically switches to PULL subscription mode -- Prevents misuse of --waitIdle/--maxInterval/--minInterval with PULL keys - -## 🚧 TODO: Remaining Work - -### 1. Error Reporting to Server - -Stub exists at `coordinator.go:912-914`. Need to implement: - -```go -func (c *Coordinator) reportErrorToServer(err error) { - // HTTPS POST to $server/sapi/$replicaID - // Include error message - // Only if failures persist for > 5 minutes -} -``` - -## 📝 Usage Examples - -### Basic PUSH subscription - -```bash -sqlrsync mydb.sqlite namespace/mydb.sqlite --subscribe --waitIdle 10m --maxInterval 24h -``` - -- Pushes after 10 minutes of idleness -- Forces push at least every 24 hours even with continuous activity -- Sends SQLRSYNC_CHANGED on first write, resends 10s before expiration if subsequent writes - -### Custom intervals - -```bash -sqlrsync mydb.sqlite namespace/mydb.sqlite --subscribe \ - --waitIdle 5m --minInterval 10m --maxInterval 1h -``` - -- Timer = MAX(10m - timeSinceLastPush, 5m) -- If last push was 3 min ago: timer = MAX(7m, 5m) = 7m -- If last push was 12 min ago: timer = MAX(-2m, 5m) = 5m - -### Using with PULL key (auto-switches) - -```bash -# This will automatically switch to PULL subscription mode -sqlrsync namespace/mydb.sqlite --subscribe --waitIdle 10m -``` - -- Client detects PULL key from server CONFIG -- Ignores --waitIdle/--maxInterval/--minInterval -- Switches to standard PULL subscription mode - -## 🏗️ Architecture - -### File Structure - -``` -client/ -├── main.go # CLI flags, operation routing -├── sync/ -│ ├── coordinator.go # Main sync orchestration + PUSH subscribe -│ └── duration.go # Duration parsing utilities -├── watcher/ -│ └── watcher.go # File system monitoring -├── remote/ -│ └── client.go # WebSocket client with CHANGED notifications -└── subscription/ - └── manager.go # PULL subscription (existing) -``` - -### Flow for PUSH Subscribe - -1. Parse and validate duration parameters (min 10s for waitIdle) -2. Execute initial PUSH -3. Establish persistent WebSocket connection -4. Check key type from CONFIG - switch to PULL subscription if PULL key detected -5. Start file watcher -6. Event loop: - - Check for maxInterval timeout - - Wait for file changes - - On first write: send SQLRSYNC_CHANGED with waitIdle duration, set sessionStarted=true - - On subsequent writes: schedule resend for 10s before expiration - - Apply timing logic (waitIdle, minInterval) - - Execute PUSH with retry on timer expiration - - Reset sessionStarted=false after successful push - -## 🔄 Server-Side Responsibilities - -### Conflict Resolution - -When a client pushes and is not in sync: - -1. Pull down the version the client was aware of → **OLD** -2. Pull down the latest version → **LATEST** -3. Treat client's current copy as → **CANDIDATE** -4. Run `sqlite_diff OLD CANDIDATE` → **second.diff** -5. Run `sqlite_diff OLD LATEST` → **first.diff** -6. If no overlap between primary keys in both diffs: - - Apply second.diff to LATEST - - Push result as new version - - Discard CANDIDATE - -### Key Type Validation - -- Server sends `keyType` in CONFIG message -- Valid values: `"PUSH"` or `"PULL"` -- Client uses this to validate operation mode -- Prevents PUSH operations with PULL-only keys - -### Analytics - -- Server receives SQLRSYNC_CHANGED (0x57) messages with duration -- Can track client activity and expected push timing -- Updates on subsequent writes provide refined timing expectations diff --git a/client/PUSH-Subscription.md b/client/PUSH-Subscription.md deleted file mode 100644 index 32947b9..0000000 --- a/client/PUSH-Subscription.md +++ /dev/null @@ -1,207 +0,0 @@ -PUSH Subscription mode is a new feature for SQLRsync that allows -you to optionally automatically PUSH changes from the LOCAL to -REMOTE. - -The feature works by using OS native file watchers to monitor changes -to the file, and when a change is detected, begin a PUSH operation after -a variable delay. Users specify how long to a fixed amount of time since -the last change. - -The subscription mode is activated by adding the --subscribe flag to a -PUSH command. The first time the command is run, it will do a normal -PUSH, and then begin monitoring the file for changes. When a change is -detected, it will wait the specified delay time (or auto mode) and then -begin a PUSH operation. If another change is detected during the wait -time, the wait timer is reset UNLESS it has been X hours since the last -PUSH, in which case it will immediately begin a PUSH operation. - -Example: -sqlrsync mydb.sqlite namespace/mydb.sqlite --subscribe --waitIdle 10m --maxInterval 24h - -Wait for 10 minutes of idleness before pushing, but in the event that there's -activity every 8 minutes for multiple days straight, it will at least push -24 hours after the last push. - -Example: -sqlrsync mydb.sqlite namespace/mydb.sqlite -m "turn on push subscribe" --subscribe --waitIdle 10m --maxInterval 24h - -With push subscribe the database is checked to see if PRAGMA busy_timeout is -greater than waitIdle. If not, it asks the user what value they want to increase -it to and sets it. - -## Coordination Mode (Optional) - -If --coordinate is specified, the client opts into the coordination system. Each client -configures their own waitIdle and maxInterval values independently - these are NOT shared -between clients. - -When --coordinate is enabled: -- Client does NOT send SQLRSYNC_COORDINATE to other clients -- Client registers with server as participating in coordination -- First file change detected: - - Client sends `SQLRSYNC_CLAIM $waitIdle` to server - - Starts extension timer for `waitIdle - 10 seconds` -- Subsequent writes: - - Sets flag "had additional writes since CLAIM" - - Does NOT send CLAIM immediately (minimizes traffic) -- When extension timer fires (10 seconds before claim expires): - - If flag is set: sends `SQLRSYNC_CLAIM $remainingWaitIdle` - - Where `remainingWaitIdle = waitIdle - (seconds since last write)` - - Starts new timer for `remainingWaitIdle - 10 seconds` - - If no additional writes: lets original claim expire naturally -- Server rebroadcasts CLAIM to all other coordinating clients for this replica -- Other clients suppress their own PUSH attempts during the claim window (polite yield) - -Without --coordinate: -- No CLAIM system is used -- Sends `SQLRSYNC_CHANGED` notifications to server for analytics -- All clients push independently based on their local timers -- Risk of concurrent pushes and conflicts (accepted risk) -- Server handles conflicts via SQLite rsync page-level merging - -If we are subscribing with these PUSH flags, then we will PUSH when the command starts. - ---minInterval, which defaults to 1/2 maxInterval but otherwise specifies the minimum -gap between subsequent PUSHes. - -The server will warn and ignore if --waitIdle, --maxInterval, or --minInterval are used with a PULL Key. - ---waitIdle, --minInterval, and --maxInterval both allow time durations like 3s, 10m, 1h30m, 2h, 1h45m, 3d, 1w, 4w, etc. They do not support months. - -The max for waitIdle is 24 days because busy_time is non-negative integer representing milliseconds, which is 24.8 days. The min is 1 second. - -Example: -sqlrsync mydb.sqlite namespace/mydb.sqlite -m "turn on coordinated subscribe" --subscribe --waitIdle 10m --maxInterval 24h --coordinate - -## Split-Brain Handling - -When two clients send SQLRSYNC_CLAIM simultaneously: -- Server accepts the first CLAIM it receives -- Responds to first client with `SQLRSYNC_CLAIM` (no arguments) = acknowledgment -- Broadcasts to all other coordinating clients: `SQLRSYNC_CLAIM $waitIdle` (using claimer's waitIdle) -- Second client receives the other client's CLAIM instead of acknowledgment -- Second client suppresses its push attempts for that waitIdle duration -- Server ignores subsequent CLAIMs until the current claim's waitIdle expires - -Note: CLAIMS do not block local writes - both databases continue accepting writes. -The coordination only affects when pushes are attempted. - -In addition to --waitIlde, --maxInterval, another flag is - -Areas for Consideration & Additional Scenarios - -1. Error Handling & Recovery - Network failures: What happens if a PUSH fails due to network issues? Should it retry with exponential backoff? - Matt says: Yes, exponential backoff using the existing exponential backoff code. The client will do an HTTPS POST request to $server/sapi/$replicaID with - the error message if it has been more than 5 minutes of failure. - -Partial sync failures: How should the system handle cases where a PUSH partially completes? -Matt says: that'd be handled the same as a network failure: exponential backoff. The client will do an HTTPS POST request to $server/sapi/$replicaID with -the error message 100% of the time. - -Server unavailability: Should the system queue changes locally and sync when the server becomes available? -Matt says: the above network and partial sync cases handle that with existing exponential backoff code. - -2. File System Edge Cases - Temporary files: Should the file watcher ignore temporary files created by editors (e.g., .tmp, .swp files)? - Matt says: yes, both the name of the file and -wal should be monitored. - -Atomic writes: How does it handle applications that write to temporary files and then rename them? -Matt says: The sqlite spec may do that to the local.db-wal file, but that will always trigger a traditional write to the local.db file - -Multiple rapid changes: What about scenarios with hundreds of small changes in quick succession? -Matt says: the --waitIdle code quickly ignores it if it hasn't been 75% of the waitIdle time or --maxInterval since the last PUSH - -File locking: What happens if the SQLite database is locked when trying to PUSH? -Matt says: The sqlrsync client PUSHes without needing a write lock by using a special control table in the sqlite database that has already been built into sqlrsync. - -3. Coordination Protocol Edge Cases - Split-brain scenarios: What if network partitions cause multiple clients to think they have the claim? - Matt says: the server is single threaded and we accept that the first claim the server received will win. - -Client crashes: How is a stale claim cleaned up if a client crashes while holding it? -Matt says: the claim is time-limited to waitIdle, so if a client crashes while holding it, the claim will expire after waitIdle time. - -Clock skew: How does the system handle clients with different system times? -Matt says: figuring in internet latency, since times are relative, other systems might be off by a few seconds. - -Late joiners: What happens when a new client subscribes to a database that already has active claims? 4. Performance & Resource Management -Matt says: The server will send a CLAIM when it connects if one already exists. - -Resource cleanup: How are file watchers cleaned up when subscription ends? -Matt says: These are long running permanent processes that should be added to systemd or similar to ensure they are restarted if they crash. - -Memory usage: For long-running subscriptions, how is memory managed? -Matt says: The file watcher uses minimal memory - mostly waiting, and not using any dynamic memory. - -CPU usage: What's the impact of continuous file watching on system resources? 5. User Experience -Matt says: I believe there are hooks into the OS that use no resources until a change is detected. - -Status visibility: How can users see the current subscription status, active claims, etc.? -Matt says: This is handled serverside. - -Graceful shutdown: How should users stop subscription mode? -Matt says: This is already handled by ctrl-c - -Logging: What level of logging should be provided for debugging coordination issues? -Matt says: Verbose logging is already available and can be enabled with the --verbose flag. - -# Specific documentation for the server side - -CLAIMS are sent from the worker to the durable for the account, which makes decisions and coordinates for that since it will be less resource constrained. - -CLAIMS do not block other writes. Both will be saved. - -# Questions for Claude - -1. How do concurrent writes work? - -**Answer**: Based on the specification, here's how concurrent writes are handled: - -## Without --coordinate flag (Default, No Coordination) -- Each client operates independently with no coordination -- The timer logic (`MAX(minInterval - timeSinceLastPush, waitIdle)`) prevents too-frequent pushes from same client -- All clients can push whenever their timers expire -- Server accepts all pushes in order received -- Risk of concurrent pushes and conflicts (accepted risk) -- Server handles conflicts via SQLite rsync page-level merging - -## With --coordinate flag (Optional Coordination) -- **--coordinate is optional** - each client opts in individually -- **Each client configures own waitIdle/maxInterval** - these are NOT shared between clients -- **CLAIMS do not block local writes** - all local databases continue accepting writes - -When Client A (with --coordinate) detects a change: -1. First write: - - Sends `SQLRSYNC_CLAIM $waitIdle` to server - - Starts extension timer for `waitIdle - 10 seconds` -2. Subsequent writes: - - Sets flag "had additional writes" - - Does NOT send CLAIM yet -3. When extension timer fires (10s before claim expires): - - If flag set: sends `SQLRSYNC_CLAIM $remainingWaitIdle` - - Where `remainingWaitIdle = waitIdle - (seconds since last write)` - - Starts new timer for `remainingWaitIdle - 10 seconds` - -Server (via durable object) handles CLAIM: -- Accepts first CLAIM for a replica -- Responds to Client A: `SQLRSYNC_CLAIM` (no args) = acknowledgment -- Broadcasts to other coordinating clients: `SQLRSYNC_CLAIM $waitIdle` (using A's waitIdle) - -Other coordinating clients receiving the CLAIM: -- **Do NOT block local writes** (application continues writing) -- Set a timer for received $waitIdle duration -- Suppress their own PUSH and CLAIM attempts during this window (polite yield) -- After timer expires, can push accumulated changes - -Client A completes its PUSH, then other clients can push their accumulated changes. - -## Conflict Resolution -Since CLAIMS don't block writes and coordination is optional: -1. Client A (coordinating) writes locally + claims -2. Client B (coordinating) writes locally but yields on push -3. Client C (no --coordinate) writes and pushes independently -4. Server accepts pushes in order received -5. Conflicts resolved using SQLite rsync protocol (page-level merging) - -The coordination is about **reducing push collisions** and **giving polite turn-taking** among coordinating clients, not preventing concurrent local writes. Non-coordinating clients can still push anytime. diff --git a/client/SQLDIFF-INTEGRATION.md b/client/SQLDIFF-INTEGRATION.md deleted file mode 100644 index 743a990..0000000 --- a/client/SQLDIFF-INTEGRATION.md +++ /dev/null @@ -1,226 +0,0 @@ -# SQLDiff Integration - -## Overview - -Created a CGO wrapper to integrate SQLite's sqldiff functionality into the auto-merge feature. - -## Files Created - -### 1. `/bridge/sqldiff_wrapper.h` -C header file defining the interface: -```c -int sqldiff_run(const char *db1, const char *db2, char **result, char **error); -``` - -### 2. `/bridge/sqldiff_wrapper.c` -C implementation that: -- Opens both databases in read-only mode -- Currently returns a placeholder diff -- **TODO**: Integrate full sqldiff.c logic once `sqlite3_stdio.h` is available - -### 3. `/bridge/cgo_sqldiff.go` -Go wrapper providing: - -**Main Function:** -```go -func RunSQLDiff(db1Path, db2Path string) (*DiffResult, error) -``` - -**Data Structures:** -```go -type DiffResult struct { - SQL string // SQL statements to transform db1 to db2 - HasChanges bool // Whether there are any differences - Operations []DiffOperation // Parsed operations from the diff - Conflicts []PrimaryKeyConflict // Detected primary key conflicts -} - -type DiffOperation struct { - Type string // INSERT, UPDATE, DELETE - Table string // Table name - PrimaryKey map[string]interface{} // Primary key values - SQL string // The actual SQL statement -} - -type PrimaryKeyConflict struct { - Table string - PrimaryKey map[string]interface{} - Operation1 string // First operation - Operation2 string // Second operation -} -``` - -**Helper Function:** -```go -func ApplyDiff(dbPath string, diff *DiffResult) error -``` - -## Integration with Auto-Merge - -### Location: `sync/coordinator.go:1112-1156` - -The auto-merge flow now: - -1. **Creates temp file** - Copies local database -2. **Pulls latest** - Gets server's version over temp -3. **Generates diff** - Calls `bridge.RunSQLDiff(tempPath, localPath)` -4. **Checks for changes** - Returns early if databases are identical -5. **Detects conflicts** - Checks for primary key conflicts -6. **Sends notification** - If conflicts exist, notifies server -7. **Applies diff** - If no conflicts, applies changes -8. **Retries push** - Pushes merged result - -### Usage in Code - -```go -diffResult, err := bridge.RunSQLDiff(tempPath, c.config.LocalPath) -if err != nil { - return fmt.Errorf("failed to generate diff: %w", err) -} - -if !diffResult.HasChanges { - fmt.Println("✅ No changes detected - databases are identical") - return nil -} - -if len(diffResult.Conflicts) > 0 { - fmt.Printf("❌ Detected %d primary key conflict(s)\n", len(diffResult.Conflicts)) - return c.sendMergeConflictNotification(serverURL, remotePath, latestVersion, []byte(diffResult.SQL)) -} - -if err := bridge.ApplyDiff(tempPath, diffResult); err != nil { - // Fallback to simple copy - c.logger.Warn("Failed to apply diff, using direct copy", zap.Error(err)) -} -``` - -## Current Status - -### ✅ Completed -- CGO wrapper infrastructure -- Go type definitions -- Integration with coordinator -- Error handling and fallbacks -- Placeholder diff generation - -### 🚧 TODO - -#### 1. Full SQLDiff Integration -The `sqldiff.c` file is present but not compiled due to missing `sqlite3_stdio.h`. - -**Next Steps:** -- Obtain `sqlite3_stdio.h` and `sqlite3_stdio.c` from SQLite ext/misc -- Remove `//go:build ignore` tag from `sqldiff.c` -- Modify `sqldiff_wrapper.c` to call actual diff functions -- Update CGO build flags if needed - -#### 2. SQL Parsing (parseSQL) -Currently stubbed in `cgo_sqldiff.go:75-82`. - -**Needs:** -```go -func parseSQL(sql string) []DiffOperation { - // Parse SQL statements - // Extract INSERT/UPDATE/DELETE operations - // Identify table names and primary keys - // Return structured operations -} -``` - -#### 3. Conflict Detection (detectConflicts) -Currently stubbed in `cgo_sqldiff.go:85-92`. - -**Algorithm:** -```go -func detectConflicts(operations []DiffOperation) []PrimaryKeyConflict { - // Group operations by table and primary key - // Find cases where same PK has multiple operations: - // - Multiple UPDATEs on same PK - // - UPDATE + DELETE on same PK - // - INSERT on existing PK - // Return conflicts -} -``` - -#### 4. Diff Application (ApplyDiff) -Currently stubbed in `cgo_sqldiff.go:95-105`. - -**Needs:** -```go -func ApplyDiff(dbPath string, diff *DiffResult) error { - // Open database - // Begin transaction - // Execute each SQL statement from diff - // Commit transaction - // Handle errors with rollback -} -``` - -## Testing Strategy - -### Unit Tests Needed - -1. **TestRunSQLDiff** - Test diff generation - - Identical databases → no changes - - Simple INSERT → detected - - UPDATE operation → detected - - DELETE operation → detected - -2. **TestParseSQL** - Test SQL parsing - - Parse INSERT statements - - Parse UPDATE statements - - Parse DELETE statements - - Extract primary keys correctly - -3. **TestDetectConflicts** - Test conflict detection - - No conflicts case - - UPDATE + UPDATE conflict - - INSERT + INSERT conflict - - UPDATE + DELETE conflict - -4. **TestApplyDiff** - Test diff application - - Apply simple changes - - Rollback on error - - Verify final state - -### Integration Tests Needed - -1. **TestAutoMergeNoConflict** - Full merge flow without conflicts -2. **TestAutoMergeWithConflict** - Merge with conflicts triggers notification -3. **TestAutoMergeFallback** - Falls back to copy on apply failure - -## Build Configuration - -The sqldiff.c is currently excluded from build: -```c -//go:build ignore -``` - -Once `sqlite3_stdio.h` is available, remove this tag and ensure: -- CGO can find all required headers -- SQLite library is linked properly -- Build succeeds on all platforms - -## Dependencies - -- SQLite3 library (`-lsqlite3`) -- C standard library -- `sqlite3_stdio.h` (pending) -- `sqlite3_stdio.c` (pending) - -## Error Handling - -The wrapper handles errors at multiple levels: - -1. **C Level** - Opens databases, checks SQLite errors -2. **CGO Level** - Converts C strings/errors to Go -3. **Go Level** - Returns structured errors with context -4. **Coordinator Level** - Falls back to simple copy if diff fails - -## Future Enhancements - -1. **Streaming Diffs** - For very large databases -2. **Incremental Parsing** - Parse SQL as it's generated -3. **Custom Conflict Resolution** - User-defined merge strategies -4. **Diff Caching** - Cache diffs for retry scenarios -5. **Progress Reporting** - Show diff generation progress