Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions cloud/linode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const (
DefaultLinodeAPIURL = "https://api.linode.com"
)

type TokenProvider func(context.Context) (string, error)

type Client interface {
GetInstance(context.Context, int) (*linodego.Instance, error)
ListInstances(context.Context, *linodego.ListOptions) ([]linodego.Instance, error)
Expand Down Expand Up @@ -75,21 +77,48 @@ type Client interface {
// linodego.Client implements Client
var _ Client = (*linodego.Client)(nil)

// New creates a new linode client with a given token and default timeout
func New(token string, timeout time.Duration) (*linodego.Client, error) {
type tokenTransport struct {
base http.RoundTripper
tokenProvider TokenProvider
}

func (t *tokenTransport) RoundTrip(req *http.Request) (*http.Response, error) {
token, err := t.tokenProvider(req.Context())
if err != nil {
return nil, err
}

clone := req.Clone(req.Context())
clone.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))

return t.base.RoundTrip(clone)
}

// New creates a new linode client with a given token and default timeout.
func New(token string, timeout time.Duration, tokenProvider TokenProvider) (*linodego.Client, error) {
userAgent := fmt.Sprintf("linode-cloud-controller-manager %s", linodego.DefaultUserAgent)
apiURL := os.Getenv("LINODE_URL")
if apiURL == "" {
apiURL = DefaultLinodeAPIURL
}
if tokenProvider == nil {
tokenProvider = func(context.Context) (string, error) {
return token, nil
}
}

httpClient := &http.Client{Timeout: timeout}
httpClient.Transport = &tokenTransport{
base: http.DefaultTransport,
tokenProvider: tokenProvider,
}

linodeClient := linodego.NewClient(&http.Client{Timeout: timeout})
linodeClient := linodego.NewClient(httpClient)
client, err := linodeClient.UseURL(apiURL)
if err != nil {
return nil, err
}
client.SetUserAgent(userAgent)
client.SetToken(token)

klog.V(3).Infof("Linode client created with default timeout of %v", timeout)
return client, nil
Expand Down
154 changes: 140 additions & 14 deletions cloud/linode/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@
"os"
"regexp"
"strconv"
"sync"
"time"

"golang.org/x/exp/slices"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog/v2"

Expand All @@ -21,12 +26,16 @@

const (
// The name of this cloudprovider
ProviderName = "linode"
accessTokenEnv = "LINODE_API_TOKEN"
regionEnv = "LINODE_REGION"
ciliumLBType = "cilium-bgp"
nodeBalancerLBType = "nodebalancer"
tokenHealthCheckPeriod = 5 * time.Minute
ProviderName = "linode"
regionEnv = "LINODE_REGION"
defaultTokenSecretName = "ccm-linode"
defaultTokenSecretKey = "apiToken"
defaultTokenSecretNamespace = "kube-system"
tokenSecretCacheTTLEnv = "LINODE_API_TOKEN_CACHE_TTL_SECONDS"
defaultTokenSecretCacheTTL = time.Minute
ciliumLBType = "cilium-bgp"
nodeBalancerLBType = "nodebalancer"
tokenHealthCheckPeriod = 5 * time.Minute
)

var supportedLoadBalancerTypes = []string{ciliumLBType, nodeBalancerLBType}
Expand All @@ -43,8 +52,69 @@
instanceCache *services.Instances
ipHolderCharLimit int = 23
NodeBalancerPrefixCharLimit int = 19

newKubernetesClient = defaultKubernetesClient
)

type tokenSecretProvider struct {
kubeClient kubernetes.Interface
namespace string
name string
key string
now func() time.Time
cacheTTL time.Duration

mu sync.RWMutex
cachedToken string
expiresAt time.Time
}

func (t *tokenSecretProvider) String() string {
return fmt.Sprintf("%s/%s[%s]", t.namespace, t.name, t.key)
}

func (t *tokenSecretProvider) nowTime() time.Time {
if t.now != nil {
return t.now()
}

return time.Now()
}

func (t *tokenSecretProvider) GetToken(ctx context.Context) (string, error) {
now := t.nowTime()

t.mu.RLock()
if t.cachedToken != "" && now.Before(t.expiresAt) {
token := t.cachedToken
t.mu.RUnlock()
return token, nil
}
t.mu.RUnlock()

secret, err := t.kubeClient.CoreV1().Secrets(t.namespace).Get(ctx, t.name, metav1.GetOptions{})
if err != nil {
return "", fmt.Errorf("failed to get secret %s: %w", t.String(), err)
}

rawToken, found := secret.Data[t.key]
if !found {
return "", fmt.Errorf("secret %s does not contain key %q", t.String(), t.key)
}

token := string(rawToken)
if token == "" {
return "", fmt.Errorf("secret %s key %q is empty", t.String(), t.key)
}
Comment on lines +105 to +108

t.mu.Lock()
t.cachedToken = token
t.expiresAt = t.nowTime().Add(t.cacheTTL)
t.mu.Unlock()

return token, nil
}

func init() {
registerMetrics()
cloudprovider.RegisterCloudProvider(
Expand All @@ -56,8 +126,8 @@

// newLinodeClientWithPrometheus creates a new client kept in its own local
// scope and returns an instrumented one that should be used and passed around
func newLinodeClientWithPrometheus(apiToken string, timeout time.Duration) (client.Client, error) {
linodeClient, err := client.New(apiToken, timeout)
func newLinodeClientWithPrometheus(apiToken string, timeout time.Duration, tokenProvider client.TokenProvider) (client.Client, error) {
linodeClient, err := client.New(apiToken, timeout, tokenProvider)
if err != nil {
return nil, fmt.Errorf("client was not created successfully: %w", err)
}
Expand All @@ -69,27 +139,83 @@
return client.NewClientWithPrometheus(linodeClient), nil
}

func defaultKubernetesClient() (kubernetes.Interface, error) {
var (
kubeConfig *rest.Config
err error
)

kubeconfigFlag := options.Options.KubeconfigFlag
if kubeconfigFlag == nil || kubeconfigFlag.Value.String() == "" {
kubeConfig, err = rest.InClusterConfig()
} else {
kubeConfig, err = clientcmd.BuildConfigFromFlags("", kubeconfigFlag.Value.String())
}
if err != nil {
return nil, err
}

return kubernetes.NewForConfig(kubeConfig)
}

func tokenSecretCacheTTLFromEnv() time.Duration {
tokenCacheTTL := defaultTokenSecretCacheTTL
if raw, ok := os.LookupEnv(tokenSecretCacheTTLEnv); ok {
if ttlSeconds, err := strconv.Atoi(raw); err == nil && ttlSeconds > 0 {
tokenCacheTTL = time.Duration(ttlSeconds) * time.Second
}
}

return tokenCacheTTL
}

func newCloud() (cloudprovider.Interface, error) {
region := os.Getenv(regionEnv)
if region == "" {
return nil, fmt.Errorf("%s must be set in the environment (use a k8s secret)", regionEnv)
}

// Read environment variables (from secrets)
apiToken := os.Getenv(accessTokenEnv)
if apiToken == "" {
return nil, fmt.Errorf("%s must be set in the environment (use a k8s secret)", accessTokenEnv)
secretName := options.Options.LinodeAPITokenSecretName
if secretName == "" {
secretName = defaultTokenSecretName
}
secretKey := options.Options.LinodeAPITokenSecretKey
if secretKey == "" {
secretKey = defaultTokenSecretKey
}
secretNamespace := options.Options.LinodeAPITokenSecretNamespace
if secretNamespace == "" {
secretNamespace = defaultTokenSecretNamespace
}

kubeClient, err := newKubernetesClient()
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes client for token secret retrieval: %w", err)
}

tokenProvider := tokenSecretProvider{
kubeClient: kubeClient,
namespace: secretNamespace,
name: secretName,
key: secretKey,
}

apiToken, err := tokenProvider.GetToken(context.Background())
if err != nil {
return nil, err
}
Comment on lines +203 to 206

// set timeout used by linodeclient for API calls
timeout := client.DefaultClientTimeout
if raw, ok := os.LookupEnv("LINODE_REQUEST_TIMEOUT_SECONDS"); ok {
if t, err := strconv.Atoi(raw); err == nil && t > 0 {

Check failure on line 211 in cloud/linode/cloud.go

View workflow job for this annotation

GitHub Actions / build-test

shadow: declaration of "err" shadows declaration at line 191 (govet)
timeout = time.Duration(t) * time.Second
}
}

linodeClient, err := newLinodeClientWithPrometheus(apiToken, timeout)
tokenProvider.cacheTTL = tokenSecretCacheTTLFromEnv()

linodeClient, err := newLinodeClientWithPrometheus(apiToken, timeout, tokenProvider.GetToken)
if err != nil {
return nil, err
}
Expand All @@ -104,7 +230,7 @@
}

if !authenticated {
return nil, fmt.Errorf("linode api token %q is invalid", accessTokenEnv)
return nil, fmt.Errorf("linode api token from secret %s is invalid", tokenProvider.String())
}

healthChecker = newHealthChecker(linodeClient, tokenHealthCheckPeriod, options.Options.GlobalStopChannel)
Expand Down
Loading
Loading