-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathclusterkit.go
More file actions
604 lines (504 loc) · 16.9 KB
/
clusterkit.go
File metadata and controls
604 lines (504 loc) · 16.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
package clusterkit
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"
)
// ClusterKit is the main entry point for the library
type ClusterKit struct {
cluster *Cluster
nodeID string // Current node's ID
stateFile string
httpAddr string
httpServer *http.Server
httpClient *http.Client
knownNodes []string
mu sync.RWMutex
stopChan chan struct{}
syncInterval time.Duration
consensusManager *ConsensusManager
hookManager *HookManager // Partition change hooks
healthChecker *HealthChecker // Health monitoring
logger Logger // Structured logger
// Metrics tracking
startTime time.Time
lastSync time.Time
requestCount int64
errorCount int64
}
// Options for initializing ClusterKit
type Options struct {
// Required
NodeID string // Unique ID for this node (e.g., "node-1")
HTTPAddr string // Address to listen on (e.g., ":8080")
// Optional - Auto-generated if not provided
NodeName string // Human-readable name (default: auto-generated from NodeID)
RaftAddr string // Raft bind address (default: auto-calculated from HTTPAddr)
DataDir string // Directory to store state (default: "./clusterkit-data")
SyncInterval time.Duration // Sync interval (default: 5s)
// Cluster Configuration - Flattened (no nested Config struct)
ClusterName string // Name of the cluster (default: "clusterkit-cluster")
PartitionCount int // Number of partitions (default: 16)
ReplicationFactor int // Replication factor (default: 3)
// Cluster Formation
JoinAddr string // Address of existing node to join (empty for first node)
Bootstrap bool // Set to true for first node (default: auto-detect)
// Health Checking
HealthCheck HealthCheckConfig // Health check configuration
// Application Services
Services map[string]string // Service name -> address mapping (e.g., {"kv": ":9080", "api": ":3000"})
// Optional Logger
Logger Logger // Custom logger (default: DefaultLogger with Info level)
}
// New initializes a new ClusterKit instance
func New(opts Options) (*ClusterKit, error) {
// Validate required fields
if opts.NodeID == "" {
return nil, fmt.Errorf("NodeID is required")
}
if opts.HTTPAddr == "" {
return nil, fmt.Errorf("HTTPAddr is required")
}
// Validate service addresses
if opts.Services != nil {
for serviceName, serviceAddr := range opts.Services {
if serviceName == "" {
return nil, fmt.Errorf("service name cannot be empty")
}
if serviceAddr == "" {
return nil, fmt.Errorf("service address for '%s' cannot be empty", serviceName)
}
// Basic address format validation
if serviceAddr[0] != ':' && !strings.Contains(serviceAddr, ":") {
return nil, fmt.Errorf("invalid service address format for '%s': %s (expected format: ':port' or 'host:port')", serviceName, serviceAddr)
}
}
}
// Auto-generate NodeName from NodeID if not provided
if opts.NodeName == "" {
opts.NodeName = generateNodeName(opts.NodeID)
}
// Auto-calculate RaftAddr from HTTPAddr if not provided
if opts.RaftAddr == "" {
opts.RaftAddr = calculateRaftAddr(opts.HTTPAddr)
}
// Set defaults for cluster configuration
if opts.ClusterName == "" {
opts.ClusterName = "clusterkit-cluster"
}
if opts.PartitionCount <= 0 {
opts.PartitionCount = 16
}
if opts.ReplicationFactor <= 0 {
opts.ReplicationFactor = 3
}
// Set defaults for optional fields
if opts.DataDir == "" {
opts.DataDir = "./clusterkit-data"
}
if opts.SyncInterval == 0 {
opts.SyncInterval = 5 * time.Second
}
// Create data directory if it doesn't exist
if err := os.MkdirAll(opts.DataDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create data directory: %v", err)
}
stateFile := filepath.Join(opts.DataDir, "cluster-state.json")
// Initialize cluster with flattened config
cluster := &Cluster{
ID: opts.ClusterName,
Name: opts.ClusterName,
Nodes: []Node{},
PartitionMap: &PartitionMap{Partitions: make(map[string]*Partition)},
Config: &Config{
ClusterName: opts.ClusterName,
PartitionCount: opts.PartitionCount,
ReplicationFactor: opts.ReplicationFactor,
},
}
// Add self as a node
selfNode := Node{
ID: opts.NodeID,
Name: opts.NodeName,
IP: opts.HTTPAddr,
Status: "active",
Services: opts.Services, // Include application services
}
cluster.Nodes = append(cluster.Nodes, selfNode)
cluster.rebuildNodeMap() // Initialize NodeMap for O(1) lookups
// Set default logger if not provided
logger := opts.Logger
if logger == nil {
logger = NewDefaultLogger(LogLevelInfo)
}
// Initialize ClusterKit
ck := &ClusterKit{
cluster: cluster,
nodeID: opts.NodeID,
stateFile: stateFile,
httpAddr: opts.HTTPAddr,
httpClient: &http.Client{Timeout: 10 * time.Second},
knownNodes: []string{},
stopChan: make(chan struct{}),
syncInterval: opts.SyncInterval,
logger: logger,
startTime: time.Now(),
}
// Set join address if provided
if opts.JoinAddr != "" {
ck.knownNodes = []string{opts.JoinAddr}
}
// Initialize consensus manager
ck.consensusManager = NewConsensusManager(ck, opts.Bootstrap, opts.RaftAddr)
// Initialize hook manager
ck.hookManager = newHookManager()
// Initialize health checker
ck.healthChecker = NewHealthChecker(ck, opts.HealthCheck)
// Load existing state if available
if err := ck.loadState(); err != nil {
fmt.Printf("No existing state found, starting fresh: %v\n", err)
}
return ck, nil
}
// Start begins the ClusterKit operations
func (ck *ClusterKit) Start() error {
// Start HTTP server for inter-node communication
if err := ck.startHTTPServer(); err != nil {
return fmt.Errorf("failed to start HTTP server: %v", err)
}
// Start consensus manager
if err := ck.consensusManager.Start(); err != nil {
return fmt.Errorf("failed to start consensus: %v", err)
}
// Start health checker
ck.healthChecker.Start()
// Discover and join known nodes
if len(ck.knownNodes) > 0 {
fmt.Printf("[JOIN] Attempting to join cluster via %v\n", ck.knownNodes)
go ck.discoverNodes()
} else {
fmt.Printf("[JOIN] No known nodes to join (bootstrap=%v)\n", ck.consensusManager.isBootstrap)
}
// Auto-create partitions if bootstrap node and no partitions exist
if ck.consensusManager.isBootstrap {
go func() {
// Wait for leader election
time.Sleep(3 * time.Second)
// Bootstrap node: Propose self addition through Raft to ensure all nodes have it
if ck.consensusManager.IsLeader() {
ck.mu.RLock()
selfNode := ck.cluster.Nodes[0] // Bootstrap node is always first
ck.mu.RUnlock()
fmt.Printf("Bootstrap node proposing self through Raft: %s\n", selfNode.ID)
if err := ck.consensusManager.ProposeAction("add_node", map[string]interface{}{
"id": selfNode.ID,
"name": selfNode.Name,
"ip": selfNode.IP,
"status": selfNode.Status,
"services": selfNode.Services,
}); err != nil {
fmt.Printf("Warning: Failed to propose bootstrap node: %v\n", err)
}
// Wait for Raft replication
time.Sleep(1 * time.Second)
}
ck.mu.RLock()
replicationFactor := ck.cluster.Config.ReplicationFactor
ck.mu.RUnlock()
// Wait for enough nodes to join (up to 30 seconds)
maxWait := 30
if replicationFactor == 1 {
maxWait = 3 // Single-node clusters don't need to wait long
}
for i := 0; i < maxWait; i++ {
if !ck.consensusManager.IsLeader() {
return // Not leader anymore
}
ck.mu.RLock()
nodeCount := len(ck.cluster.Nodes)
partitionCount := len(ck.cluster.PartitionMap.Partitions)
ck.mu.RUnlock()
// If partitions already exist, we're done
if partitionCount > 0 {
return
}
// Create partitions if we have enough nodes
if nodeCount >= replicationFactor {
// Deduplicate nodes before creating partitions
ck.deduplicateNodes()
ck.mu.RLock()
finalNodeCount := len(ck.cluster.Nodes)
ck.mu.RUnlock()
fmt.Printf("Auto-creating partitions with %d nodes (replication factor: %d)...\n", finalNodeCount, replicationFactor)
if err := ck.CreatePartitions(); err != nil {
fmt.Printf("Failed to auto-create partitions: %v\n", err)
// Continue loop to retry
} else {
fmt.Printf("✓ Created %d partitions automatically\n", ck.cluster.Config.PartitionCount)
return // Success!
}
}
// Wait before checking again
time.Sleep(1 * time.Second)
}
// Timeout reached - log warning
ck.mu.RLock()
nodeCount := len(ck.cluster.Nodes)
ck.mu.RUnlock()
fmt.Printf("Warning: Partition auto-creation timed out after %d seconds (have %d nodes, need %d)\n",
maxWait, nodeCount, replicationFactor)
}()
}
fmt.Printf("ClusterKit started on %s\n", ck.httpAddr)
return nil
}
// Stop gracefully shuts down ClusterKit
func (ck *ClusterKit) Stop() error {
close(ck.stopChan)
// Gracefully shutdown HTTP server
if ck.httpServer != nil {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := ck.httpServer.Shutdown(ctx); err != nil {
fmt.Printf("Failed to shutdown HTTP server gracefully: %v\n", err)
}
}
// Stop health checker
ck.healthChecker.Stop()
// Stop consensus manager
ck.consensusManager.Stop()
// Save final state
if err := ck.saveState(); err != nil {
return fmt.Errorf("failed to save state: %v", err)
}
fmt.Println("ClusterKit stopped")
return nil
}
// GetCluster returns the current cluster state
func (ck *ClusterKit) GetCluster() *Cluster {
ck.mu.RLock()
defer ck.mu.RUnlock()
return ck.cluster
}
// GetConsensusManager returns the consensus manager
func (ck *ClusterKit) GetConsensusManager() *ConsensusManager {
return ck.consensusManager
}
// saveState persists the cluster state to disk atomically
func (ck *ClusterKit) saveState() error {
ck.mu.RLock()
defer ck.mu.RUnlock()
data, err := json.MarshalIndent(ck.cluster, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal state: %v", err)
}
// Atomic write: write to temp file, then rename
tempFile := ck.stateFile + ".tmp"
if err := os.WriteFile(tempFile, data, 0644); err != nil {
return fmt.Errorf("failed to write temp state file: %v", err)
}
// Atomic rename
if err := os.Rename(tempFile, ck.stateFile); err != nil {
os.Remove(tempFile) // Clean up temp file on error
return fmt.Errorf("failed to rename state file: %v", err)
}
return nil
}
// loadState loads the cluster state from disk
func (ck *ClusterKit) loadState() error {
data, err := os.ReadFile(ck.stateFile)
if err != nil {
return err
}
ck.mu.Lock()
defer ck.mu.Unlock()
if err := json.Unmarshal(data, ck.cluster); err != nil {
return fmt.Errorf("failed to unmarshal state: %v", err)
}
return nil
}
// discoverNodes attempts to connect to known nodes
func (ck *ClusterKit) discoverNodes() {
fmt.Printf("[JOIN] discoverNodes called with %d known nodes\n", len(ck.knownNodes))
for _, nodeAddr := range ck.knownNodes {
fmt.Printf("[JOIN] Attempting to join %s...\n", nodeAddr)
if err := ck.joinNodeWithRetry(nodeAddr, 3); err != nil {
fmt.Printf("[JOIN] Failed to join node %s after retries: %v\n", nodeAddr, err)
} else {
fmt.Printf("[JOIN] Successfully joined cluster via %s\n", nodeAddr)
}
}
}
// deduplicateNodes removes duplicate nodes from the cluster
func (ck *ClusterKit) deduplicateNodes() {
ck.mu.Lock()
defer ck.mu.Unlock()
seen := make(map[string]bool)
uniqueNodes := []Node{}
for _, node := range ck.cluster.Nodes {
if !seen[node.ID] {
seen[node.ID] = true
uniqueNodes = append(uniqueNodes, node)
} else {
fmt.Printf("Removing duplicate node: %s (%s)\n", node.Name, node.ID)
}
}
ck.cluster.Nodes = uniqueNodes
fmt.Printf("Deduplicated nodes: %d -> %d\n", len(ck.cluster.Nodes)+len(seen)-len(uniqueNodes), len(uniqueNodes))
}
// GetMetrics returns current cluster metrics
func (ck *ClusterKit) GetMetrics() *Metrics {
ck.mu.RLock()
defer ck.mu.RUnlock()
var raftState string
var isLeader bool
if ck.consensusManager != nil {
isLeader = ck.consensusManager.IsLeader()
if stats := ck.consensusManager.GetStats(); stats != nil {
raftState = stats.State
}
}
return &Metrics{
NodeCount: len(ck.cluster.Nodes),
PartitionCount: len(ck.cluster.PartitionMap.Partitions),
RequestCount: ck.requestCount,
ErrorCount: ck.errorCount,
LastSync: ck.lastSync,
IsLeader: isLeader,
RaftState: raftState,
UptimeSeconds: int64(time.Since(ck.startTime).Seconds()),
}
}
// HealthCheck returns detailed health status
func (ck *ClusterKit) HealthCheck() *HealthStatus {
ck.mu.RLock()
defer ck.mu.RUnlock()
var raftState string
var isLeader bool
if ck.consensusManager != nil {
isLeader = ck.consensusManager.IsLeader()
if stats := ck.consensusManager.GetStats(); stats != nil {
raftState = stats.State
}
}
// Get self node info
var nodeID, nodeName string
if len(ck.cluster.Nodes) > 0 {
nodeID = ck.cluster.Nodes[0].ID
nodeName = ck.cluster.Nodes[0].Name
}
uptime := time.Since(ck.startTime)
return &HealthStatus{
Healthy: true,
NodeID: nodeID,
NodeName: nodeName,
IsLeader: isLeader,
NodeCount: len(ck.cluster.Nodes),
PartitionCount: len(ck.cluster.PartitionMap.Partitions),
RaftState: raftState,
LastSync: ck.lastSync,
Uptime: uptime.String(),
}
}
// generateNodeName creates a human-readable name from NodeID
// Examples: "node-1" -> "Server-1", "server-2" -> "Server-2"
func generateNodeName(nodeID string) string {
var num int
// Try "node-N" pattern
if _, err := fmt.Sscanf(nodeID, "node-%d", &num); err == nil {
return fmt.Sprintf("Server-%d", num)
}
// Try "server-N" pattern
if _, err := fmt.Sscanf(nodeID, "server-%d", &num); err == nil {
return fmt.Sprintf("Server-%d", num)
}
// Try just "N" pattern
if _, err := fmt.Sscanf(nodeID, "%d", &num); err == nil {
return fmt.Sprintf("Server-%d", num)
}
// Default: capitalize first letter
if len(nodeID) > 0 {
return string(nodeID[0]-32) + nodeID[1:]
}
return nodeID
}
// calculateRaftAddr auto-calculates Raft address from HTTP address
// Examples: ":8080" -> "127.0.0.1:10001", ":8081" -> "127.0.0.1:10002"
// Uses port range 10001+ to avoid conflicts with common application ports (9000-9999)
func calculateRaftAddr(httpAddr string) string {
var port int
// Try to extract port from HTTP address
if _, err := fmt.Sscanf(httpAddr, ":%d", &port); err == nil {
// Calculate Raft port: 10001 + (httpPort - 8080)
// This ensures Raft ports are in the 10000+ range, avoiding conflicts
raftPort := 10001 + (port - 8080)
return fmt.Sprintf("127.0.0.1:%d", raftPort)
}
// Default to 10001
return "127.0.0.1:10001"
}
// SetCustomData stores custom data that will be replicated across all nodes via Raft consensus
// The data is stored as raw bytes, allowing any serialization format (JSON, protobuf, msgpack, etc.)
// This operation goes through Raft consensus and will fail if this node is not the leader
// Maximum value size is 1MB to prevent excessive memory usage
func (ck *ClusterKit) SetCustomData(key string, value []byte) error {
if key == "" {
return fmt.Errorf("key cannot be empty")
}
if len(value) > 1024*1024 { // 1MB limit
return fmt.Errorf("value too large (max 1MB, got %d bytes)", len(value))
}
// Encode value as base64 for JSON serialization
valueEncoded := base64.StdEncoding.EncodeToString(value)
return ck.consensusManager.ProposeAction("set_custom_data", map[string]interface{}{
"key": key,
"value": valueEncoded,
})
}
// GetCustomData retrieves custom data from the local cluster state
// This is a local read operation and does not require consensus
// Returns an error if the key is not found
func (ck *ClusterKit) GetCustomData(key string) ([]byte, error) {
ck.mu.RLock()
defer ck.mu.RUnlock()
if ck.cluster.CustomData == nil {
return nil, fmt.Errorf("key not found: %s", key)
}
value, exists := ck.cluster.CustomData[key]
if !exists {
return nil, fmt.Errorf("key not found: %s", key)
}
// Return a copy to prevent external modification
valueCopy := make([]byte, len(value))
copy(valueCopy, value)
return valueCopy, nil
}
// DeleteCustomData removes custom data from all nodes via Raft consensus
// This operation goes through Raft consensus and will fail if this node is not the leader
func (ck *ClusterKit) DeleteCustomData(key string) error {
if key == "" {
return fmt.Errorf("key cannot be empty")
}
return ck.consensusManager.ProposeAction("delete_custom_data", key)
}
// ListCustomDataKeys returns all custom data keys currently stored in the cluster
// This is a local read operation and does not require consensus
func (ck *ClusterKit) ListCustomDataKeys() []string {
ck.mu.RLock()
defer ck.mu.RUnlock()
if ck.cluster.CustomData == nil {
return []string{}
}
keys := make([]string, 0, len(ck.cluster.CustomData))
for k := range ck.cluster.CustomData {
keys = append(keys, k)
}
return keys
}