Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions src/pkg/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pkg
import (
"fmt"
"strings"
"sync"
"time"

"github.com/go-resty/resty/v2"
Expand All @@ -13,26 +14,28 @@ import (
)

var (
_version string
_clientRest *resty.Client
_clientGQL *opslevel.Client
_version string
_clientRest *resty.Client
_clientGQL *opslevel.Client
_clientRestOnce sync.Once
_clientGQLOnce sync.Once
)

func SetClientVersion(version string) {
_version = version
}

func NewRestClient() *resty.Client {
if _clientRest == nil {
_clientRestOnce.Do(func() {
_clientRest = opslevel.NewRestClient(opslevel.SetURL(viper.GetString("api-url")))
}
})
return _clientRest
}

func NewGraphClient() *opslevel.Client {
if _clientGQL == nil {
_clientGQLOnce.Do(func() {
_clientGQL = newGraphClient()
}
})
return _clientGQL
}

Expand All @@ -57,7 +60,6 @@ func newGraphClient() *opslevel.Client {
cobra.CheckErr(clientErr)
}
}
cobra.CheckErr(clientErr)

return client
}
59 changes: 33 additions & 26 deletions src/pkg/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ import (
"github.com/spf13/viper"
)

const (
ContainerNameHelper = "helper"
ContainerNameJob = "job"
)

var (
ImageTagVersion string
k8sValidated bool
Expand Down Expand Up @@ -204,7 +209,7 @@ func (s *JobRunner) getPodObject(identifier string, labels map[string]string, jo
NodeSelector: s.podConfig.NodeSelector,
InitContainers: []corev1.Container{
{
Name: "helper",
Name: ContainerNameHelper,
Image: "public.ecr.aws/opslevel/opslevel-runner:v2024.1.3", // TODO: fmt.Sprintf("public.ecr.aws/opslevel/opslevel-runner:v%s", ImageTagVersion),
ImagePullPolicy: s.podConfig.PullPolicy,
Command: []string{
Expand All @@ -223,7 +228,7 @@ func (s *JobRunner) getPodObject(identifier string, labels map[string]string, jo
},
Containers: []corev1.Container{
{
Name: "job",
Name: ContainerNameJob,
Image: job.Image,
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{
Expand Down Expand Up @@ -296,26 +301,26 @@ func (s *JobRunner) Run(ctx context.Context, job opslevel.RunnerJob, stdout, std
}
}
// TODO: manage pods based on image for re-use?
cfgMap, err := s.CreateConfigMap(s.getConfigMapObject(identifier, job))
defer s.DeleteConfigMap(cfgMap) // TODO: if we reuse pods then delete should not happen?
cfgMap, err := s.CreateConfigMap(ctx, s.getConfigMapObject(identifier, job))
defer s.DeleteConfigMap(context.Background(), cfgMap) // Use Background for cleanup to ensure it completes
if err != nil {
return JobOutcome{
Message: fmt.Sprintf("failed to create configmap REASON: %s", err),
Outcome: opslevel.RunnerJobOutcomeEnumFailed,
}
}

pdb, err := s.CreatePDB(s.getPBDObject(identifier, labelSelector))
defer s.DeletePDB(pdb) // TODO: if we reuse pods then delete should not happen?
pdb, err := s.CreatePDB(ctx, s.getPBDObject(identifier, labelSelector))
defer s.DeletePDB(context.Background(), pdb) // Use Background for cleanup to ensure it completes
if err != nil {
return JobOutcome{
Message: fmt.Sprintf("failed to create pod disruption budget REASON: %s", err),
Outcome: opslevel.RunnerJobOutcomeEnumFailed,
}
}

pod, err := s.CreatePod(s.getPodObject(identifier, labels, job))
defer s.DeletePod(pod) // TODO: if we reuse pods then delete should not happen
pod, err := s.CreatePod(ctx, s.getPodObject(identifier, labels, job))
defer s.DeletePod(context.Background(), pod) // Use Background for cleanup to ensure it completes
if err != nil {
return JobOutcome{
Message: fmt.Sprintf("failed to create pod REASON: %s", err),
Expand All @@ -324,7 +329,7 @@ func (s *JobRunner) Run(ctx context.Context, job opslevel.RunnerJob, stdout, std
}

timeout := time.Second * time.Duration(viper.GetInt("job-pod-max-wait"))
waitErr := s.WaitForPod(pod, timeout)
waitErr := s.WaitForPod(ctx, pod, timeout)
if waitErr != nil {
// TODO: get pod status or status message?
return JobOutcome{
Expand Down Expand Up @@ -410,24 +415,24 @@ func (s *JobRunner) Exec(ctx context.Context, stdout, stderr *SafeBuffer, pod *c
})
}

func (s *JobRunner) CreateConfigMap(config *corev1.ConfigMap) (*corev1.ConfigMap, error) {
func (s *JobRunner) CreateConfigMap(ctx context.Context, config *corev1.ConfigMap) (*corev1.ConfigMap, error) {
s.logger.Trace().Msgf("Creating configmap %s/%s ...", config.Namespace, config.Name)
return s.clientset.CoreV1().ConfigMaps(config.Namespace).Create(context.TODO(), config, metav1.CreateOptions{})
return s.clientset.CoreV1().ConfigMaps(config.Namespace).Create(ctx, config, metav1.CreateOptions{})
}

func (s *JobRunner) CreatePDB(config *policyv1.PodDisruptionBudget) (*policyv1.PodDisruptionBudget, error) {
func (s *JobRunner) CreatePDB(ctx context.Context, config *policyv1.PodDisruptionBudget) (*policyv1.PodDisruptionBudget, error) {
s.logger.Trace().Msgf("Creating pod disruption budget %s/%s ...", config.Namespace, config.Name)
return s.clientset.PolicyV1().PodDisruptionBudgets(config.Namespace).Create(context.TODO(), config, metav1.CreateOptions{})
return s.clientset.PolicyV1().PodDisruptionBudgets(config.Namespace).Create(ctx, config, metav1.CreateOptions{})
}

func (s *JobRunner) CreatePod(config *corev1.Pod) (*corev1.Pod, error) {
func (s *JobRunner) CreatePod(ctx context.Context, config *corev1.Pod) (*corev1.Pod, error) {
s.logger.Trace().Msgf("Creating pod %s/%s ...", config.Namespace, config.Name)
return s.clientset.CoreV1().Pods(config.Namespace).Create(context.TODO(), config, metav1.CreateOptions{})
return s.clientset.CoreV1().Pods(config.Namespace).Create(ctx, config, metav1.CreateOptions{})
}

func (s *JobRunner) isPodInDesiredState(podConfig *corev1.Pod) wait.ConditionWithContextFunc {
return func(context.Context) (bool, error) {
pod, err := s.clientset.CoreV1().Pods(podConfig.Namespace).Get(context.TODO(), podConfig.Name, metav1.GetOptions{})
return func(ctx context.Context) (bool, error) {
pod, err := s.clientset.CoreV1().Pods(podConfig.Namespace).Get(ctx, podConfig.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
Expand All @@ -441,30 +446,32 @@ func (s *JobRunner) isPodInDesiredState(podConfig *corev1.Pod) wait.ConditionWit
}
}

func (s *JobRunner) WaitForPod(podConfig *corev1.Pod, timeout time.Duration) error {
func (s *JobRunner) WaitForPod(ctx context.Context, podConfig *corev1.Pod, timeout time.Duration) error {
s.logger.Debug().Msgf("Waiting for pod %s/%s to be ready in %s ...", podConfig.Namespace, podConfig.Name, timeout)
return wait.PollUntilContextTimeout(context.Background(), time.Second, timeout, false, s.isPodInDesiredState(podConfig))
waitCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return wait.PollUntilContextTimeout(waitCtx, time.Second, timeout, false, s.isPodInDesiredState(podConfig))
}

func (s *JobRunner) DeleteConfigMap(config *corev1.ConfigMap) {
func (s *JobRunner) DeleteConfigMap(ctx context.Context, config *corev1.ConfigMap) {
s.logger.Trace().Msgf("Deleting configmap %s/%s ...", config.Namespace, config.Name)
err := s.clientset.CoreV1().ConfigMaps(config.Namespace).Delete(context.TODO(), config.Name, metav1.DeleteOptions{})
err := s.clientset.CoreV1().ConfigMaps(config.Namespace).Delete(ctx, config.Name, metav1.DeleteOptions{})
if err != nil {
s.logger.Error().Err(err).Msgf("received error on ConfigMap deletion")
}
}

func (s *JobRunner) DeletePDB(config *policyv1.PodDisruptionBudget) {
s.logger.Trace().Msgf("Deleting configmap %s/%s ...", config.Namespace, config.Name)
err := s.clientset.PolicyV1().PodDisruptionBudgets(config.Namespace).Delete(context.TODO(), config.Name, metav1.DeleteOptions{})
func (s *JobRunner) DeletePDB(ctx context.Context, config *policyv1.PodDisruptionBudget) {
s.logger.Trace().Msgf("Deleting PDB %s/%s ...", config.Namespace, config.Name)
err := s.clientset.PolicyV1().PodDisruptionBudgets(config.Namespace).Delete(ctx, config.Name, metav1.DeleteOptions{})
if err != nil {
s.logger.Error().Err(err).Msgf("received error on PDB deletion")
}
}

func (s *JobRunner) DeletePod(config *corev1.Pod) {
func (s *JobRunner) DeletePod(ctx context.Context, config *corev1.Pod) {
s.logger.Trace().Msgf("Deleting pod %s/%s ...", config.Namespace, config.Name)
err := s.clientset.CoreV1().Pods(config.Namespace).Delete(context.TODO(), config.Name, metav1.DeleteOptions{})
err := s.clientset.CoreV1().Pods(config.Namespace).Delete(ctx, config.Name, metav1.DeleteOptions{})
if err != nil {
s.logger.Error().Err(err).Msgf("received error on Pod deletion")
}
Expand Down
64 changes: 35 additions & 29 deletions src/pkg/leaderElection.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,40 +48,46 @@ func RunLeaderElection(ctx context.Context, runnerId opslevel.ID, lockName, lock
isLeader = true
logger.Info().Msgf("leader is %s", lockIdentity)
deploymentsClient := client.AppsV1().Deployments(lockNamespace)
// Not allowing this sleep interval to be configurable for now
// to prevent it being set too low and causing thundering herd
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
for {
// Not allowing this sleep interval to be configurable for now
// to prevent it being set too low and causing thundering herd
time.Sleep(60 * time.Second)
result, getErr := deploymentsClient.Get(context.TODO(), lockName, metav1.GetOptions{})
if getErr != nil {
logger.Error().Err(getErr).Msg("Failed to get latest version of Deployment")
continue
}
replicaCount, err := getReplicaCount(runnerId, int(*result.Spec.Replicas))
if err != nil {
logger.Error().Err(err).Msg("Failed to get replica count")
continue
}
logger.Info().Msgf("Ideal replica count is %v", replicaCount)
// Retry is being used below to prevent deployment update from overwriting a
// separate and unrelated update action per client-go's recommendation:
// https://github.com/kubernetes/client-go/blob/master/examples/create-update-delete-deployment/main.go#L117
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
result, getErr := deploymentsClient.Get(context.TODO(), lockName, metav1.GetOptions{})
select {
case <-c.Done():
logger.Info().Msg("Leader election context cancelled, stopping scaling loop")
return
case <-ticker.C:
result, getErr := deploymentsClient.Get(c, lockName, metav1.GetOptions{})
if getErr != nil {
logger.Error().Err(getErr).Msg("Failed to get latest version of Deployment")
return getErr
continue
}
replicaCount, err := getReplicaCount(runnerId, int(*result.Spec.Replicas))
if err != nil {
logger.Error().Err(err).Msg("Failed to get replica count")
continue
}
result.Spec.Replicas = &replicaCount
_, updateErr := deploymentsClient.Update(context.TODO(), result, metav1.UpdateOptions{})
return updateErr
})
if retryErr != nil {
logger.Error().Err(retryErr).Msg("Failed to set replica count")
continue
logger.Info().Msgf("Ideal replica count is %v", replicaCount)
// Retry is being used below to prevent deployment update from overwriting a
// separate and unrelated update action per client-go's recommendation:
// https://github.com/kubernetes/client-go/blob/master/examples/create-update-delete-deployment/main.go#L117
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
result, getErr := deploymentsClient.Get(c, lockName, metav1.GetOptions{})
if getErr != nil {
logger.Error().Err(getErr).Msg("Failed to get latest version of Deployment")
return getErr
}
result.Spec.Replicas = &replicaCount
_, updateErr := deploymentsClient.Update(c, result, metav1.UpdateOptions{})
return updateErr
})
if retryErr != nil {
logger.Error().Err(retryErr).Msg("Failed to set replica count")
continue
}
logger.Info().Msgf("Successfully set replica count to %v", replicaCount)
}
logger.Info().Msgf("Successfully set replica count to %v", replicaCount)

}
},
OnStoppedLeading: func() {
Expand Down
21 changes: 15 additions & 6 deletions src/pkg/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func (s *LogStreamer) GetLogBuffer() []string {

func (s *LogStreamer) Run(ctx context.Context) {
s.logger.Trace().Msg("Starting log streamer ...")
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
Expand All @@ -60,7 +62,7 @@ func (s *LogStreamer) Run(ctx context.Context) {
case <-s.quit:
s.logger.Trace().Msg("Shutting down log streamer ...")
return
default:
case <-ticker.C:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main offender for the performance issue, doing a busy-wait for log streaming. This makes the cpu go from 100% -> less than 1% per job.

for len(s.Stderr.String()) > 0 {
line, err := s.Stderr.ReadString('\n')
if err == nil {
Expand Down Expand Up @@ -89,12 +91,19 @@ func (s *LogStreamer) Run(ctx context.Context) {

func (s *LogStreamer) Flush(outcome JobOutcome) {
s.logger.Trace().Msg("Starting log streamer flush ...")
for len(s.Stderr.String()) > 0 {
time.Sleep(200 * time.Millisecond)
}
for len(s.Stdout.String()) > 0 {
time.Sleep(200 * time.Millisecond)
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
timeout := time.After(30 * time.Second)
for len(s.Stderr.String()) > 0 || len(s.Stdout.String()) > 0 {
select {
case <-ticker.C:
// Continue waiting
case <-timeout:
s.logger.Warn().Msg("Flush timeout reached, proceeding with remaining data")
goto done
}
}
done:
s.logger.Trace().Msg("Finished log streamer flush ...")
s.quit <- true
time.Sleep(200 * time.Millisecond) // Allow 'Run' goroutine to quit
Expand Down
3 changes: 1 addition & 2 deletions src/pkg/opslevelAppendLogProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,5 @@ func (s *OpsLevelAppendLogProcessor) submit() {
}
}
s.logLinesBytesSize = 0
s.logLines = nil
s.logLines = []string{}
s.logLines = s.logLines[:0]
}
Loading