Skip to content

Commit 45a0135

Browse files
committed
Some final touches on distribution test
1 parent ee34b73 commit 45a0135

10 files changed

Lines changed: 489 additions & 121 deletions

File tree

client/tcp.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,11 @@ func (h *tcpEventHandler) OnTraffic(c gnet.Conn) gnet.Action {
234234
// Append the chunk to our buffer
235235
buf = append(buf, chunk...)
236236

237+
// Check if we have a complete message in the buffer
238+
if isCompleteMessage(buf) {
239+
break
240+
}
241+
237242
// If we received less than what would fill a typical buffer,
238243
// we've likely received the complete message for now
239244
if len(chunk) < 4096 {
@@ -407,6 +412,50 @@ func (h *tcpEventHandler) OnTraffic(c gnet.Conn) gnet.Action {
407412
return gnet.None
408413
}
409414

415+
// isCompleteMessage checks if we have a complete valid message in the buffer
416+
// by verifying message structure and length requirements
417+
func isCompleteMessage(data []byte) bool {
418+
// Need at least 1 byte for message type
419+
if len(data) < 1 {
420+
return false
421+
}
422+
423+
// Check if message is a success response (which includes a payload length)
424+
msgType := MessageType(data[0])
425+
if msgType == MessageType(types.HandlerStatusSuccess.Byte()) {
426+
// Need at least 5 bytes for header (1 status + 4 length)
427+
if len(data) < 5 {
428+
return false
429+
}
430+
431+
// Check if actual data length matches expected length from header
432+
expectedLength := binary.BigEndian.Uint32(data[1:5])
433+
actualDataLength := len(data) - 5 // subtract header bytes
434+
435+
// Only complete if we have all the expected data
436+
return uint32(actualDataLength) >= expectedLength
437+
}
438+
439+
// For write/read handler messages which have a specific format
440+
if msgType == MessageType('W') || msgType == MessageType('R') {
441+
// These also use a length field - need at least 5 bytes (1 type + 4 length)
442+
if len(data) < 5 {
443+
return false
444+
}
445+
446+
// Get the expected message length
447+
expectedLength := binary.BigEndian.Uint32(data[1:5])
448+
actualDataLength := len(data) - 5
449+
450+
// Check if we have the complete message
451+
return uint32(actualDataLength) >= expectedLength
452+
}
453+
454+
// For other message types where we can't easily determine completeness
455+
// by examining headers, use a more conservative approach
456+
return false
457+
}
458+
410459
// OnTick is called periodically
411460
func (h *tcpEventHandler) OnTick() (time.Duration, gnet.Action) {
412461
return time.Second, gnet.None

db/writer.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package db
22

33
import (
4+
"context"
45
"github.com/erigontech/mdbx-go/mdbx"
56
"github.com/pkg/errors"
67
"go.uber.org/zap"
@@ -89,19 +90,28 @@ func (bw *BatchWriter) BufferWrite(key [32]byte, value []byte) error {
8990
// XOR the first and last bytes for slightly better distribution
9091
workerID := int(key[0]^key[31]) % bw.workers
9192

92-
// Use non-blocking send to prevent backpressure
93+
// Create a timeout context for the send operation
94+
timeoutCtx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
95+
defer cancel()
96+
97+
// Try to send with timeout
9398
select {
9499
case bw.workerChannels[workerID] <- WriteRequest{Key: key, Value: value}:
95100
// Successfully sent to channel
96101
return nil
97-
default:
98-
// Channel is full, handle gracefully by using a goroutine to avoid blocking
99-
// This helps during high-load scenarios to prevent caller blocking
102+
case <-timeoutCtx.Done():
103+
// Channel send timed out, try in background goroutine
100104
go func() {
101-
// This will block in the goroutine but not block the caller
102-
bw.workerChannels[workerID] <- WriteRequest{Key: key, Value: value}
105+
select {
106+
case bw.workerChannels[workerID] <- WriteRequest{Key: key, Value: value}:
107+
// Successfully sent
108+
case <-time.After(5 * time.Second):
109+
// If we can't send after a long timeout, log the error
110+
zap.L().Error("Failed to queue record for batch writing after extended timeout",
111+
zap.Binary("key_prefix", key[:8]))
112+
}
103113
}()
104-
return nil
114+
return errors.New("queue operation timed out, write queued in background")
105115
}
106116
}
107117

node/db_read_handler.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package node
22

33
import (
4-
"fmt"
5-
64
"github.com/unpackdev/fdb/db"
75
"github.com/unpackdev/fdb/logger"
86
"github.com/unpackdev/fdb/observability"
@@ -122,7 +120,7 @@ func (rh *DbReadHandler) Handle(conn transports.Connection, frame []byte) {
122120
// Encode the response to bytes
123121
response := dbResp.Encode()
124122

125-
fmt.Println("SENDING RESPONSE PREFIX", response[0:10], "with status byte:", response[0], "data length:", dbResp.Length)
123+
//fmt.Println("SENDING RESPONSE PREFIX", response[0:10], "with status byte:", response[0], "data length:", dbResp.Length)
126124

127125
// Send the formatted response back to the client
128126
conn.Send(response)

node/db_write_handler.go

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package node
22

33
import (
4-
"encoding/binary"
5-
"fmt"
6-
74
"github.com/unpackdev/fdb/db"
85
"github.com/unpackdev/fdb/logger"
96
"github.com/unpackdev/fdb/observability"
@@ -43,15 +40,15 @@ func (wh *DbWriteHandler) ForceFlush() {
4340

4441
// Handle processes the incoming message using the TCPWriteHandler
4542
func (wh *DbWriteHandler) Handle(conn transports.Connection, frame []byte) {
46-
fmt.Println("DID I REACH WRITE DB HANLDER? FRAME LENGTH:", len(frame))
43+
//fmt.Println("DID I REACH WRITE DB HANLDER? FRAME LENGTH:", len(frame))
4744
// Debug the raw incoming frame
48-
fmt.Printf("WRITE HANDLER RAW FRAME (first 20 bytes): %v\n", frame[:min(20, len(frame))])
45+
//fmt.Printf("WRITE HANDLER RAW FRAME (first 20 bytes): %v\n", frame[:min(20, len(frame))])
4946

5047
// Check if first byte is our special marker 0xF0
5148
offset := 0
5249
if len(frame) > 0 && frame[0] == 0xF0 {
5350
offset = 1 // Skip the marker byte
54-
fmt.Println("DETECTED MARKER BYTE 0xF0, offset set to", offset)
51+
//fmt.Println("DETECTED MARKER BYTE 0xF0, offset set to", offset)
5552
}
5653

5754
// Adjust minimum length check based on whether we have a marker
@@ -80,15 +77,15 @@ func (wh *DbWriteHandler) Handle(conn transports.Connection, frame []byte) {
8077
}
8178

8279
// Debug action byte
83-
actionByte := frame[offset]
84-
fmt.Printf("ACTION BYTE: 0x%02x\n", actionByte)
80+
//actionByte := frame[offset]
81+
//fmt.Printf("ACTION BYTE: 0x%02x\n", actionByte)
8582

8683
// Create a [32]byte key from the frame without using the pool
8784
var key [32]byte
8885
copy(key[:], frame[offset+1:offset+33]) // Copy directly from frame, accounting for offset
8986

9087
// Debug the key being extracted
91-
fmt.Printf("KEY (first 8 bytes): %v\n", key[:8])
88+
//fmt.Printf("KEY (first 8 bytes): %v\n", key[:8])
9289

9390
// For the message protocol format: [1-byte action][32-byte key][4-byte length][actual data]
9491
// We need to skip 4 bytes after the key to get to the actual data
@@ -114,7 +111,7 @@ func (wh *DbWriteHandler) Handle(conn transports.Connection, frame []byte) {
114111
// Extract only the actual data, skipping the length field
115112
value := frame[valueStart:]
116113

117-
fmt.Printf("ACTUAL VALUE (first %d bytes): %v\n", min(10, len(value)), value[:min(10, len(value))])
114+
//fmt.Printf("ACTUAL VALUE (first %d bytes): %v\n", min(10, len(value)), value[:min(10, len(value))])
118115

119116
// Add detailed logging for debugging large payload issues
120117
wh.logger.Debug("Received write request",
@@ -125,7 +122,7 @@ func (wh *DbWriteHandler) Handle(conn transports.Connection, frame []byte) {
125122
zap.Binary("value_prefix", value[:min(10, len(value))]))
126123

127124
// Log what we're about to write to the database
128-
fmt.Printf("ABOUT TO WRITE TO DB - KEY: %v, VALUE (first 20 bytes): %v\n", key[:8], value[:min(20, len(value))])
125+
//fmt.Printf("ABOUT TO WRITE TO DB - KEY: %v, VALUE (first 20 bytes): %v\n", key[:8], value[:min(20, len(value))])
129126

130127
// Buffer the write request with the key as [32]byte
131128
err := wh.writer.BufferWrite(key, value)
@@ -144,7 +141,7 @@ func (wh *DbWriteHandler) Handle(conn transports.Connection, frame []byte) {
144141

145142
// Encode the response to bytes
146143
response := dbResp.Encode()
147-
fmt.Printf("SENDING ERROR RESPONSE: %v\n", response[:min(20, len(response))])
144+
//fmt.Printf("SENDING ERROR RESPONSE: %v\n", response[:min(20, len(response))])
148145
conn.Send(response)
149146
return
150147
}
@@ -174,12 +171,12 @@ func (wh *DbWriteHandler) Handle(conn transports.Connection, frame []byte) {
174171
response := dbResp.Encode()
175172

176173
// Detailed debug of the full response being sent
177-
fmt.Println("SENDING WRITE SUCCESS RESPONSE", response[:min(20, len(response))], "with status byte:", response[0])
178-
fmt.Printf("FULL RESPONSE DETAILS:\n - Status: %d\n - Length bytes: %v (uint32: %d)\n - Data: %v\n",
179-
response[0],
180-
response[1:5],
181-
binary.BigEndian.Uint32(response[1:5]),
182-
response[5:min(25, len(response))])
174+
// fmt.Println("SENDING WRITE SUCCESS RESPONSE", response[:min(20, len(response))], "with status byte:", response[0])
175+
// fmt.Printf("FULL RESPONSE DETAILS:\n - Status: %d\n - Length bytes: %v (uint32: %d)\n - Data: %v\n",
176+
// response[0],
177+
// response[1:5],
178+
// binary.BigEndian.Uint32(response[1:5]),
179+
// response[5:min(25, len(response))])
183180

184181
// Send the formatted response back to the client
185182
conn.Send(response)

0 commit comments

Comments
 (0)