Skip to content

Commit df0f603

Browse files
committed
Some adjustments to the tests and start of the playground system
1 parent bb6f59a commit df0f603

11 files changed

Lines changed: 797 additions & 17 deletions

File tree

fdb.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,6 @@ func New(ctx context.Context, cfg config.Config) (*FDB, error) {
102102

103103
batchWriter := db.NewBatchWriter(dbI.(*db.Db), 2048, 100*time.Millisecond, 15)
104104

105-
// Node is basically a wrapper around consensus, chain, identity management, peer system and peer discovery system.
106-
// Not to forget metrics and ping-pong game between peers to establish metrics baseline.
107-
// Construction is done here because other services might need it.
108-
// In this state, block producer callback is not set.
109-
// Sequencer will be setting up block producer callback and utilize it.
110105
dNode, dnErr := node.NewNode(ctx, cfg, rbacMgr, zLog, store, obs, stateMgr, dbM, batchWriter)
111106
if dnErr != nil {
112107
return nil, errors.Wrap(dnErr, "failed to initialize node")

pkg/protocols/http/handler_benchmark_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
// setupBenchmarkServer initializes the HTTP server for benchmarking.
2626
func setupBenchmarkServer(t testing.TB, ctx context.Context, cfg config.TcpTransport) (*tcp.Server, logger.Logger, *observability.Observability, string, error) {
2727
// Get a free port
28-
port := tcp.GetFreePort(t)
28+
port := tcp.GetFreePortForTest(t)
2929
addr := fmt.Sprintf("127.0.0.1:%d", port)
3030

3131
// Setup server using existing test helper
@@ -163,8 +163,8 @@ func BenchmarkHTTPHandler_POST_Echo(b *testing.B) {
163163
Type: types.TCPTransportType,
164164
Enabled: true,
165165
IPv4: net.ParseIP("127.0.0.1").String(),
166-
Port: tcp.GetFreePort(&testing.T{}), // Dynamically assign port
167-
TLS: nil, // Disable TLS for simplicity
166+
Port: tcp.GetFreePortForTest(b),
167+
TLS: nil,
168168
}
169169

170170
server, _, _, addr, err := setupBenchmarkServer(b, ctx, cfg)
@@ -243,8 +243,8 @@ func BenchmarkHTTPHandler_Concurrent(b *testing.B) {
243243
Type: types.TCPTransportType,
244244
Enabled: true,
245245
IPv4: net.ParseIP("127.0.0.1").String(),
246-
Port: tcp.GetFreePort(&testing.T{}), // Dynamically assign port
247-
TLS: nil, // Disable TLS for simplicity
246+
Port: tcp.GetFreePortForTest(b),
247+
TLS: nil,
248248
}
249249

250250
server, _, _, addr, err := setupBenchmarkServer(b, ctx, cfg)

pkg/protocols/http/handler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525

2626
func setupHTTPServerTest(t testing.TB, ctx context.Context) (*tcp.Server, logger.Logger, *observability.Observability, string) {
2727
// Get a free port
28-
port := tcp.GetFreePort(t)
28+
port := tcp.GetFreePortForTest(t)
2929
addr := fmt.Sprintf("127.0.0.1:%d", port)
3030

3131
// Create TCP transport configuration without TLS for testing

pkg/protocols/rpc/test_helpers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func SetupRPCServerForTest(t testing.TB) (rpcInstance *RPC, addr string, cleanup
2121
ctx := context.Background()
2222

2323
// Use a free port.
24-
port := tcp.GetFreePort(t)
24+
port := tcp.GetFreePortForTest(t)
2525

2626
// Define node configuration.
2727
nodeConfig := config.Config{

pkg/protocols/websocket/traffic_handler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919

2020
func setupWebSocketServerTest(t testing.TB, ctx context.Context) (*tcp.Server, logger.Logger, *observability.Observability, string) {
2121
// Get a free port
22-
port := tcp.GetFreePort(t)
22+
port := tcp.GetFreePortForTest(t)
2323
addr := fmt.Sprintf("127.0.0.1:%d", port)
2424

2525
// Create TCP transport configuration without TLS for testing

pkg/transports/tcp/helpers.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package tcp
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"time"
7+
)
8+
9+
// GetFreePort attempts to find an available port and confirm that it's truly available.
10+
func GetFreePort() (int, error) {
11+
maxAttempts := 5
12+
for range maxAttempts {
13+
addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
14+
if err != nil {
15+
return -1, err
16+
}
17+
18+
l, err := net.ListenTCP("tcp", addr)
19+
if err != nil {
20+
time.Sleep(500 * time.Millisecond)
21+
continue
22+
}
23+
24+
port := l.Addr().(*net.TCPAddr).Port
25+
l.Close()
26+
return port, nil
27+
}
28+
29+
return -1, fmt.Errorf("failed to acquire a free port after %d attempts", maxAttempts)
30+
}

pkg/transports/tcp/test_helpers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ func CreateResponseMessage(payload []byte) []byte {
3636
return append(lengthBuf, payload...)
3737
}
3838

39-
// GetFreePort attempts to find an available port and confirm that it's truly available.
40-
func GetFreePort(t testing.TB) int {
39+
// GetFreePortForTest attempts to find an available port and confirm that it's truly available.
40+
func GetFreePortForTest(t testing.TB) int {
4141
maxAttempts := 5
4242
for i := 0; i < maxAttempts; i++ {
4343
addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")

playground/suite/client.go

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
package suite
2+
3+
import (
4+
"context"
5+
"encoding/binary"
6+
"errors"
7+
"fmt"
8+
"time"
9+
10+
"github.com/panjf2000/gnet/v2"
11+
"github.com/unpackdev/fdb/client"
12+
"github.com/unpackdev/fdb/pkg/logger"
13+
"github.com/unpackdev/fdb/pkg/messages"
14+
"github.com/unpackdev/fdb/pkg/types"
15+
"go.uber.org/zap"
16+
)
17+
18+
const (
19+
// maxChunkSize keeps every TCP write comfortably below the default Ethernet
20+
// MTU once TCP/IP framing overhead is added.
21+
maxChunkSize = 60 * 1024 // 60 KiB
22+
)
23+
24+
func CreateClient(ctx context.Context, logger logger.Logger, port int) (*client.Client, error) {
25+
cfg := client.NewConfig()
26+
27+
cli := client.NewClient(ctx, cfg)
28+
29+
// Create a new TCP transport with gnet options optimized for large payloads
30+
tcpTransport := client.NewTCPTransport(fmt.Sprintf("127.0.0.1:%d", port), logger,
31+
gnet.WithMulticore(true),
32+
gnet.WithTCPNoDelay(gnet.TCPNoDelay),
33+
gnet.WithSocketRecvBuffer(256*1024), // 256KB receive buffer
34+
gnet.WithSocketSendBuffer(256*1024), // 256KB send buffer
35+
)
36+
37+
// Register the transport with the client
38+
if err := cli.RegisterTransport("tcp", tcpTransport); err != nil {
39+
return nil, err
40+
}
41+
42+
return cli, nil
43+
}
44+
45+
// -----------------------------------------------------------------------------
46+
// Send helper methods
47+
// -----------------------------------------------------------------------------
48+
49+
// SendMessage sends a message to the target node and returns an error if the
50+
// send fails.
51+
func (t *TestNode) SendMessage(targetNode *TestNode, transportType types.TransportType, handlerType types.HandlerType, data []byte) error {
52+
// Ensure client is initialized
53+
if targetNode.client == nil {
54+
return errors.New("client not initialized")
55+
}
56+
57+
// Get the TCP transport from the client
58+
tcpTransport, err := targetNode.client.GetTransport("tcp")
59+
if err != nil {
60+
return fmt.Errorf("failed to get TCP transport: %w", err)
61+
}
62+
63+
// Create and encode the message
64+
var encodedMsg []byte
65+
if _, decErr := messages.Decode(data); decErr == nil {
66+
// Data is already an encoded message
67+
encodedMsg = data
68+
} else {
69+
// Generate a new message with the data
70+
msg, err := messages.GenerateRandomMessageWithData(handlerType, data)
71+
if err != nil {
72+
return fmt.Errorf("failed to generate message: %w", err)
73+
}
74+
75+
if encodedMsg, err = msg.Encode(); err != nil {
76+
return fmt.Errorf("failed to encode message: %w", err)
77+
}
78+
}
79+
80+
// Send the message using the transport
81+
if err := tcpTransport.Send(encodedMsg); err != nil {
82+
return fmt.Errorf("failed to send message: %w", err)
83+
}
84+
85+
t.logger.Debug("Sent message",
86+
zap.Int("bytes", len(encodedMsg)),
87+
zap.Stringer("handler_type", handlerType))
88+
89+
return nil
90+
}
91+
92+
// -----------------------------------------------------------------------------
93+
// Round‑trip helper (send + blocking read)
94+
// -----------------------------------------------------------------------------
95+
96+
// SendAndReceiveMessage sends a message to another node and waits for a
97+
// response. This implementation uses the client package's ResponseHandler for
98+
// asynchronous but reliable large payload handling.
99+
func (t *TestNode) SendAndReceiveMessage(targetNode *TestNode, transportType types.TransportType, handlerType types.HandlerType, data []byte, timeout time.Duration) ([]byte, error) {
100+
// Ensure client is initialized
101+
if targetNode.client == nil {
102+
return nil, errors.New("client not initialized")
103+
}
104+
105+
// Get the TCP transport from the client
106+
tcp, err := targetNode.client.GetTransport("tcp")
107+
if err != nil {
108+
return nil, fmt.Errorf("failed to get TCP transport: %w", err)
109+
}
110+
111+
tcpTransport, ok := tcp.(*client.TCPTransport)
112+
if !ok {
113+
return nil, errors.New("transport is not a TCPTransport")
114+
}
115+
116+
// Determine the appropriate message type for the response
117+
responseType := client.MessageType(types.HandlerStatusSuccess.Byte())
118+
119+
// Register a response channel before sending the message
120+
responseCh := tcpTransport.RegisterResponseChannel(responseType)
121+
122+
// Prepare the message
123+
var encodedMsg []byte
124+
if _, decErr := messages.Decode(data); decErr == nil {
125+
// Data is already an encoded message
126+
encodedMsg = data
127+
} else {
128+
// Generate a new message with the data
129+
msg, err := messages.GenerateRandomMessageWithData(handlerType, data)
130+
if err != nil {
131+
return nil, fmt.Errorf("failed to generate message: %w", err)
132+
}
133+
134+
encodedMsg, err = msg.Encode()
135+
if err != nil {
136+
return nil, fmt.Errorf("failed to encode message: %w", err)
137+
}
138+
}
139+
140+
// For large payloads, we use a chunking protocol to ensure reliable transmission
141+
if len(encodedMsg) > maxChunkSize {
142+
t.logger.Info("Using chunked message protocol for large payload",
143+
zap.Int("size", len(encodedMsg)),
144+
zap.Stringer("handler", handlerType))
145+
146+
// Prepare the chunked message with a 4-byte length prefix
147+
// The server expects: [total_size(4 bytes)][payload...]
148+
lengthPrefix := make([]byte, 4)
149+
binary.LittleEndian.PutUint32(lengthPrefix, uint32(len(encodedMsg)))
150+
151+
// Prepend the length prefix to the message
152+
chunkedMsg := append(lengthPrefix, encodedMsg...)
153+
154+
// Replace the original message with the chunked version
155+
encodedMsg = chunkedMsg
156+
157+
t.logger.Debug("Prepared chunked message",
158+
zap.Int("original_size", len(encodedMsg)-4),
159+
zap.Int("with_prefix_size", len(encodedMsg)))
160+
}
161+
162+
// Send the message
163+
start := time.Now()
164+
if err := tcpTransport.Send(encodedMsg); err != nil {
165+
// Make sure to unregister the response channel on error
166+
tcpTransport.UnregisterResponseChannel(responseType)
167+
return nil, fmt.Errorf("failed to send message: %w", err)
168+
}
169+
170+
t.logger.Debug("Sent message",
171+
zap.Int("bytes", len(encodedMsg)),
172+
zap.Stringer("handler", handlerType))
173+
174+
// Wait for the response with the provided timeout
175+
response, err := tcpTransport.WaitForResponseWithTimeout(responseCh, timeout)
176+
if err != nil {
177+
return nil, fmt.Errorf("error waiting for response: %w", err)
178+
}
179+
180+
latency := time.Since(start)
181+
t.logger.Debug("Received response",
182+
zap.Int("bytes", len(response)),
183+
zap.Duration("latency", latency))
184+
return response, nil
185+
}

0 commit comments

Comments
 (0)