Skip to content

Commit d7d5369

Browse files
committed
Add commit message
1 parent 818dbb8 commit d7d5369

File tree

3 files changed

+130
-60
lines changed

3 files changed

+130
-60
lines changed

client/main.go

Lines changed: 67 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,23 @@ import (
1717

1818
var VERSION = "0.0.2"
1919
var (
20-
serverURL string
21-
verbose bool
22-
dryRun bool
23-
SetPublic bool
24-
SetUnlisted bool
25-
subscribing bool
26-
pullKey string
27-
pushKey string
28-
replicaID string
29-
logger *zap.Logger
30-
showVersion bool
20+
serverURL string
21+
verbose bool
22+
dryRun bool
23+
SetPublic bool
24+
SetUnlisted bool
25+
subscribing bool
26+
pullKey string
27+
pushKey string
28+
commitMessageParam string
29+
replicaID string
30+
logger *zap.Logger
31+
showVersion bool
3132
)
3233

34+
var NoCommitMessage = "<!--NO_SQLRSYNC_MESSAGE-->"
35+
var MAX_MESSAGE_SIZE = 4096
36+
3337
var rootCmd = &cobra.Command{
3438
Use: "sqlrsync [ORIGIN] [REPLICA] or [LOCAL] or [REMOTE]",
3539
Short: "SQLRsync v" + VERSION,
@@ -70,6 +74,19 @@ func runSync(cmd *cobra.Command, args []string) error {
7074
return cmd.Help()
7175
}
7276

77+
var commitMessage []byte
78+
79+
if commitMessageParam == NoCommitMessage {
80+
commitMessage = nil
81+
} else if len(commitMessageParam) == 0 {
82+
83+
} else {
84+
if len(commitMessageParam) > MAX_MESSAGE_SIZE {
85+
return fmt.Errorf("commit message too long (max %d characters)", MAX_MESSAGE_SIZE)
86+
}
87+
commitMessage = []byte(commitMessageParam)
88+
}
89+
7390
// Preprocess variables
7491
serverURL = strings.TrimRight(serverURL, "/")
7592

@@ -81,13 +98,45 @@ func runSync(cmd *cobra.Command, args []string) error {
8198

8299
versionRaw := strings.SplitN(remotePath, "@", 2)
83100
version := "latest"
101+
102+
// permitted version formats:
103+
// # <none - no @ or anything>
104+
// @ # just the at sign
105+
// @latest
106+
// @1
107+
// @30
108+
// @v1
109+
// @v30
110+
// @latest-1
111+
// @latest-20
112+
//
113+
114+
// NOT permitted:
115+
// @latest1
116+
// @latest+1
117+
// Therefore this is a good regexp for this https://regex101.com/r/LooJFS/1 /^(latest-\d+)|(latest)|v?(\d+)$/
84118
if len(versionRaw) == 2 {
85-
version = strings.TrimPrefix(strings.ToLower(versionRaw[1]), "v")
119+
verStr := strings.ToLower(strings.TrimPrefix(versionRaw[1], "v"))
86120
remotePath = versionRaw[0]
87-
versionCheck, _ := strconv.Atoi(version)
88-
if strings.HasPrefix(version, "latest") && versionCheck <= 0 {
89-
return fmt.Errorf("invalid version specified: %s (must be `latest`, `latest-<number>`, or `<number>` where the number is greater than 0)", version)
121+
122+
if !strings.HasPrefix(verStr, "latest") && !strings.HasPrefix(verStr, "latest-") {
123+
// Accept plain numbers
124+
if _, err := strconv.Atoi(verStr); err != nil {
125+
return fmt.Errorf("invalid version specified: %s (must be `latest`, `latest-<number>`, or `<number>`)", verStr)
126+
}
127+
} else {
128+
// Accept latest or latest-N
129+
if !strings.HasPrefix(verStr, "latest") {
130+
return fmt.Errorf("invalid version specified: %s (must be `latest`, `latest-<number>`, or `<number>`)", verStr)
131+
}
132+
if strings.HasPrefix(verStr, "latest-") {
133+
numStr := strings.TrimPrefix(verStr, "latest-")
134+
if n, err := strconv.Atoi(numStr); err != nil || n <= 0 {
135+
return fmt.Errorf("invalid version specified: %s (must be `latest`, `latest-<number>`, or `<number>` where number > 0)", verStr)
136+
}
137+
}
90138
}
139+
version = verStr
91140
}
92141

93142
visibility := 0
@@ -111,6 +160,7 @@ func runSync(cmd *cobra.Command, args []string) error {
111160
ReplicaPath: remotePath, // For LOCAL TO LOCAL, remotePath is actually the replica path
112161
Version: version, // Could be extended to parse @version syntax
113162
Operation: operation,
163+
CommitMessage: commitMessage,
114164
SetVisibility: visibility,
115165
DryRun: dryRun,
116166
Logger: logger,
@@ -220,8 +270,9 @@ func setupLogger() {
220270
func init() {
221271
rootCmd.Flags().StringVar(&pullKey, "pullKey", "", "Authentication key for PULL operations")
222272
rootCmd.Flags().StringVar(&pushKey, "pushKey", "", "Authentication key for PUSH operations")
273+
rootCmd.Flags().StringVarP(&commitMessageParam, "message", "m", NoCommitMessage, "Commit message for the PUSH operation")
223274
rootCmd.Flags().StringVar(&replicaID, "replicaID", "", "Replica ID for the remote database")
224-
rootCmd.Flags().StringVarP(&serverURL, "server", "s", "wss://sqlrsync.com", "Server URL for operations")
275+
rootCmd.Flags().StringVarP(&serverURL, "server", "s", "wss://sqlrsync.com", "Server URL for remote operations")
225276
rootCmd.Flags().BoolVar(&subscribing, "subscribe", false, "Enable subscription to PULL changes")
226277
rootCmd.Flags().BoolVar(&verbose, "verbose", false, "Enable verbose logging")
227278
rootCmd.Flags().BoolVar(&SetUnlisted, "unlisted", false, "Enable unlisted access to the replica (initial PUSH only)")

client/remote/client.go

Lines changed: 43 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ const (
2121
SQLRSYNC_CONFIG = 0x51 // Send to keys and replicaID
2222
SQLRSYNC_NEWREPLICAVERSION = 0x52 // New version available
2323
SQLRSYNC_KEYREQUEST = 0x53 // request keys
24+
SQLRSYNC_COMMITMESSAGE = 0x54 // commit message
2425
)
2526

2627
// ProgressPhase represents the current phase of the sync operation
@@ -132,9 +133,9 @@ func NewTrafficInspector(logger *zap.Logger, depth int) *TrafficInspector {
132133
}
133134

134135
// InspectOutbound logs outbound traffic (Go → Remote) and returns true if it's ORIGIN_END
135-
func (t *TrafficInspector) InspectOutbound(data []byte, enableLogging bool) bool {
136+
func (t *TrafficInspector) InspectOutbound(data []byte, enableLogging bool) string {
136137
if len(data) == 0 {
137-
return false
138+
return ""
138139
}
139140

140141
msgType := t.parseMessageType(data)
@@ -153,7 +154,7 @@ func (t *TrafficInspector) InspectOutbound(data []byte, enableLogging bool) bool
153154
}
154155

155156
// Return whether this is an ORIGIN_END message
156-
return msgType == "ORIGIN_END"
157+
return msgType
157158
}
158159

159160
// InspectInbound logs inbound traffic (Remote → Go) and returns true if it's ORIGIN_END
@@ -389,18 +390,19 @@ func (t *TrafficInspector) parseMessageType(data []byte) string {
389390

390391
// Config holds the configuration for the remote WebSocket client
391392
type Config struct {
392-
ServerURL string
393-
Version string
394-
ReplicaID string
395-
Subscribe bool
396-
SetVisibility int // for PUSH
397-
Timeout int // in milliseconds
398-
Logger *zap.Logger
399-
EnableTrafficInspection bool // Enable detailed traffic logging
400-
InspectionDepth int // How many bytes to inspect (default: 32)
401-
PingPong bool
402-
AuthToken string
403-
SendKeyRequest bool // the -sqlrsync file doesn't exist, so make a token
393+
ServerURL string
394+
Version string
395+
ReplicaID string
396+
Subscribe bool
397+
SetVisibility int // for PUSH
398+
CommitMessage []byte
399+
Timeout int // in milliseconds
400+
Logger *zap.Logger
401+
EnableTrafficInspection bool // Enable detailed traffic logging
402+
InspectionDepth int // How many bytes to inspect (default: 32)
403+
PingPong bool
404+
AuthToken string
405+
SendKeyRequest bool // the -sqlrsync file doesn't exist, so make a token
404406

405407
SendConfigCmd bool // we don't have the version number or remote path
406408
LocalHostname string
@@ -863,11 +865,23 @@ func (c *Client) isSyncCompleted() bool {
863865
// handleOutboundTraffic inspects outbound data and handles sync completion detection
864866
func (c *Client) handleOutboundTraffic(data []byte) {
865867
// Always inspect for protocol messages (sync completion detection)
866-
isOriginEnd := c.inspector.InspectOutbound(data, c.config.EnableTrafficInspection)
867-
if isOriginEnd {
868+
outboundCommand := c.inspector.InspectOutbound(data, c.config.EnableTrafficInspection)
869+
if outboundCommand == "ORIGIN_END" {
868870
c.logger.Info("ORIGIN_END detected - sync completing")
869871
c.setSyncCompleted(true)
870872
}
873+
if outboundCommand == "ORIGIN_BEGIN" {
874+
if len(c.config.CommitMessage) > 0 {
875+
length := len(c.config.CommitMessage)
876+
// Encode length as 2 bytes (big-endian), 2 bytes is ~65k max
877+
lenBytes := []byte{
878+
byte(length >> 8),
879+
byte(length),
880+
}
881+
c.writeQueue <- append([]byte{SQLRSYNC_COMMITMESSAGE}, append(lenBytes, c.config.CommitMessage...)...)
882+
c.config.CommitMessage = nil
883+
}
884+
}
871885

872886
// Handle progress tracking
873887
if c.config.ProgressCallback != nil {
@@ -1357,15 +1371,6 @@ func (c *Client) writeLoop() {
13571371
// Set write deadline
13581372
conn.SetWriteDeadline(time.Now().Add(30 * time.Second))
13591373

1360-
if c.config.SendConfigCmd {
1361-
conn.WriteMessage(websocket.BinaryMessage, []byte{SQLRSYNC_CONFIG})
1362-
c.config.SendConfigCmd = false
1363-
}
1364-
if c.config.SendKeyRequest {
1365-
conn.WriteMessage(websocket.BinaryMessage, []byte{SQLRSYNC_KEYREQUEST})
1366-
c.config.SendKeyRequest = false
1367-
}
1368-
13691374
// Inspect raw WebSocket outbound traffic
13701375
c.inspector.LogWebSocketTraffic(data, "OUT (Client → Server)", c.config.EnableTrafficInspection)
13711376

@@ -1383,6 +1388,18 @@ func (c *Client) writeLoop() {
13831388
return
13841389
}
13851390

1391+
// consider moving this to InspectorOutbound
1392+
1393+
// Do this here so ORIGIN_BEGIN sends first
1394+
if c.config.SendConfigCmd {
1395+
conn.WriteMessage(websocket.BinaryMessage, []byte{SQLRSYNC_CONFIG})
1396+
c.config.SendConfigCmd = false
1397+
}
1398+
if c.config.SendKeyRequest {
1399+
conn.WriteMessage(websocket.BinaryMessage, []byte{SQLRSYNC_KEYREQUEST})
1400+
c.config.SendKeyRequest = false
1401+
}
1402+
13861403
c.logger.Debug("Sent message to remote", zap.Int("bytes", len(data)))
13871404
}
13881405
}

client/sync/coordinator.go

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type CoordinatorConfig struct {
4242
Version string
4343
Operation Operation
4444
SetVisibility int
45+
CommitMessage []byte
4546
DryRun bool
4647
Logger *zap.Logger
4748
Verbose bool
@@ -360,18 +361,18 @@ func (c *Coordinator) executePull(isSubscription bool) error {
360361

361362
// Create remote client for WebSocket transport
362363
remoteClient, err := remote.New(&remote.Config{
363-
ServerURL: serverURL + "/sapi/pull/" + remotePath,
364-
AuthToken: authResult.AccessToken,
365-
ReplicaID: authResult.ReplicaID,
366-
Timeout: 8000,
367-
PingPong: false, // No ping/pong needed for single sync
368-
Logger: c.logger.Named("remote"),
369-
Subscribe: false, // Subscription handled separately
370-
EnableTrafficInspection: c.config.Verbose,
371-
InspectionDepth: 5,
372-
Version: version,
373-
SendConfigCmd: true,
374-
SendKeyRequest: c.authResolver.CheckNeedsDashFile(c.config.LocalPath, remotePath),
364+
ServerURL: serverURL + "/sapi/pull/" + remotePath,
365+
AuthToken: authResult.AccessToken,
366+
ReplicaID: authResult.ReplicaID,
367+
Timeout: 8000,
368+
PingPong: false, // No ping/pong needed for single sync
369+
Logger: c.logger.Named("remote"),
370+
Subscribe: false, // Subscription handled separately
371+
EnableTrafficInspection: c.config.Verbose,
372+
InspectionDepth: 5,
373+
Version: version,
374+
SendConfigCmd: true,
375+
SendKeyRequest: c.authResolver.CheckNeedsDashFile(c.config.LocalPath, remotePath),
375376
//ProgressCallback: remote.DefaultProgressCallback(remote.FormatSimple),
376377
ProgressCallback: nil,
377378
ProgressConfig: &remote.ProgressConfig{
@@ -396,9 +397,9 @@ func (c *Coordinator) executePull(isSubscription bool) error {
396397

397398
// Create local client for SQLite operations
398399
localClient, err := bridge.New(&bridge.BridgeConfig{
399-
DatabasePath: c.config.LocalPath,
400-
DryRun: c.config.DryRun,
401-
Logger: c.logger.Named("local"),
400+
DatabasePath: c.config.LocalPath,
401+
DryRun: c.config.DryRun,
402+
Logger: c.logger.Named("local"),
402403
EnableSQLiteRsyncLogging: c.config.Verbose,
403404
})
404405
if err != nil {
@@ -458,9 +459,9 @@ func (c *Coordinator) executePush() error {
458459

459460
// Create local client for SQLite operations
460461
localClient, err := bridge.New(&bridge.BridgeConfig{
461-
DatabasePath: c.config.LocalPath,
462-
DryRun: c.config.DryRun,
463-
Logger: c.logger.Named("local"),
462+
DatabasePath: c.config.LocalPath,
463+
DryRun: c.config.DryRun,
464+
Logger: c.logger.Named("local"),
464465
EnableSQLiteRsyncLogging: c.config.Verbose,
465466
})
466467
if err != nil {
@@ -497,6 +498,7 @@ func (c *Coordinator) executePush() error {
497498
SendKeyRequest: c.authResolver.CheckNeedsDashFile(c.config.LocalPath, remotePath),
498499
SendConfigCmd: true,
499500
SetVisibility: c.config.SetVisibility,
501+
CommitMessage: c.config.CommitMessage,
500502
ProgressCallback: nil, //remote.DefaultProgressCallback(remote.FormatSimple),
501503
ProgressConfig: &remote.ProgressConfig{
502504
Enabled: true,

0 commit comments

Comments
 (0)