diff --git a/src/ferryctl/cmd/delta.go b/src/ferryctl/cmd/delta.go index a793a97..a22fa30 100644 --- a/src/ferryctl/cmd/delta.go +++ b/src/ferryctl/cmd/delta.go @@ -19,6 +19,7 @@ package cmd import ( "fmt" "os" + "strconv" "github.com/spf13/cobra" @@ -37,15 +38,21 @@ func init() { } func delta(cmd *cobra.Command, args []string) { - if len(args) != 1 { - fmt.Fprintf(os.Stderr, "Usage: delta [repoName]\n") + if len(args) != 2 { + fmt.Fprintf(os.Stderr, "Usage: delta [repoName] [maxGenerate]\n") + return + } + + maxGen, err := strconv.ParseInt(args[1], 10, 32) + if err != nil { + fmt.Fprintf(os.Stderr, "Invalid integer: %v\n", err) return } client := libferry.NewClient(socketPath) defer client.Close() - if err := client.DeltaRepo(args[0]); err != nil { + if err := client.DeltaRepo(args[0], int(maxGen)); err != nil { fmt.Fprintf(os.Stderr, "Error while creating deltas: %v\n", err) return } diff --git a/src/ferryd/handlers.go b/src/ferryd/handlers.go index ce8988c..1294c4f 100644 --- a/src/ferryd/handlers.go +++ b/src/ferryd/handlers.go @@ -161,10 +161,20 @@ func (s *Server) DeleteRepo(w http.ResponseWriter, r *http.Request, p httprouter // DeltaRepo will handle remote requests for repository deltaing func (s *Server) DeltaRepo(w http.ResponseWriter, r *http.Request, p httprouter.Params) { id := p.ByName("id") + + req := libferry.DeltaPackagesRequest{} + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + log.WithFields(log.Fields{ - "id": id, + "id": id, + "maxGenerate": req.MaxGenerate, }).Info("Repository delta requested") - s.jproc.PushJob(jobs.NewDeltaRepoJob(id)) + + s.jproc.PushJob(jobs.NewDeltaRepoJob(id, req.MaxGenerate)) } // IndexRepo will handle remote requests for repository indexing diff --git a/src/ferryd/jobs/delta.go b/src/ferryd/jobs/delta.go index bbfc8ad..e111321 100644 --- a/src/ferryd/jobs/delta.go +++ b/src/ferryd/jobs/delta.go @@ -20,6 +20,7 @@ import ( "fmt" "os" "sort" + "strconv" log "github.com/sirupsen/logrus" @@ -35,43 +36,51 @@ type DeltaJobHandler struct { packageName string indexRepo bool nDeltas int // Track how many deltas we actually produce + maxGenerate int // Limit how many deltas are generated per package } // NewDeltaJob will return a job suitable for adding to the job processor -func NewDeltaJob(repoID, packageID string) *JobEntry { +func NewDeltaJob(repoID, packageID string, maxGenerate int) *JobEntry { return &JobEntry{ sequential: false, Type: Delta, - Params: []string{repoID, packageID}, + Params: []string{repoID, packageID, fmt.Sprintf("%d", maxGenerate)}, } } // NewDeltaIndexJob will return a new job for creating delta packages as well // as scheduling an index operation when complete. -func NewDeltaIndexJob(repoID, packageID string) *JobEntry { +func NewDeltaIndexJob(repoID, packageID string, maxGenerate int) *JobEntry { return &JobEntry{ sequential: false, Type: DeltaIndex, - Params: []string{repoID, packageID}, + Params: []string{repoID, packageID, fmt.Sprintf("%d", maxGenerate)}, } } // NewDeltaJobHandler will create a job handler for the input job and ensure it validates func NewDeltaJobHandler(j *JobEntry, indexRepo bool) (*DeltaJobHandler, error) { - if len(j.Params) != 2 { + if len(j.Params) != 3 { return nil, fmt.Errorf("job has invalid parameters") } + + maxGen, err := strconv.ParseInt(j.Params[2], 10, 32) + if err != nil { + return nil, err + } + return &DeltaJobHandler{ repoID: j.Params[0], packageName: j.Params[1], indexRepo: indexRepo, nDeltas: 0, + maxGenerate: int(maxGen), }, nil } // executeInternal is the common code shared in the delta jobs, and is // split out to save duplication. -func (j *DeltaJobHandler) executeInternal(manager *core.Manager) error { +func (j *DeltaJobHandler) executeInternal(manager *core.Manager, maxGenerate int) error { pkgs, err := manager.GetPackages(j.repoID, j.packageName) if err != nil { return err @@ -89,8 +98,17 @@ func (j *DeltaJobHandler) executeInternal(manager *core.Manager) error { sort.Sort(libeopkg.PackageSet(pkgs)) tip := pkgs[len(pkgs)-1] + // Determine the start index for deltas + startIndex := 0 + if maxGenerate > 0 { + numHistorical := len(pkgs) - 1 + if numHistorical > maxGenerate { + startIndex = numHistorical - maxGenerate + } + } + // Process all potential deltas - for i := 0; i < len(pkgs)-1; i++ { + for i := startIndex; i < len(pkgs)-1; i++ { old := pkgs[i] fields := log.Fields{ "old": old.GetID(), @@ -188,7 +206,7 @@ func (j *DeltaJobHandler) includeDelta(manager *core.Manager, mapping *core.Delt // Execute will delta the target package within the target repository. func (j *DeltaJobHandler) Execute(_ *Processor, manager *core.Manager) error { - err := j.executeInternal(manager) + err := j.executeInternal(manager, j.maxGenerate) if err != nil { return err } diff --git a/src/ferryd/jobs/delta_repo.go b/src/ferryd/jobs/delta_repo.go index 60f70ba..a77b321 100644 --- a/src/ferryd/jobs/delta_repo.go +++ b/src/ferryd/jobs/delta_repo.go @@ -18,6 +18,7 @@ package jobs import ( "fmt" + "strconv" log "github.com/sirupsen/logrus" @@ -27,25 +28,33 @@ import ( // DeltaRepoJobHandler is responsible for delta'ing repositories and should only // ever be used in sequential queues. type DeltaRepoJobHandler struct { - repoID string + repoID string + maxGenerate int } // NewDeltaRepoJob will return a job suitable for adding to the job processor -func NewDeltaRepoJob(id string) *JobEntry { +func NewDeltaRepoJob(id string, maxGenerate int) *JobEntry { return &JobEntry{ sequential: true, Type: DeltaRepo, - Params: []string{id}, + Params: []string{id, fmt.Sprintf("%d", maxGenerate)}, } } // NewDeltaRepoJobHandler will create a job handler for the input job and ensure it validates func NewDeltaRepoJobHandler(j *JobEntry) (*DeltaRepoJobHandler, error) { - if len(j.Params) != 1 { + if len(j.Params) != 2 { return nil, fmt.Errorf("job has invalid parameters") } + + generate, err := strconv.ParseInt(j.Params[1], 10, 32) + if err != nil { + return nil, err + } + return &DeltaRepoJobHandler{ - repoID: j.Params[0], + repoID: j.Params[0], + maxGenerate: int(generate), }, nil } @@ -72,7 +81,7 @@ func (j *DeltaRepoJobHandler) Execute(jproc *Processor, manager *core.Manager) e // Fire off parallel delta jobs for every package in this repository for _, name := range packageNames { - jproc.PushJob(NewDeltaJob(j.repoID, name)) + jproc.PushJob(NewDeltaJob(j.repoID, name, j.maxGenerate)) } return nil diff --git a/src/ferryd/jobs/pull_repo.go b/src/ferryd/jobs/pull_repo.go index 58eb3f6..f02ac7e 100644 --- a/src/ferryd/jobs/pull_repo.go +++ b/src/ferryd/jobs/pull_repo.go @@ -74,7 +74,7 @@ func (j *PullRepoJobHandler) Execute(jproc *Processor, manager *core.Manager) er // Create delta job in this repository on the changed names // Don't cause indexing because it causes noise for _, pkg := range changedNames { - jproc.PushJob(NewDeltaIndexJob(j.targetID, pkg)) + jproc.PushJob(NewDeltaIndexJob(j.targetID, pkg, 0)) } return nil diff --git a/src/ferryd/jobs/transit_job.go b/src/ferryd/jobs/transit_job.go index 5701274..add1079 100644 --- a/src/ferryd/jobs/transit_job.go +++ b/src/ferryd/jobs/transit_job.go @@ -111,7 +111,7 @@ func (j *TransitJobHandler) Execute(jproc *Processor, manager *core.Manager) err if ent != nil { return err } - jproc.PushJob(NewDeltaIndexJob(repo, p.Name)) + jproc.PushJob(NewDeltaIndexJob(repo, p.Name, 0)) } return nil diff --git a/src/ferryd/server.go b/src/ferryd/server.go index b2386d5..688f2fc 100644 --- a/src/ferryd/server.go +++ b/src/ferryd/server.go @@ -94,7 +94,7 @@ func NewServer() (*Server, error) { // Repo management router.GET("/api/v1/create/repo/:id", s.CreateRepo) router.GET("/api/v1/remove/repo/:id", s.DeleteRepo) - router.GET("/api/v1/delta/repo/:id", s.DeltaRepo) + router.POST("/api/v1/delta/repo/:id", s.DeltaRepo) router.GET("/api/v1/index/repo/:id", s.IndexRepo) // Client sends us data diff --git a/src/libferry/main.go b/src/libferry/main.go index f8fdc5c..a24fb06 100644 --- a/src/libferry/main.go +++ b/src/libferry/main.go @@ -154,9 +154,11 @@ func (c *Client) DeleteRepo(id string) error { } // DeltaRepo will attempt to reproduce deltas in the given repo -func (c *Client) DeltaRepo(id string) error { - uri := c.formURI("/api/v1/delta/repo/" + id) - return c.getBasicResponse(uri, &Response{}) +func (c *Client) DeltaRepo(id string, maxGenerate int) error { + tq := DeltaPackagesRequest{ + MaxGenerate: maxGenerate, + } + return c.postBasicResponse(c.formURI("api/v1/delta/repo/"+id), &tq, &Response{}) } // IndexRepo will attempt to index a repository in the daemon diff --git a/src/libferry/types.go b/src/libferry/types.go index dc9a7a5..8539e62 100644 --- a/src/libferry/types.go +++ b/src/libferry/types.go @@ -91,6 +91,12 @@ type TrimPackagesRequest struct { MaxKeep int `json:"maxPackages"` } +// DeltaPackagesRequest is sent when generating deltas for a repository. +type DeltaPackagesRequest struct { + Response + MaxGenerate int `json:"maxPackages"` +} + // TimingInformation stores relevant timing stats on jobs so we can know what // kind of latency we're dealing with, etc. //