Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions src/ferryctl/cmd/trim_deltas.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
//
// Copyright © 2017-2019 Solus Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package cmd

import (
"fmt"
"os"
"strconv"

"github.com/spf13/cobra"

"github.com/getsolus/ferryd/src/libferry"
)

var trimDeltasCmd = &cobra.Command{
Use: "deltas [repoName] [maxToKeep]",
Short: "trim deltas back to a maximum of [max to keep]",
Long: "Trim excessive back versions for delta packages in the repository",
Run: trimDeltas,
}

func init() {
TrimCmd.AddCommand(trimDeltasCmd)
}

func trimDeltas(cmd *cobra.Command, args []string) {
if len(args) != 2 {
fmt.Fprintf(os.Stderr, "usage: trim deltas [repoName] [maxToKeep]\n")
return
}

maxKeep, err := strconv.ParseInt(args[1], 10, 32)
if err != nil {
fmt.Fprintf(os.Stderr, "Invalid integer: %v\n", err)
return
}

client := libferry.NewClient(socketPath)
defer client.Close()

repoID := args[0]

if err := client.TrimDeltas(repoID, int(maxKeep)); err != nil {
fmt.Fprintf(os.Stderr, "Error while trimming deltas: %v\n", err)
return
}
}
14 changes: 14 additions & 0 deletions src/ferryd/core/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,20 @@ func (m *Manager) TrimPackages(repoID string, maxKeep int) error {
return m.Index(repoID)
}

// TrimDeltas will ask the repo to remove excessive packages
func (m *Manager) TrimDeltas(repoID string, maxKeep int) error {
repo, err := m.repo.GetRepo(m.db, repoID)
if err != nil {
return err
}

if err = repo.TrimDeltas(m.db, m.pool, maxKeep); err != nil {
return err
}

return m.Index(repoID)
}

// GetRepos will return all known repositories
func (m *Manager) GetRepos() ([]*Repository, error) {
return m.repo.GetRepos(m.db)
Expand Down
129 changes: 129 additions & 0 deletions src/ferryd/core/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -1330,3 +1330,132 @@ func (r *Repository) TrimPackages(db libdb.Database, pool *Pool, maxKeep int) er

return nil
}

// TrimPackages will trim back the packages in each package entry to a maximum
// amount of packages, which helps to combat the issue of rapidly inserting
// many builds into a repo, i.e. removing old backversions
func (r *Repository) TrimDeltas(db libdb.Database, pool *Pool, maxKeep int) error {
if err := r.checkWrite(); err != nil {
return err
}

// Check for valid maxKeep
if maxKeep < 0 {
return fmt.Errorf("maxKeep of %d is too small. It Must be greater than or equal to 0", maxKeep)
}

// All the guys who we're sending to the big bitsink in the sky
var removalIDs []string

rootBucket := db.Bucket([]byte(DatabaseBucketRepo)).Bucket([]byte(r.ID)).Bucket([]byte(DatabaseBucketPackage))

// Grab every package
err := rootBucket.ForEach(func(k, v []byte) error {
entry := RepoEntry{}
if err := rootBucket.Decode(v, &entry); err != nil {
return err
}

// Find the published release number
pubRelease := -1
if entry.Published != "" {
if pubEntry, err := pool.GetEntry(db, entry.Published); err == nil {
pubRelease = pubEntry.Meta.GetRelease()
}
}

log.WithFields(log.Fields{
"package": entry.Name,
"ndeltas": len(entry.Deltas),
"pubRelease": pubRelease,
"deltas": entry.Deltas,
}).Info("Processing deltas for package")

var candidates []*PoolEntry

for _, id := range entry.Deltas {
poolEntry, err := pool.GetEntry(db, id)
if err != nil {
log.WithFields(log.Fields{
"package": entry.Name,
"id": id,
"error": err,
}).Warning("Failed to find delta in pool")
continue
}
if poolEntry.Delta == nil {
log.WithFields(log.Fields{
"package": entry.Name,
"id": id,
}).Warning("Found a delta entry in pool with no delta information")
continue
}

// If it doesn't target our published version, it's stale, remove it.
if poolEntry.Delta.ToRelease != pubRelease {
log.WithFields(log.Fields{
"package": entry.Name,
"id": id,
"toRelease": poolEntry.Delta.ToRelease,
"pubRelease": pubRelease,
}).Info("Marking stale delta for removal")
removalIDs = append(removalIDs, id)
continue
}

candidates = append(candidates, poolEntry)
}

if len(candidates) <= maxKeep {
return nil
}

sort.Slice(candidates, func(i, j int) bool {
// They all have the same ToRelease now, so sort by FromRelease DESC
return candidates[i].Delta.FromRelease > candidates[j].Delta.FromRelease
})

log.WithFields(log.Fields{
"package": entry.Name,
"count": len(candidates),
"maxKeep": maxKeep,
}).Info("Candidates for delta trimming")

for idx, c := range candidates {
log.WithFields(log.Fields{
"idx": idx,
"name": c.Name,
"to": c.Delta.ToRelease,
"from": c.Delta.FromRelease,
"keep": idx < maxKeep,
}).Info("Delta candidate")
}

for i := maxKeep; i < len(candidates); i++ {
removalIDs = append(removalIDs, candidates[i].Name)
}

return nil
})

if err != nil {
return err
}

// Now attempt to unref every one of the packages marked as obsolete
for _, id := range removalIDs {
log.WithFields(log.Fields{
"repo": r.ID,
"id": id,
}).Info("Trimming old delta package")
if err := r.UnrefPackage(db, pool, id); err != nil {
log.WithFields(log.Fields{
"repo": r.ID,
"id": id,
"error": err,
}).Warning("Failed to trim delta package")
}
}

return nil
}
19 changes: 19 additions & 0 deletions src/ferryd/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,25 @@ func (s *Server) TrimPackages(w http.ResponseWriter, r *http.Request, p httprout
s.jproc.PushJob(jobs.NewTrimPackagesJob(target, req.MaxKeep))
}

// TrimPackages will proxy a job to remove excess fat from a repo
func (s *Server) TrimDeltas(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
target := p.ByName("id")

req := libferry.TrimPackagesRequest{}

if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

log.WithFields(log.Fields{
"repo": target,
"maxKeep": req.MaxKeep,
}).Info("Package trim requested")

s.jproc.PushJob(jobs.NewTrimDeltasJob(target, req.MaxKeep))
}

// TrimObsolete will proxy a job to remove obsolete packages from a repo
func (s *Server) TrimObsolete(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
id := p.ByName("id")
Expand Down
5 changes: 5 additions & 0 deletions src/ferryd/jobs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ const (
// TrimPackages is a sequential job to trim fat from a repository
TrimPackages = "TrimPackages"

// TrimDeltas is a sequential job to trim fat from a repository
TrimDeltas = "TrimDeltas"

// FreezeRepo is a sequential job to freeze a repository.
FreezeRepo = "FreezeRepo"

Expand Down Expand Up @@ -170,6 +173,8 @@ func NewJobHandler(j *JobEntry) (JobHandler, error) {
return NewTrimObsoleteJobHandler(j)
case TrimPackages:
return NewTrimPackagesJobHandler(j)
case TrimDeltas:
return NewTrimDeltasJobHandler(j)
case FreezeRepo:
return NewFreezeRepoJobHandler(j)
case UnfreezeRepo:
Expand Down
73 changes: 73 additions & 0 deletions src/ferryd/jobs/trim_deltas.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
//
// Copyright © 2017-2019 Solus Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package jobs

import (
"fmt"
"strconv"

log "github.com/sirupsen/logrus"

"github.com/getsolus/ferryd/src/ferryd/core"
)

// TrimDeltasJobHandler is responsible for removing packages by identifiers
type TrimDeltasJobHandler struct {
repoID string
maxKeep int
}

// NewTrimDeltasJob will return a job suitable for adding to the job processor
func NewTrimDeltasJob(repoID string, maxKeep int) *JobEntry {
return &JobEntry{
sequential: true,
Type: TrimDeltas,
Params: []string{repoID, fmt.Sprintf("%d", maxKeep)},
}
}

// NewTrimDeltasJobHandler will create a job handler for the input job and ensure it validates
func NewTrimDeltasJobHandler(j *JobEntry) (*TrimDeltasJobHandler, error) {
if len(j.Params) != 2 {
return nil, fmt.Errorf("job has invalid parameters")
}
keep, err := strconv.ParseInt(j.Params[1], 10, 32)
if err != nil {
return nil, err
}
return &TrimDeltasJobHandler{
repoID: j.Params[0],
maxKeep: int(keep),
}, nil
}

// Execute will attempt removal of excessive packages in the index
func (j *TrimDeltasJobHandler) Execute(_ *Processor, manager *core.Manager) error {
if err := manager.TrimDeltas(j.repoID, j.maxKeep); err != nil {
return err
}
log.WithFields(log.Fields{
"repo": j.repoID,
"maxKeep": j.maxKeep,
}).Info("Trimmed deltas in repository")
return nil
}

// Describe returns a human readable description for this job
func (j *TrimDeltasJobHandler) Describe() string {
return fmt.Sprintf("Trim deltas to maximum of %d in '%s'", j.maxKeep, j.repoID)
}
1 change: 1 addition & 0 deletions src/ferryd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func NewServer() (*Server, error) {
// Removal
router.POST("/api/v1/remove/source/:id", s.RemoveSource)
router.POST("/api/v1/trim/packages/:id", s.TrimPackages)
router.POST("/api/v1/trim/deltas/:id", s.TrimDeltas)
router.GET("/api/v1/trim/obsoletes/:id", s.TrimObsolete)

// Reset jobs are special and go straight to the store
Expand Down
8 changes: 8 additions & 0 deletions src/libferry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ func (c *Client) TrimPackages(repoID string, maxKeep int) error {
return c.postBasicResponse(c.formURI("api/v1/trim/packages/"+repoID), &tq, &Response{})
}

// TrimDeltas will request that packages in the repo are trimmed to maxKeep
func (c *Client) TrimDeltas(repoID string, maxKeep int) error {
tq := TrimPackagesRequest{
MaxKeep: maxKeep,
}
return c.postBasicResponse(c.formURI("api/v1/trim/deltas/"+repoID), &tq, &Response{})
}

// TrimObsolete will request that all packages marked obsolete are removed
func (c *Client) TrimObsolete(repoID string) error {
uri := c.formURI("/api/v1/trim/obsoletes/" + repoID)
Expand Down