From 7e772d06ee7a396cde706b82f082c6f416a264ed Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Fri, 24 Oct 2025 15:25:13 +0200 Subject: [PATCH] feat(node): add statefulset-based discovery --- cmd/beekeeper/cmd/cmd.go | 54 -------- cmd/beekeeper/cmd/node.go | 132 +++++++++++++++++++ pkg/node/client.go | 266 ++++++++++++++++++++++++++++++++++---- 3 files changed, 370 insertions(+), 82 deletions(-) create mode 100644 cmd/beekeeper/cmd/node.go diff --git a/cmd/beekeeper/cmd/cmd.go b/cmd/beekeeper/cmd/cmd.go index 118b6739..61e467b2 100644 --- a/cmd/beekeeper/cmd/cmd.go +++ b/cmd/beekeeper/cmd/cmd.go @@ -12,12 +12,10 @@ import ( "strings" "time" - "github.com/ethersphere/beekeeper/pkg/bee" "github.com/ethersphere/beekeeper/pkg/config" "github.com/ethersphere/beekeeper/pkg/httpx" "github.com/ethersphere/beekeeper/pkg/k8s" "github.com/ethersphere/beekeeper/pkg/logging" - "github.com/ethersphere/beekeeper/pkg/node" "github.com/ethersphere/beekeeper/pkg/scheduler" "github.com/ethersphere/beekeeper/pkg/swap" "github.com/go-git/go-billy/v5/memfs" @@ -435,58 +433,6 @@ func (c *command) executePeriodically(ctx context.Context, action func(ctx conte return ctx.Err() } -func (c *command) createNodeClient(ctx context.Context, useDeploymentType bool) (*node.Client, error) { - namespace := c.globalConfig.GetString(optionNameNamespace) - clusterName := c.globalConfig.GetString(optionNameClusterName) - - if clusterName == "" && namespace == "" { - return nil, errors.New("either cluster name or namespace must be provided") - } - - if c.globalConfig.IsSet(optionNameNamespace) && namespace == "" { - return nil, errors.New("namespace cannot be empty if set") - } - - if namespace == "" && useDeploymentType && !isValidDeploymentType(c.globalConfig.GetString(optionNameDeploymentType)) { - return nil, errors.New("unsupported deployment type: must be 'beekeeper' or 'helm'") - } - - if useDeploymentType { - c.log.Infof("using deployment type %s", c.globalConfig.GetString(optionNameDeploymentType)) - } - - var beeClients map[string]*bee.Client - - if clusterName != "" { - cluster, err := c.setupCluster(ctx, clusterName, false) - if err != nil { - return nil, fmt.Errorf("setting up cluster %s: %w", clusterName, err) - } - - beeClients, err = cluster.NodesClients(ctx) - if err != nil { - return nil, fmt.Errorf("failed to retrieve node clients: %w", err) - } - - namespace = cluster.Namespace() - } - - nodeClient := node.New(&node.ClientConfig{ - Log: c.log, - HTTPClient: c.httpClient, - K8sClient: c.k8sClient, - BeeClients: beeClients, - Namespace: namespace, - LabelSelector: c.globalConfig.GetString(optionNameLabelSelector), - DeploymentType: node.DeploymentType(c.globalConfig.GetString(optionNameDeploymentType)), - InCluster: c.globalConfig.GetBool(optionNameInCluster), - UseNamespace: c.globalConfig.IsSet(optionNameNamespace), - NodeGroups: c.globalConfig.GetStringSlice(optionNameNodeGroups), - }) - - return nodeClient, nil -} - func (c *command) setSwapClient() (err error) { if c.globalConfig.IsSet(optionNameGethURL) { gethUrl, err := url.Parse(c.globalConfig.GetString(optionNameGethURL)) diff --git a/cmd/beekeeper/cmd/node.go b/cmd/beekeeper/cmd/node.go new file mode 100644 index 00000000..45c945f5 --- /dev/null +++ b/cmd/beekeeper/cmd/node.go @@ -0,0 +1,132 @@ +package cmd + +import ( + "context" + "errors" + "fmt" + + "github.com/ethersphere/beekeeper/pkg/bee" + "github.com/ethersphere/beekeeper/pkg/node" +) + +func (c *command) createNodeClient(ctx context.Context, useDeploymentType bool) (*node.Client, error) { + if err := c.validateNodeClientInputs(useDeploymentType); err != nil { + return nil, err + } + + config := c.extractNodeClientConfig() + + beeClients, namespace, err := c.setupBeeClients(ctx, config.clusterName) + if err != nil { + return nil, err + } + + // Update namespace if we got it from cluster + // Namespace is required for all K8s operations (pods, statefulsets, services, ingress) + if namespace != "" { + config.namespace = namespace + } + + discoveryType := c.determineDiscoveryType(beeClients, config.namespace) + + if useDeploymentType { + c.log.Infof("using deployment type %s", config.deploymentType) + } + + return c.buildNodeClient(beeClients, config, discoveryType), nil +} + +// nodeClientConfig holds the extracted configuration values +type nodeClientConfig struct { + namespace string + clusterName string + labelSelector string + deploymentType string + inCluster bool + nodeGroups []string +} + +// validateNodeClientInputs validates the input parameters for node client creation +func (c *command) validateNodeClientInputs(useDeploymentType bool) error { + namespace := c.globalConfig.GetString(optionNameNamespace) + clusterName := c.globalConfig.GetString(optionNameClusterName) + + if clusterName == "" && namespace == "" { + return errors.New("either cluster name or namespace must be provided") + } + + if c.globalConfig.IsSet(optionNameNamespace) && namespace == "" { + return errors.New("namespace cannot be empty if set") + } + + if namespace == "" && useDeploymentType && !isValidDeploymentType(c.globalConfig.GetString(optionNameDeploymentType)) { + return errors.New("unsupported deployment type: must be 'beekeeper' or 'helm'") + } + + // Note: Namespace will be available either from: + // 1. Explicit configuration (optionNameNamespace) + // 2. Cluster setup (cluster.Namespace()) + // This ensures all K8s operations (pods, statefulsets, services, ingress) have the required namespace + + return nil +} + +// extractNodeClientConfig extracts configuration values from global config +func (c *command) extractNodeClientConfig() nodeClientConfig { + return nodeClientConfig{ + namespace: c.globalConfig.GetString(optionNameNamespace), + clusterName: c.globalConfig.GetString(optionNameClusterName), + labelSelector: c.globalConfig.GetString(optionNameLabelSelector), + deploymentType: c.globalConfig.GetString(optionNameDeploymentType), + inCluster: c.globalConfig.GetBool(optionNameInCluster), + nodeGroups: c.globalConfig.GetStringSlice(optionNameNodeGroups), + } +} + +// setupBeeClients sets up bee clients if cluster is specified +func (c *command) setupBeeClients(ctx context.Context, clusterName string) (map[string]*bee.Client, string, error) { + if clusterName == "" { + return nil, "", nil + } + + cluster, err := c.setupCluster(ctx, clusterName, false) + if err != nil { + return nil, "", fmt.Errorf("setting up cluster %s: %w", clusterName, err) + } + + beeClients, err := cluster.NodesClients(ctx) + if err != nil { + return nil, "", fmt.Errorf("failed to retrieve node clients: %w", err) + } + + return beeClients, cluster.Namespace(), nil +} + +// determineDiscoveryType determines the discovery type based on available resources +func (c *command) determineDiscoveryType(beeClients map[string]*bee.Client, namespace string) node.DiscoveryType { + if len(beeClients) > 0 { + return node.DiscoveryTypeBeeClients + } + + if namespace != "" { + return node.DiscoveryTypeNamespace + } + + return node.DiscoveryTypeNamespace +} + +// buildNodeClient creates the node client with the provided configuration +func (c *command) buildNodeClient(beeClients map[string]*bee.Client, config nodeClientConfig, discoveryType node.DiscoveryType) *node.Client { + return node.New(&node.ClientConfig{ + Log: c.log, + HTTPClient: c.httpClient, + K8sClient: c.k8sClient, + BeeClients: beeClients, + Namespace: config.namespace, + LabelSelector: config.labelSelector, + DeploymentType: node.DeploymentType(config.deploymentType), + InCluster: config.inCluster, + DiscoveryType: discoveryType, + NodeGroups: config.nodeGroups, + }) +} diff --git a/pkg/node/client.go b/pkg/node/client.go index d029fec2..da850d73 100644 --- a/pkg/node/client.go +++ b/pkg/node/client.go @@ -1,3 +1,8 @@ +// Package node provides node discovery and management capabilities for the beekeeper system. +// It supports multiple discovery methods: +// 1. BeeClients-based discovery (using pre-configured orchestration clients) +// 2. Namespace-based discovery (using K8s services/ingress with label selectors) +// 3. StatefulSet-based discovery (discovering nodes from statefulset names and replica counts) package node import ( @@ -20,6 +25,14 @@ const ( DeploymentTypeHelm DeploymentType = "helm" ) +type DiscoveryType string + +const ( + DiscoveryTypeBeeClients DiscoveryType = "beeclients" + DiscoveryTypeNamespace DiscoveryType = "namespace" + DiscoveryTypeStatefulSet DiscoveryType = "statefulset" +) + var ErrNodesNotFound = fmt.Errorf("nodes not found") type NodeProvider interface { @@ -36,8 +49,11 @@ type ClientConfig struct { LabelSelector string DeploymentType DeploymentType InCluster bool - UseNamespace bool // Overrides the usage of the bee clients - NodeGroups []string // Node groups for filtering nodes (only used with beekeeper deployment) + // Discovery method selection + DiscoveryType DiscoveryType // Choose discovery method: beeclients, namespace, or statefulset + NodeGroups []string // Node groups for filtering nodes (only used with beekeeper deployment) + // StatefulSet-based discovery options + StatefulSetNames []string // Names of statefulsets to discover nodes from } type Client struct { @@ -49,8 +65,10 @@ type Client struct { labelSelector string deploymentType DeploymentType inCluster bool - useNamespace bool + discoveryType DiscoveryType nodeGroups []string + // StatefulSet-based discovery fields + statefulSetNames []string } func New(cfg *ClientConfig) *Client { @@ -67,16 +85,17 @@ func New(cfg *ClientConfig) *Client { } return &Client{ - log: cfg.Log, - httpClient: cfg.HTTPClient, - k8sClient: cfg.K8sClient, - beeClients: cfg.BeeClients, - namespace: cfg.Namespace, - labelSelector: cfg.LabelSelector, - deploymentType: cfg.DeploymentType, - inCluster: cfg.InCluster, - useNamespace: cfg.UseNamespace, - nodeGroups: cfg.NodeGroups, + log: cfg.Log, + httpClient: cfg.HTTPClient, + k8sClient: cfg.K8sClient, + beeClients: cfg.BeeClients, + namespace: cfg.Namespace, + labelSelector: cfg.LabelSelector, + deploymentType: cfg.DeploymentType, + inCluster: cfg.InCluster, + discoveryType: cfg.DiscoveryType, + nodeGroups: cfg.NodeGroups, + statefulSetNames: cfg.StatefulSetNames, } } @@ -95,25 +114,38 @@ func (sc *Client) GetNodes(ctx context.Context) (nodes NodeList, err error) { sc.log.Infof("found %d nodes", len(nodes)) }() - if sc.useNamespace && sc.namespace != "" { - return sc.getNamespaceNodes(ctx) - } + // Choose discovery method based on configuration + switch sc.discoveryType { + case DiscoveryTypeStatefulSet: + // Validate configuration before proceeding + if err := sc.ValidateStatefulSetDiscovery(); err != nil { + return nil, fmt.Errorf("statefulset discovery validation failed: %w", err) + } + return sc.getStatefulSetNodes(ctx) - if len(sc.beeClients) == 0 { - return nil, fmt.Errorf("bee clients not provided") - } + case DiscoveryTypeNamespace: + if sc.namespace == "" { + return nil, fmt.Errorf("namespace not provided for namespace discovery") + } + return sc.getNamespaceNodes(ctx) - filteredClients := sc.beeClients.FilterByNodeGroups(sc.nodeGroups) - if len(filteredClients) == 0 { - return nil, ErrNodesNotFound - } + case DiscoveryTypeBeeClients: + if len(sc.beeClients) == 0 { + return nil, fmt.Errorf("bee clients not provided") + } + filteredClients := sc.beeClients.FilterByNodeGroups(sc.nodeGroups) + if len(filteredClients) == 0 { + return nil, ErrNodesNotFound + } + nodes = make(NodeList, 0, len(filteredClients)) + for _, beeClient := range filteredClients { + nodes = append(nodes, *NewNode(beeClient.API(), sc.nodeName(beeClient.Name()))) + } + return nodes.Sort(), nil - nodes = make(NodeList, 0, len(filteredClients)) - for _, beeClient := range filteredClients { - nodes = append(nodes, *NewNode(beeClient.API(), sc.nodeName(beeClient.Name()))) + default: + return nil, fmt.Errorf("unknown discovery type: %s", sc.discoveryType) } - - return nodes.Sort(), nil } func (sc *Client) getNamespaceNodes(ctx context.Context) (nodes []Node, err error) { @@ -200,3 +232,181 @@ func (sc *Client) nodeName(name string) string { } return name } + +// getStatefulSetNodes discovers nodes based on statefulset names and their replicas +func (sc *Client) getStatefulSetNodes(ctx context.Context) ([]Node, error) { + if sc.k8sClient == nil { + return nil, fmt.Errorf("k8s client not provided") + } + + if sc.namespace == "" { + return nil, fmt.Errorf("namespace not provided") + } + + if len(sc.statefulSetNames) == 0 { + return nil, fmt.Errorf("no statefulset names provided") + } + + sc.log.Infof("discovering nodes from %d statefulsets: %v", len(sc.statefulSetNames), sc.statefulSetNames) + + var allNodes []Node + for _, ssName := range sc.statefulSetNames { + nodes, err := sc.getNodesFromStatefulSet(ctx, ssName) + if err != nil { + sc.log.Warningf("failed to get nodes from statefulset %s: %v", ssName, err) + continue + } + allNodes = append(allNodes, nodes...) + } + + if len(allNodes) == 0 { + return nil, ErrNodesNotFound + } + + return allNodes, nil +} + +// getNodesFromStatefulSet discovers all nodes for a specific statefulset +func (sc *Client) getNodesFromStatefulSet(ctx context.Context, statefulSetName string) ([]Node, error) { + // Get the statefulset to determine replica count + ss, err := sc.k8sClient.StatefulSet.Get(ctx, statefulSetName, sc.namespace) + if err != nil { + return nil, fmt.Errorf("failed to get statefulset %s: %w", statefulSetName, err) + } + + replicas := int32(1) + if ss.Spec.Replicas != nil { + replicas = *ss.Spec.Replicas + } + + sc.log.Debugf("statefulset %s has %d replicas", statefulSetName, replicas) + + var nodes []Node + for i := int32(0); i < replicas; i++ { + podName := fmt.Sprintf("%s-%d", statefulSetName, i) + node, err := sc.createNodeFromPod(ctx, podName, statefulSetName, int(i)) + if err != nil { + sc.log.Warningf("failed to create node for pod %s: %v", podName, err) + continue + } + nodes = append(nodes, *node) + } + + return nodes, nil +} + +// createNodeFromPod creates a Node from a pod, determining the endpoint via service or direct pod access +func (sc *Client) createNodeFromPod(ctx context.Context, podName, statefulSetName string, replicaIndex int) (*Node, error) { + // Try to get the pod to verify it exists + pod, err := sc.k8sClient.Pods.Get(ctx, podName, sc.namespace) + if err != nil { + return nil, fmt.Errorf("failed to get pod %s: %w", podName, err) + } + + // Determine the endpoint + var endpoint string + var nodeName string + + if sc.inCluster { + // In-cluster: use service discovery + svcNodes, err := sc.k8sClient.Service.GetNodes(ctx, sc.namespace, fmt.Sprintf("app.kubernetes.io/instance=%s", statefulSetName)) + if err == nil && len(svcNodes) > 0 { + // Use service endpoint + endpoint = svcNodes[0].Endpoint + nodeName = sc.nodeNameFromStatefulSet(statefulSetName, replicaIndex) + } else { + // Fallback to pod IP + if pod.Status.PodIP == "" { + return nil, fmt.Errorf("pod %s has no IP address", podName) + } + endpoint = fmt.Sprintf("http://%s:1633", pod.Status.PodIP) // Assuming bee API port + nodeName = sc.nodeNameFromStatefulSet(statefulSetName, replicaIndex) + } + } else { + // External access: try ingress first, then pod IP + ingressNodes, err := sc.k8sClient.Ingress.GetNodes(ctx, sc.namespace, fmt.Sprintf("app.kubernetes.io/instance=%s", statefulSetName)) + if err == nil && len(ingressNodes) > 0 { + endpoint = fmt.Sprintf("http://%s", ingressNodes[0].Host) + nodeName = sc.nodeNameFromStatefulSet(statefulSetName, replicaIndex) + } else { + // Fallback to pod IP + if pod.Status.PodIP == "" { + return nil, fmt.Errorf("pod %s has no IP address", podName) + } + endpoint = fmt.Sprintf("http://%s:1633", pod.Status.PodIP) + nodeName = sc.nodeNameFromStatefulSet(statefulSetName, replicaIndex) + } + } + + // Parse endpoint and create API client + parsedURL, err := url.Parse(endpoint) + if err != nil { + return nil, fmt.Errorf("failed to parse endpoint %s: %w", endpoint, err) + } + + apiClient, err := api.NewClient(parsedURL, sc.httpClient) + if err != nil { + return nil, fmt.Errorf("failed to create API client for %s: %w", endpoint, err) + } + + return NewNode(apiClient, nodeName), nil +} + +// nodeNameFromStatefulSet creates a node name from statefulset name and replica index +func (sc *Client) nodeNameFromStatefulSet(statefulSetName string, replicaIndex int) string { + return fmt.Sprintf("%s-%d", statefulSetName, replicaIndex) +} + +// GetNodesByStatefulSets is a convenience method to get nodes from specific statefulsets +func (sc *Client) GetNodesByStatefulSets(ctx context.Context, statefulSetNames []string) (NodeList, error) { + if sc.k8sClient == nil { + return nil, fmt.Errorf("k8s client not provided") + } + + if sc.namespace == "" { + return nil, fmt.Errorf("namespace not provided") + } + + if len(statefulSetNames) == 0 { + return nil, fmt.Errorf("no statefulset names provided") + } + + sc.log.Infof("discovering nodes from statefulsets: %v", statefulSetNames) + + var allNodes []Node + for _, ssName := range statefulSetNames { + nodes, err := sc.getNodesFromStatefulSet(ctx, ssName) + if err != nil { + sc.log.Warningf("failed to get nodes from statefulset %s: %v", ssName, err) + continue + } + allNodes = append(allNodes, nodes...) + } + + if len(allNodes) == 0 { + return nil, ErrNodesNotFound + } + + return NodeList(allNodes).Sort(), nil +} + +// ValidateStatefulSetDiscovery validates the configuration for statefulset discovery +func (sc *Client) ValidateStatefulSetDiscovery() error { + if sc.discoveryType != DiscoveryTypeStatefulSet { + return nil // Not using statefulset discovery, no validation needed + } + + if sc.k8sClient == nil { + return fmt.Errorf("k8s client is required for statefulset discovery") + } + + if sc.namespace == "" { + return fmt.Errorf("namespace is required for statefulset discovery") + } + + if len(sc.statefulSetNames) == 0 { + return fmt.Errorf("statefulset names must be provided for statefulset discovery") + } + + return nil +}