diff --git a/src/ferryctl/cmd/trim_deltas.go b/src/ferryctl/cmd/trim_deltas.go new file mode 100644 index 0000000..521f6af --- /dev/null +++ b/src/ferryctl/cmd/trim_deltas.go @@ -0,0 +1,61 @@ +// +// Copyright © 2017-2019 Solus Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package cmd + +import ( + "fmt" + "os" + "strconv" + + "github.com/spf13/cobra" + + "github.com/getsolus/ferryd/src/libferry" +) + +var trimDeltasCmd = &cobra.Command{ + Use: "deltas [repoName] [maxToKeep]", + Short: "trim deltas back to a maximum of [max to keep]", + Long: "Trim excessive back versions for delta packages in the repository", + Run: trimDeltas, +} + +func init() { + TrimCmd.AddCommand(trimDeltasCmd) +} + +func trimDeltas(cmd *cobra.Command, args []string) { + if len(args) != 2 { + fmt.Fprintf(os.Stderr, "usage: trim deltas [repoName] [maxToKeep]\n") + return + } + + maxKeep, err := strconv.ParseInt(args[1], 10, 32) + if err != nil { + fmt.Fprintf(os.Stderr, "Invalid integer: %v\n", err) + return + } + + client := libferry.NewClient(socketPath) + defer client.Close() + + repoID := args[0] + + if err := client.TrimDeltas(repoID, int(maxKeep)); err != nil { + fmt.Fprintf(os.Stderr, "Error while trimming deltas: %v\n", err) + return + } +} diff --git a/src/ferryd/core/api.go b/src/ferryd/core/api.go index 14e2150..067c2bb 100644 --- a/src/ferryd/core/api.go +++ b/src/ferryd/core/api.go @@ -157,6 +157,20 @@ func (m *Manager) TrimPackages(repoID string, maxKeep int) error { return m.Index(repoID) } +// TrimDeltas will ask the repo to remove excessive packages +func (m *Manager) TrimDeltas(repoID string, maxKeep int) error { + repo, err := m.repo.GetRepo(m.db, repoID) + if err != nil { + return err + } + + if err = repo.TrimDeltas(m.db, m.pool, maxKeep); err != nil { + return err + } + + return m.Index(repoID) +} + // GetRepos will return all known repositories func (m *Manager) GetRepos() ([]*Repository, error) { return m.repo.GetRepos(m.db) diff --git a/src/ferryd/core/repo.go b/src/ferryd/core/repo.go index 0172cc9..050eb60 100644 --- a/src/ferryd/core/repo.go +++ b/src/ferryd/core/repo.go @@ -1330,3 +1330,132 @@ func (r *Repository) TrimPackages(db libdb.Database, pool *Pool, maxKeep int) er return nil } + +// TrimPackages will trim back the packages in each package entry to a maximum +// amount of packages, which helps to combat the issue of rapidly inserting +// many builds into a repo, i.e. removing old backversions +func (r *Repository) TrimDeltas(db libdb.Database, pool *Pool, maxKeep int) error { + if err := r.checkWrite(); err != nil { + return err + } + + // Check for valid maxKeep + if maxKeep < 0 { + return fmt.Errorf("maxKeep of %d is too small. It Must be greater than or equal to 0", maxKeep) + } + + // All the guys who we're sending to the big bitsink in the sky + var removalIDs []string + + rootBucket := db.Bucket([]byte(DatabaseBucketRepo)).Bucket([]byte(r.ID)).Bucket([]byte(DatabaseBucketPackage)) + + // Grab every package + err := rootBucket.ForEach(func(k, v []byte) error { + entry := RepoEntry{} + if err := rootBucket.Decode(v, &entry); err != nil { + return err + } + + // Find the published release number + pubRelease := -1 + if entry.Published != "" { + if pubEntry, err := pool.GetEntry(db, entry.Published); err == nil { + pubRelease = pubEntry.Meta.GetRelease() + } + } + + log.WithFields(log.Fields{ + "package": entry.Name, + "ndeltas": len(entry.Deltas), + "pubRelease": pubRelease, + "deltas": entry.Deltas, + }).Info("Processing deltas for package") + + var candidates []*PoolEntry + + for _, id := range entry.Deltas { + poolEntry, err := pool.GetEntry(db, id) + if err != nil { + log.WithFields(log.Fields{ + "package": entry.Name, + "id": id, + "error": err, + }).Warning("Failed to find delta in pool") + continue + } + if poolEntry.Delta == nil { + log.WithFields(log.Fields{ + "package": entry.Name, + "id": id, + }).Warning("Found a delta entry in pool with no delta information") + continue + } + + // If it doesn't target our published version, it's stale, remove it. + if poolEntry.Delta.ToRelease != pubRelease { + log.WithFields(log.Fields{ + "package": entry.Name, + "id": id, + "toRelease": poolEntry.Delta.ToRelease, + "pubRelease": pubRelease, + }).Info("Marking stale delta for removal") + removalIDs = append(removalIDs, id) + continue + } + + candidates = append(candidates, poolEntry) + } + + if len(candidates) <= maxKeep { + return nil + } + + sort.Slice(candidates, func(i, j int) bool { + // They all have the same ToRelease now, so sort by FromRelease DESC + return candidates[i].Delta.FromRelease > candidates[j].Delta.FromRelease + }) + + log.WithFields(log.Fields{ + "package": entry.Name, + "count": len(candidates), + "maxKeep": maxKeep, + }).Info("Candidates for delta trimming") + + for idx, c := range candidates { + log.WithFields(log.Fields{ + "idx": idx, + "name": c.Name, + "to": c.Delta.ToRelease, + "from": c.Delta.FromRelease, + "keep": idx < maxKeep, + }).Info("Delta candidate") + } + + for i := maxKeep; i < len(candidates); i++ { + removalIDs = append(removalIDs, candidates[i].Name) + } + + return nil + }) + + if err != nil { + return err + } + + // Now attempt to unref every one of the packages marked as obsolete + for _, id := range removalIDs { + log.WithFields(log.Fields{ + "repo": r.ID, + "id": id, + }).Info("Trimming old delta package") + if err := r.UnrefPackage(db, pool, id); err != nil { + log.WithFields(log.Fields{ + "repo": r.ID, + "id": id, + "error": err, + }).Warning("Failed to trim delta package") + } + } + + return nil +} diff --git a/src/ferryd/handlers.go b/src/ferryd/handlers.go index ce8988c..30474fd 100644 --- a/src/ferryd/handlers.go +++ b/src/ferryd/handlers.go @@ -295,6 +295,25 @@ func (s *Server) TrimPackages(w http.ResponseWriter, r *http.Request, p httprout s.jproc.PushJob(jobs.NewTrimPackagesJob(target, req.MaxKeep)) } +// TrimPackages will proxy a job to remove excess fat from a repo +func (s *Server) TrimDeltas(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + target := p.ByName("id") + + req := libferry.TrimPackagesRequest{} + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + log.WithFields(log.Fields{ + "repo": target, + "maxKeep": req.MaxKeep, + }).Info("Package trim requested") + + s.jproc.PushJob(jobs.NewTrimDeltasJob(target, req.MaxKeep)) +} + // TrimObsolete will proxy a job to remove obsolete packages from a repo func (s *Server) TrimObsolete(w http.ResponseWriter, r *http.Request, p httprouter.Params) { id := p.ByName("id") diff --git a/src/ferryd/jobs/main.go b/src/ferryd/jobs/main.go index 22dc2ee..6ee5e6e 100644 --- a/src/ferryd/jobs/main.go +++ b/src/ferryd/jobs/main.go @@ -78,6 +78,9 @@ const ( // TrimPackages is a sequential job to trim fat from a repository TrimPackages = "TrimPackages" + // TrimDeltas is a sequential job to trim fat from a repository + TrimDeltas = "TrimDeltas" + // FreezeRepo is a sequential job to freeze a repository. FreezeRepo = "FreezeRepo" @@ -170,6 +173,8 @@ func NewJobHandler(j *JobEntry) (JobHandler, error) { return NewTrimObsoleteJobHandler(j) case TrimPackages: return NewTrimPackagesJobHandler(j) + case TrimDeltas: + return NewTrimDeltasJobHandler(j) case FreezeRepo: return NewFreezeRepoJobHandler(j) case UnfreezeRepo: diff --git a/src/ferryd/jobs/trim_deltas.go b/src/ferryd/jobs/trim_deltas.go new file mode 100644 index 0000000..c1fccab --- /dev/null +++ b/src/ferryd/jobs/trim_deltas.go @@ -0,0 +1,73 @@ +// +// Copyright © 2017-2019 Solus Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package jobs + +import ( + "fmt" + "strconv" + + log "github.com/sirupsen/logrus" + + "github.com/getsolus/ferryd/src/ferryd/core" +) + +// TrimDeltasJobHandler is responsible for removing packages by identifiers +type TrimDeltasJobHandler struct { + repoID string + maxKeep int +} + +// NewTrimDeltasJob will return a job suitable for adding to the job processor +func NewTrimDeltasJob(repoID string, maxKeep int) *JobEntry { + return &JobEntry{ + sequential: true, + Type: TrimDeltas, + Params: []string{repoID, fmt.Sprintf("%d", maxKeep)}, + } +} + +// NewTrimDeltasJobHandler will create a job handler for the input job and ensure it validates +func NewTrimDeltasJobHandler(j *JobEntry) (*TrimDeltasJobHandler, error) { + if len(j.Params) != 2 { + return nil, fmt.Errorf("job has invalid parameters") + } + keep, err := strconv.ParseInt(j.Params[1], 10, 32) + if err != nil { + return nil, err + } + return &TrimDeltasJobHandler{ + repoID: j.Params[0], + maxKeep: int(keep), + }, nil +} + +// Execute will attempt removal of excessive packages in the index +func (j *TrimDeltasJobHandler) Execute(_ *Processor, manager *core.Manager) error { + if err := manager.TrimDeltas(j.repoID, j.maxKeep); err != nil { + return err + } + log.WithFields(log.Fields{ + "repo": j.repoID, + "maxKeep": j.maxKeep, + }).Info("Trimmed deltas in repository") + return nil +} + +// Describe returns a human readable description for this job +func (j *TrimDeltasJobHandler) Describe() string { + return fmt.Sprintf("Trim deltas to maximum of %d in '%s'", j.maxKeep, j.repoID) +} diff --git a/src/ferryd/server.go b/src/ferryd/server.go index b2386d5..bba16b2 100644 --- a/src/ferryd/server.go +++ b/src/ferryd/server.go @@ -110,6 +110,7 @@ func NewServer() (*Server, error) { // Removal router.POST("/api/v1/remove/source/:id", s.RemoveSource) router.POST("/api/v1/trim/packages/:id", s.TrimPackages) + router.POST("/api/v1/trim/deltas/:id", s.TrimDeltas) router.GET("/api/v1/trim/obsoletes/:id", s.TrimObsolete) // Reset jobs are special and go straight to the store diff --git a/src/libferry/main.go b/src/libferry/main.go index f8fdc5c..907c0e5 100644 --- a/src/libferry/main.go +++ b/src/libferry/main.go @@ -219,6 +219,14 @@ func (c *Client) TrimPackages(repoID string, maxKeep int) error { return c.postBasicResponse(c.formURI("api/v1/trim/packages/"+repoID), &tq, &Response{}) } +// TrimDeltas will request that packages in the repo are trimmed to maxKeep +func (c *Client) TrimDeltas(repoID string, maxKeep int) error { + tq := TrimPackagesRequest{ + MaxKeep: maxKeep, + } + return c.postBasicResponse(c.formURI("api/v1/trim/deltas/"+repoID), &tq, &Response{}) +} + // TrimObsolete will request that all packages marked obsolete are removed func (c *Client) TrimObsolete(repoID string) error { uri := c.formURI("/api/v1/trim/obsoletes/" + repoID)