From 866338f1e491d742d9e81397bbff900adaad38a6 Mon Sep 17 00:00:00 2001 From: Hariom Verma Date: Thu, 21 Jul 2022 16:19:35 +0530 Subject: [PATCH] Add concurrency in content API Signed-off-by: Hariom Verma --- main.go | 78 +++++++++++++++++++++++++++----------------- utils/common.go | 4 ++- utils/content.go | 84 +++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 134 insertions(+), 32 deletions(-) diff --git a/main.go b/main.go index 7e615e1..0c24f72 100644 --- a/main.go +++ b/main.go @@ -13,6 +13,7 @@ import ( "path" "path/filepath" "regexp" + "sort" "strings" "syscall" @@ -503,7 +504,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } if treeEntry.Mode.IsFile() { - var fileContent []*utils.Content + var fileContent []utils.Content blob, err := object.GetBlob(repo.Storer, treeEntry.Hash) if err != nil { http.Error(w, err.Error(), http.StatusNotFound) @@ -512,7 +513,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { if err != nil { http.Error(w, err.Error(), http.StatusNotFound) } - fileContent = append(fileContent, fc) + fileContent = append(fileContent, *fc) if body.IncludeLastCommit { pathCommitId, err := utils.LastCommitForPath(RepoPath, body.RefId, fileContent[0].Path) @@ -550,9 +551,9 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } - var treeContents []*utils.Content + var treeContents []utils.Content pageRes, err := utils.PaginateTreeContentResponse(tree, body.Pagination, 100, body.Path, func(treeContent utils.Content) error { - treeContents = append(treeContents, &treeContent) + treeContents = append(treeContents, treeContent) return nil }) if err != nil { @@ -560,40 +561,57 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + var resContents []utils.Content + if body.IncludeLastCommit { - for i := range treeContents { - pathCommitId, err := utils.LastCommitForPath(RepoPath, body.RefId, treeContents[i].Path) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - pathCommitHash := plumbing.NewHash(pathCommitId) - pathCommitObject, err := object.GetCommit(repo.Storer, pathCommitHash) - if err != nil { - http.Error(w, err.Error(), http.StatusNotFound) - return - } - treeContents[i].LastCommit, err = utils.GrabCommit(*pathCommitObject) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return + errc := make(chan error, len(treeContents)) + done := make(chan struct{}) + defer close(errc) + defer close(done) + + inputCh := utils.PrepareTreeContentPipeline(treeContents, done) + + mergeInput := make([]<-chan utils.Content, len(treeContents)) + + for i := 0; i < len(treeContents); i++ { + mergeInput[i] = utils.GetLastCommit(inputCh, RepoPath, repo.Storer, body.RefId, errc, done) + } + + final := utils.MergeContentChannel(done, mergeInput...) + + go func() { + for tc := range final { + select { + case <-done: + return + default: + resContents = append(resContents, tc) + } } + errc <- nil + }() + + if err := <-errc; err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return } + } else { + resContents = treeContents } - var sortedTREEContents []*utils.Content - var sortedBLOBContents []*utils.Content - for _, tc := range treeContents { - if tc.Type == "TREE" { - sortedTREEContents = append(sortedTREEContents, tc) - } else { - sortedBLOBContents = append(sortedBLOBContents, tc) + /* Sort the contents */ + sort.Slice(resContents[:], func(i, j int) bool { + switch strings.Compare(resContents[i].Type, resContents[j].Type) { + case -1: + return false + case 1: + return true } - } - sortedTreeContents := append(sortedTREEContents, sortedBLOBContents...) + return resContents[i].Name < resContents[j].Name + }) contentResponse := utils.ContentResponse{ - Content: sortedTreeContents, + Content: resContents, Pagination: pageRes, } contentResponseJson, err := json.Marshal(contentResponse) diff --git a/utils/common.go b/utils/common.go index 1e66388..e826294 100644 --- a/utils/common.go +++ b/utils/common.go @@ -1,6 +1,8 @@ package utils -import "encoding/binary" +import ( + "encoding/binary" +) func UInt64ToBytes(id uint64) []byte { bz := make([]byte, 8) diff --git a/utils/content.go b/utils/content.go index ee57f91..a1926d1 100644 --- a/utils/content.go +++ b/utils/content.go @@ -4,8 +4,11 @@ import ( "encoding/base64" "fmt" "io/ioutil" + "sync" + "github.com/gitopia/go-git/v5/plumbing" "github.com/gitopia/go-git/v5/plumbing/object" + "github.com/gitopia/go-git/v5/storage" ) type ContentType int @@ -29,7 +32,7 @@ type ContentRequestBody struct { } type ContentResponse struct { - Content []*Content `json:"content,omitempty"` + Content []Content `json:"content,omitempty"` Pagination *PageResponse `json:"pagination,omitempty"` } @@ -183,3 +186,82 @@ func PaginateTreeContentResponse( return res, nil } + +func PrepareTreeContentPipeline(treeContents []Content, done chan struct{}) <-chan Content { + out := make(chan Content) + + go func() { + defer close(out) + for i := range treeContents { + select { + case out <- treeContents[i]: + case <-done: + return + } + } + }() + + return out +} + +func GetLastCommit(treeEntry <-chan Content, RepoPath string, repoStorer storage.Storer, refId string, errc chan<- error, done <-chan struct{}) <-chan Content { + out := make(chan Content) + + go func() { + defer close(out) + for te := range treeEntry { + select { + case <-done: + return + default: + pathCommitId, err := LastCommitForPath(RepoPath, refId, te.Path) + if err != nil { + errc <- err + return + } + pathCommitHash := plumbing.NewHash(pathCommitId) + pathCommitObject, err := object.GetCommit(repoStorer, pathCommitHash) + if err != nil { + errc <- err + return + } + te.LastCommit, err = GrabCommit(*pathCommitObject) + if err != nil { + errc <- err + return + } + out <- te + } + } + }() + + return out +} + +func MergeContentChannel(done chan struct{}, cs ...<-chan Content) <-chan Content { + out := make(chan Content) + wg := sync.WaitGroup{} + + output := func(c <-chan Content) { + defer wg.Done() + for i := range c { + select { + case out <- i: + case <-done: + return + } + } + } + + wg.Add(len(cs)) + for _, ch := range cs { + go output(ch) + } + + go func() { + wg.Wait() + close(out) + }() + + return out +}