Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 99 additions & 3 deletions internal/cli/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,48 @@ package cli
import (
"context"
"fmt"
"net"
"time"

"github.com/spf13/cobra"
"github.com/syleron/pulseha/internal/client"
"github.com/syleron/pulseha/packages/config"
"github.com/syleron/pulseha/rpc"
)

// isLocalInterfaceIP checks if an IP is assigned to a local interface
func isLocalInterfaceIP(ip string) bool {
if ip == "" {
return false
}
if ip == "127.0.0.1" || ip == "::1" {
return true
}
ifaces, err := net.Interfaces()
if err != nil {
return false
}
for _, iface := range ifaces {
addrs, err := iface.Addrs()
if err != nil {
continue
}
for _, addr := range addrs {
switch v := addr.(type) {
case *net.IPNet:
if v.IP.String() == ip {
return true
}
case *net.IPAddr:
if v.IP.String() == ip {
return true
}
}
}
}
return false
}

func NewClusterCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "cluster",
Expand All @@ -24,6 +59,7 @@ func NewClusterCmd() *cobra.Command {
newClusterTokenCmd(),
newClusterModeCmd(),
newNetworkCmd(),
newClusterDoctorCmd(),
)

return cmd
Expand Down Expand Up @@ -86,18 +122,24 @@ func newClusterJoinCmd() *cobra.Command {
}

func newClusterLeaveCmd() *cobra.Command {
var address string
var nodeID string
cmd := &cobra.Command{
Use: "leave",
Short: "Leave the current cluster",
RunE: func(cmd *cobra.Command, args []string) error {
client, err := client.New()
c, err := client.New()
if err != nil {
return err
}

return client.LeaveCluster()
if address != "" {
return c.LeaveClusterVia(address, nodeID)
}
return c.LeaveCluster()
},
}
cmd.Flags().StringVar(&address, "address", "", "Address of a cluster member to send leave to (IP:port)")
cmd.Flags().StringVar(&nodeID, "node-id", "", "Node ID to remove (defaults to local node)")

return cmd
}
Expand Down Expand Up @@ -215,6 +257,60 @@ func newNetworkResyncCmd() *cobra.Command {
return cmd
}

// newClusterDoctorCmd adds diagnostics for common misconfigurations
func newClusterDoctorCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "doctor",
Short: "Diagnose common cluster issues on this node",
RunE: func(cmd *cobra.Command, args []string) error {
// Load current config
cfg := config.New()
// Check local node entry
localID, err := cfg.GetLocalNodeUUID()
if err != nil {
return fmt.Errorf("no local node configured: %v", err)
}
node := cfg.Nodes[localID]
if node == nil {
return fmt.Errorf("local node %s not found in config", localID)
}

// 1) Local bind-ip is assigned
if !isLocalInterfaceIP(node.IP) {
return fmt.Errorf("bind-ip %s is not assigned to any local interface", node.IP)
}

// 2) Duplicate bind tuple
for id, n := range cfg.Nodes {
if id == localID {
continue
}
if n.IP == node.IP && n.Port == node.Port {
return fmt.Errorf("bind %s:%s conflicts with node %s (%s)", n.IP, n.Port, id, n.Hostname)
}
}

// 3) Peer connectivity basic checks
// Try to connect TCP to each peer
for id, n := range cfg.Nodes {
if id == localID {
continue
}
addr := net.JoinHostPort(n.IP, n.Port)
conn, err := net.DialTimeout("tcp", addr, 750*time.Millisecond)
if err != nil {
return fmt.Errorf("cannot reach peer %s at %s: %v", n.Hostname, addr, err)
}
_ = conn.Close()
}

fmt.Println("Doctor checks passed: local bind IP valid, no conflicts, peers reachable")
return nil
},
}
return cmd
}

// createCluster creates a new cluster
func createCluster(cmd *cobra.Command, args []string) error {
// Get bind IP and port
Expand Down
44 changes: 41 additions & 3 deletions internal/cli/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ func printClusterStatus(status *client.ClusterStatus) error {
fmt.Printf("Mode: %s\n", status.Mode)
fmt.Printf("==============\n")

// Print node information
fmt.Printf("\nNodes:\n")
fmt.Printf("------\n")
// Print live member information
fmt.Printf("\nLive Members:\n")
fmt.Printf("-------------\n")
for _, member := range status.Members {
fmt.Printf("\nNode: %s\n", member.Hostname)
fmt.Printf("Address: %s:%s\n", member.IP, member.Port)
Expand All @@ -130,6 +130,44 @@ func printClusterStatus(status *client.ClusterStatus) error {
}
}

// Derive configured-but-not-live nodes
liveHosts := make(map[string]struct{})
liveAddr := make(map[string]struct{})
for _, m := range status.Members {
if m.Hostname != "" {
liveHosts[m.Hostname] = struct{}{}
}
if m.IP != "" && m.Port != "" {
liveAddr[m.IP+":"+m.Port] = struct{}{}
}
}

// Fetch configured nodes from current config
c, err := client.New()
if err == nil {
cfg := c.GetConfig()
var printedHeader bool
for id, node := range cfg.Nodes {
_, hostLive := liveHosts[node.Hostname]
_, addrLive := liveAddr[node.IP+":"+node.Port]
if hostLive || addrLive {
continue
}
if !printedHeader {
fmt.Printf("\nConfigured Nodes (not live):\n")
fmt.Printf("---------------------------\n")
printedHeader = true
}
fmt.Printf("\nNode ID: %s\n", id)
fmt.Printf("Hostname: %s\n", node.Hostname)
fmt.Printf("Configured Address: %s:%s\n", node.IP, node.Port)
// Hint if address conflicts with a live member
if _, conflict := liveAddr[node.IP+":"+node.Port]; conflict {
fmt.Printf("Hint: Address conflicts with a live member at %s:%s\n", node.IP, node.Port)
}
}
}

// Print group information
if len(status.Groups) > 0 {
fmt.Printf("\nFloating IP Groups:\n")
Expand Down
136 changes: 132 additions & 4 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"net"
"os"
"strings"
"time"
Expand All @@ -21,6 +22,36 @@ import (
"google.golang.org/grpc/credentials/insecure"
)

// isLocalIP checks whether the given IP address is assigned to a local interface
func isLocalIP(ip string) bool {
if ip == "127.0.0.1" || ip == "::1" {
return true
}
ifaces, err := net.Interfaces()
if err != nil {
return false
}
for _, iface := range ifaces {
addrs, err := iface.Addrs()
if err != nil {
continue
}
for _, addr := range addrs {
switch v := addr.(type) {
case *net.IPNet:
if v.IP.String() == ip {
return true
}
case *net.IPAddr:
if v.IP.String() == ip {
return true
}
}
}
}
return false
}

type Client struct {
Connection *grpc.ClientConn
server rpc.ServerClient
Expand Down Expand Up @@ -455,6 +486,58 @@ func (c *Client) JoinClusterWithNodeID(address, token, bindIP, bindPort, customN
return fmt.Errorf("successfully joined cluster but failed to save local config: %v", err)
}

// Post-join validation and health checks
// 1) Validate bind-ip is local (if provided)
var warnings []string
if bindIP != "" {
if !isLocalIP(bindIP) {
warnings = append(warnings, fmt.Sprintf("bind-ip %s does not match any local interface", bindIP))
}
}

// 2) Detect duplicate bind tuple (IP:Port) with existing nodes
if localNode, ok := cfg.Nodes[resp.NodeId]; ok {
for id, node := range cfg.Nodes {
if id == resp.NodeId {
continue
}
if node.IP == localNode.IP && node.Port == localNode.Port && node.IP != "" && node.Port != "" {
warnings = append(warnings, fmt.Sprintf("bind %s:%s conflicts with existing node %s", node.IP, node.Port, id))
}
}
}

// 3) Wait briefly for local daemon to load new config and report full membership
// Poll Status up to ~8s
expectedMembers := len(cfg.Nodes)
membershipEstablished := false
for i := 0; i < 8; i++ {
statusResp, statusErr := c.CLI().Status(context.Background(), &rpc.StatusRequest{})
if statusErr == nil && statusResp != nil {
if len(statusResp.Members) >= expectedMembers {
membershipEstablished = true
break
}
}
time.Sleep(1 * time.Second)
}

if !membershipEstablished {
warnings = append(warnings, "node membership not established within timeout")
}

// If we detected any critical issues, provide actionable guidance and return an error
if len(warnings) > 0 {
for _, w := range warnings {
fmt.Printf("WARN %s\n", w)
}
// Suggest corrective action when bind-ip seems wrong and a conflict exists
if bindIP != "" && !isLocalIP(bindIP) {
fmt.Printf("HINT Re-run with a local --bind-ip or update /etc/pulseha/config.json and restart pulseha.\n")
}
return fmt.Errorf("join configuration applied, but cluster membership not healthy")
}

fmt.Printf("Successfully joined cluster with node ID: %s\n", resp.NodeId)
return nil
}
Expand All @@ -468,10 +551,55 @@ func (c *Client) LeaveCluster() error {
return fmt.Errorf("failed to get local node ID: %v", err)
}

_, err = c.CLI().Leave(context.Background(), &rpc.LeaveRequest{
NodeId: localNodeID,
})
return err
resp, err := c.CLI().Leave(context.Background(), &rpc.LeaveRequest{NodeId: localNodeID})
if err != nil {
return fmt.Errorf("leave request failed: %v\nHINT If the local daemon is unreachable, try: pulsectl cluster leave --address <leader-ip:port>", err)
}
if resp == nil || !resp.Success {
msg := "unknown error"
if resp != nil && resp.Message != "" {
msg = resp.Message
}
return fmt.Errorf("leave failed: %s", msg)
}
fmt.Println(resp.Message)
return nil
}

// LeaveClusterVia attempts to issue a leave against a specific cluster member (leader/peer)
func (c *Client) LeaveClusterVia(address string, nodeID string) error {
host, port := address, "8080"
if strings.Contains(address, ":") {
parts := strings.Split(address, ":")
if len(parts) == 2 {
host = parts[0]
port = parts[1]
}
}
if err := c.Connect(host, port, false); err != nil {
return fmt.Errorf("failed to connect to %s:%s: %v", host, port, err)
}
if nodeID == "" {
cfg := c.GetConfig()
var err error
nodeID, err = cfg.GetLocalNodeUUID()
if err != nil {
return fmt.Errorf("failed to get local node ID: %v", err)
}
}
resp, err := c.CLI().Leave(context.Background(), &rpc.LeaveRequest{NodeId: nodeID})
if err != nil {
return fmt.Errorf("leave request to %s:%s failed: %v", host, port, err)
}
if resp == nil || !resp.Success {
msg := "unknown error"
if resp != nil && resp.Message != "" {
msg = resp.Message
}
return fmt.Errorf("leave failed via %s:%s: %s", host, port, msg)
}
fmt.Println(resp.Message)
return nil
}

// GetClusterStatus returns the current cluster status
Expand Down