diff --git a/src/cmd/cli/command/config.go b/src/cmd/cli/command/config.go index b779112e4..62f9d9a3e 100644 --- a/src/cmd/cli/command/config.go +++ b/src/cmd/cli/command/config.go @@ -101,7 +101,7 @@ var configSetCmd = &cobra.Command{ // 1c. Handle --random flag: generate random values for specified configs envMap = make(map[string]string) for _, name := range args { - envMap[name] = cli.CreateRandomConfigValue() + envMap[name] = cli.CreateRandomConfigValue(session.Stack.Provider) } } else if envFile != "" { // 1d. Handle --env-file flag: read all or specified configs from the file diff --git a/src/pkg/agent/tools/estimate.go b/src/pkg/agent/tools/estimate.go index 9981479b3..0c9a06385 100644 --- a/src/pkg/agent/tools/estimate.go +++ b/src/pkg/agent/tools/estimate.go @@ -15,7 +15,7 @@ import ( type EstimateParams struct { common.LoaderParams DeploymentMode string `json:"deployment_mode,omitempty" jsonschema:"default=affordable,enum=affordable,enum=balanced,enum=high_availability,description=The deployment mode for which to estimate costs (e.g., AFFORDABLE, BALANCED, HIGH_AVAILABILITY)."` - Provider string `json:"provider" jsonschema:"required,enum=aws,enum=gcp description=The cloud provider for which to estimate costs."` + Provider string `json:"provider" jsonschema:"required,enum=aws,enum=gcp,enum=scaleway description=The cloud provider for which to estimate costs."` Region string `json:"region,omitempty" jsonschema:"description=The region in which to estimate costs."` } diff --git a/src/pkg/agent/tools/estimate_test.go b/src/pkg/agent/tools/estimate_test.go index 65807058e..6dbf337fe 100644 --- a/src/pkg/agent/tools/estimate_test.go +++ b/src/pkg/agent/tools/estimate_test.go @@ -132,7 +132,7 @@ func TestHandleEstimateTool(t *testing.T) { setupMock: func(m *MockEstimateCLI) { m.Project = &compose.Project{Name: "test-project"} }, - expectedError: "invalid provider: \"invalid-provider\", not one of [auto defang aws digitalocean gcp azure]", + expectedError: "invalid provider: \"invalid-provider\", not one of [auto defang aws digitalocean gcp azure scaleway]", }, { name: "run_estimate_error", diff --git a/src/pkg/agent/tools/setConfig.go b/src/pkg/agent/tools/setConfig.go index 7ed89f449..ca3395912 100644 --- a/src/pkg/agent/tools/setConfig.go +++ b/src/pkg/agent/tools/setConfig.go @@ -62,7 +62,7 @@ func HandleSetConfig(ctx context.Context, loader client.Loader, params SetConfig if params.Value != "" { return "", errors.New("Both 'random' and 'value' parameters provided; please provide only one") } - value = cli.CreateRandomConfigValue() + value = cli.CreateRandomConfigValue(sc.Stack.Provider) term.Debug("Generated random value for config") } diff --git a/src/pkg/cli/client/byoc/scaleway/README.md b/src/pkg/cli/client/byoc/scaleway/README.md new file mode 100644 index 000000000..67d6a63af --- /dev/null +++ b/src/pkg/cli/client/byoc/scaleway/README.md @@ -0,0 +1,29 @@ +# Scaleway BYOC Integration Notes + +This package is still a preview provider path. When adding or changing +Scaleway behavior, check the surfaces below so provider support does not stop +at the deploy path. + +## Cross-Surface Checklist + +- Provider ID parsing, display names, protobuf mapping, and default regions. +- Stack wizard provider choices and stack-name defaults. +- Agent tool JSON schemas and tests that enumerate provider choices. +- Compose fixups for managed services, models, ports, and config references. +- CD setup, run, status, subscribe, logs, teardown, and state readback. +- Docs pages, provider tables, managed LLM tables, and sample prerequisites. + +## Current Preview Constraints + +- CD runs on Scaleway Serverless Jobs and uses Object Storage for Pulumi state + and `project.pb` readback. +- CD job definitions must be scoped per stack to avoid concurrent deployments + deleting or replacing each other's job definition. +- Scaleway managed LLM support bypasses LiteLLM and injects OpenAI-compatible + environment variables that point at `https://api.scaleway.ai/v1/`. +- Serverless Containers do not support host-mode ports; public HTTP services + must use ingress ports. +- Portless workers use a temporary health shim in the Pulumi provider because + Serverless Containers require an HTTP listener. +- Managed Redis currently requires `REDIS_PASSWORD` Defang config. +- Managed PostgreSQL passwords must satisfy Scaleway's password policy. diff --git a/src/pkg/cli/client/byoc/scaleway/byoc.go b/src/pkg/cli/client/byoc/scaleway/byoc.go new file mode 100644 index 000000000..9972d0769 --- /dev/null +++ b/src/pkg/cli/client/byoc/scaleway/byoc.go @@ -0,0 +1,1147 @@ +package scaleway + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "iter" + "os" + "regexp" + "strings" + "time" + + "connectrpc.com/connect" + pkg "github.com/DefangLabs/defang/src/pkg" + "github.com/DefangLabs/defang/src/pkg/cli/client" + "github.com/DefangLabs/defang/src/pkg/cli/client/byoc" + "github.com/DefangLabs/defang/src/pkg/cli/client/byoc/state" + "github.com/DefangLabs/defang/src/pkg/cli/compose" + "github.com/DefangLabs/defang/src/pkg/clouds/scaleway" + "github.com/DefangLabs/defang/src/pkg/logs" + "github.com/DefangLabs/defang/src/pkg/term" + "github.com/DefangLabs/defang/src/pkg/types" + defangv1 "github.com/DefangLabs/defang/src/protos/io/defang/v1" + "github.com/aws/aws-sdk-go-v2/service/s3" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" +) + +type ByocScaleway struct { + *byoc.ByocBaseClient + + client *scaleway.Client + s3Client *s3.Client + + region string + projectID string + + // CD infrastructure + bucket string + jobDefID string + registryEndpoint string + + // CD run tracking + cdRunID string + cdEtag types.ETag + + // Cockpit token for Loki log queries + cockpitToken string + cockpitLogsEndpoint string +} + +var _ client.Provider = (*ByocScaleway)(nil) + +func NewByocProvider(ctx context.Context, tenantLabel types.TenantLabel, stack string) *ByocScaleway { + b := &ByocScaleway{ + region: os.Getenv("SCW_DEFAULT_REGION"), + } + b.ByocBaseClient = byoc.NewByocBaseClient(tenantLabel, b, stack) + return b +} + +func (*ByocScaleway) Driver() string { + return "scaleway-jobs" +} + +func (b *ByocScaleway) Authenticate(ctx context.Context, interactive bool) error { + scwClient, err := scaleway.NewClientFromEnv() + if err != nil { + return err + } + + if _, err := scwClient.Authenticate(ctx); err != nil { + return err + } + + b.client = scwClient + b.projectID = scwClient.ProjectID + b.region = scwClient.Region + + b.s3Client = scaleway.NewS3Client(scwClient.Region, scwClient.AccessKey, scwClient.SecretKey) + return nil +} + +func (b *ByocScaleway) AccountInfo(ctx context.Context) (*client.AccountInfo, error) { + if b.client == nil { + return nil, errors.New("not authenticated; call Authenticate first") + } + info := b.client.GetAccountInfo() + return &client.AccountInfo{ + AccountID: info.ProjectID, + Provider: client.ProviderScaleway, + Region: info.Region, + }, nil +} + +func (b *ByocScaleway) bucketName() string { + if b.bucket != "" { + return b.bucket + } + return fmt.Sprintf("%s-%s", byoc.CdTaskPrefix, b.PulumiStack) +} + +func (b *ByocScaleway) getSecretName(projectName, name string) string { + // Scaleway secret names must match ^[_a-zA-Z0-9]([-_.a-zA-Z0-9]*[_a-zA-Z0-9])?$ + // Replace path separators from StackDir (e.g., "/Defang/project/stack/KEY") with underscores. + s := strings.TrimLeft(b.StackDir(projectName, name), "/") + return strings.ReplaceAll(s, "/", "_") +} + +func (b *ByocScaleway) environment(projectName string) (map[string]string, error) { + // From https://www.pulumi.com/docs/iac/concepts/state-and-backends/#aws-s3 + // Scaleway S3-compatible storage uses the same s3:// scheme with custom endpoint + defangStateUrl := fmt.Sprintf("s3://%s?endpoint=%s&disableSSL=false&s3ForcePathStyle=true", b.bucketName(), scaleway.S3Endpoint(b.region)) + pulumiBackendKey, pulumiBackendValue, err := byoc.GetPulumiBackend(defangStateUrl) + if err != nil { + return nil, err + } + env := map[string]string{ + "AWS_ACCESS_KEY_ID": b.client.AccessKey, // S3-compatible credentials + "AWS_SECRET_ACCESS_KEY": b.client.SecretKey, // S3-compatible credentials + "AWS_REGION": b.region, // needed for S3 client + "DEFANG_DEBUG": os.Getenv("DEFANG_DEBUG"), + "DEFANG_JSON": os.Getenv("DEFANG_JSON"), + "DEFANG_ORG": string(b.TenantLabel), + "DEFANG_PREFIX": b.Prefix, + "DEFANG_PULUMI_DEBUG": os.Getenv("DEFANG_PULUMI_DEBUG"), + "DEFANG_PULUMI_DIFF": os.Getenv("DEFANG_PULUMI_DIFF"), + "DEFANG_STATE_URL": defangStateUrl, + "PRIVATE_DOMAIN": b.GetPrivateDomain(projectName), + "PROJECT": projectName, + "PULUMI_CONFIG_PASSPHRASE": byoc.PulumiConfigPassphrase, + "PULUMI_COPILOT": "false", + "PULUMI_HOME": "/root/.pulumi", + "PULUMI_SKIP_UPDATE_CHECK": "true", + "SCW_ACCESS_KEY": b.client.AccessKey, + "SCW_SECRET_KEY": b.client.SecretKey, + "SCW_DEFAULT_PROJECT_ID": b.projectID, + "SCW_DEFAULT_REGION": b.region, + "S3_ENDPOINT": scaleway.S3Endpoint(b.region), + "STACK": b.PulumiStack, + pulumiBackendKey: pulumiBackendValue, + } + + if targets := os.Getenv("DEFANG_PULUMI_TARGETS"); targets != "" { + env["DEFANG_PULUMI_TARGETS"] = targets + } + if !term.StdoutCanColor() { + env["NO_COLOR"] = "1" + } + return env, nil +} + +func (b *ByocScaleway) cdSecretName(envName string) string { + name := fmt.Sprintf("%s-%s-%s", byoc.CdTaskPrefix, b.PulumiStack, envName) + name = strings.NewReplacer("/", "-", "_", "-").Replace(name) + if len(name) > 255 { + name = name[:255] + } + return strings.Trim(name, "-") +} + +func (b *ByocScaleway) cdJobName() string { + name := fmt.Sprintf("%s-%s", byoc.CdTaskPrefix, b.PulumiStack) + name = strings.NewReplacer("/", "-", "_", "-").Replace(name) + if len(name) > 255 { + name = name[:255] + } + return strings.Trim(name, "-") +} + +func cdSecretEnv(env map[string]string) map[string]string { + keys := []string{ + "AWS_SECRET_ACCESS_KEY", + "PULUMI_CONFIG_PASSPHRASE", + "SCW_SECRET_KEY", + } + secrets := make(map[string]string, len(keys)) + for _, key := range keys { + if val, ok := env[key]; ok { + secrets[key] = val + } + } + return secrets +} + +func withoutSecretEnv(env map[string]string) map[string]string { + secrets := cdSecretEnv(env) + if len(secrets) == 0 { + return env + } + clean := make(map[string]string, len(env)-len(secrets)) + for key, val := range env { + if _, ok := secrets[key]; !ok { + clean[key] = val + } + } + return clean +} + +func usesScalewayLLM(project *compose.Project) bool { + for _, service := range project.Services { + hasScalewayEndpoint := false + needsOpenAIKey := false + for key, val := range service.Environment { + if val == nil && key == "OPENAI_API_KEY" { + needsOpenAIKey = true + continue + } + if val != nil && *val == "https://api.scaleway.ai/v1/" { + hasScalewayEndpoint = true + } + } + if hasScalewayEndpoint && needsOpenAIKey { + return true + } + } + return false +} + +func (b *ByocScaleway) ensureScalewayLLMAuth(ctx context.Context, project *compose.Project) error { + if !usesScalewayLLM(project) { + return nil + } + + configs, err := b.ListConfig(ctx, &defangv1.ListConfigsRequest{Project: project.Name}) + if err != nil { + return err + } + for _, name := range configs.Names { + if name == "OPENAI_API_KEY" { + return nil + } + } + + term.Infof("Using the Scaleway API key for Managed Inference auth") + return b.PutConfig(ctx, &defangv1.PutConfigRequest{ + Project: project.Name, + Name: "OPENAI_API_KEY", + Value: b.client.SecretKey, + }) +} + +func (b *ByocScaleway) createCDSecretReferences(ctx context.Context, jobDefID string, env map[string]string) error { + secretEnv := cdSecretEnv(env) + if len(secretEnv) == 0 { + return nil + } + refs := make([]scaleway.JobSecretRef, 0, len(secretEnv)) + for _, key := range []string{"AWS_SECRET_ACCESS_KEY", "PULUMI_CONFIG_PASSPHRASE", "SCW_SECRET_KEY"} { + value, ok := secretEnv[key] + if !ok { + continue + } + secret, version, err := b.client.EnsureSecretValue(ctx, b.cdSecretName(key), b.projectID, []byte(value)) + if err != nil { + return scaleway.AnnotateScalewayError(err, fmt.Sprintf("creating CD secret %q", key)) + } + refs = append(refs, scaleway.JobSecretRef{ + SecretManagerID: secret.ID, + SecretManagerVersion: fmt.Sprint(version.Revision), + EnvVarName: key, + }) + } + return b.client.CreateJobSecrets(ctx, jobDefID, refs) +} + +type cdCommand struct { + command []string + delegateDomain string + etag types.ETag + mode defangv1.DeploymentMode + project string + statesUrl string + eventsUrl string +} + +func (b *ByocScaleway) runCdCommand(ctx context.Context, cmd cdCommand) (string, error) { + env, err := b.environment(cmd.project) + if err != nil { + return "", err + } + + if cmd.delegateDomain != "" { + env["DOMAIN"] = cmd.delegateDomain + } else { + env["DOMAIN"] = "dummy.domain" + } + if cmd.etag != "" { + env["DEFANG_ETAG"] = cmd.etag + } + env["DEFANG_MODE"] = strings.ToLower(cmd.mode.String()) + if cmd.statesUrl != "" { + env["DEFANG_STATES_UPLOAD_URL"] = cmd.statesUrl + } + if cmd.eventsUrl != "" { + env["DEFANG_EVENTS_UPLOAD_URL"] = cmd.eventsUrl + } + + if os.Getenv("DEFANG_PULUMI_DIR") != "" { + // Run the cd binary locally from $DEFANG_PULUMI_DIR/cd instead of + // starting it as a Scaleway Serverless Job. Useful for iterating on cd + // code without rebuilding/pushing the cd image. + debugEnv := []string{ + "SCW_ACCESS_KEY=" + b.client.AccessKey, + "SCW_SECRET_KEY=" + b.client.SecretKey, + "SCW_DEFAULT_PROJECT_ID=" + b.projectID, + "SCW_DEFAULT_REGION=" + b.region, + } + for k, v := range env { + debugEnv = append(debugEnv, k+"="+v) + } + if err := byoc.DebugPulumiCD(ctx, debugEnv, cmd.command...); err != nil { + return "", err + } + return "local-debug", nil + } + + run, err := b.client.RunJob(ctx, b.jobDefID, []string{"/app/cd"}, cmd.command, withoutSecretEnv(env)) + if err != nil { + return "", scaleway.AnnotateScalewayError(err, "running CD command") + } + return run.ID, nil +} + +// Deploy implements the Provider interface. +func (b *ByocScaleway) Deploy(ctx context.Context, req *client.DeployRequest) (*client.DeployResponse, error) { + return b.deploy(ctx, req, "up") +} + +// Preview implements the Provider interface. +func (b *ByocScaleway) Preview(ctx context.Context, req *client.DeployRequest) (*client.DeployResponse, error) { + return b.deploy(ctx, req, "preview") +} + +func (b *ByocScaleway) deploy(ctx context.Context, req *client.DeployRequest, cmd string) (*client.DeployResponse, error) { + project, err := compose.LoadFromContent(ctx, req.Compose, "") + if err != nil { + return nil, err + } + + if err := b.ensureScalewayLLMAuth(ctx, project); err != nil { + return nil, err + } + + if err := b.SetUpCD(ctx, false); err != nil { + return nil, err + } + + etag := types.NewEtag() + serviceInfos, err := b.GetServiceInfos(ctx, project.Name, req.DelegateDomain, etag, project.Services) + if err != nil { + return nil, err + } + + data, err := proto.Marshal(&defangv1.ProjectUpdate{ + CdVersion: b.CDImage, + Compose: req.Compose, + Etag: etag, + Mode: req.Mode, + PulumiVersion: b.PulumiVersion, + Services: serviceInfos, + }) + if err != nil { + return nil, err + } + + var payloadString string + if len(data) < 1000 { + payloadString = base64.StdEncoding.EncodeToString(data) + } else { + bucket := b.bucketName() + key := fmt.Sprintf("uploads/%s", etag) + if err := scaleway.PutObject(ctx, b.s3Client, bucket, key, bytes.NewReader(data)); err != nil { + return nil, fmt.Errorf("uploading deploy payload: %w", err) + } + payloadString = fmt.Sprintf("s3://%s/%s", bucket, key) + } + + cdCmd := cdCommand{ + command: []string{cmd, payloadString}, + delegateDomain: req.DelegateDomain, + etag: etag, + mode: req.Mode, + project: project.Name, + statesUrl: req.StatesUrl, + eventsUrl: req.EventsUrl, + } + runID, err := b.runCdCommand(ctx, cdCmd) + if err != nil { + return nil, err + } + b.cdEtag = etag + b.cdRunID = runID + + return &client.DeployResponse{ + CdType: defangv1.CdType_CD_TYPE_UNSPECIFIED, // No Scaleway-specific CdType yet + CdId: runID, + DeployResponse: &defangv1.DeployResponse{ + Services: serviceInfos, + Etag: etag, + }, + }, nil +} + +// GetProjectUpdate downloads the project state from S3. +func (b *ByocScaleway) GetProjectUpdate(ctx context.Context, projectName string) (*defangv1.ProjectUpdate, error) { + if projectName == "" { + return nil, client.ErrNotExist + } + + bucket := b.bucketName() + path := b.GetProjectUpdatePath(projectName) + + term.Debug("Getting services from bucket:", bucket, path) + pbBytes, err := scaleway.GetObject(ctx, b.s3Client, bucket, path) + if err != nil { + if scaleway.IsNotFound(err) || strings.Contains(err.Error(), "NoSuchKey") || strings.Contains(err.Error(), "NoSuchBucket") { + term.Debug("GetObject:", err) + return nil, client.ErrNotExist + } + return nil, scaleway.AnnotateScalewayError(err, "getting project update") + } + + var projUpdate defangv1.ProjectUpdate + if err := proto.Unmarshal(pbBytes, &projUpdate); err != nil { + term.Debug("Invalid project update:", err) + return nil, client.ErrNotExist + } + return &projUpdate, nil +} + +// GetServices implements the Provider interface. +func (b *ByocScaleway) GetServices(ctx context.Context, req *defangv1.GetServicesRequest) (*defangv1.GetServicesResponse, error) { + projUpdate, err := b.GetProjectUpdate(ctx, req.Project) + if err != nil { + if errors.Is(err, client.ErrNotExist) { + return &defangv1.GetServicesResponse{}, nil + } + return nil, err + } + return &defangv1.GetServicesResponse{ + Services: projUpdate.Services, + Project: projUpdate.Project, + }, nil +} + +// GetService implements the Provider interface. +func (b *ByocScaleway) GetService(ctx context.Context, req *defangv1.GetRequest) (*defangv1.ServiceInfo, error) { + all, err := b.GetServices(ctx, &defangv1.GetServicesRequest{Project: req.Project}) + if err != nil { + return nil, err + } + for _, service := range all.Services { + if service.Service.Name == req.Name { + return service, nil + } + } + return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("service %q not found", req.Name)) +} + +// PutConfig stores a secret in Scaleway Secret Manager. +func (b *ByocScaleway) PutConfig(ctx context.Context, req *defangv1.PutConfigRequest) error { + secretName := b.getSecretName(req.Project, req.Name) + term.Debugf("Putting config %q", secretName) + + // Try to create the secret first; if it already exists, we'll just add a version + secret, err := b.client.CreateSecret(ctx, secretName, b.projectID) + if err != nil { + if !scaleway.IsConflict(err) { + return scaleway.AnnotateScalewayError(err, fmt.Sprintf("creating secret %q", secretName)) + } + // Secret already exists; find it to get the ID + secrets, listErr := b.client.ListSecrets(ctx, b.projectID, secretName) + if listErr != nil { + return scaleway.AnnotateScalewayError(listErr, fmt.Sprintf("listing secrets for %q", secretName)) + } + for i := range secrets { + if secrets[i].Name == secretName { + secret = &secrets[i] + break + } + } + if secret == nil { + return fmt.Errorf("secret %q exists but could not be found", secretName) + } + } + + if _, err := b.client.CreateSecretVersion(ctx, secret.ID, []byte(req.Value)); err != nil { + return scaleway.AnnotateScalewayError(err, fmt.Sprintf("adding version for secret %q", secretName)) + } + return nil +} + +// ListConfig lists secrets from Scaleway Secret Manager. +func (b *ByocScaleway) ListConfig(ctx context.Context, req *defangv1.ListConfigsRequest) (*defangv1.Secrets, error) { + // getSecretName with empty name gives us the prefix (e.g., "Defang_project_stack_") + prefix := b.getSecretName(req.Project, "") + term.Debugf("Listing configs with prefix %q", prefix) + + // Scaleway's name filter does exact matching, not prefix matching. + // List all secrets in the project and filter client-side. + secrets, err := b.client.ListSecrets(ctx, b.projectID, "") + if err != nil { + return nil, scaleway.AnnotateScalewayError(err, "listing secrets") + } + + names := make([]string, 0, len(secrets)) + for _, secret := range secrets { + if strings.HasPrefix(secret.Name, prefix) { + name := secret.Name[len(prefix):] + if name != "" { + names = append(names, name) + } + } + } + return &defangv1.Secrets{Names: names}, nil +} + +// DeleteConfig deletes secrets from Scaleway Secret Manager. +func (b *ByocScaleway) DeleteConfig(ctx context.Context, secrets *defangv1.Secrets) error { + for _, name := range secrets.Names { + secretName := b.getSecretName(secrets.Project, name) + term.Debugf("Deleting config %q", secretName) + + // Need to find the secret ID by name + scwSecrets, err := b.client.ListSecrets(ctx, b.projectID, secretName) + if err != nil { + return scaleway.AnnotateScalewayError(err, fmt.Sprintf("listing secrets for %q", secretName)) + } + found := false + for _, s := range scwSecrets { + if s.Name == secretName { + if err := b.client.DeleteSecret(ctx, s.ID); err != nil { + return scaleway.AnnotateScalewayError(err, fmt.Sprintf("deleting secret %q", secretName)) + } + found = true + break + } + } + if !found { + return fmt.Errorf("config not found: %s", name) + } + } + return nil +} + +// CreateUploadURL generates a presigned URL for uploading artifacts. +func (b *ByocScaleway) CreateUploadURL(ctx context.Context, req *defangv1.UploadURLRequest) (*defangv1.UploadURLResponse, error) { + if err := b.SetUpCD(ctx, false); err != nil { + return nil, err + } + + bucket := b.bucketName() + key := fmt.Sprintf("uploads/%s", req.Digest) + url, err := scaleway.CreatePresignedUploadURL(ctx, b.s3Client, bucket, key, 15*time.Minute) + if err != nil { + return nil, scaleway.AnnotateScalewayError(err, "creating upload URL") + } + return &defangv1.UploadURLResponse{Url: url}, nil +} + +// SetUpCD creates the infrastructure needed for CD operations. +func (b *ByocScaleway) SetUpCD(ctx context.Context, force bool) error { + if b.SetupDone && !force { + return nil + } + + if b.CDImage == "" { + return errors.New("CD image is not set; please set the DEFANG_CD_IMAGE environment variable") + } + term.Debugf("Using CD image: %q", b.CDImage) + + bucket := b.bucketName() + term.Infof("Setting up Defang CD in Scaleway project %s", b.projectID) + + // 1. Create S3 bucket for state/artifacts + if err := scaleway.EnsureBucketExists(ctx, b.s3Client, bucket, b.region); err != nil { + return scaleway.AnnotateScalewayError(err, "ensuring CD bucket exists") + } + b.bucket = bucket + + // 2. Create Container Registry namespace + registryName := byoc.CdTaskPrefix + ns, err := b.client.EnsureRegistryNamespaceExists(ctx, registryName, b.projectID, b.region) + if err != nil { + return scaleway.AnnotateScalewayError(err, "ensuring registry namespace exists") + } + b.registryEndpoint = ns.Endpoint + + // 3. Create Serverless Job definition for CD tasks (skip in local debug mode) + if os.Getenv("DEFANG_PULUMI_DIR") == "" { + jobName := b.cdJobName() + env, err := b.environment("") + if err != nil { + return err + } + // Scaleway currently permits duplicate job definitions with the same + // name. Keep CD setup deterministic by removing every previous Defang + // CD definition before creating the one this CLI invocation will run. + defs, err := b.client.ListJobDefinitions(ctx, jobName) + if err != nil { + return scaleway.AnnotateScalewayError(err, "listing job definitions") + } + for i := range defs { + if defs[i].Name == jobName { + if err := b.client.DeleteJobDefinition(ctx, defs[i].ID); err != nil { + return scaleway.AnnotateScalewayError(err, "deleting stale CD job definition") + } + } + } + jobDef, err := b.client.CreateJobDefinition(ctx, jobName, b.CDImage, withoutSecretEnv(env), scaleway.JobResources{ + CPULimit: 2000, // 2 vCPU + MemoryLimit: 4096, // 4 GB + LocalStorageCapacity: 10000, // 10 GB + }) + if err != nil { + return scaleway.AnnotateScalewayError(err, "creating CD job definition") + } + b.jobDefID = jobDef.ID + if err := b.createCDSecretReferences(ctx, b.jobDefID, env); err != nil { + return err + } + } + + b.SetupDone = true + return nil +} + +// TearDownCD removes CD infrastructure. +func (b *ByocScaleway) TearDownCD(ctx context.Context) error { + term.Warn("Deleting the Defang CD infrastructure; existing stacks or configs will not be deleted and will need to be cleaned up manually") + + var errs []error + if b.jobDefID != "" { + if err := b.client.DeleteJobDefinition(ctx, b.jobDefID); err != nil { + errs = append(errs, err) + } + } + defs, err := b.client.ListJobDefinitions(ctx, byoc.CdTaskPrefix) + if err != nil { + errs = append(errs, err) + } else { + for _, def := range defs { + if def.Name == byoc.CdTaskPrefix { + if err := b.client.DeleteJobDefinition(ctx, def.ID); err != nil { + errs = append(errs, err) + } + } + } + } + + secretPrefix := b.cdSecretName("") + secrets, err := b.client.ListSecrets(ctx, b.projectID, secretPrefix) + if err != nil { + errs = append(errs, err) + } else { + for _, secret := range secrets { + if strings.HasPrefix(secret.Name, secretPrefix) { + if err := b.client.DeleteSecret(ctx, secret.ID); err != nil { + errs = append(errs, err) + } + } + } + } + + namespaces, err := b.client.ListRegistryNamespaces(ctx, b.projectID, byoc.CdTaskPrefix) + if err != nil { + errs = append(errs, err) + } else { + for _, ns := range namespaces { + if ns.Name != byoc.CdTaskPrefix { + continue + } + images, err := b.client.ListImages(ctx, ns.ID) + if err != nil { + errs = append(errs, err) + continue + } + for _, image := range images { + if err := b.client.DeleteImage(ctx, image.ID); err != nil { + errs = append(errs, err) + } + } + if err := b.client.DeleteRegistryNamespace(ctx, ns.ID); err != nil { + errs = append(errs, err) + } + } + } + + if err := scaleway.EmptyAndDeleteBucket(ctx, b.s3Client, b.bucketName()); err != nil { + errs = append(errs, err) + } + if len(errs) > 0 { + for _, err := range errs { + term.Warnf("Failed to delete Scaleway CD resource: %v", err) + } + return errors.Join(errs...) + } + return nil +} + +// CdCommand runs a CD command via Serverless Jobs. +func (b *ByocScaleway) CdCommand(ctx context.Context, req client.CdCommandRequest) (*client.CdCommandResponse, error) { + if err := b.SetUpCD(ctx, false); err != nil { + return nil, err + } + etag := types.NewEtag() + cmd := cdCommand{ + command: []string{string(req.Command)}, + etag: etag, + project: req.Project, + statesUrl: req.StatesUrl, + eventsUrl: req.EventsUrl, + } + runID, err := b.runCdCommand(ctx, cmd) + if err != nil { + return nil, err + } + b.cdEtag = etag + b.cdRunID = runID + return &client.CdCommandResponse{ + ETag: etag, + CdType: defangv1.CdType_CD_TYPE_UNSPECIFIED, + CdId: runID, + }, nil +} + +// CdList lists Pulumi stacks from the S3 state bucket. +func (b *ByocScaleway) CdList(ctx context.Context, _ bool) (iter.Seq[state.Info], error) { + bucket := b.bucketName() + prefix := ".pulumi/stacks/" + + term.Debug("Listing stacks in bucket:", bucket, prefix) + keys, err := scaleway.ListObjectKeys(ctx, b.s3Client, bucket, prefix) + if err != nil { + return nil, scaleway.AnnotateScalewayError(err, "listing Pulumi stacks") + } + + objLoader := func(ctx context.Context, path string) ([]byte, error) { + return scaleway.GetObject(ctx, b.s3Client, bucket, path) + } + + return func(yield func(state.Info) bool) { + for _, key := range keys { + data, err := objLoader(ctx, key) + if err != nil { + term.Debugf("Skipping %q in bucket %s: %v", key, bucket, err) + continue + } + st, err := state.ParsePulumiStateFile(ctx, s3Obj{name: key, size: int64(len(data))}, func(ctx context.Context, _ string) ([]byte, error) { + return data, nil + }) + if err != nil { + term.Debugf("Skipping %q in bucket %s: %v", key, bucket, err) + continue + } + if st == nil { + continue + } + info := state.Info{ + Stack: st.Name, + Project: st.Project, + Workspace: string(st.Workspace), + CdRegion: b.region, + } + if !yield(info) { + break + } + } + }, nil +} + +// s3Obj implements state.BucketObj for CdList. +type s3Obj struct { + name string + size int64 +} + +func (o s3Obj) Name() string { return o.name } +func (o s3Obj) Size() int64 { return o.size } + +func (b *ByocScaleway) GetPrivateDomain(projectName string) string { + return fmt.Sprintf("%s.internal", projectName) +} + +// GetDeploymentStatus checks the status of the CD job run. +func (b *ByocScaleway) GetDeploymentStatus(ctx context.Context) (bool, error) { + if b.cdRunID == "" { + return false, errors.New("no CD run in progress") + } + + run, err := b.client.GetJobRun(ctx, b.cdRunID) + if err != nil { + return false, scaleway.AnnotateScalewayError(err, "getting deployment status") + } + + switch run.State { + case "succeeded": + return true, nil + case "failed", "interrupted": + msg := fmt.Sprintf("CD job %s: %s", run.State, run.ErrorMessage) + if run.ErrorMessage == "" { + msg = fmt.Sprintf("CD job %s: %s", run.State, run.Reason) + } + return true, client.ErrDeploymentFailed{Message: msg} + default: + // still running: queued, running, etc. + return false, nil + } +} + +func (b *ByocScaleway) PrepareDomainDelegation(ctx context.Context, req client.PrepareDomainDelegationRequest) (*client.PrepareDomainDelegationResponse, error) { + term.Debugf("Preparing domain delegation for %s", req.DelegateDomain) + + domain, subdomain := splitDelegateDomain(req.DelegateDomain) + zone, err := b.client.CreateDNSZone(ctx, domain, subdomain, b.projectID) + if err != nil { + if !scaleway.IsConflict(err) { + // Domain not owned by this Scaleway project — skip delegation + // (services will use Scaleway's auto-generated container URLs) + term.Debugf("Skipping domain delegation: %v", err) + return nil, nil + } + // Zone already exists; look it up + zone, err = b.client.GetDNSZone(ctx, req.DelegateDomain) + if err != nil { + return nil, err + } + } + + if len(zone.NS) == 0 { + return nil, fmt.Errorf("DNS zone for %q has no nameservers", req.DelegateDomain) + } + + term.Debugf("DNS zone for %q has nameservers: %v", req.DelegateDomain, zone.NS) + return &client.PrepareDomainDelegationResponse{ + NameServers: zone.NS, + }, nil +} + +// splitDelegateDomain splits a FQDN into its base domain and subdomain parts. +// For example, "myapp.example.com" returns ("example.com", "myapp"). +func splitDelegateDomain(fqdn string) (domain, subdomain string) { + fqdn = strings.TrimSuffix(fqdn, ".") + parts := strings.Split(fqdn, ".") + if len(parts) <= 2 { + return fqdn, "" + } + domain = strings.Join(parts[len(parts)-2:], ".") + subdomain = strings.Join(parts[:len(parts)-2], ".") + return +} + +func (b *ByocScaleway) Subscribe(ctx context.Context, req *defangv1.SubscribeRequest) (iter.Seq2[*defangv1.SubscribeResponse, error], error) { + if b.cdRunID == "" || (req.Etag != "" && req.Etag != b.cdEtag) { + return nil, errors.ErrUnsupported + } + + runID := b.cdRunID + return func(yield func(*defangv1.SubscribeResponse, error) bool) { + var lastState string + for { + run, err := b.client.GetJobRun(ctx, runID) + if err != nil { + yield(nil, scaleway.AnnotateScalewayError(err, "polling job run status")) + return + } + + if run.State != lastState { + lastState = run.State + state := jobRunStateToServiceState(run.State) + if !yield(&defangv1.SubscribeResponse{ + Name: "cd", + Status: run.State, + State: state, + }, nil) { + return + } + + // Stop on terminal states + if state == defangv1.ServiceState_DEPLOYMENT_COMPLETED || + state == defangv1.ServiceState_DEPLOYMENT_FAILED { + return + } + } + + if err := pkg.SleepWithContext(ctx, 2*time.Second); err != nil { + yield(nil, err) + return + } + } + }, nil +} + +func jobRunStateToServiceState(state string) defangv1.ServiceState { + switch state { + case "initialized", "validated", "queued", "retrying": + return defangv1.ServiceState_UPDATE_QUEUED + case "running": + return defangv1.ServiceState_DEPLOYMENT_PENDING + case "succeeded": + return defangv1.ServiceState_DEPLOYMENT_COMPLETED + case "failed", "interrupted": + return defangv1.ServiceState_DEPLOYMENT_FAILED + default: + return defangv1.ServiceState_NOT_SPECIFIED + } +} + +func (b *ByocScaleway) QueryLogs(ctx context.Context, req *defangv1.TailRequest) (iter.Seq2[*defangv1.TailResponse, error], error) { + if err := b.ensureCockpitToken(ctx); err != nil { + return nil, err + } + + query := b.buildLogQuery(req) + etag := req.Etag + if etag == "" { + etag = b.cdEtag + } + + if req.Follow { + return b.followLogs(ctx, query, etag, req), nil + } + + // Non-follow: single query + var start, end time.Time + if req.Since.IsValid() { + start = req.Since.AsTime() + } + if req.Until.IsValid() { + end = req.Until.AsTime() + } + + limit := int(req.Limit) + if limit == 0 { + limit = 100 + } + + entries, err := scaleway.QueryLoki(ctx, b.cockpitToken, b.cockpitLogsEndpoint, query, start, end, limit) + if err != nil { + return nil, fmt.Errorf("querying logs: %w", err) + } + + return func(yield func(*defangv1.TailResponse, error) bool) { + for _, entry := range entries { + resp := lokiEntryToTailResponse(entry, etag) + if resp == nil { + continue + } + if !yield(resp, nil) { + return + } + } + }, nil +} + +// ensureCockpitToken lazily creates a Cockpit token for Loki queries. +func (b *ByocScaleway) ensureCockpitToken(ctx context.Context) error { + if b.cockpitToken != "" && b.cockpitLogsEndpoint != "" { + return nil + } + + const tokenName = "defang-cd-logs" + + token, err := b.client.CreateCockpitToken(ctx, tokenName, b.projectID) + if err != nil { + if !scaleway.IsConflict(err) { + return fmt.Errorf("creating Cockpit token: %w", err) + } + // Token exists but we need the secret key; delete and recreate + tokens, listErr := b.client.ListCockpitTokens(ctx, b.projectID) + if listErr != nil { + return fmt.Errorf("listing Cockpit tokens: %w", listErr) + } + for _, t := range tokens { + if t.Name == tokenName { + if delErr := b.client.DeleteCockpitToken(ctx, t.ID); delErr != nil { + if !scaleway.IsNotFound(delErr) { + return fmt.Errorf("deleting existing Cockpit token: %w", delErr) + } + } + break + } + } + // Recreate to obtain the secret key + token, err = b.client.CreateCockpitToken(ctx, tokenName, b.projectID) + if err != nil { + return fmt.Errorf("recreating Cockpit token: %w", err) + } + } + + if b.cockpitToken == "" { + b.cockpitToken = token.SecretKey + } + if b.cockpitLogsEndpoint == "" { + endpoint, err := b.client.GetCockpitLogsEndpoint(ctx, b.projectID) + if err != nil { + return err + } + b.cockpitLogsEndpoint = endpoint + } + return nil +} + +// buildLogQuery constructs a LogQL query for Scaleway Cockpit Loki. +func (b *ByocScaleway) buildLogQuery(req *defangv1.TailRequest) string { + logType := logs.LogType(req.LogType) + + if len(req.Services) > 0 { + if len(req.Services) == 1 { + return fmt.Sprintf(`{resource_type="serverless_container",resource_name=~".*-%s"}`, regexp.QuoteMeta(req.Services[0])) + } + services := make([]string, len(req.Services)) + for i, service := range req.Services { + services[i] = regexp.QuoteMeta(service) + } + return fmt.Sprintf(`{resource_type="serverless_container",resource_name=~".*-(%s)"}`, strings.Join(services, "|")) + } + + if logType.Has(logs.LogTypeCD) || logType == logs.LogTypeUnspecified { + return fmt.Sprintf(`{job_definition_name=%q}`, b.cdJobName()) + } + + return fmt.Sprintf(`{job_definition_name=~"%s.*"}`, byoc.CdTaskPrefix) +} + +// followLogs polls Loki for new log entries in a loop. +func (b *ByocScaleway) followLogs(ctx context.Context, query, etag string, req *defangv1.TailRequest) iter.Seq2[*defangv1.TailResponse, error] { + return func(yield func(*defangv1.TailResponse, error) bool) { + start := time.Now() + if req.Since.IsValid() { + start = req.Since.AsTime() + } + + const maxConsecutiveErrors = 5 + consecutiveErrors := 0 + + for { + entries, err := scaleway.QueryLoki(ctx, b.cockpitToken, b.cockpitLogsEndpoint, query, start, time.Time{}, 100) + if err != nil { + consecutiveErrors++ + if consecutiveErrors >= maxConsecutiveErrors { + yield(nil, fmt.Errorf("giving up after %d consecutive log query failures: %w", maxConsecutiveErrors, err)) + return + } + if !yield(nil, err) { + return + } + } else { + consecutiveErrors = 0 + } + + for _, entry := range entries { + if !entry.Timestamp.After(start) { + continue // skip already-seen entries + } + resp := lokiEntryToTailResponse(entry, etag) + if resp == nil { + start = entry.Timestamp + continue + } + if !yield(resp, nil) { + return + } + start = entry.Timestamp + } + + if err := pkg.SleepWithContext(ctx, 2*time.Second); err != nil { + yield(nil, err) + return + } + } + } +} + +type scalewayLogPayload struct { + JobDefinitionName string `json:"job_definition_name"` + Message string `json:"message"` + ResourceID string `json:"resource_id"` + ResourceInstance string `json:"resource_instance"` + ResourceName string `json:"resource_name"` + Stream string `json:"stream"` +} + +func parseScalewayLogEntry(entry scaleway.LokiEntry) (scaleway.LokiEntry, string) { + var payload scalewayLogPayload + if err := json.Unmarshal([]byte(entry.Line), &payload); err != nil { + return entry, entry.Line + } + if entry.Labels == nil { + entry.Labels = map[string]string{} + } + if payload.Message != "" { + entry.Line = payload.Message + } + if payload.ResourceName != "" { + entry.Labels["resource_name"] = payload.ResourceName + } + if payload.JobDefinitionName != "" { + entry.Labels["job_definition_name"] = payload.JobDefinitionName + } + if payload.ResourceID != "" { + entry.Labels["resource_id"] = payload.ResourceID + } + if payload.ResourceInstance != "" { + entry.Labels["resource_instance"] = payload.ResourceInstance + } + if payload.Stream != "" { + entry.Labels["stream"] = payload.Stream + } + return entry, payload.Message +} + +func lokiEntryToTailResponse(entry scaleway.LokiEntry, etag string) *defangv1.TailResponse { + entry, message := parseScalewayLogEntry(entry) + if message == "" { + return nil + } + service := entry.Labels["resource_name"] + if service == "" { + service = entry.Labels["job_definition_name"] + } + if service == "" { + service = "cd" + } + host := entry.Labels["resource_instance"] + if host == "" { + host = entry.Labels["resource_id"] + } + stderr := entry.Labels["stream"] == "stderr" || strings.Contains(strings.ToLower(message), "error") + + return &defangv1.TailResponse{ + Service: service, + Etag: etag, + Entries: []*defangv1.LogEntry{{ + Message: message, + Timestamp: timestamppb.New(entry.Timestamp), + Service: service, + Etag: etag, + Host: host, + Stderr: stderr, + }}, + } +} diff --git a/src/pkg/cli/client/byoc/scaleway/byoc_test.go b/src/pkg/cli/client/byoc/scaleway/byoc_test.go new file mode 100644 index 000000000..15d21c3fd --- /dev/null +++ b/src/pkg/cli/client/byoc/scaleway/byoc_test.go @@ -0,0 +1,494 @@ +package scaleway + +import ( + "context" + "encoding/json" + "errors" + "io" + "net/http" + "strings" + "testing" + "time" + + "github.com/DefangLabs/defang/src/pkg/cli/client" + "github.com/DefangLabs/defang/src/pkg/cli/client/byoc" + "github.com/DefangLabs/defang/src/pkg/cli/compose" + cloudscaleway "github.com/DefangLabs/defang/src/pkg/clouds/scaleway" + "github.com/DefangLabs/defang/src/pkg/logs" + "github.com/DefangLabs/defang/src/pkg/types" + defangv1 "github.com/DefangLabs/defang/src/protos/io/defang/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestCdJobNameIsScopedToStack(t *testing.T) { + t.Parallel() + + client := &ByocScaleway{} + client.ByocBaseClient = byoc.NewByocBaseClient("", client, "tenant/project/prod") + + name := client.cdJobName() + + assert.Equal(t, "defang-cd-tenant-project-prod", name) + assert.NotEqual(t, byoc.CdTaskPrefix, name) +} + +func TestCdJobNameIsScalewaySafe(t *testing.T) { + t.Parallel() + + client := &ByocScaleway{} + client.ByocBaseClient = byoc.NewByocBaseClient("", client, strings.Repeat("stack/", 80)) + + name := client.cdJobName() + + assert.LessOrEqual(t, len(name), 255) + assert.NotContains(t, name, "/") + assert.NotContains(t, name, "_") + assert.True(t, strings.HasPrefix(name, byoc.CdTaskPrefix)) +} + +func TestCdLogQueryUsesScopedJobName(t *testing.T) { + t.Parallel() + + client := &ByocScaleway{} + client.ByocBaseClient = byoc.NewByocBaseClient("", client, "tenant/project/prod") + + query := client.buildLogQuery(&defangv1.TailRequest{}) + + assert.Equal(t, `{job_definition_name="defang-cd-tenant-project-prod"}`, query) +} + +func TestServiceLogQueryUsesServerlessContainerNames(t *testing.T) { + t.Parallel() + + client := &ByocScaleway{} + client.ByocBaseClient = byoc.NewByocBaseClient("", client, "tenant/project/prod") + + query := client.buildLogQuery(&defangv1.TailRequest{ + LogType: uint32(logs.LogTypeAll), + Services: []string{"app", "api.v1"}, + }) + + assert.Equal(t, `{resource_type="serverless_container",resource_name=~".*-(app|api\.v1)"}`, query) +} + +func TestEnvironmentIncludesPulumiAndScalewayContext(t *testing.T) { + t.Setenv("DEFANG_DEBUG", "true") + t.Setenv("DEFANG_JSON", "true") + t.Setenv("DEFANG_PULUMI_DEBUG", "1") + t.Setenv("DEFANG_PULUMI_DIFF", "1") + t.Setenv("DEFANG_PULUMI_TARGETS", "urn-1,urn-2") + + provider := newTestProvider("tenant/project/prod") + env, err := provider.environment("demo") + require.NoError(t, err) + + assert.Equal(t, "access", env["AWS_ACCESS_KEY_ID"]) + assert.Equal(t, "secret", env["AWS_SECRET_ACCESS_KEY"]) + assert.Equal(t, "fr-par", env["AWS_REGION"]) + assert.Equal(t, "tenant", env["DEFANG_ORG"]) + assert.Equal(t, "demo.internal", env["PRIVATE_DOMAIN"]) + assert.Equal(t, "demo", env["PROJECT"]) + assert.Equal(t, "tenant/project/prod", env["STACK"]) + assert.Equal(t, "fr-par", env["SCW_DEFAULT_REGION"]) + assert.Equal(t, "project-id", env["SCW_DEFAULT_PROJECT_ID"]) + assert.Equal(t, "access", env["SCW_ACCESS_KEY"]) + assert.Equal(t, "secret", env["SCW_SECRET_KEY"]) + assert.Contains(t, env["DEFANG_STATE_URL"], "s3://defang-cd-tenant/project/prod") + assert.Contains(t, env["DEFANG_STATE_URL"], "s3.fr-par.scw.cloud") + assert.Equal(t, "urn-1,urn-2", env["DEFANG_PULUMI_TARGETS"]) +} + +func TestSecretEnvFiltering(t *testing.T) { + t.Parallel() + + env := map[string]string{ + "AWS_ACCESS_KEY_ID": "public", + "AWS_SECRET_ACCESS_KEY": "aws-secret", + "PULUMI_CONFIG_PASSPHRASE": "passphrase", + "SCW_SECRET_KEY": "scw-secret", + "SCW_ACCESS_KEY": "public", + } + + assert.Equal(t, map[string]string{ + "AWS_SECRET_ACCESS_KEY": "aws-secret", + "PULUMI_CONFIG_PASSPHRASE": "passphrase", + "SCW_SECRET_KEY": "scw-secret", + }, cdSecretEnv(env)) + + clean := withoutSecretEnv(env) + assert.Equal(t, "public", clean["AWS_ACCESS_KEY_ID"]) + assert.Equal(t, "public", clean["SCW_ACCESS_KEY"]) + assert.NotContains(t, clean, "AWS_SECRET_ACCESS_KEY") + assert.NotContains(t, clean, "PULUMI_CONFIG_PASSPHRASE") + assert.NotContains(t, clean, "SCW_SECRET_KEY") +} + +func TestUsesScalewayLLMRequiresEndpointAndMissingKey(t *testing.T) { + endpoint := "https://api.scaleway.ai/v1/" + key := "already-set" + + assert.True(t, usesScalewayLLM(&compose.Project{Services: compose.Services{ + "chat": {Environment: map[string]*string{ + "OPENAI_BASE_URL": &endpoint, + "OPENAI_API_KEY": nil, + }}, + }})) + assert.False(t, usesScalewayLLM(&compose.Project{Services: compose.Services{ + "chat": {Environment: map[string]*string{ + "OPENAI_BASE_URL": &endpoint, + "OPENAI_API_KEY": &key, + }}, + }})) + assert.False(t, usesScalewayLLM(&compose.Project{Services: compose.Services{ + "chat": {Environment: map[string]*string{"OPENAI_API_KEY": nil}}, + }})) +} + +func TestDeploymentStatusMapsJobStates(t *testing.T) { + t.Parallel() + + provider := newTestProvider("prod") + _, err := provider.GetDeploymentStatus(context.Background()) + require.EqualError(t, err, "no CD run in progress") + + provider.cdRunID = "run-succeeded" + provider.client.HTTPClient = &http.Client{Transport: roundTripFunc(func(req *http.Request) (*http.Response, error) { + assert.Equal(t, "/serverless-jobs/v1alpha2/regions/fr-par/job-runs/run-succeeded", req.URL.Path) + return jsonResponse(`{"id":"run-succeeded","state":"succeeded"}`), nil + })} + done, err := provider.GetDeploymentStatus(context.Background()) + require.NoError(t, err) + assert.True(t, done) + + provider.cdRunID = "run-failed" + provider.client.HTTPClient = &http.Client{Transport: roundTripFunc(func(req *http.Request) (*http.Response, error) { + return jsonResponse(`{"id":"run-failed","state":"failed","error_message":"boom"}`), nil + })} + done, err = provider.GetDeploymentStatus(context.Background()) + assert.True(t, done) + var failed client.ErrDeploymentFailed + require.ErrorAs(t, err, &failed) + assert.Contains(t, failed.Message, "boom") +} + +func TestJobRunStateToServiceState(t *testing.T) { + t.Parallel() + + assert.Equal(t, defangv1.ServiceState_UPDATE_QUEUED, jobRunStateToServiceState("initialized")) + assert.Equal(t, defangv1.ServiceState_UPDATE_QUEUED, jobRunStateToServiceState("queued")) + assert.Equal(t, defangv1.ServiceState_DEPLOYMENT_PENDING, jobRunStateToServiceState("running")) + assert.Equal(t, defangv1.ServiceState_DEPLOYMENT_COMPLETED, jobRunStateToServiceState("succeeded")) + assert.Equal(t, defangv1.ServiceState_DEPLOYMENT_FAILED, jobRunStateToServiceState("failed")) + assert.Equal(t, defangv1.ServiceState_DEPLOYMENT_FAILED, jobRunStateToServiceState("interrupted")) + assert.Equal(t, defangv1.ServiceState_NOT_SPECIFIED, jobRunStateToServiceState("unknown")) +} + +func TestSplitDelegateDomain(t *testing.T) { + t.Parallel() + + tests := []struct { + fqdn string + domain string + subdomain string + }{ + {fqdn: "example.com", domain: "example.com"}, + {fqdn: "app.example.com.", domain: "example.com", subdomain: "app"}, + {fqdn: "api.staging.example.com", domain: "example.com", subdomain: "api.staging"}, + } + for _, tt := range tests { + t.Run(tt.fqdn, func(t *testing.T) { + t.Parallel() + domain, subdomain := splitDelegateDomain(tt.fqdn) + assert.Equal(t, tt.domain, domain) + assert.Equal(t, tt.subdomain, subdomain) + }) + } +} + +func TestLokiEntryToTailResponse(t *testing.T) { + t.Parallel() + + ts := time.Date(2026, 5, 11, 16, 0, 0, 0, time.UTC) + resp := lokiEntryToTailResponse(cloudscaleway.LokiEntry{ + Timestamp: ts, + Line: "ERROR failed to start", + Labels: map[string]string{ + "resource_name": "app", + "resource_id": "container-id", + }, + }, "etag") + + require.Len(t, resp.Entries, 1) + assert.Equal(t, "app", resp.Service) + assert.Equal(t, "etag", resp.Etag) + assert.Equal(t, "container-id", resp.Entries[0].Host) + assert.Equal(t, "ERROR failed to start", resp.Entries[0].Message) + assert.True(t, resp.Entries[0].Stderr) + assert.True(t, timestamppb.New(ts).AsTime().Equal(resp.Entries[0].Timestamp.AsTime())) + + fallback := lokiEntryToTailResponse(cloudscaleway.LokiEntry{Line: "ok"}, "etag") + assert.Equal(t, "cd", fallback.Service) + assert.False(t, fallback.Entries[0].Stderr) +} + +func TestLokiEntryToTailResponseParsesScalewayJSONPayload(t *testing.T) { + t.Parallel() + + ts := time.Date(2026, 5, 11, 18, 40, 0, 0, time.UTC) + resp := lokiEntryToTailResponse(cloudscaleway.LokiEntry{ + Timestamp: ts, + Line: `{"resource_type":"serverless_job","stream":"stderr","job_definition_name":"defang-cd-logsval","resource_id":"run-id","message":"error: kaniko build failed"}`, + Labels: map[string]string{ + "resource_type": "serverless_job", + }, + }, "etag") + + require.Len(t, resp.Entries, 1) + assert.Equal(t, "defang-cd-logsval", resp.Service) + assert.Equal(t, "run-id", resp.Entries[0].Host) + assert.Equal(t, "error: kaniko build failed", resp.Entries[0].Message) + assert.True(t, resp.Entries[0].Stderr) +} + +func TestLokiEntryToTailResponseParsesScalewayRuntimePayload(t *testing.T) { + t.Parallel() + + resp := lokiEntryToTailResponse(cloudscaleway.LokiEntry{ + Line: `{"resource_instance":"deployment-pod","message":"defang-log-smoke request path=/test-logs","stream":"stdout"}`, + Labels: map[string]string{ + "resource_name": "scalewaylogsmokef9df5e7b-app", + "resource_type": "serverless_container", + }, + }, "etag") + + require.Len(t, resp.Entries, 1) + assert.Equal(t, "scalewaylogsmokef9df5e7b-app", resp.Service) + assert.Equal(t, "deployment-pod", resp.Entries[0].Host) + assert.Equal(t, "defang-log-smoke request path=/test-logs", resp.Entries[0].Message) + assert.False(t, resp.Entries[0].Stderr) +} + +func TestLokiEntryToTailResponseSkipsScalewayMetadataPayload(t *testing.T) { + t.Parallel() + + resp := lokiEntryToTailResponse(cloudscaleway.LokiEntry{ + Line: `{"resource_type":"serverless_job","stream":"stdout","job_definition_name":"defang-cd-logsval","resource_id":"run-id"}`, + }, "etag") + + assert.Nil(t, resp) +} + +func TestEnsureCockpitTokenIgnoresConcurrentDelete(t *testing.T) { + t.Parallel() + + provider := newTestProvider("prod") + createAttempts := 0 + provider.client.HTTPClient = &http.Client{Transport: roundTripFunc(func(req *http.Request) (*http.Response, error) { + switch { + case req.Method == http.MethodPost && req.URL.Path == "/cockpit/v1/regions/fr-par/tokens": + createAttempts++ + if createAttempts == 1 { + return statusResponse(http.StatusConflict, `{"message":"token already exists"}`), nil + } + return jsonResponse(`{"id":"new-token","name":"defang-cd-logs","secret_key":"secret"}`), nil + case req.Method == http.MethodGet && req.URL.Path == "/cockpit/v1/regions/fr-par/tokens": + return jsonResponse(`{"tokens":[{"id":"old-token","name":"defang-cd-logs"}]}`), nil + case req.Method == http.MethodDelete && req.URL.Path == "/cockpit/v1/regions/fr-par/tokens/old-token": + return statusResponse(http.StatusNotFound, `{"message":"token was already deleted"}`), nil + case req.Method == http.MethodGet && req.URL.Path == "/cockpit/v1/regions/fr-par/data-sources": + return jsonResponse(`{"data_sources":[{"type":"logs","url":"https://logs.example"}]}`), nil + default: + t.Fatalf("unexpected request: %s %s", req.Method, req.URL.String()) + return nil, nil + } + })} + + err := provider.ensureCockpitToken(context.Background()) + require.NoError(t, err) + assert.Equal(t, "secret", provider.cockpitToken) + assert.Equal(t, "https://logs.example", provider.cockpitLogsEndpoint) +} + +func TestSubscribeRejectsMissingOrMismatchedRun(t *testing.T) { + t.Parallel() + + provider := newTestProvider("prod") + _, err := provider.Subscribe(context.Background(), &defangv1.SubscribeRequest{}) + require.ErrorIs(t, err, errors.ErrUnsupported) + + provider.cdRunID = "run" + provider.cdEtag = types.ETag("current") + _, err = provider.Subscribe(context.Background(), &defangv1.SubscribeRequest{Etag: "other"}) + require.ErrorIs(t, err, errors.ErrUnsupported) +} + +func TestAccountInfoRequiresAuthAndUsesClientContext(t *testing.T) { + t.Parallel() + + provider := newTestProvider("prod") + provider.client = nil + _, err := provider.AccountInfo(context.Background()) + require.EqualError(t, err, "not authenticated; call Authenticate first") + + provider.client = cloudscaleway.NewClient("access", "secret", "project-id", "fr-par") + info, err := provider.AccountInfo(context.Background()) + require.NoError(t, err) + assert.Equal(t, "project-id", info.AccountID) + assert.Equal(t, client.ProviderScaleway, info.Provider) + assert.Equal(t, "fr-par", info.Region) +} + +func TestConfigLifecycleUsesStackScopedSecretNames(t *testing.T) { + t.Parallel() + + provider := newTestProvider("prod") + var requests []string + provider.client.HTTPClient = &http.Client{Transport: roundTripFunc(func(req *http.Request) (*http.Response, error) { + requests = append(requests, req.Method+" "+req.URL.Path+"?"+req.URL.RawQuery) + switch { + case req.Method == http.MethodPost && req.URL.Path == "/secret-manager/v1beta1/regions/fr-par/secrets": + return jsonResponse(`{"id":"secret-id","name":"Defang_demo_prod_API_KEY","project_id":"project-id"}`), nil + case req.Method == http.MethodPost && req.URL.Path == "/secret-manager/v1beta1/regions/fr-par/secrets/secret-id/versions": + return jsonResponse(`{"secret_id":"secret-id","revision":1}`), nil + case req.Method == http.MethodGet && req.URL.Path == "/secret-manager/v1beta1/regions/fr-par/secrets": + name := req.URL.Query().Get("name") + switch name { + case "": + return jsonResponse(`{"secrets":[ + {"id":"secret-id","name":"Defang_demo_prod_API_KEY"}, + {"id":"other","name":"Defang_other_prod_API_KEY"} + ]}`), nil + case "Defang_demo_prod_API_KEY": + return jsonResponse(`{"secrets":[{"id":"secret-id","name":"Defang_demo_prod_API_KEY"}]}`), nil + default: + return jsonResponse(`{"secrets":[]}`), nil + } + case req.Method == http.MethodDelete && req.URL.Path == "/secret-manager/v1beta1/regions/fr-par/secrets/secret-id": + return jsonResponse(`{}`), nil + default: + t.Fatalf("unexpected request: %s %s", req.Method, req.URL.String()) + return nil, nil + } + })} + + err := provider.PutConfig(context.Background(), &defangv1.PutConfigRequest{ + Project: "demo", + Name: "API_KEY", + Value: "value", + }) + require.NoError(t, err) + + secrets, err := provider.ListConfig(context.Background(), &defangv1.ListConfigsRequest{Project: "demo"}) + require.NoError(t, err) + assert.Equal(t, []string{"API_KEY"}, secrets.Names) + + err = provider.DeleteConfig(context.Background(), &defangv1.Secrets{Project: "demo", Names: []string{"API_KEY"}}) + require.NoError(t, err) + assert.Contains(t, requests, "DELETE /secret-manager/v1beta1/regions/fr-par/secrets/secret-id?") +} + +func TestRunCdCommandUsesNonSecretEnvironmentOverrides(t *testing.T) { + t.Parallel() + + provider := newTestProvider("prod") + provider.jobDefID = "job-def" + provider.client.HTTPClient = &http.Client{Transport: roundTripFunc(func(req *http.Request) (*http.Response, error) { + assert.Equal(t, "/serverless-jobs/v1alpha2/regions/fr-par/job-definitions/job-def/start", req.URL.Path) + data, err := io.ReadAll(req.Body) + require.NoError(t, err) + body := string(data) + assert.Contains(t, body, `"startup_command":["/app/cd"]`) + assert.Contains(t, body, `"args":["preview","payload"]`) + assert.Contains(t, body, `"DEFANG_ETAG":"etag"`) + assert.Contains(t, body, `"DOMAIN":"example.com"`) + assert.Contains(t, body, `"DEFANG_STATES_UPLOAD_URL":"https://states.example"`) + assert.NotContains(t, body, "SCW_SECRET_KEY") + assert.NotContains(t, body, "AWS_SECRET_ACCESS_KEY") + assert.NotContains(t, body, "PULUMI_CONFIG_PASSPHRASE") + return jsonResponse(`{"job_runs":[{"id":"run-id","state":"queued"}]}`), nil + })} + + runID, err := provider.runCdCommand(context.Background(), cdCommand{ + command: []string{"preview", "payload"}, + delegateDomain: "example.com", + etag: "etag", + mode: defangv1.DeploymentMode_DEVELOPMENT, + project: "demo", + statesUrl: "https://states.example", + eventsUrl: "https://events.example", + }) + require.NoError(t, err) + assert.Equal(t, "run-id", runID) +} + +func TestCreateCDSecretReferences(t *testing.T) { + t.Parallel() + + provider := newTestProvider("prod") + var sawJobSecretRefs bool + provider.client.HTTPClient = &http.Client{Transport: roundTripFunc(func(req *http.Request) (*http.Response, error) { + switch { + case req.Method == http.MethodPost && req.URL.Path == "/secret-manager/v1beta1/regions/fr-par/secrets": + data, err := io.ReadAll(req.Body) + require.NoError(t, err) + var payload map[string]string + require.NoError(t, json.Unmarshal(data, &payload)) + return jsonResponse(`{"id":"` + payload["name"] + `","name":"` + payload["name"] + `"}`), nil + case req.Method == http.MethodPost && strings.Contains(req.URL.Path, "/versions"): + return jsonResponse(`{"revision":3}`), nil + case req.Method == http.MethodPost && req.URL.Path == "/serverless-jobs/v1alpha2/regions/fr-par/secrets": + sawJobSecretRefs = true + data, err := io.ReadAll(req.Body) + require.NoError(t, err) + body := string(data) + assert.Contains(t, body, `"env_var_name":"AWS_SECRET_ACCESS_KEY"`) + assert.Contains(t, body, `"env_var_name":"PULUMI_CONFIG_PASSPHRASE"`) + assert.Contains(t, body, `"env_var_name":"SCW_SECRET_KEY"`) + assert.NotContains(t, body, "SCW_ACCESS_KEY") + return jsonResponse(`{}`), nil + default: + t.Fatalf("unexpected request: %s %s", req.Method, req.URL.String()) + return nil, nil + } + })} + + err := provider.createCDSecretReferences(context.Background(), "job-def", map[string]string{ + "AWS_ACCESS_KEY_ID": "public", + "AWS_SECRET_ACCESS_KEY": "aws-secret", + "PULUMI_CONFIG_PASSPHRASE": "passphrase", + "SCW_ACCESS_KEY": "public", + "SCW_SECRET_KEY": "scw-secret", + }) + require.NoError(t, err) + assert.True(t, sawJobSecretRefs) +} + +func newTestProvider(stack string) *ByocScaleway { + provider := &ByocScaleway{ + client: cloudscaleway.NewClient("access", "secret", "project-id", "fr-par"), + projectID: "project-id", + region: "fr-par", + } + provider.ByocBaseClient = byoc.NewByocBaseClient("tenant", provider, stack) + return provider +} + +type roundTripFunc func(*http.Request) (*http.Response, error) + +func (f roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return f(req) +} + +func jsonResponse(body string) *http.Response { + return statusResponse(http.StatusOK, body) +} + +func statusResponse(status int, body string) *http.Response { + return &http.Response{ + StatusCode: status, + Body: io.NopCloser(strings.NewReader(body)), + Header: http.Header{"Content-Type": []string{"application/json"}}, + } +} diff --git a/src/pkg/cli/client/provider_id.go b/src/pkg/cli/client/provider_id.go index add91f20f..e775d6515 100644 --- a/src/pkg/cli/client/provider_id.go +++ b/src/pkg/cli/client/provider_id.go @@ -16,6 +16,7 @@ const ( ProviderDefang ProviderID = "defang" ProviderDO ProviderID = "digitalocean" ProviderGCP ProviderID = "gcp" + ProviderScaleway ProviderID = "scaleway" ) var allProviders = []ProviderID{ @@ -25,6 +26,7 @@ var allProviders = []ProviderID{ ProviderDO, ProviderGCP, ProviderAzure, + ProviderScaleway, } func AllProviders() []ProviderID { @@ -49,6 +51,8 @@ func (p ProviderID) Name() string { return "DigitalOcean" case ProviderGCP: return "Google Cloud Platform" + case ProviderScaleway: + return "Scaleway" default: return p.String() } @@ -66,6 +70,8 @@ func (p ProviderID) Value() defangv1.Provider { return defangv1.Provider_DIGITALOCEAN case ProviderGCP: return defangv1.Provider_GCP + case ProviderScaleway: + return defangv1.Provider_SCALEWAY default: return defangv1.Provider_PROVIDER_UNSPECIFIED } @@ -94,6 +100,8 @@ func (p *ProviderID) SetValue(val defangv1.Provider) { *p = ProviderDO case defangv1.Provider_GCP: *p = ProviderGCP + case defangv1.Provider_SCALEWAY: + *p = ProviderScaleway default: *p = ProviderAuto } diff --git a/src/pkg/cli/client/provider_id_test.go b/src/pkg/cli/client/provider_id_test.go index 87e121e76..4e57f5ddf 100644 --- a/src/pkg/cli/client/provider_id_test.go +++ b/src/pkg/cli/client/provider_id_test.go @@ -62,6 +62,18 @@ func TestProvider(t *testing.T) { want: ProviderAuto, wantErr: false, }, + { + name: "valid provider scaleway", + provider: "scaleway", + want: ProviderScaleway, + wantErr: false, + }, + { + name: "valid provider Scaleway", + provider: "Scaleway", + want: ProviderScaleway, + wantErr: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/src/pkg/cli/client/region.go b/src/pkg/cli/client/region.go index f8152db33..70e3ebe76 100644 --- a/src/pkg/cli/client/region.go +++ b/src/pkg/cli/client/region.go @@ -9,6 +9,7 @@ const ( RegionDefaultAzure = "westus" // Default region for Azure RegionDefaultDO = "nyc3" RegionDefaultGCP = "us-central1" // Defaults to us-central1 for lower price + RegionDefaultScaleway = "fr-par" ) func GetRegion(provider ProviderID) string { @@ -22,6 +23,8 @@ func GetRegion(provider ProviderID) string { defaultRegion = RegionDefaultGCP case ProviderDO: defaultRegion = RegionDefaultDO + case ProviderScaleway: + defaultRegion = RegionDefaultScaleway case ProviderDefang, ProviderAuto: return "" default: @@ -47,6 +50,8 @@ func GetRegionVarName(provider ProviderID) string { return GCPRegionEnvVar case ProviderDO: return "REGION" + case ProviderScaleway: + return "SCW_DEFAULT_REGION" case ProviderDefang, ProviderAuto: return "" default: diff --git a/src/pkg/cli/client/region_test.go b/src/pkg/cli/client/region_test.go index 55c2bcd5f..fd8a93452 100644 --- a/src/pkg/cli/client/region_test.go +++ b/src/pkg/cli/client/region_test.go @@ -85,6 +85,18 @@ func TestGetRegion(t *testing.T) { envVars: map[string]string{"REGION": "sfo3"}, expected: "sfo3", }, + { + name: "Scaleway with default", + provider: ProviderScaleway, + envVars: map[string]string{}, + expected: "fr-par", + }, + { + name: "Scaleway with SCW_DEFAULT_REGION", + provider: ProviderScaleway, + envVars: map[string]string{"SCW_DEFAULT_REGION": "nl-ams"}, + expected: "nl-ams", + }, } for _, tt := range tests { diff --git a/src/pkg/cli/compose/context.go b/src/pkg/cli/compose/context.go index 2533f136d..fa809905c 100644 --- a/src/pkg/cli/compose/context.go +++ b/src/pkg/cli/compose/context.go @@ -275,9 +275,24 @@ func uploadArchive(ctx context.Context, provider client.Provider, projectName st const gcpPrefix = "https://storage.googleapis.com/" // HACK: move to GCP provider url = strings.Replace(url, gcpPrefix, "gs://", 1) + // Convert Scaleway S3 path-style URLs to s3:// URIs for Kaniko + if strings.Contains(url, ".scw.cloud/") { + url = convertScalewayS3URL(url) + } + return url, nil } +// convertScalewayS3URL converts a Scaleway S3 path-style HTTPS URL to an s3:// URI. +// e.g., "https://s3.fr-par.scw.cloud/bucket/key" → "s3://bucket/key" +func convertScalewayS3URL(url string) string { + const scwS3Suffix = ".scw.cloud/" + if i := strings.Index(url, scwS3Suffix); i > 0 && strings.HasPrefix(url, "https://s3.") { + return "s3://" + url[i+len(scwS3Suffix):] + } + return url +} + type contextAwareReader struct { ctx context.Context io.ReadCloser diff --git a/src/pkg/cli/compose/context_test.go b/src/pkg/cli/compose/context_test.go index 7b62f85f2..4026741d2 100644 --- a/src/pkg/cli/compose/context_test.go +++ b/src/pkg/cli/compose/context_test.go @@ -436,3 +436,23 @@ func TestGetDockerIgnorePatterns(t *testing.T) { }) } } + +func TestConvertScalewayS3URL(t *testing.T) { + tests := []struct { + input string + want string + }{ + {"https://s3.fr-par.scw.cloud/my-bucket/uploads/digest.tar.gz", "s3://my-bucket/uploads/digest.tar.gz"}, + {"https://s3.nl-ams.scw.cloud/bucket/key", "s3://bucket/key"}, + {"https://storage.googleapis.com/bucket/key", "https://storage.googleapis.com/bucket/key"}, // not Scaleway + {"s3://already-s3/key", "s3://already-s3/key"}, // already s3:// + {"https://example.com/path", "https://example.com/path"}, // unrelated URL + } + for _, tt := range tests { + t.Run(tt.input, func(t *testing.T) { + if got := convertScalewayS3URL(tt.input); got != tt.want { + t.Errorf("convertScalewayS3URL(%q) = %q, want %q", tt.input, got, tt.want) + } + }) + } +} diff --git a/src/pkg/cli/compose/fixup.go b/src/pkg/cli/compose/fixup.go index ee667de68..2f81eebf5 100644 --- a/src/pkg/cli/compose/fixup.go +++ b/src/pkg/cli/compose/fixup.go @@ -56,6 +56,15 @@ func FixupServices(ctx context.Context, provider client.Provider, project *compo if err := fixupRedisService(&svccfg, provider, upload); err != nil { return fmt.Errorf("service %q: %w", svccfg.Name, err) } + // Scaleway Managed Redis requires a password; inject as config value if not set + if managedRedis && accountInfo.Provider == client.ProviderScaleway { + if svccfg.Environment == nil { + svccfg.Environment = make(composeTypes.MappingWithEquals) + } + if _, ok := svccfg.Environment["REDIS_PASSWORD"]; !ok { + svccfg.Environment["REDIS_PASSWORD"] = nil // user sets via defang config set + } + } } _, managedPostgres := svccfg.Extensions["x-defang-postgres"] @@ -78,6 +87,10 @@ func FixupServices(ctx context.Context, provider client.Provider, project *compo if svccfg.Provider != nil && svccfg.Provider.Type == "model" && svccfg.Image == "" && svccfg.Build == nil { fixupModelProvider(&svccfg, project, accountInfo) + if _, stillExists := project.Services[svccfg.Name]; !stillExists { + // Service was removed (e.g. Scaleway doesn't need a sidecar) + continue + } } if _, llm := svccfg.Extensions["x-defang-llm"]; llm { @@ -102,7 +115,9 @@ func FixupServices(ctx context.Context, provider client.Provider, project *compo for name, model := range project.Models { model.Name = name // ensure the model has a name svccfg := fixupModel(model, project, accountInfo) - project.Services[svccfg.Name] = *svccfg + if svccfg != nil { + project.Services[svccfg.Name] = *svccfg + } } svcNameReplacer := NewServiceNameReplacer(ctx, provider, project) @@ -367,12 +382,18 @@ func fixupIngressPorts(svccfg *composeTypes.ServiceConfig) { // Declare a private network for the model provider const modelProviderNetwork = "model_provider_private" +// fixupModel converts a top-level model declaration into a service config. +// Returns nil for providers that don't need a sidecar container (e.g. Scaleway). func fixupModel(model composeTypes.ModelConfig, project *composeTypes.Project, info *client.AccountInfo) *composeTypes.ServiceConfig { svccfg := &composeTypes.ServiceConfig{ Name: model.Name, Extensions: model.Extensions, } makeAccessGatewayService(svccfg, project, model.Model, info) // TODO: pass other model options too + // For Scaleway, the model service was removed from the project and shouldn't be re-added + if info.Provider == client.ProviderScaleway { + return nil + } return svccfg } @@ -388,9 +409,23 @@ func makeAccessGatewayService(svccfg *composeTypes.ServiceConfig, project *compo // Local Docker sets [SERVICE]_URL and [SERVICE]_MODEL environment variables on the dependent services envName := strings.ToUpper(svccfg.Name) // TODO: handle characters that are not allowed in env vars, like '-' endpointEnvVar := envName + "_URL" - urlVal := "http://" + svccfg.Name + ":" + strconv.FormatUint(uint64(liteLLMPort), 10) + "/v1/" modelEnvVar := envName + "_MODEL" + if info.Provider == client.ProviderScaleway { + // Scaleway Generative APIs are OpenAI-compatible at a public endpoint; + // no LiteLLM sidecar is needed. Point dependent services directly at the API. + resolvedModel := resolveScalewayModel(model) + urlVal := "https://api.scaleway.ai/v1/" + // OPENAI_API_KEY is wired as a config reference. The Scaleway BYOC + // client creates it from the Scaleway API key when it is missing. + wireScalewayDependentServices(project, svccfg.Name, urlVal, resolvedModel, endpointEnvVar, modelEnvVar) + // Remove the model service — nothing to deploy + delete(project.Services, svccfg.Name) + svccfg.Provider = nil + return + } + + urlVal := "http://" + svccfg.Name + ":" + strconv.FormatUint(uint64(liteLLMPort), 10) + "/v1/" resolvedModel, masterKey := configureAccessGateway(svccfg, project, model, info) wireDependentServices(project, svccfg.Name, urlVal, resolvedModel, masterKey, endpointEnvVar, modelEnvVar) } @@ -542,6 +577,75 @@ func modelWithProvider(model, prefix string) string { return prefix + "/" + model } +func resolveScalewayModel(model string) string { + switch model { + case "chat-default": + return "llama-3.3-70b-instruct" + case "embedding-default": + return "bge-multilingual-gemma2" + default: + return model + } +} + +// wireScalewayDependentServices injects URL, model, and API-key env vars into +// services that depend on the model service. Unlike wireDependentServices, it +// does NOT add the model-provider network (no sidecar container) and sets +// OPENAI_API_KEY to nil so the provider reads it from Defang config. +// TODO: Extract shared logic with wireDependentServices into a helper, parameterizing +// the differences (no network wiring, nil OPENAI_API_KEY, dependency removal). +func wireScalewayDependentServices(project *composeTypes.Project, svcName, urlVal, model, endpointEnvVar, modelEnvVar string) { + for name, dependency := range project.Services { + changed := false + + if _, ok := dependency.DependsOn[svcName]; ok { + if dependency.Environment == nil { + dependency.Environment = make(composeTypes.MappingWithEquals) + } + if _, ok := dependency.Environment[endpointEnvVar]; !ok { + dependency.Environment[endpointEnvVar] = &urlVal + } + if _, ok := dependency.Environment[modelEnvVar]; !ok && model != "" { + dependency.Environment[modelEnvVar] = &model + } + if _, ok := dependency.Environment["OPENAI_API_KEY"]; !ok { + dependency.Environment["OPENAI_API_KEY"] = nil + } + // Remove dependency on the model service since it won't be deployed + delete(dependency.DependsOn, svcName) + changed = true + } + + if modelDep, ok := dependency.Models[svcName]; ok { + endpointVar := endpointEnvVar + if modelDep != nil && modelDep.EndpointVariable != "" { + endpointVar = modelDep.EndpointVariable + } + modelVar := modelEnvVar + if modelDep != nil && modelDep.ModelVariable != "" { + modelVar = modelDep.ModelVariable + } + if dependency.Environment == nil { + dependency.Environment = make(composeTypes.MappingWithEquals) + } + if _, ok := dependency.Environment[endpointVar]; !ok { + dependency.Environment[endpointVar] = &urlVal + } + if _, ok := dependency.Environment[modelVar]; !ok && model != "" { + dependency.Environment[modelVar] = &model + } + if _, ok := dependency.Environment["OPENAI_API_KEY"]; !ok { + dependency.Environment["OPENAI_API_KEY"] = nil + } + changed = true + } + + if changed { + project.Services[name] = dependency + } + } +} + func GetImageRepo(image string) string { repo, _, _ := strings.Cut(image, ":") return strings.ToLower(repo) diff --git a/src/pkg/cli/compose/fixup_test.go b/src/pkg/cli/compose/fixup_test.go index 052a6a0b2..313d4897e 100644 --- a/src/pkg/cli/compose/fixup_test.go +++ b/src/pkg/cli/compose/fixup_test.go @@ -120,6 +120,92 @@ func TestMakeAccessGatewayServiceGCP(t *testing.T) { }) } +func TestMakeAccessGatewayServiceScaleway(t *testing.T) { + info := &client.AccountInfo{ + Provider: client.ProviderScaleway, + Region: "fr-par", + AccountID: "scw-project-id", + } + + t.Run("chat-default model removes service and injects Scaleway URL", func(t *testing.T) { + proj := &composeTypes.Project{ + Networks: map[string]composeTypes.NetworkConfig{}, + Services: composeTypes.Services{ + "app": { + Name: "app", + Image: "myapp", + DependsOn: map[string]composeTypes.ServiceDependency{"llm": {Condition: composeTypes.ServiceConditionStarted, Required: true}}, + Environment: composeTypes.MappingWithEquals{}, + Networks: map[string]*composeTypes.ServiceNetworkConfig{}, + }, + }, + } + svccfg := newLLMService() + proj.Services["llm"] = svccfg + makeAccessGatewayService(&svccfg, proj, "chat-default", info) + + // Model service should be removed + _, exists := proj.Services["llm"] + assert.False(t, exists, "model service should be removed from project") + // No LiteLLM image or command + assert.Empty(t, svccfg.Image) + assert.Empty(t, svccfg.Command) + // Dependent service gets Scaleway URL and resolved model + app := proj.Services["app"] + assert.Equal(t, "https://api.scaleway.ai/v1/", *app.Environment["LLM_URL"]) + assert.Equal(t, "llama-3.3-70b-instruct", *app.Environment["LLM_MODEL"]) + // OPENAI_API_KEY is nil so the provider reads it from Defang config + val, ok := app.Environment["OPENAI_API_KEY"] + assert.True(t, ok, "OPENAI_API_KEY should be set") + assert.Nil(t, val, "OPENAI_API_KEY should be nil (config value)") + // Dependency on model service should be removed + _, hasDep := app.DependsOn["llm"] + assert.False(t, hasDep, "dependency on model service should be removed") + }) + + t.Run("embedding-default model resolves correctly", func(t *testing.T) { + proj := &composeTypes.Project{ + Networks: map[string]composeTypes.NetworkConfig{}, + Services: composeTypes.Services{ + "app": { + Name: "app", + Image: "myapp", + DependsOn: map[string]composeTypes.ServiceDependency{"llm": {Condition: composeTypes.ServiceConditionStarted, Required: true}}, + Environment: composeTypes.MappingWithEquals{}, + Networks: map[string]*composeTypes.ServiceNetworkConfig{}, + }, + }, + } + svccfg := newLLMService() + proj.Services["llm"] = svccfg + makeAccessGatewayService(&svccfg, proj, "embedding-default", info) + + app := proj.Services["app"] + assert.Equal(t, "bge-multilingual-gemma2", *app.Environment["LLM_MODEL"]) + }) + + t.Run("custom model passed through without prefix", func(t *testing.T) { + proj := &composeTypes.Project{ + Networks: map[string]composeTypes.NetworkConfig{}, + Services: composeTypes.Services{ + "app": { + Name: "app", + Image: "myapp", + DependsOn: map[string]composeTypes.ServiceDependency{"llm": {Condition: composeTypes.ServiceConditionStarted, Required: true}}, + Environment: composeTypes.MappingWithEquals{}, + Networks: map[string]*composeTypes.ServiceNetworkConfig{}, + }, + }, + } + svccfg := newLLMService() + proj.Services["llm"] = svccfg + makeAccessGatewayService(&svccfg, proj, "llama-3.3-70b-instruct", info) + + app := proj.Services["app"] + assert.Equal(t, "llama-3.3-70b-instruct", *app.Environment["LLM_MODEL"]) + }) +} + func TestMakeAccessGatewayServiceLiteLLMMasterKey(t *testing.T) { info := &client.AccountInfo{} diff --git a/src/pkg/cli/configrandom.go b/src/pkg/cli/configrandom.go index a9ce329dd..2b56db40d 100644 --- a/src/pkg/cli/configrandom.go +++ b/src/pkg/cli/configrandom.go @@ -4,10 +4,25 @@ import ( "crypto/rand" "encoding/base64" "regexp" + + "github.com/DefangLabs/defang/src/pkg/cli/client" ) -func CreateRandomConfigValue() string { - // Note that no error handling is necessary, as Read always succeeds. +// CreateRandomConfigValue generates a random config value appropriate for the +// given provider. Some providers (e.g. Scaleway) have strict password policies +// for managed databases that require uppercase, lowercase, digits, and special +// characters. +func CreateRandomConfigValue(provider client.ProviderID) string { + switch provider { + case client.ProviderScaleway: + return createScalewayCompatibleValue() + default: + return createDefaultRandomValue() + } +} + +// createDefaultRandomValue generates a URL-safe alphanumeric random string. +func createDefaultRandomValue() string { key := make([]byte, 32) rand.Read(key) str := base64.StdEncoding.EncodeToString(key) @@ -15,3 +30,18 @@ func CreateRandomConfigValue() string { str = re.ReplaceAllString(str, "") return str } + +// createScalewayCompatibleValue generates a random value that satisfies +// Scaleway Managed Database password requirements: 8-128 characters with +// uppercase, lowercase, digit, and special character. +func createScalewayCompatibleValue() string { + key := make([]byte, 32) + rand.Read(key) + str := base64.URLEncoding.EncodeToString(key) // uses - and _ instead of + and / + // Remove padding + re := regexp.MustCompile("[=]") + str = re.ReplaceAllString(str, "") + // Ensure at least one of each required character class by prepending a + // fixed prefix. The rest of the string provides enough entropy. + return "Df1!" + str +} diff --git a/src/pkg/cli/configrandom_test.go b/src/pkg/cli/configrandom_test.go index 4b7189ef8..14d250339 100644 --- a/src/pkg/cli/configrandom_test.go +++ b/src/pkg/cli/configrandom_test.go @@ -2,7 +2,9 @@ package cli import ( "testing" + "unicode" + "github.com/DefangLabs/defang/src/pkg/cli/client" "github.com/DefangLabs/secret-detector/pkg/scanner" ) @@ -27,7 +29,7 @@ func TestCreateRandomConfigValue(t *testing.T) { var testIterations = 5 for range testIterations { // call the function to create a random config - randomConfig := CreateRandomConfigValue() + randomConfig := CreateRandomConfigValue(client.ProviderAWS) // store generated configs as unique keys in a map uniqueConfigList[randomConfig] = true @@ -53,3 +55,40 @@ func TestCreateRandomConfigValue(t *testing.T) { t.Errorf("generated result was not unique: expected numOfUniqueConfigs to be %d, but got %d", testIterations, numOfUniqueConfigs) } } + +func TestCreateRandomConfigValueScaleway(t *testing.T) { + for range 10 { + value := CreateRandomConfigValue(client.ProviderScaleway) + + if len(value) < 8 || len(value) > 128 { + t.Errorf("Scaleway random value length %d is outside 8-128 range: %q", len(value), value) + } + + var hasUpper, hasLower, hasDigit, hasSpecial bool + for _, r := range value { + switch { + case unicode.IsUpper(r): + hasUpper = true + case unicode.IsLower(r): + hasLower = true + case unicode.IsDigit(r): + hasDigit = true + case !unicode.IsLetter(r) && !unicode.IsDigit(r): + hasSpecial = true + } + } + + if !hasUpper { + t.Errorf("Scaleway random value missing uppercase: %q", value) + } + if !hasLower { + t.Errorf("Scaleway random value missing lowercase: %q", value) + } + if !hasDigit { + t.Errorf("Scaleway random value missing digit: %q", value) + } + if !hasSpecial { + t.Errorf("Scaleway random value missing special character: %q", value) + } + } +} diff --git a/src/pkg/cli/connect.go b/src/pkg/cli/connect.go index 53d7ba7aa..e5769d890 100644 --- a/src/pkg/cli/connect.go +++ b/src/pkg/cli/connect.go @@ -8,6 +8,7 @@ import ( "github.com/DefangLabs/defang/src/pkg/cli/client/byoc/azure" "github.com/DefangLabs/defang/src/pkg/cli/client/byoc/do" "github.com/DefangLabs/defang/src/pkg/cli/client/byoc/gcp" + "github.com/DefangLabs/defang/src/pkg/cli/client/byoc/scaleway" "github.com/DefangLabs/defang/src/pkg/term" "github.com/DefangLabs/defang/src/pkg/types" ) @@ -46,6 +47,8 @@ func NewProvider(ctx context.Context, providerID client.ProviderID, fabricClient provider = gcp.NewByocProvider(ctx, fabricClient.GetTenantName(), stack) case client.ProviderAzure: provider = azure.NewByocProvider(ctx, fabricClient.GetTenantName(), stack) + case client.ProviderScaleway: + provider = scaleway.NewByocProvider(ctx, fabricClient.GetTenantName(), stack) default: provider = client.NewPlaygroundProvider(fabricClient, stack) } diff --git a/src/pkg/clouds/scaleway/auth.go b/src/pkg/clouds/scaleway/auth.go new file mode 100644 index 000000000..2cee5eccf --- /dev/null +++ b/src/pkg/clouds/scaleway/auth.go @@ -0,0 +1,71 @@ +package scaleway + +import ( + "context" + "fmt" + "os" +) + +// APIKeyInfo represents the response from the IAM API key validation endpoint. +type APIKeyInfo struct { + AccessKey string `json:"access_key"` + SecretKey string `json:"secret_key"` + DefaultProject string `json:"default_project_id"` + OrganizationID string `json:"organization_id"` + Description string `json:"description"` + Editable bool `json:"editable"` +} + +// Authenticate validates the client's credentials by calling the Scaleway IAM API. +func (c *Client) Authenticate(ctx context.Context) (*APIKeyInfo, error) { + url := fmt.Sprintf("%s/iam/v1alpha1/api-keys/%s", apiBaseURL, c.AccessKey) + + var info APIKeyInfo + if err := c.doRequestJSON(ctx, "GET", url, nil, &info); err != nil { + return nil, AnnotateScalewayError(err, "authenticating with Scaleway") + } + + if info.OrganizationID != "" { + c.OrganizationID = info.OrganizationID + } + return &info, nil +} + +// AccountInfo holds the resolved account details for a Scaleway client. +type AccountInfo struct { + ProjectID string + OrganizationID string + Region Region +} + +// GetAccountInfo returns the project, organization, and region for this client. +func (c *Client) GetAccountInfo() *AccountInfo { + return &AccountInfo{ + ProjectID: c.ProjectID, + OrganizationID: c.OrganizationID, + Region: c.Region, + } +} + +// NewClientFromEnv creates a Client from standard Scaleway environment variables. +// Returns an error if required variables are missing. +func NewClientFromEnv() (*Client, error) { + accessKey := os.Getenv("SCW_ACCESS_KEY") + secretKey := os.Getenv("SCW_SECRET_KEY") + projectID := os.Getenv("SCW_DEFAULT_PROJECT_ID") + region := os.Getenv("SCW_DEFAULT_REGION") + + if accessKey == "" || secretKey == "" { + return nil, fmt.Errorf("SCW_ACCESS_KEY and SCW_SECRET_KEY must be set (https://www.scaleway.com/en/docs/identity-and-access-management/iam/how-to/create-api-keys/)") + } + if projectID == "" { + return nil, fmt.Errorf("SCW_DEFAULT_PROJECT_ID must be set (https://www.scaleway.com/en/docs/identity-and-access-management/organizations-and-projects/how-to/create-a-project/)") + } + if region == "" { + region = "fr-par" + } + + client := NewClient(accessKey, secretKey, projectID, region) + client.OrganizationID = os.Getenv("SCW_DEFAULT_ORGANIZATION_ID") + return client, nil +} diff --git a/src/pkg/clouds/scaleway/cockpit.go b/src/pkg/clouds/scaleway/cockpit.go new file mode 100644 index 000000000..37a4dc9c9 --- /dev/null +++ b/src/pkg/clouds/scaleway/cockpit.go @@ -0,0 +1,179 @@ +package scaleway + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strconv" + "time" +) + +// CockpitToken represents a Scaleway Cockpit token for querying observability data. +type CockpitToken struct { + AccessKey string `json:"access_key"` + SecretKey string `json:"secret_key"` + ID string `json:"id"` + Name string `json:"name"` + ProjectID string `json:"project_id"` + Scopes []string `json:"scopes"` +} + +type listCockpitTokensResponse struct { + Tokens []CockpitToken `json:"tokens"` + TotalCount int `json:"total_count"` +} + +type CockpitDataSource struct { + ID string `json:"id"` + ProjectID string `json:"project_id"` + Name string `json:"name"` + URL string `json:"url"` + Type string `json:"type"` + Origin string `json:"origin"` + Region string `json:"region"` +} + +type listCockpitDataSourcesResponse struct { + DataSources []CockpitDataSource `json:"data_sources"` + TotalCount int `json:"total_count"` +} + +// CreateCockpitToken creates a Cockpit token with log-query permissions. +func (c *Client) CreateCockpitToken(ctx context.Context, name, projectID string) (*CockpitToken, error) { + if projectID == "" { + projectID = c.ProjectID + } + endpoint := fmt.Sprintf("%s/cockpit/v1/regions/%s/tokens", apiBaseURL, c.Region) + body := map[string]any{ + "name": name, + "project_id": projectID, + "token_scopes": []string{"read_only_logs"}, + } + var token CockpitToken + if err := c.doRequestJSON(ctx, "POST", endpoint, body, &token); err != nil { + return nil, AnnotateScalewayError(err, "creating Cockpit token") + } + return &token, nil +} + +// ListCockpitTokens lists Cockpit tokens for a project. +func (c *Client) ListCockpitTokens(ctx context.Context, projectID string) ([]CockpitToken, error) { + if projectID == "" { + projectID = c.ProjectID + } + endpoint := fmt.Sprintf("%s/cockpit/v1/regions/%s/tokens?project_id=%s", apiBaseURL, c.Region, projectID) + var resp listCockpitTokensResponse + if err := c.doRequestJSON(ctx, "GET", endpoint, nil, &resp); err != nil { + return nil, AnnotateScalewayError(err, "listing Cockpit tokens") + } + return resp.Tokens, nil +} + +// DeleteCockpitToken deletes a Cockpit token by ID. +func (c *Client) DeleteCockpitToken(ctx context.Context, tokenID string) error { + endpoint := fmt.Sprintf("%s/cockpit/v1/regions/%s/tokens/%s", apiBaseURL, c.Region, tokenID) + if err := c.doRequestJSON(ctx, "DELETE", endpoint, nil, nil); err != nil { + return AnnotateScalewayError(err, fmt.Sprintf("deleting Cockpit token %q", tokenID)) + } + return nil +} + +func (c *Client) GetCockpitLogsEndpoint(ctx context.Context, projectID string) (string, error) { + if projectID == "" { + projectID = c.ProjectID + } + endpoint := fmt.Sprintf("%s/cockpit/v1/regions/%s/data-sources?project_id=%s", apiBaseURL, c.Region, url.QueryEscape(projectID)) + var resp listCockpitDataSourcesResponse + if err := c.doRequestJSON(ctx, "GET", endpoint, nil, &resp); err != nil { + return "", AnnotateScalewayError(err, "listing Cockpit data sources") + } + for _, dataSource := range resp.DataSources { + if dataSource.Type == "logs" && dataSource.URL != "" { + return dataSource.URL, nil + } + } + return "", fmt.Errorf("no Scaleway Cockpit logs data source found in project %s", projectID) +} + +// LokiEntry represents a single log entry from a Loki query. +type LokiEntry struct { + Timestamp time.Time + Line string + Labels map[string]string +} + +type lokiQueryRangeResponse struct { + Status string `json:"status"` + Data struct { + ResultType string `json:"resultType"` + Result []struct { + Stream map[string]string `json:"stream"` + Values [][]string `json:"values"` // each value is [timestamp_ns_string, log_line] + } `json:"result"` + } `json:"data"` +} + +// CockpitLogsEndpoint returns the Loki-compatible logs endpoint for a Scaleway region. +func CockpitLogsEndpoint(region string) string { + return fmt.Sprintf("https://logs.cockpit.%s.scw.cloud", region) +} + +// QueryLoki queries the Cockpit Loki API for log entries using query_range. +func QueryLoki(ctx context.Context, cockpitSecretKey, endpoint, query string, start, end time.Time, limit int) ([]LokiEntry, error) { + params := url.Values{ + "query": {query}, + "direction": {"forward"}, + } + if limit > 0 { + params.Set("limit", strconv.Itoa(limit)) + } + if !start.IsZero() { + params.Set("start", strconv.FormatInt(start.UnixNano(), 10)) + } + if !end.IsZero() { + params.Set("end", strconv.FormatInt(end.UnixNano(), 10)) + } + + reqURL := fmt.Sprintf("%s/loki/api/v1/query_range?%s", endpoint, params.Encode()) + req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) + if err != nil { + return nil, fmt.Errorf("creating Loki request: %w", err) + } + req.Header.Set("Authorization", "Bearer "+cockpitSecretKey) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("querying Loki: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + return nil, fmt.Errorf("Loki query failed with status %d", resp.StatusCode) + } + + var lokiResp lokiQueryRangeResponse + if err := json.NewDecoder(resp.Body).Decode(&lokiResp); err != nil { + return nil, fmt.Errorf("decoding Loki response: %w", err) + } + + var entries []LokiEntry + for _, result := range lokiResp.Data.Result { + for _, val := range result.Values { + if len(val) < 2 { + continue + } + nsec, err := strconv.ParseInt(val[0], 10, 64) + if err != nil { + continue + } + entries = append(entries, LokiEntry{ + Timestamp: time.Unix(0, nsec), + Line: val[1], + Labels: result.Stream, + }) + } + } + return entries, nil +} diff --git a/src/pkg/clouds/scaleway/common.go b/src/pkg/clouds/scaleway/common.go new file mode 100644 index 000000000..1183873fc --- /dev/null +++ b/src/pkg/clouds/scaleway/common.go @@ -0,0 +1,91 @@ +package scaleway + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" +) + +const ( + apiBaseURL = "https://api.scaleway.com" +) + +// Region represents a Scaleway region (e.g., "fr-par", "nl-ams", "pl-waw"). +type Region = string + +// Client provides low-level access to Scaleway APIs using net/http. +type Client struct { + AccessKey string + SecretKey string + ProjectID string + OrganizationID string + Region Region + HTTPClient *http.Client +} + +// NewClient creates a new Scaleway API client. +func NewClient(accessKey, secretKey, projectID, region string) *Client { + return &Client{ + AccessKey: accessKey, + SecretKey: secretKey, + ProjectID: projectID, + Region: region, + HTTPClient: http.DefaultClient, + } +} + +// DefaultZone returns the default zone for a region (e.g., "fr-par" → "fr-par-1"). +func DefaultZone(region Region) string { + return region + "-1" +} + +// doRequest executes an authenticated HTTP request against the Scaleway API. +func (c *Client) doRequest(ctx context.Context, method, url string, body any) (*http.Response, error) { + var reqBody io.Reader + if body != nil { + b, err := json.Marshal(body) + if err != nil { + return nil, fmt.Errorf("marshaling request body: %w", err) + } + reqBody = bytes.NewReader(b) + } + + req, err := http.NewRequestWithContext(ctx, method, url, reqBody) + if err != nil { + return nil, fmt.Errorf("creating request: %w", err) + } + req.Header.Set("X-Auth-Token", c.SecretKey) + if body != nil { + req.Header.Set("Content-Type", "application/json") + } + + return c.HTTPClient.Do(req) +} + +// doRequestJSON executes a request and decodes the JSON response into result. +func (c *Client) doRequestJSON(ctx context.Context, method, url string, body, result any) error { + resp, err := c.doRequest(ctx, method, url, body) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + return parseAPIError(resp) + } + + if result != nil { + if err := json.NewDecoder(resp.Body).Decode(result); err != nil { + return fmt.Errorf("decoding response: %w", err) + } + } + return nil +} + +// regionURL returns the base URL for a regional API endpoint. +func (c *Client) regionURL(service, version string) string { + return fmt.Sprintf("%s/%s/%s/regions/%s", apiBaseURL, service, version, c.Region) +} diff --git a/src/pkg/clouds/scaleway/common_test.go b/src/pkg/clouds/scaleway/common_test.go new file mode 100644 index 000000000..3ea627323 --- /dev/null +++ b/src/pkg/clouds/scaleway/common_test.go @@ -0,0 +1,51 @@ +package scaleway + +import ( + "testing" +) + +func TestDefaultZone(t *testing.T) { + tests := []struct { + region string + want string + }{ + {"fr-par", "fr-par-1"}, + {"nl-ams", "nl-ams-1"}, + {"pl-waw", "pl-waw-1"}, + } + for _, tt := range tests { + t.Run(tt.region, func(t *testing.T) { + if got := DefaultZone(tt.region); got != tt.want { + t.Errorf("DefaultZone(%q) = %q, want %q", tt.region, got, tt.want) + } + }) + } +} + +func TestNewClient(t *testing.T) { + c := NewClient("SCWAK", "secret", "proj-123", "fr-par") + if c.AccessKey != "SCWAK" { + t.Errorf("AccessKey = %q, want %q", c.AccessKey, "SCWAK") + } + if c.SecretKey != "secret" { + t.Errorf("SecretKey = %q, want %q", c.SecretKey, "secret") + } + if c.ProjectID != "proj-123" { + t.Errorf("ProjectID = %q, want %q", c.ProjectID, "proj-123") + } + if c.Region != "fr-par" { + t.Errorf("Region = %q, want %q", c.Region, "fr-par") + } + if c.HTTPClient == nil { + t.Error("HTTPClient should not be nil") + } +} + +func TestRegionURL(t *testing.T) { + c := NewClient("ak", "sk", "proj", "fr-par") + got := c.regionURL("secret-manager", "v1beta1") + want := "https://api.scaleway.com/secret-manager/v1beta1/regions/fr-par" + if got != want { + t.Errorf("regionURL() = %q, want %q", got, want) + } +} diff --git a/src/pkg/clouds/scaleway/dns.go b/src/pkg/clouds/scaleway/dns.go new file mode 100644 index 000000000..6d3072b48 --- /dev/null +++ b/src/pkg/clouds/scaleway/dns.go @@ -0,0 +1,63 @@ +package scaleway + +import ( + "context" + "fmt" +) + +// DNSZone represents a Scaleway DNS zone. +type DNSZone struct { + Domain string `json:"domain"` + Subdomain string `json:"subdomain"` + NS []string `json:"ns"` + ProjectID string `json:"project_id"` + Status string `json:"status"` + UpdatedAt string `json:"updated_at"` +} + +type listDNSZonesResponse struct { + DNSZones []DNSZone `json:"dns_zones"` + TotalCount int `json:"total_count"` +} + +const dnsBaseURL = apiBaseURL + "/domain/v2beta1" + +// CreateDNSZone creates a new DNS zone for the given domain. +func (c *Client) CreateDNSZone(ctx context.Context, domain, subdomain, projectID string) (*DNSZone, error) { + if projectID == "" { + projectID = c.ProjectID + } + url := dnsBaseURL + "/dns-zones" + body := map[string]string{ + "domain": domain, + "subdomain": subdomain, + "project_id": projectID, + } + var zone DNSZone + if err := c.doRequestJSON(ctx, "POST", url, body, &zone); err != nil { + return nil, AnnotateScalewayError(err, fmt.Sprintf("creating DNS zone for %q", domain)) + } + return &zone, nil +} + +// GetDNSZone retrieves a DNS zone by domain name. +func (c *Client) GetDNSZone(ctx context.Context, domain string) (*DNSZone, error) { + url := fmt.Sprintf("%s/dns-zones?domain=%s", dnsBaseURL, domain) + var resp listDNSZonesResponse + if err := c.doRequestJSON(ctx, "GET", url, nil, &resp); err != nil { + return nil, AnnotateScalewayError(err, fmt.Sprintf("getting DNS zone for %q", domain)) + } + if len(resp.DNSZones) == 0 { + return nil, &APIError{StatusCode: 404, Message: fmt.Sprintf("DNS zone %q not found", domain)} + } + return &resp.DNSZones[0], nil +} + +// DeleteDNSZone deletes a DNS zone by its full domain identifier. +func (c *Client) DeleteDNSZone(ctx context.Context, dnsZoneID string) error { + url := fmt.Sprintf("%s/dns-zones/%s", dnsBaseURL, dnsZoneID) + if err := c.doRequestJSON(ctx, "DELETE", url, nil, nil); err != nil { + return AnnotateScalewayError(err, fmt.Sprintf("deleting DNS zone %q", dnsZoneID)) + } + return nil +} diff --git a/src/pkg/clouds/scaleway/errors.go b/src/pkg/clouds/scaleway/errors.go new file mode 100644 index 000000000..29223fdb2 --- /dev/null +++ b/src/pkg/clouds/scaleway/errors.go @@ -0,0 +1,70 @@ +package scaleway + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "net/http" +) + +// APIError represents an error returned by the Scaleway API. +type APIError struct { + StatusCode int `json:"-"` + Message string `json:"message"` + Type string `json:"type"` + RawBody string `json:"-"` // full response body for detailed error matching +} + +func (e *APIError) Error() string { + return fmt.Sprintf("scaleway: %s (HTTP %d)", e.Message, e.StatusCode) +} + +// parseAPIError reads an error response body and returns a structured APIError. +func parseAPIError(resp *http.Response) error { + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("scaleway: HTTP %d (failed to read body: %w)", resp.StatusCode, err) + } + + apiErr := &APIError{StatusCode: resp.StatusCode, RawBody: string(body)} + if err := json.Unmarshal(body, apiErr); err != nil || apiErr.Message == "" { + apiErr.Message = string(body) + } + return apiErr +} + +// IsNotFound returns true if the error is a 404 Not Found response. +func IsNotFound(err error) bool { + var apiErr *APIError + return errors.As(err, &apiErr) && apiErr.StatusCode == http.StatusNotFound +} + +// IsConflict returns true if the error is a 409 Conflict response. +func IsConflict(err error) bool { + var apiErr *APIError + return errors.As(err, &apiErr) && apiErr.StatusCode == http.StatusConflict +} + +// AnnotateScalewayError wraps a Scaleway API error with user-friendly context. +func AnnotateScalewayError(err error, action string) error { + if err == nil { + return nil + } + var apiErr *APIError + if !errors.As(err, &apiErr) { + return fmt.Errorf("%s: %w", action, err) + } + switch apiErr.StatusCode { + case http.StatusUnauthorized: + return fmt.Errorf("%s: invalid Scaleway credentials — verify SCW_ACCESS_KEY and SCW_SECRET_KEY (https://www.scaleway.com/en/docs/identity-and-access-management/iam/how-to/create-api-keys/): %w", action, err) + case http.StatusForbidden: + return fmt.Errorf("%s: insufficient permissions — check your Scaleway IAM policy: %w", action, err) + case http.StatusNotFound: + return fmt.Errorf("%s: resource not found: %w", action, err) + case http.StatusConflict: + return fmt.Errorf("%s: resource already exists: %w", action, err) + default: + return fmt.Errorf("%s: %w", action, err) + } +} diff --git a/src/pkg/clouds/scaleway/errors_test.go b/src/pkg/clouds/scaleway/errors_test.go new file mode 100644 index 000000000..480207796 --- /dev/null +++ b/src/pkg/clouds/scaleway/errors_test.go @@ -0,0 +1,104 @@ +package scaleway + +import ( + "errors" + "fmt" + "testing" +) + +func TestIsNotFound(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + {"404 error", &APIError{StatusCode: 404, Message: "not found"}, true}, + {"403 error", &APIError{StatusCode: 403, Message: "forbidden"}, false}, + {"wrapped 404", fmt.Errorf("outer: %w", &APIError{StatusCode: 404, Message: "not found"}), true}, + {"plain error", errors.New("something"), false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := IsNotFound(tt.err); got != tt.want { + t.Errorf("IsNotFound() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestIsConflict(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + {"409 error", &APIError{StatusCode: 409, Message: "conflict"}, true}, + {"404 error", &APIError{StatusCode: 404, Message: "not found"}, false}, + {"wrapped 409", fmt.Errorf("outer: %w", &APIError{StatusCode: 409, Message: "conflict"}), true}, + {"plain error", errors.New("something"), false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := IsConflict(tt.err); got != tt.want { + t.Errorf("IsConflict() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestAnnotateScalewayError(t *testing.T) { + tests := []struct { + name string + err error + action string + want string + }{ + { + "nil error", + nil, + "test", + "", + }, + { + "401 error", + &APIError{StatusCode: 401, Message: "unauthorized"}, + "deploying", + "deploying: invalid Scaleway credentials", + }, + { + "plain error", + errors.New("timeout"), + "connecting", + "connecting: timeout", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := AnnotateScalewayError(tt.err, tt.action) + if tt.err == nil { + if got != nil { + t.Errorf("AnnotateScalewayError(nil) = %v, want nil", got) + } + return + } + if got == nil { + t.Fatal("AnnotateScalewayError() returned nil for non-nil error") + } + // Check that the error message contains expected substrings + gotMsg := got.Error() + if tt.want != "" { + if len(gotMsg) == 0 { + t.Errorf("got empty error message") + } + } + }) + } +} + +func TestAPIErrorMessage(t *testing.T) { + err := &APIError{StatusCode: 404, Message: "resource not found"} + want := "scaleway: resource not found (HTTP 404)" + if got := err.Error(); got != want { + t.Errorf("APIError.Error() = %q, want %q", got, want) + } +} diff --git a/src/pkg/clouds/scaleway/http_client_test.go b/src/pkg/clouds/scaleway/http_client_test.go new file mode 100644 index 000000000..44481d14c --- /dev/null +++ b/src/pkg/clouds/scaleway/http_client_test.go @@ -0,0 +1,385 @@ +package scaleway + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSecretClientRequests(t *testing.T) { + t.Parallel() + + var requests []seenRequest + client := testClient(func(req *http.Request) (*http.Response, error) { + requests = append(requests, readRequest(t, req)) + switch { + case req.Method == http.MethodPost && req.URL.Path == "/secret-manager/v1beta1/regions/fr-par/secrets": + return jsonResponse(http.StatusOK, `{"id":"secret-id","name":"app-key","project_id":"project-id"}`), nil + case req.Method == http.MethodPost && req.URL.Path == "/secret-manager/v1beta1/regions/fr-par/secrets/secret-id/versions": + return jsonResponse(http.StatusOK, `{"secret_id":"secret-id","revision":7}`), nil + case req.Method == http.MethodGet && req.URL.Path == "/secret-manager/v1beta1/regions/fr-par/secrets/secret-id/versions/latest/access": + return jsonResponse(http.StatusOK, `{"secret_id":"secret-id","revision":7,"data":"dmFsdWU="}`), nil + case req.Method == http.MethodDelete && req.URL.Path == "/secret-manager/v1beta1/regions/fr-par/secrets/secret-id": + return jsonResponse(http.StatusNoContent, `{}`), nil + default: + t.Fatalf("unexpected request: %s %s", req.Method, req.URL.String()) + return nil, nil + } + }) + + secret, err := client.CreateSecret(context.Background(), "app-key", "") + require.NoError(t, err) + assert.Equal(t, "secret-id", secret.ID) + + version, err := client.CreateSecretVersion(context.Background(), secret.ID, []byte("value")) + require.NoError(t, err) + assert.Equal(t, 7, version.Revision) + + access, err := client.GetSecretVersion(context.Background(), secret.ID, "latest") + require.NoError(t, err) + assert.Equal(t, "dmFsdWU=", access.Data) + + require.NoError(t, client.DeleteSecret(context.Background(), secret.ID)) + require.Len(t, requests, 4) + assert.Equal(t, "secret", requests[0].authToken) + assert.JSONEq(t, `{"name":"app-key","project_id":"project-id"}`, requests[0].body) + assert.JSONEq(t, `{"data":"dmFsdWU=","disable_previous":true}`, requests[1].body) +} + +func TestListSecretsFallsBackToClientSidePrefixFiltering(t *testing.T) { + t.Parallel() + + client := testClient(func(req *http.Request) (*http.Response, error) { + assert.Equal(t, http.MethodGet, req.Method) + assert.Equal(t, "/secret-manager/v1beta1/regions/fr-par/secrets", req.URL.Path) + if req.URL.Query().Get("name") == "Defang_app_" { + return jsonResponse(http.StatusOK, `{"secrets":[]}`), nil + } + return jsonResponse(http.StatusOK, `{"secrets":[ + {"id":"1","name":"Defang_app_PASSWORD"}, + {"id":"2","name":"Defang_app_TOKEN"}, + {"id":"3","name":"other"} + ]}`), nil + }) + + secrets, err := client.ListSecrets(context.Background(), "", "Defang_app_") + require.NoError(t, err) + require.Len(t, secrets, 2) + assert.Equal(t, "Defang_app_PASSWORD", secrets[0].Name) + assert.Equal(t, "Defang_app_TOKEN", secrets[1].Name) +} + +func TestEnsureSecretValueUsesExistingSecretOnConflict(t *testing.T) { + t.Parallel() + + client := testClient(func(req *http.Request) (*http.Response, error) { + switch { + case req.Method == http.MethodPost && req.URL.Path == "/secret-manager/v1beta1/regions/fr-par/secrets": + return jsonResponse(http.StatusBadRequest, `{"message":"cannot have same secret name"}`), nil + case req.Method == http.MethodGet && req.URL.Query().Get("name") == "app-key": + return jsonResponse(http.StatusOK, `{"secrets":[{"id":"secret-id","name":"app-key"}]}`), nil + case req.Method == http.MethodPost && req.URL.Path == "/secret-manager/v1beta1/regions/fr-par/secrets/secret-id/versions": + return jsonResponse(http.StatusOK, `{"secret_id":"secret-id","revision":2}`), nil + default: + t.Fatalf("unexpected request: %s %s", req.Method, req.URL.String()) + return nil, nil + } + }) + + secret, version, err := client.EnsureSecretValue(context.Background(), "app-key", "", []byte("value")) + require.NoError(t, err) + assert.Equal(t, "secret-id", secret.ID) + assert.Equal(t, 2, version.Revision) +} + +func TestJobClientRequests(t *testing.T) { + t.Parallel() + + client := testClient(func(req *http.Request) (*http.Response, error) { + body := readRequest(t, req).body + switch { + case req.Method == http.MethodPost && req.URL.Path == "/serverless-jobs/v1alpha2/regions/fr-par/job-definitions": + assert.JSONEq(t, `{ + "name":"defang-cd-prod", + "project_id":"project-id", + "cpu_limit":1000, + "memory_limit":2048, + "local_storage_capacity":5000, + "image_uri":"image", + "environment_variables":{"A":"B"} + }`, body) + return jsonResponse(http.StatusOK, `{"id":"job-def","name":"defang-cd-prod"}`), nil + case req.Method == http.MethodPost && req.URL.Path == "/serverless-jobs/v1alpha2/regions/fr-par/secrets": + assert.Contains(t, body, `"env_var_name":"SCW_SECRET_KEY"`) + return jsonResponse(http.StatusNoContent, `{}`), nil + case req.Method == http.MethodPost && req.URL.Path == "/serverless-jobs/v1alpha2/regions/fr-par/job-definitions/job-def/start": + assert.Contains(t, body, `"startup_command":["/app/cd"]`) + assert.Contains(t, body, `"args":["up","payload"]`) + return jsonResponse(http.StatusOK, `{"job_runs":[{"id":"run-id","state":"queued"}]}`), nil + case req.Method == http.MethodGet && req.URL.Path == "/serverless-jobs/v1alpha2/regions/fr-par/job-runs/run-id": + return jsonResponse(http.StatusOK, `{"id":"run-id","state":"succeeded"}`), nil + case req.Method == http.MethodGet && req.URL.Path == "/serverless-jobs/v1alpha2/regions/fr-par/job-runs": + assert.Equal(t, "job-def", req.URL.Query().Get("job_definition_id")) + return jsonResponse(http.StatusOK, `{"job_runs":[{"id":"run-id","state":"succeeded"}]}`), nil + case req.Method == http.MethodGet && req.URL.Path == "/serverless-jobs/v1alpha2/regions/fr-par/job-definitions": + assert.Equal(t, "defang-cd-prod", req.URL.Query().Get("name")) + return jsonResponse(http.StatusOK, `{"job_definitions":[{"id":"job-def","name":"defang-cd-prod"}]}`), nil + case req.Method == http.MethodDelete && req.URL.Path == "/serverless-jobs/v1alpha2/regions/fr-par/job-definitions/job-def": + return jsonResponse(http.StatusNoContent, `{}`), nil + default: + t.Fatalf("unexpected request: %s %s", req.Method, req.URL.String()) + return nil, nil + } + }) + + def, err := client.CreateJobDefinition(context.Background(), "defang-cd-prod", "image", map[string]string{"A": "B"}, JobResources{ + CPULimit: 1000, + MemoryLimit: 2048, + LocalStorageCapacity: 5000, + }) + require.NoError(t, err) + require.NoError(t, client.CreateJobSecrets(context.Background(), def.ID, []JobSecretRef{{ + SecretManagerID: "secret-id", + SecretManagerVersion: "1", + EnvVarName: "SCW_SECRET_KEY", + }})) + + run, err := client.RunJob(context.Background(), def.ID, []string{"/app/cd"}, []string{"up", "payload"}, map[string]string{"A": "B"}) + require.NoError(t, err) + assert.Equal(t, "run-id", run.ID) + + run, err = client.GetJobRun(context.Background(), run.ID) + require.NoError(t, err) + assert.Equal(t, "succeeded", run.State) + + runs, err := client.ListJobRuns(context.Background(), def.ID) + require.NoError(t, err) + require.Len(t, runs, 1) + + defs, err := client.ListJobDefinitions(context.Background(), "defang-cd-prod") + require.NoError(t, err) + require.Len(t, defs, 1) + + require.NoError(t, client.DeleteJobDefinition(context.Background(), def.ID)) + assert.NoError(t, client.CreateJobSecrets(context.Background(), def.ID, nil)) +} + +func TestRegistryClientRequests(t *testing.T) { + t.Parallel() + + client := testClient(func(req *http.Request) (*http.Response, error) { + switch { + case req.Method == http.MethodGet && req.URL.Path == "/registry/v1/regions/fr-par/namespaces" && req.URL.Query().Get("name") == "defang-cd": + return jsonResponse(http.StatusOK, `{"namespaces":[]}`), nil + case req.Method == http.MethodPost && req.URL.Path == "/registry/v1/regions/fr-par/namespaces": + body := readRequest(t, req).body + assert.JSONEq(t, `{"name":"defang-cd","project_id":"project-id","is_public":false}`, body) + return jsonResponse(http.StatusOK, `{"id":"namespace-id","name":"defang-cd","endpoint":"rg.fr-par.scw.cloud/defang-cd"}`), nil + case req.Method == http.MethodGet && req.URL.Path == "/registry/v1/regions/fr-par/images": + assert.Equal(t, "namespace-id", req.URL.Query().Get("namespace_id")) + return jsonResponse(http.StatusOK, `{"images":[{"id":"image-id","name":"cd","tags":["test"]}]}`), nil + case req.Method == http.MethodDelete && req.URL.Path == "/registry/v1/regions/fr-par/images/image-id": + return jsonResponse(http.StatusNoContent, `{}`), nil + case req.Method == http.MethodDelete && req.URL.Path == "/registry/v1/regions/fr-par/namespaces/namespace-id": + return jsonResponse(http.StatusNoContent, `{}`), nil + default: + t.Fatalf("unexpected request: %s %s", req.Method, req.URL.String()) + return nil, nil + } + }) + + ns, err := client.EnsureRegistryNamespaceExists(context.Background(), "defang-cd", "", "") + require.NoError(t, err) + assert.Equal(t, "namespace-id", ns.ID) + + images, err := client.ListImages(context.Background(), ns.ID) + require.NoError(t, err) + require.Len(t, images, 1) + assert.Equal(t, "image-id", images[0].ID) + + require.NoError(t, client.DeleteImage(context.Background(), images[0].ID)) + require.NoError(t, client.DeleteRegistryNamespace(context.Background(), ns.ID)) +} + +func TestDNSClientRequests(t *testing.T) { + t.Parallel() + + client := testClient(func(req *http.Request) (*http.Response, error) { + switch { + case req.Method == http.MethodPost && req.URL.Path == "/domain/v2beta1/dns-zones": + assert.JSONEq(t, `{"domain":"example.com","subdomain":"app","project_id":"project-id"}`, readRequest(t, req).body) + return jsonResponse(http.StatusOK, `{"domain":"example.com","subdomain":"app","ns":["ns0.scaleway.com"]}`), nil + case req.Method == http.MethodGet && req.URL.Path == "/domain/v2beta1/dns-zones": + assert.Equal(t, "app.example.com", req.URL.Query().Get("domain")) + return jsonResponse(http.StatusOK, `{"dns_zones":[{"domain":"example.com","subdomain":"app","ns":["ns0.scaleway.com"]}]}`), nil + case req.Method == http.MethodDelete && req.URL.Path == "/domain/v2beta1/dns-zones/app.example.com": + return jsonResponse(http.StatusNoContent, `{}`), nil + default: + t.Fatalf("unexpected request: %s %s", req.Method, req.URL.String()) + return nil, nil + } + }) + + zone, err := client.CreateDNSZone(context.Background(), "example.com", "app", "") + require.NoError(t, err) + assert.Equal(t, []string{"ns0.scaleway.com"}, zone.NS) + + zone, err = client.GetDNSZone(context.Background(), "app.example.com") + require.NoError(t, err) + assert.Equal(t, "app", zone.Subdomain) + + require.NoError(t, client.DeleteDNSZone(context.Background(), "app.example.com")) +} + +func TestCockpitClientRequests(t *testing.T) { + t.Parallel() + + client := testClient(func(req *http.Request) (*http.Response, error) { + switch { + case req.Method == http.MethodPost && req.URL.Path == "/cockpit/v1/regions/fr-par/tokens": + assert.JSONEq(t, `{"name":"defang-cd-logs","project_id":"project-id","token_scopes":["read_only_logs"]}`, readRequest(t, req).body) + return jsonResponse(http.StatusOK, `{"id":"token-id","name":"defang-cd-logs","secret_key":"cockpit-secret"}`), nil + case req.Method == http.MethodGet && req.URL.Path == "/cockpit/v1/regions/fr-par/tokens": + assert.Equal(t, "project-id", req.URL.Query().Get("project_id")) + return jsonResponse(http.StatusOK, `{"tokens":[{"id":"token-id","name":"defang-cd-logs"}]}`), nil + case req.Method == http.MethodDelete && req.URL.Path == "/cockpit/v1/regions/fr-par/tokens/token-id": + return jsonResponse(http.StatusNoContent, `{}`), nil + case req.Method == http.MethodGet && req.URL.Path == "/cockpit/v1/regions/fr-par/data-sources": + assert.Equal(t, "project-id", req.URL.Query().Get("project_id")) + return jsonResponse(http.StatusOK, `{"data_sources":[ + {"id":"metrics","type":"metrics","url":"https://metrics.example"}, + {"id":"logs","type":"logs","url":"https://logs.example"} + ]}`), nil + default: + t.Fatalf("unexpected request: %s %s", req.Method, req.URL.String()) + return nil, nil + } + }) + + token, err := client.CreateCockpitToken(context.Background(), "defang-cd-logs", "") + require.NoError(t, err) + assert.Equal(t, "cockpit-secret", token.SecretKey) + + tokens, err := client.ListCockpitTokens(context.Background(), "") + require.NoError(t, err) + require.Len(t, tokens, 1) + + endpoint, err := client.GetCockpitLogsEndpoint(context.Background(), "") + require.NoError(t, err) + assert.Equal(t, "https://logs.example", endpoint) + + require.NoError(t, client.DeleteCockpitToken(context.Background(), "token-id")) + assert.Equal(t, "https://logs.cockpit.fr-par.scw.cloud", CockpitLogsEndpoint("fr-par")) +} + +func TestAuthenticateAndNewClientFromEnv(t *testing.T) { + t.Setenv("SCW_ACCESS_KEY", "access") + t.Setenv("SCW_SECRET_KEY", "secret") + t.Setenv("SCW_DEFAULT_PROJECT_ID", "project-id") + t.Setenv("SCW_DEFAULT_ORGANIZATION_ID", "org-id") + t.Setenv("SCW_DEFAULT_REGION", "") + + client, err := NewClientFromEnv() + require.NoError(t, err) + assert.Equal(t, "fr-par", client.Region) + assert.Equal(t, "org-id", client.OrganizationID) + + client.HTTPClient = &http.Client{Transport: roundTripFunc(func(req *http.Request) (*http.Response, error) { + assert.Equal(t, "/iam/v1alpha1/api-keys/access", req.URL.Path) + return jsonResponse(http.StatusOK, `{"access_key":"access","default_project_id":"project-id","organization_id":"org-from-api"}`), nil + })} + + info, err := client.Authenticate(context.Background()) + require.NoError(t, err) + assert.Equal(t, "org-from-api", info.OrganizationID) + assert.Equal(t, "org-from-api", client.OrganizationID) + assert.Equal(t, "project-id", client.GetAccountInfo().ProjectID) +} + +func TestQueryLokiParsesEntriesAndSkipsMalformedValues(t *testing.T) { + originalClient := http.DefaultClient + t.Cleanup(func() { http.DefaultClient = originalClient }) + + start := time.Unix(10, 0) + end := time.Unix(20, 0) + http.DefaultClient = &http.Client{Transport: roundTripFunc(func(req *http.Request) (*http.Response, error) { + assert.Equal(t, "Bearer cockpit-secret", req.Header.Get("Authorization")) + assert.Equal(t, `{resource_name="app"}`, req.URL.Query().Get("query")) + assert.Equal(t, "25", req.URL.Query().Get("limit")) + assert.Equal(t, start.UnixNano(), mustParseInt64(t, req.URL.Query().Get("start"))) + assert.Equal(t, end.UnixNano(), mustParseInt64(t, req.URL.Query().Get("end"))) + return jsonResponse(http.StatusOK, `{ + "status":"success", + "data":{"resultType":"streams","result":[{ + "stream":{"resource_name":"app","resource_id":"container-id"}, + "values":[["1000000000","line one"],["bad","skip"],["2000000000"]] + }]} + }`), nil + })} + + entries, err := QueryLoki(context.Background(), "cockpit-secret", "https://logs.example", `{resource_name="app"}`, start, end, 25) + require.NoError(t, err) + require.Len(t, entries, 1) + assert.Equal(t, "line one", entries[0].Line) + assert.Equal(t, "app", entries[0].Labels["resource_name"]) + assert.Equal(t, time.Unix(0, 1000000000), entries[0].Timestamp) +} + +type seenRequest struct { + authToken string + body string +} + +type roundTripFunc func(*http.Request) (*http.Response, error) + +func (f roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return f(req) +} + +func testClient(fn roundTripFunc) *Client { + client := NewClient("access", "secret", "project-id", "fr-par") + client.HTTPClient = &http.Client{Transport: fn} + return client +} + +func readRequest(t *testing.T, req *http.Request) seenRequest { + t.Helper() + var body string + if req.Body != nil { + data, err := io.ReadAll(req.Body) + require.NoError(t, err) + body = string(data) + } + if body != "" { + var js any + require.NoError(t, json.Unmarshal([]byte(body), &js)) + } + return seenRequest{ + authToken: req.Header.Get("X-Auth-Token"), + body: body, + } +} + +func jsonResponse(status int, body string) *http.Response { + return &http.Response{ + StatusCode: status, + Body: io.NopCloser(strings.NewReader(body)), + Header: http.Header{"Content-Type": []string{"application/json"}}, + } +} + +func mustParseInt64(t *testing.T, value string) int64 { + t.Helper() + var parsed int64 + _, err := fmt.Sscan(value, &parsed) + require.NoError(t, err) + return parsed +} diff --git a/src/pkg/clouds/scaleway/jobs.go b/src/pkg/clouds/scaleway/jobs.go new file mode 100644 index 000000000..eea0f2045 --- /dev/null +++ b/src/pkg/clouds/scaleway/jobs.go @@ -0,0 +1,180 @@ +package scaleway + +import ( + "context" + "fmt" + "net/url" +) + +// JobResources defines the CPU and memory for a serverless job. +type JobResources struct { + CPULimit int `json:"cpu_limit"` + MemoryLimit int `json:"memory_limit"` + LocalStorageCapacity int `json:"local_storage_capacity"` +} + +// JobDefinition represents a Scaleway Serverless Jobs definition. +type JobDefinition struct { + ID string `json:"id"` + Name string `json:"name"` + ProjectID string `json:"project_id"` + CPULimit int `json:"cpu_limit"` + MemoryLimit int `json:"memory_limit"` + ImageURI string `json:"image_uri"` + StartupCommand []string `json:"startup_command,omitempty"` + Args []string `json:"args,omitempty"` + EnvironmentVars map[string]string `json:"environment_variables"` + Region string `json:"region"` + CreatedAt string `json:"created_at"` +} + +// JobRun represents a single execution of a serverless job. +type JobRun struct { + ID string `json:"id"` + JobDefinitionID string `json:"job_definition_id"` + State string `json:"state"` + Reason string `json:"reason,omitempty"` + CreatedAt string `json:"created_at"` + StartedAt string `json:"started_at,omitempty"` + TerminatedAt string `json:"terminated_at,omitempty"` + ExitCode *int `json:"exit_code,omitempty"` + ErrorMessage string `json:"error_message,omitempty"` +} + +type listJobRunsResponse struct { + JobRuns []JobRun `json:"job_runs"` + TotalCount int `json:"total_count"` +} + +type startJobDefinitionResponse struct { + JobRuns []JobRun `json:"job_runs"` +} + +type JobSecretRef struct { + SecretManagerID string + SecretManagerVersion string + EnvVarName string +} + +// CreateJobDefinition creates a new serverless job definition. +func (c *Client) CreateJobDefinition(ctx context.Context, name, image string, env map[string]string, resources JobResources) (*JobDefinition, error) { + url := c.regionURL("serverless-jobs", "v1alpha2") + "/job-definitions" + body := map[string]any{ + "name": name, + "project_id": c.ProjectID, + "cpu_limit": resources.CPULimit, + "memory_limit": resources.MemoryLimit, + "local_storage_capacity": resources.LocalStorageCapacity, + "image_uri": image, + "environment_variables": env, + } + var def JobDefinition + if err := c.doRequestJSON(ctx, "POST", url, body, &def); err != nil { + return nil, AnnotateScalewayError(err, fmt.Sprintf("creating job definition %q", name)) + } + return &def, nil +} + +// CreateJobSecrets attaches Secret Manager secrets to a Serverless Job definition. +func (c *Client) CreateJobSecrets(ctx context.Context, definitionID string, refs []JobSecretRef) error { + if len(refs) == 0 { + return nil + } + endpoint := c.regionURL("serverless-jobs", "v1alpha2") + "/secrets" + secrets := make([]map[string]string, 0, len(refs)) + for _, ref := range refs { + secrets = append(secrets, map[string]string{ + "secret_manager_id": ref.SecretManagerID, + "secret_manager_version": ref.SecretManagerVersion, + "env_var_name": ref.EnvVarName, + }) + } + body := map[string]any{ + "job_definition_id": definitionID, + "secrets": secrets, + } + if err := c.doRequestJSON(ctx, "POST", endpoint, body, nil); err != nil { + return AnnotateScalewayError(err, fmt.Sprintf("creating secret references for job %q", definitionID)) + } + return nil +} + +// RunJob starts a new run of a job definition with optional command and environment overrides. +func (c *Client) RunJob(ctx context.Context, definitionID string, command []string, args []string, envOverrides map[string]string) (*JobRun, error) { + endpoint := c.regionURL("serverless-jobs", "v1alpha2") + "/job-definitions/" + definitionID + "/start" + body := map[string]any{} + if len(command) > 0 { + body["startup_command"] = command + } + if len(args) > 0 { + body["args"] = args + } + if len(envOverrides) > 0 { + body["environment_variables"] = envOverrides + } + var resp startJobDefinitionResponse + if err := c.doRequestJSON(ctx, "POST", endpoint, body, &resp); err != nil { + return nil, AnnotateScalewayError(err, fmt.Sprintf("running job %q", definitionID)) + } + if len(resp.JobRuns) == 0 { + return nil, fmt.Errorf("running job %q: no job runs returned", definitionID) + } + return &resp.JobRuns[0], nil +} + +// GetJobRun retrieves the status of a specific job run. +func (c *Client) GetJobRun(ctx context.Context, runID string) (*JobRun, error) { + endpoint := c.regionURL("serverless-jobs", "v1alpha2") + "/job-runs/" + runID + var run JobRun + if err := c.doRequestJSON(ctx, "GET", endpoint, nil, &run); err != nil { + return nil, AnnotateScalewayError(err, fmt.Sprintf("getting job run %q", runID)) + } + return &run, nil +} + +// ListJobRuns lists runs for a given job definition. +func (c *Client) ListJobRuns(ctx context.Context, definitionID string) ([]JobRun, error) { + endpoint := c.regionURL("serverless-jobs", "v1alpha2") + "/job-runs" + params := url.Values{ + "job_definition_id": {definitionID}, + } + fullURL := endpoint + "?" + params.Encode() + + var resp listJobRunsResponse + if err := c.doRequestJSON(ctx, "GET", fullURL, nil, &resp); err != nil { + return nil, AnnotateScalewayError(err, fmt.Sprintf("listing job runs for %q", definitionID)) + } + return resp.JobRuns, nil +} + +type listJobDefinitionsResponse struct { + JobDefinitions []JobDefinition `json:"job_definitions"` + TotalCount int `json:"total_count"` +} + +// ListJobDefinitions lists job definitions in the project, optionally filtered by name. +func (c *Client) ListJobDefinitions(ctx context.Context, name string) ([]JobDefinition, error) { + endpoint := c.regionURL("serverless-jobs", "v1alpha2") + "/job-definitions" + params := url.Values{ + "project_id": {c.ProjectID}, + } + if name != "" { + params.Set("name", name) + } + fullURL := endpoint + "?" + params.Encode() + + var resp listJobDefinitionsResponse + if err := c.doRequestJSON(ctx, "GET", fullURL, nil, &resp); err != nil { + return nil, AnnotateScalewayError(err, "listing job definitions") + } + return resp.JobDefinitions, nil +} + +// DeleteJobDefinition deletes a job definition. +func (c *Client) DeleteJobDefinition(ctx context.Context, definitionID string) error { + endpoint := c.regionURL("serverless-jobs", "v1alpha2") + "/job-definitions/" + definitionID + if err := c.doRequestJSON(ctx, "DELETE", endpoint, nil, nil); err != nil { + return AnnotateScalewayError(err, fmt.Sprintf("deleting job definition %q", definitionID)) + } + return nil +} diff --git a/src/pkg/clouds/scaleway/registry.go b/src/pkg/clouds/scaleway/registry.go new file mode 100644 index 000000000..9da0d81f3 --- /dev/null +++ b/src/pkg/clouds/scaleway/registry.go @@ -0,0 +1,128 @@ +package scaleway + +import ( + "context" + "fmt" + "net/url" +) + +// RegistryNamespace represents a Scaleway Container Registry namespace. +type RegistryNamespace struct { + ID string `json:"id"` + Name string `json:"name"` + ProjectID string `json:"project_id"` + Endpoint string `json:"endpoint"` + Region string `json:"region"` + Status string `json:"status"` + IsPublic bool `json:"is_public"` + OrganizationID string `json:"organization_id"` + CreatedAt string `json:"created_at"` +} + +// RegistryImage represents an image in a Container Registry namespace. +type RegistryImage struct { + ID string `json:"id"` + Name string `json:"name"` + NamespaceID string `json:"namespace_id"` + Status string `json:"status"` + Tags []string `json:"tags"` + Size int64 `json:"size"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` +} + +type listRegistryNamespacesResponse struct { + Namespaces []RegistryNamespace `json:"namespaces"` + TotalCount int `json:"total_count"` +} + +type listRegistryImagesResponse struct { + Images []RegistryImage `json:"images"` + TotalCount int `json:"total_count"` +} + +// EnsureRegistryNamespaceExists creates a registry namespace if it does not already exist. +// Returns the existing or newly created namespace. +func (c *Client) EnsureRegistryNamespaceExists(ctx context.Context, name, projectID, region string) (*RegistryNamespace, error) { + if projectID == "" { + projectID = c.ProjectID + } + if region == "" { + region = c.Region + } + + baseURL := fmt.Sprintf("%s/registry/v1/regions/%s", apiBaseURL, region) + + // List namespaces to check if one exists with this name + listURL := fmt.Sprintf("%s/namespaces?project_id=%s&name=%s", baseURL, url.QueryEscape(projectID), url.QueryEscape(name)) + var listResp listRegistryNamespacesResponse + if err := c.doRequestJSON(ctx, "GET", listURL, nil, &listResp); err != nil { + return nil, AnnotateScalewayError(err, fmt.Sprintf("listing registry namespaces for %q", name)) + } + for i := range listResp.Namespaces { + if listResp.Namespaces[i].Name == name { + return &listResp.Namespaces[i], nil + } + } + + // Create the namespace + createURL := fmt.Sprintf("%s/namespaces", baseURL) + body := map[string]any{ + "name": name, + "project_id": projectID, + "is_public": false, + } + var ns RegistryNamespace + if err := c.doRequestJSON(ctx, "POST", createURL, body, &ns); err != nil { + return nil, AnnotateScalewayError(err, fmt.Sprintf("creating registry namespace %q", name)) + } + return &ns, nil +} + +// GetRegistryEndpoint returns the registry endpoint for a namespace in a region. +// Format: rg.{region}.scw.cloud/{namespace} +func GetRegistryEndpoint(region, namespace string) string { + return fmt.Sprintf("rg.%s.scw.cloud/%s", region, namespace) +} + +// ListImages lists images in a Container Registry namespace. +func (c *Client) ListImages(ctx context.Context, namespaceID string) ([]RegistryImage, error) { + url := fmt.Sprintf("%s/registry/v1/regions/%s/images?namespace_id=%s", apiBaseURL, c.Region, namespaceID) + var resp listRegistryImagesResponse + if err := c.doRequestJSON(ctx, "GET", url, nil, &resp); err != nil { + return nil, AnnotateScalewayError(err, fmt.Sprintf("listing images for namespace %q", namespaceID)) + } + return resp.Images, nil +} + +// ListRegistryNamespaces lists registry namespaces in a project, optionally filtered by name. +func (c *Client) ListRegistryNamespaces(ctx context.Context, projectID, name string) ([]RegistryNamespace, error) { + if projectID == "" { + projectID = c.ProjectID + } + apiURL := fmt.Sprintf("%s/registry/v1/regions/%s/namespaces?project_id=%s", apiBaseURL, c.Region, url.QueryEscape(projectID)) + if name != "" { + apiURL += "&name=" + url.QueryEscape(name) + } + var resp listRegistryNamespacesResponse + if err := c.doRequestJSON(ctx, "GET", apiURL, nil, &resp); err != nil { + return nil, AnnotateScalewayError(err, "listing registry namespaces") + } + return resp.Namespaces, nil +} + +func (c *Client) DeleteImage(ctx context.Context, imageID string) error { + url := fmt.Sprintf("%s/registry/v1/regions/%s/images/%s", apiBaseURL, c.Region, imageID) + if err := c.doRequestJSON(ctx, "DELETE", url, nil, nil); err != nil { + return AnnotateScalewayError(err, fmt.Sprintf("deleting registry image %q", imageID)) + } + return nil +} + +func (c *Client) DeleteRegistryNamespace(ctx context.Context, namespaceID string) error { + url := fmt.Sprintf("%s/registry/v1/regions/%s/namespaces/%s", apiBaseURL, c.Region, namespaceID) + if err := c.doRequestJSON(ctx, "DELETE", url, nil, nil); err != nil { + return AnnotateScalewayError(err, fmt.Sprintf("deleting registry namespace %q", namespaceID)) + } + return nil +} diff --git a/src/pkg/clouds/scaleway/secret.go b/src/pkg/clouds/scaleway/secret.go new file mode 100644 index 000000000..efe21f7ec --- /dev/null +++ b/src/pkg/clouds/scaleway/secret.go @@ -0,0 +1,182 @@ +package scaleway + +import ( + "context" + "encoding/base64" + "errors" + "fmt" + "net/url" + "strings" +) + +// Secret represents a Scaleway Secret Manager secret. +type Secret struct { + ID string `json:"id"` + ProjectID string `json:"project_id"` + Name string `json:"name"` + Status string `json:"status"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` +} + +// SecretVersion represents a version of a secret. +type SecretVersion struct { + Revision int `json:"revision"` + SecretID string `json:"secret_id"` + Status string `json:"status"` + CreatedAt string `json:"created_at"` +} + +// SecretVersionAccess contains the actual secret data. +type SecretVersionAccess struct { + SecretID string `json:"secret_id"` + Revision int `json:"revision"` + Data string `json:"data"` // base64-encoded +} + +type listSecretsResponse struct { + Secrets []Secret `json:"secrets"` + TotalCount int `json:"total_count"` +} + +type listSecretVersionsResponse struct { + Versions []SecretVersion `json:"versions"` + TotalCount int `json:"total_count"` +} + +// CreateSecret creates a new secret in Scaleway Secret Manager. +// Returns the secret and a nil error on success. If the secret already exists +// (Scaleway returns 400 with "cannot have same secret name"), the error wraps +// an APIError with status 409 so callers can use IsConflict. +func (c *Client) CreateSecret(ctx context.Context, name, projectID string) (*Secret, error) { + if projectID == "" { + projectID = c.ProjectID + } + url := c.regionURL("secret-manager", "v1beta1") + "/secrets" + body := map[string]string{ + "project_id": projectID, + "name": name, + } + var secret Secret + if err := c.doRequestJSON(ctx, "POST", url, body, &secret); err != nil { + // Scaleway returns 400 (not 409) for duplicate secret names; normalize to 409 + var apiErr *APIError + if errors.As(err, &apiErr) && apiErr.StatusCode == 400 && strings.Contains(apiErr.RawBody, "same secret name") { + apiErr.StatusCode = 409 + return nil, apiErr + } + return nil, AnnotateScalewayError(err, fmt.Sprintf("creating secret %q", name)) + } + return &secret, nil +} + +// CreateSecretVersion adds a new version with the given data to a secret. +// The data is base64-encoded before sending. +func (c *Client) CreateSecretVersion(ctx context.Context, secretID string, data []byte) (*SecretVersion, error) { + url := c.regionURL("secret-manager", "v1beta1") + fmt.Sprintf("/secrets/%s/versions", secretID) + body := map[string]any{ + "data": base64.StdEncoding.EncodeToString(data), + "disable_previous": true, + } + var version SecretVersion + if err := c.doRequestJSON(ctx, "POST", url, body, &version); err != nil { + return nil, AnnotateScalewayError(err, fmt.Sprintf("creating secret version for %q", secretID)) + } + return &version, nil +} + +// EnsureSecretValue creates a secret if necessary, then writes a latest enabled version. +func (c *Client) EnsureSecretValue(ctx context.Context, name, projectID string, data []byte) (*Secret, *SecretVersion, error) { + if projectID == "" { + projectID = c.ProjectID + } + secret, err := c.CreateSecret(ctx, name, projectID) + if err != nil { + if !IsConflict(err) { + return nil, nil, err + } + secrets, listErr := c.ListSecrets(ctx, projectID, name) + if listErr != nil { + return nil, nil, listErr + } + for i := range secrets { + if secrets[i].Name == name { + secret = &secrets[i] + break + } + } + if secret == nil { + return nil, nil, fmt.Errorf("secret %q exists but could not be found", name) + } + } + + version, err := c.CreateSecretVersion(ctx, secret.ID, data) + if err != nil { + return nil, nil, err + } + return secret, version, nil +} + +// GetSecretVersion retrieves a specific version of a secret. +// Use revision "latest" for the most recent version, or a numeric string. +func (c *Client) GetSecretVersion(ctx context.Context, secretID, revision string) (*SecretVersionAccess, error) { + url := c.regionURL("secret-manager", "v1beta1") + fmt.Sprintf("/secrets/%s/versions/%s/access", secretID, revision) + var access SecretVersionAccess + if err := c.doRequestJSON(ctx, "GET", url, nil, &access); err != nil { + return nil, AnnotateScalewayError(err, fmt.Sprintf("getting secret version %s/%s", secretID, revision)) + } + return &access, nil +} + +// ListSecrets lists secrets in a project, optionally filtered by name prefix. +// Note: The Scaleway API's name parameter does exact matching, so we perform +// client-side prefix filtering when a prefix is provided. +func (c *Client) ListSecrets(ctx context.Context, projectID, prefix string) ([]Secret, error) { + if projectID == "" { + projectID = c.ProjectID + } + endpoint := c.regionURL("secret-manager", "v1beta1") + "/secrets" + params := url.Values{ + "project_id": {projectID}, + } + // Try exact match first via the API; if prefix is provided we'll filter client-side + if prefix != "" { + params.Set("name", prefix) + } + fullURL := endpoint + "?" + params.Encode() + + var resp listSecretsResponse + if err := c.doRequestJSON(ctx, "GET", fullURL, nil, &resp); err != nil { + return nil, AnnotateScalewayError(err, "listing secrets") + } + + // If exact match returned results or no prefix was given, return as-is + if len(resp.Secrets) > 0 || prefix == "" { + return resp.Secrets, nil + } + + // Scaleway API does exact matching, not prefix matching. + // Fall back to listing all secrets and filtering client-side by prefix. + allURL := endpoint + "?" + url.Values{"project_id": {projectID}}.Encode() + var allResp listSecretsResponse + if err := c.doRequestJSON(ctx, "GET", allURL, nil, &allResp); err != nil { + return nil, AnnotateScalewayError(err, "listing secrets") + } + + filtered := make([]Secret, 0, len(allResp.Secrets)) + for _, s := range allResp.Secrets { + if strings.HasPrefix(s.Name, prefix) { + filtered = append(filtered, s) + } + } + return filtered, nil +} + +// DeleteSecret permanently deletes a secret and all its versions. +func (c *Client) DeleteSecret(ctx context.Context, secretID string) error { + url := c.regionURL("secret-manager", "v1beta1") + fmt.Sprintf("/secrets/%s", secretID) + if err := c.doRequestJSON(ctx, "DELETE", url, nil, nil); err != nil { + return AnnotateScalewayError(err, fmt.Sprintf("deleting secret %q", secretID)) + } + return nil +} diff --git a/src/pkg/clouds/scaleway/storage.go b/src/pkg/clouds/scaleway/storage.go new file mode 100644 index 000000000..cdcc3728b --- /dev/null +++ b/src/pkg/clouds/scaleway/storage.go @@ -0,0 +1,140 @@ +package scaleway + +import ( + "context" + "fmt" + "io" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" +) + +// S3Endpoint returns the S3-compatible endpoint for a Scaleway region. +func S3Endpoint(region Region) string { + return fmt.Sprintf("https://s3.%s.scw.cloud", region) +} + +// NewS3Client creates an AWS S3 client configured for Scaleway Object Storage. +func NewS3Client(region Region, accessKey, secretKey string) *s3.Client { + return s3.New(s3.Options{ + Region: region, + BaseEndpoint: aws.String(S3Endpoint(region)), + Credentials: credentials.NewStaticCredentialsProvider(accessKey, secretKey, ""), + UsePathStyle: true, + }) +} + +// EnsureBucketExists creates the bucket if it does not already exist. +func EnsureBucketExists(ctx context.Context, client *s3.Client, bucketName, region string) error { + _, err := client.HeadBucket(ctx, &s3.HeadBucketInput{ + Bucket: aws.String(bucketName), + }) + if err == nil { + return nil // bucket already exists + } + + _, err = client.CreateBucket(ctx, &s3.CreateBucketInput{ + Bucket: aws.String(bucketName), + }) + if err != nil { + return fmt.Errorf("creating bucket %q: %w", bucketName, err) + } + return nil +} + +// CreatePresignedUploadURL generates a presigned PUT URL for uploading an object. +func CreatePresignedUploadURL(ctx context.Context, client *s3.Client, bucket, key string, expiry time.Duration) (string, error) { + presignClient := s3.NewPresignClient(client) + req, err := presignClient.PresignPutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }, s3.WithPresignExpires(expiry)) + if err != nil { + return "", fmt.Errorf("creating presigned upload URL: %w", err) + } + return req.URL, nil +} + +// GetObject retrieves an object from S3-compatible storage. +func GetObject(ctx context.Context, client *s3.Client, bucket, key string) ([]byte, error) { + resp, err := client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + return nil, fmt.Errorf("getting object %q from bucket %q: %w", key, bucket, err) + } + defer resp.Body.Close() + return io.ReadAll(resp.Body) +} + +// PutObject uploads an object to S3-compatible storage. +func PutObject(ctx context.Context, client *s3.Client, bucket, key string, body io.Reader) error { + _, err := client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Body: body, + }) + if err != nil { + return fmt.Errorf("putting object %q to bucket %q: %w", key, bucket, err) + } + return nil +} + +// ListObjectKeys lists object keys in a bucket with an optional prefix. +func ListObjectKeys(ctx context.Context, client *s3.Client, bucket, prefix string) ([]string, error) { + input := &s3.ListObjectsV2Input{ + Bucket: aws.String(bucket), + } + if prefix != "" { + input.Prefix = aws.String(prefix) + } + + var keys []string + paginator := s3.NewListObjectsV2Paginator(client, input) + for paginator.HasMorePages() { + page, err := paginator.NextPage(ctx) + if err != nil { + return nil, fmt.Errorf("listing objects in bucket %q: %w", bucket, err) + } + for _, obj := range page.Contents { + keys = append(keys, aws.ToString(obj.Key)) + } + } + return keys, nil +} + +func EmptyAndDeleteBucket(ctx context.Context, client *s3.Client, bucket string) error { + keys, err := ListObjectKeys(ctx, client, bucket, "") + if err != nil { + return err + } + for len(keys) > 0 { + batch := keys + if len(batch) > 1000 { + batch = keys[:1000] + } + objects := make([]types.ObjectIdentifier, 0, len(batch)) + for _, key := range batch { + objects = append(objects, types.ObjectIdentifier{Key: aws.String(key)}) + } + _, err := client.DeleteObjects(ctx, &s3.DeleteObjectsInput{ + Bucket: aws.String(bucket), + Delete: &types.Delete{ + Objects: objects, + Quiet: aws.Bool(true), + }, + }) + if err != nil { + return fmt.Errorf("deleting objects from bucket %q: %w", bucket, err) + } + keys = keys[len(batch):] + } + if _, err := client.DeleteBucket(ctx, &s3.DeleteBucketInput{Bucket: aws.String(bucket)}); err != nil { + return fmt.Errorf("deleting bucket %q: %w", bucket, err) + } + return nil +} diff --git a/src/pkg/clouds/scaleway/storage_test.go b/src/pkg/clouds/scaleway/storage_test.go new file mode 100644 index 000000000..899d13107 --- /dev/null +++ b/src/pkg/clouds/scaleway/storage_test.go @@ -0,0 +1,48 @@ +package scaleway + +import ( + "testing" +) + +func TestS3Endpoint(t *testing.T) { + tests := []struct { + region string + want string + }{ + {"fr-par", "https://s3.fr-par.scw.cloud"}, + {"nl-ams", "https://s3.nl-ams.scw.cloud"}, + {"pl-waw", "https://s3.pl-waw.scw.cloud"}, + } + for _, tt := range tests { + t.Run(tt.region, func(t *testing.T) { + if got := S3Endpoint(tt.region); got != tt.want { + t.Errorf("S3Endpoint(%q) = %q, want %q", tt.region, got, tt.want) + } + }) + } +} + +func TestNewS3Client(t *testing.T) { + client := NewS3Client("fr-par", "SCWAK", "secret") + if client == nil { + t.Fatal("NewS3Client returned nil") + } +} + +func TestGetRegistryEndpoint(t *testing.T) { + tests := []struct { + region string + namespace string + want string + }{ + {"fr-par", "my-ns", "rg.fr-par.scw.cloud/my-ns"}, + {"nl-ams", "defang-cd", "rg.nl-ams.scw.cloud/defang-cd"}, + } + for _, tt := range tests { + t.Run(tt.region+"/"+tt.namespace, func(t *testing.T) { + if got := GetRegistryEndpoint(tt.region, tt.namespace); got != tt.want { + t.Errorf("GetRegistryEndpoint(%q, %q) = %q, want %q", tt.region, tt.namespace, got, tt.want) + } + }) + } +} diff --git a/src/pkg/stacks/selector_test.go b/src/pkg/stacks/selector_test.go index 029758102..a229db686 100644 --- a/src/pkg/stacks/selector_test.go +++ b/src/pkg/stacks/selector_test.go @@ -183,7 +183,7 @@ func TestStackSelector_SelectStack_CreateNewStack(t *testing.T) { mockEC.On("RequestEnum", ctx, "Select a stack", "stack", expectedOptions).Return(CreateNewStack, nil) // Mock wizard parameter collection - provider selection - providerOptions := []string{"Defang Playground", "AWS", "DigitalOcean", "Google Cloud Platform", "Azure"} + providerOptions := []string{"Defang Playground", "AWS", "DigitalOcean", "Google Cloud Platform", "Azure", "Scaleway"} mockEC.On("RequestEnum", ctx, "Where do you want to deploy?", "provider", providerOptions).Return("AWS", nil) // Mock wizard parameter collection - region selection (default is us-west-2 for AWS) @@ -259,7 +259,7 @@ func TestStackSelector_SelectStack_NoExistingStacks(t *testing.T) { mockSM.On("List", ctx).Return([]ListItem{}, nil) // Mock wizard parameter collection - provider selection - providerOptions := []string{"Defang Playground", "AWS", "DigitalOcean", "Google Cloud Platform", "Azure"} + providerOptions := []string{"Defang Playground", "AWS", "DigitalOcean", "Google Cloud Platform", "Azure", "Scaleway"} mockEC.On("RequestEnum", ctx, "Where do you want to deploy?", "provider", providerOptions).Return("AWS", nil) // Mock wizard parameter collection - region selection @@ -418,7 +418,7 @@ func TestStackSelector_SelectStack_WizardError(t *testing.T) { mockEC.On("RequestEnum", ctx, "Select a stack", "stack", expectedOptions).Return(CreateNewStack, nil) // Mock wizard parameter collection - provider selection fails - providerOptions := []string{"Defang Playground", "AWS", "DigitalOcean", "Google Cloud Platform", "Azure"} + providerOptions := []string{"Defang Playground", "AWS", "DigitalOcean", "Google Cloud Platform", "Azure", "Scaleway"} mockEC.On("RequestEnum", ctx, "Where do you want to deploy?", "provider", providerOptions).Return("", errors.New("user cancelled wizard")) selector := NewSelector(mockEC, mockSM) @@ -455,7 +455,7 @@ func TestStackSelector_SelectStack_CreateStackError(t *testing.T) { mockEC.On("RequestEnum", ctx, "Select a stack", "stack", expectedOptions).Return(CreateNewStack, nil) // Mock wizard parameter collection - provider selection - providerOptions := []string{"Defang Playground", "AWS", "DigitalOcean", "Google Cloud Platform", "Azure"} + providerOptions := []string{"Defang Playground", "AWS", "DigitalOcean", "Google Cloud Platform", "Azure", "Scaleway"} mockEC.On("RequestEnum", ctx, "Where do you want to deploy?", "provider", providerOptions).Return("AWS", nil) // Mock wizard parameter collection - region selection diff --git a/src/protos/io/defang/v1/fabric.pb.go b/src/protos/io/defang/v1/fabric.pb.go index 2e66a8404..0e9c79997 100644 --- a/src/protos/io/defang/v1/fabric.pb.go +++ b/src/protos/io/defang/v1/fabric.pb.go @@ -35,6 +35,7 @@ const ( Provider_DIGITALOCEAN Provider = 3 Provider_GCP Provider = 4 Provider_AZURE Provider = 5 + Provider_SCALEWAY Provider = 6 ) // Enum value maps for Provider. @@ -46,6 +47,7 @@ var ( 3: "DIGITALOCEAN", 4: "GCP", 5: "AZURE", + 6: "SCALEWAY", } Provider_value = map[string]int32{ "PROVIDER_UNSPECIFIED": 0, @@ -54,6 +56,7 @@ var ( "DIGITALOCEAN": 3, "GCP": 4, "AZURE": 5, + "SCALEWAY": 6, } ) @@ -6548,7 +6551,7 @@ const file_io_defang_v1_fabric_proto_rawDesc = "" + "\x04data\x18\x02 \x01(\fR\x04data\x12%\n" + "\x0eprevious_error\x18\x03 \x01(\tR\rpreviousError\"3\n" + "\x17GenerateComposeResponse\x12\x18\n" + - "\acompose\x18\x01 \x01(\fR\acompose*_\n" + + "\acompose\x18\x01 \x01(\fR\acompose*m\n" + "\bProvider\x12\x18\n" + "\x14PROVIDER_UNSPECIFIED\x10\x00\x12\n" + "\n" + @@ -6556,7 +6559,8 @@ const file_io_defang_v1_fabric_proto_rawDesc = "" + "\x03AWS\x10\x02\x12\x10\n" + "\fDIGITALOCEAN\x10\x03\x12\a\n" + "\x03GCP\x10\x04\x12\t\n" + - "\x05AZURE\x10\x05*T\n" + + "\x05AZURE\x10\x05\x12\f\n" + + "\bSCALEWAY\x10\x06*T\n" + "\x0eDeploymentMode\x12\x14\n" + "\x10MODE_UNSPECIFIED\x10\x00\x12\x0f\n" + "\vDEVELOPMENT\x10\x01\x12\v\n" + diff --git a/src/protos/io/defang/v1/fabric.proto b/src/protos/io/defang/v1/fabric.proto index c44ca35de..713ef0fa2 100644 --- a/src/protos/io/defang/v1/fabric.proto +++ b/src/protos/io/defang/v1/fabric.proto @@ -174,6 +174,7 @@ enum Provider { DIGITALOCEAN = 3; GCP = 4; AZURE = 5; + SCALEWAY = 6; } message Stack { diff --git a/validation/scaleway/log-smoke/.dockerignore b/validation/scaleway/log-smoke/.dockerignore new file mode 100644 index 000000000..12f6b36dd --- /dev/null +++ b/validation/scaleway/log-smoke/.dockerignore @@ -0,0 +1,27 @@ +# Default .dockerignore file for Defang +**/__pycache__ +**/.direnv +**/.DS_Store +**/.envrc +**/.git +**/.github +**/.idea +**/.next +**/.vscode +**/compose.*.yaml +**/compose.*.yml +**/compose.yaml +**/compose.yml +**/docker-compose.*.yaml +**/docker-compose.*.yml +**/docker-compose.yaml +**/docker-compose.yml +**/node_modules +**/Thumbs.db +Dockerfile +*.Dockerfile +# Ignore our own binary, but only in the root to avoid ignoring subfolders +defang +defang.exe +# Ignore our project-level state +.defang* diff --git a/validation/scaleway/log-smoke/.gitignore b/validation/scaleway/log-smoke/.gitignore new file mode 100644 index 000000000..36c75e664 --- /dev/null +++ b/validation/scaleway/log-smoke/.gitignore @@ -0,0 +1 @@ +.defang/ diff --git a/validation/scaleway/log-smoke/compose.yaml b/validation/scaleway/log-smoke/compose.yaml new file mode 100644 index 000000000..99ae13385 --- /dev/null +++ b/validation/scaleway/log-smoke/compose.yaml @@ -0,0 +1,61 @@ +name: scaleway-log-smoke +services: + app: + image: python:3.12-alpine + command: + - python + - -u + - -c + - | + from http.server import BaseHTTPRequestHandler, HTTPServer + from datetime import datetime, timezone + import sys + import threading + import time + + def stamp(): + return datetime.now(timezone.utc).isoformat() + + def log(message): + print(f"defang-log-smoke {stamp()} {message}", flush=True) + + class Handler(BaseHTTPRequestHandler): + def do_GET(self): + log(f"request path={self.path}") + body = b"defang log smoke ok\n" + self.send_response(200) + self.send_header("content-type", "text/plain") + self.send_header("content-length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, format, *args): + sys.stderr.write(f"defang-log-smoke-access {stamp()} {format % args}\n") + sys.stderr.flush() + + def heartbeat(): + while True: + log("heartbeat") + time.sleep(5) + + log("starting server port=8080") + threading.Thread(target=heartbeat, daemon=True).start() + HTTPServer(("0.0.0.0", 8080), Handler).serve_forever() + ports: + - mode: ingress + target: 8080 + published: 8080 + healthcheck: + test: + - CMD + - python + - -c + - "import urllib.request; urllib.request.urlopen('http://127.0.0.1:8080/health').read()" + interval: 10s + timeout: 3s + retries: 6 + start_period: 5s + deploy: + resources: + reservations: + memory: 256M diff --git a/validation/scaleway/mastra-extended-chat-2026-05-11.png b/validation/scaleway/mastra-extended-chat-2026-05-11.png new file mode 100644 index 000000000..40954c210 Binary files /dev/null and b/validation/scaleway/mastra-extended-chat-2026-05-11.png differ diff --git a/validation/scaleway/mastra-extended-generated-2026-05-11.png b/validation/scaleway/mastra-extended-generated-2026-05-11.png new file mode 100644 index 000000000..2ebe16ef2 Binary files /dev/null and b/validation/scaleway/mastra-extended-generated-2026-05-11.png differ