Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions _run/.envrc_run
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ source_up .envrc

AP_RUN_NAME=$(basename "$(pwd)")
AP_RUN_DIR="${DEVCACHE_RUN}/${AP_RUN_NAME}"
KUBECONFIG="${HOME}/.kube/config"

export AKASH_HOME="${AP_RUN_DIR}/.akash"
export AP_RUN_NAME
export AP_RUN_DIR
export KUBECONFIG
5 changes: 5 additions & 0 deletions _run/common-kube.mk
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

K8S_CONTEXT ?= $(shell kubectl config current-context)
KUBE_CREATE := $(AP_RUN_DIR)/.kube-create

Expand Down Expand Up @@ -205,3 +206,7 @@ kube-logs-operator-inventory:
.PHONY: kube-wait-inventory-available
kube-wait-inventory-available:
$(SETUP_KUBE) --retries=60 wait inventory-available

.PHONY: provider-migrate-run
provider-migrate-run:
$(PROVIDER_SERVICES) migrate run
10 changes: 10 additions & 0 deletions cmd/provider-services/cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,5 +324,15 @@ func addRunFlags(cmd *cobra.Command) error {
return err
}

cmd.Flags().Bool(FlagMigrationsEnabled, true, "enable migrations to run automatically on startup")
if err := viper.BindPFlag(FlagMigrationsEnabled, cmd.Flags().Lookup(FlagMigrationsEnabled)); err != nil {
return err
}

cmd.Flags().String(FlagMigrationsStatePath, "", "path to migrations state file (default: $AP_HOME/migrations.json)")
Comment thread
cloud-j-luna marked this conversation as resolved.
if err := viper.BindPFlag(FlagMigrationsStatePath, cmd.Flags().Lookup(FlagMigrationsStatePath)); err != nil {
return err
}

return nil
}
6 changes: 3 additions & 3 deletions cmd/provider-services/cmd/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package cmd

import (
"github.com/spf13/cobra"
"pkg.akt.dev/go/cli"
)

func migrate() *cobra.Command {
cmd := &cobra.Command{
Use: "migrate",
PersistentPreRunE: cli.TxPersistentPreRunE,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this as a migrations are not ran with transaction contexts. Also it seems migrate-hostnames and migrate-endpoints are also not used so should have no impact.

Use: "migrate",
}

cmd.AddCommand(MigrateRunCmd())

return cmd
}
196 changes: 196 additions & 0 deletions cmd/provider-services/cmd/migrate_run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package cmd

import (
"context"
"fmt"
"path/filepath"

"cosmossdk.io/log"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
cflags "pkg.akt.dev/go/cli/flags"
"pkg.akt.dev/go/util/ctxlog"

"github.com/akash-network/provider/cluster/kube/clientcommon"
providerflags "github.com/akash-network/provider/cmd/provider-services/cmd/flags"
"github.com/akash-network/provider/cmd/provider-services/cmd/util"
"github.com/akash-network/provider/migrations"
"github.com/akash-network/provider/tools/fromctx"
"github.com/akash-network/provider/version"
)

func MigrateRunCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "run",
Short: "run pending migrations",
SilenceUsage: true,
PreRunE: func(cmd *cobra.Command, args []string) error {
if err := clientcommon.SetKubeConfigToCmd(cmd); err != nil {
return err
}

logger := util.OpenLogger()
ctx := ctxlog.WithLogger(cmd.Context(), logger)
cmd.SetContext(ctx)

kubecfg := fromctx.MustKubeConfigFromCtx(ctx)
kc, err := kubernetes.NewForConfig(kubecfg)
if err != nil {
return fmt.Errorf("failed to create kubernetes client: %w", err)
}

fromctx.CmdSetContextValue(cmd, fromctx.CtxKeyKubeClientSet, kc)

return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
return doMigrateRun(cmd.Context(), cmd)
},
}

cmd.Flags().String(FlagMigrationsStatePath, "", "path to migrations state file (default: $AP_HOME/migrations.json)")

if err := providerflags.AddKubeConfigPathFlag(cmd); err != nil {
panic(err.Error())
}
Comment thread
cloud-j-luna marked this conversation as resolved.

return cmd
}

func doMigrateRun(ctx context.Context, cmd *cobra.Command) error {
logger := ctxlog.LogcFromCtx(ctx)

statePath, err := determineStatePath(cmd)
if err != nil {
return fmt.Errorf("determining state path: %w", err)
}

result, err := runMigrations(ctx, statePath, logger)
if err != nil {
return err
}

// Log completion with success/failed counts (even if there are errors)
logger.Info("migrations completed", "successful", result.SuccessCount, "failed", len(result.Errs))

if len(result.Errs) > 0 {
return fmt.Errorf("%d migration(s) failed", len(result.Errs))
}

return nil
}

func determineStatePath(cmd *cobra.Command) (string, error) {
statePath, err := cmd.Flags().GetString(FlagMigrationsStatePath)
if err != nil {
return "", fmt.Errorf("getting migrations state path flag: %w", err)
}

if statePath == "" {
homeDir, err := cmd.Flags().GetString(cflags.FlagHome)
if err != nil {
// Try persistent flags if not found in regular flags
homeDir, err = cmd.PersistentFlags().GetString(cflags.FlagHome)
if err != nil {
return "", fmt.Errorf("unable to get home directory flag: %w", err)
}
}
if homeDir == "" {
return "", fmt.Errorf("home directory flag is not set")
}
statePath = filepath.Join(homeDir, "migrations.json")
}

return statePath, nil
}

func setCurrentVersionOrDefault(registry *migrations.Registry) string {
currentVersion := version.Version
if currentVersion == "" {
currentVersion = "0.0.0"
}
registry.SetCurrentVersion(currentVersion)

return currentVersion
}

// runMigrationsResult contains the results of running migrations.
type runMigrationsResult struct {
SuccessCount int
Errs []error
}

// runMigrations executes pending migrations with the provided state path.
// The logContext parameter is used to customize log messages (e.g., "running pending migrations" vs "running pending migrations on startup").
// The logMigrationStatus parameter controls whether detailed migration status is logged for each migration.
// Returns the migration results and an error if the migration process itself failed (not individual migration errors).
func runMigrations(ctx context.Context, statePath string, logger log.Logger) (*runMigrationsResult, error) {
stateManager := migrations.NewStateManager(statePath)
registry := migrations.NewRegistry(stateManager)

currentVersion := setCurrentVersionOrDefault(registry)

previousVersion, err := stateManager.GetProviderVersion()
if err != nil {
return nil, fmt.Errorf("failed to get previous provider version: %w", err)
}

if previousVersion == "" {
logger.Info("fresh install detected, marking all migrations as applied", "current_version", currentVersion)
allMigrations := migrations.GetAll()
for _, m := range allMigrations {
if err := stateManager.MarkApplied(m.Name()); err != nil {
return nil, fmt.Errorf("failed to mark migration %q as applied: %w", m.Name(), err)
}
}
if err := stateManager.SetProviderVersion(currentVersion); err != nil {
return nil, fmt.Errorf("failed to set provider version: %w", err)
}
return &runMigrationsResult{SuccessCount: 0, Errs: nil}, nil
}

logger.Info("checking for migrations", "previous_version", previousVersion, "current_version", currentVersion)

allMigrations := migrations.GetAll()
logger.Info("discovered migrations", "count", len(allMigrations))
if len(allMigrations) == 0 {
logger.Info("no migrations found")
if err := stateManager.SetProviderVersion(currentVersion); err != nil {
return nil, fmt.Errorf("failed to update provider version: %w", err)
}
return &runMigrationsResult{SuccessCount: 0, Errs: nil}, nil
}

for _, m := range allMigrations {
logger.Info("registered migration", "name", m.Name(), "description", m.Description(), "from_version", m.FromVersion())
Comment thread
cloud-j-luna marked this conversation as resolved.
applied, _ := stateManager.IsApplied(m.Name())
Comment thread
cloud-j-luna marked this conversation as resolved.
logger.Info("migration status", "name", m.Name(), "applied", applied, "previous_version", previousVersion, "from_version", m.FromVersion())
}

pending, err := registry.GetPending(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get pending migrations: %w", err)
}

if len(pending) == 0 {
logger.Info("no pending migrations")
if err := stateManager.SetProviderVersion(currentVersion); err != nil {
return nil, fmt.Errorf("failed to update provider version: %w", err)
}
return &runMigrationsResult{SuccessCount: 0, Errs: nil}, nil
}

logger.Info("running pending migrations", "count", len(pending))

successCount, errs := registry.RunMigrations(ctx)

if len(errs) > 0 {
for _, err := range errs {
logger.Error("migration error", "err", err)
}
return &runMigrationsResult{SuccessCount: successCount, Errs: errs}, nil
Comment thread
cloud-j-luna marked this conversation as resolved.
}

logger.Info("migrations completed successfully", "count", successCount)
return &runMigrationsResult{SuccessCount: successCount, Errs: nil}, nil
}
Comment thread
cloud-j-luna marked this conversation as resolved.
27 changes: 27 additions & 0 deletions cmd/provider-services/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ const (
FlagCertIssuerDNSProviders = "cert-issuer-dns-providers"
FlagCertIssuerDNSResolvers = "cert-issuer-dns-resolvers"
FlagCertIssuerEmail = "cert-issuer-email"
FlagMigrationsEnabled = "migrations-enabled"
FlagMigrationsStatePath = "migrations-state-path"
)

const (
Expand Down Expand Up @@ -490,6 +492,13 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error {

logger := ctxlog.LogcFromCtx(cmd.Context())

runMigrations := viper.GetBool(FlagMigrationsEnabled)
if runMigrations {
if err := runMigrationsOnStartup(ctx, cmd, logger); err != nil {
return fmt.Errorf("migrations failed: %w", err)
}
}

logger.Info("starting provider service")

var metricsRouter http.Handler
Expand Down Expand Up @@ -743,6 +752,24 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error {
return nil
}

func runMigrationsOnStartup(ctx context.Context, cmd *cobra.Command, logger log.Logger) error {
statePath, err := determineStatePath(cmd)
if err != nil {
return fmt.Errorf("determining state path: %w", err)
}

result, err := runMigrations(ctx, statePath, logger)
if err != nil {
return err
}

if len(result.Errs) > 0 {
return fmt.Errorf("%d migration(s) failed", len(result.Errs))
}

return nil
}

func createClusterClient(ctx context.Context, log log.Logger, _ *cobra.Command) (cluster.Client, error) {
if !viper.GetBool(FlagClusterK8s) {
// Condition that there is no Kubernetes API to work with.
Expand Down
30 changes: 30 additions & 0 deletions migrations/migrations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package migrations

import (
"context"
)

// Migration represents a single migration that can be executed.
// All migrations must be idempotent, running them multiple times
// should have the same effect as running them once.
type Migration interface {
// Name returns a unique identifier for this migration.
// This identifier is used to track whether the migration has been applied.
Name() string

// Description returns a human-readable description of what this migration does.
Description() string

// FromVersion returns the provider version this migration applies from.
// Migrations will only run when upgrading from a version < FromVersion.
// For fresh installs (no previous version), migrations are skipped.
// Return empty string to indicate this migration should always run (not recommended).
FromVersion() string

// Run executes the migration. This method must be idempotent.
// If the migration has already been applied, this should be a no-op.
// The context provides access to Kubernetes clients and other resources.
// The registry tracks applied migrations via the state file, so Run()
// should be safe to call multiple times.
Run(ctx context.Context) error
}
Loading
Loading