Skip to content

Commit da1f3f0

Browse files
committed
feat: ipfs get progress via dag stat and dag stat file count
- get: show progress (Items: N files, Blocks, Saving to) using dag stat API output only; - dag stat: add NumFiles (UnixFS file count, exclude chunk nodes); single traversal using file vs directory type; show Files column and Total Files in summary - dag: add NumFiles to DagStat and DagStatSummary for JSON and text output Signed-off-by: Chayan Das <01chayandas@gmail.com>
1 parent 9c894a7 commit da1f3f0

3 files changed

Lines changed: 178 additions & 18 deletions

File tree

core/commands/dag/dag.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ type DagStat struct {
297297
Cid cid.Cid
298298
Size uint64 `json:",omitempty"`
299299
NumBlocks int64 `json:",omitempty"`
300+
NumFiles int64 `json:",omitempty"` // UnixFS file nodes (actual files, including nested)
300301
}
301302

302303
func (s *DagStat) String() string {
@@ -349,13 +350,15 @@ type DagStatSummary struct {
349350
TotalSize uint64 `json:",omitempty"`
350351
SharedSize uint64 `json:",omitempty"`
351352
Ratio float32 `json:",omitempty"`
353+
NumFiles int64 `json:",omitempty"` // total UnixFS file count across all DAGs
352354
DagStatsArray []*DagStat `json:"DagStats,omitempty"`
353355
}
354356

355357
func (s *DagStatSummary) String() string {
356-
return fmt.Sprintf("Total Size: %d (%s)\nUnique Blocks: %d\nShared Size: %d (%s)\nRatio: %f",
358+
return fmt.Sprintf("Total Size: %d (%s)\nUnique Blocks: %d\nTotal Files: %d\nShared Size: %d (%s)\nRatio: %f",
357359
s.TotalSize, humanize.Bytes(s.TotalSize),
358360
s.UniqueBlocks,
361+
s.NumFiles,
359362
s.SharedSize, humanize.Bytes(s.SharedSize),
360363
s.Ratio)
361364
}
@@ -405,15 +408,17 @@ Note: This command skips duplicate blocks in reporting both size and the number
405408
csvWriter := csv.NewWriter(w)
406409
csvWriter.Comma = '\t'
407410
cidSpacing := len(event.DagStatsArray[0].Cid.String())
408-
header := []string{fmt.Sprintf("%-*s", cidSpacing, "CID"), fmt.Sprintf("%-15s", "Blocks"), "Size"}
411+
header := []string{fmt.Sprintf("%-*s", cidSpacing, "CID"), fmt.Sprintf("%-15s", "Blocks"), fmt.Sprintf("%-10s", "Files"), "Size"}
409412
if err := csvWriter.Write(header); err != nil {
410413
return err
411414
}
412415
for _, dagStat := range event.DagStatsArray {
413416
numBlocksStr := fmt.Sprint(dagStat.NumBlocks)
417+
numFilesStr := fmt.Sprint(dagStat.NumFiles)
414418
err := csvWriter.Write([]string{
415419
dagStat.Cid.String(),
416420
fmt.Sprintf("%-15s", numBlocksStr),
421+
fmt.Sprintf("%-10s", numFilesStr),
417422
fmt.Sprint(dagStat.Size),
418423
})
419424
if err != nil {

core/commands/dag/stat.go

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/dustin/go-humanize"
99
mdag "github.com/ipfs/boxo/ipld/merkledag"
1010
"github.com/ipfs/boxo/ipld/merkledag/traverse"
11+
"github.com/ipfs/boxo/ipld/unixfs"
1112
cid "github.com/ipfs/go-cid"
1213
cmds "github.com/ipfs/go-ipfs-cmds"
1314
"github.com/ipfs/kubo/core/commands/cmdenv"
@@ -25,6 +26,7 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment)
2526
if val, specified := req.Options[progressOptionName].(bool); specified {
2627
progressive = val
2728
}
29+
2830
api, err := cmdenv.GetApi(env, req)
2931
if err != nil {
3032
return err
@@ -33,6 +35,7 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment)
3335

3436
cidSet := cid.NewSet()
3537
dagStatSummary := &DagStatSummary{DagStatsArray: []*DagStat{}}
38+
3639
for _, a := range req.Arguments {
3740
p, err := cmdutils.PathOrCidPath(a)
3841
if err != nil {
@@ -50,24 +53,45 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment)
5053
if err != nil {
5154
return err
5255
}
56+
5357
dagstats := &DagStat{Cid: rp.RootCid()}
5458
dagStatSummary.appendStats(dagstats)
59+
60+
chunkCids := cid.NewSet()
5561
err = traverse.Traverse(obj, traverse.Options{
5662
DAG: nodeGetter,
5763
Order: traverse.DFSPre,
5864
Func: func(current traverse.State) error {
59-
currentNodeSize := uint64(len(current.Node.RawData()))
65+
nd := current.Node
66+
currentNodeSize := uint64(len(nd.RawData()))
6067
dagstats.Size += currentNodeSize
6168
dagstats.NumBlocks++
62-
if !cidSet.Has(current.Node.Cid()) {
69+
70+
if pn, ok := nd.(*mdag.ProtoNode); ok {
71+
if fsn, err := unixfs.FSNodeFromBytes(pn.Data()); err == nil {
72+
if fsn.Type() == unixfs.TFile && len(pn.Links()) > 0 {
73+
for _, l := range pn.Links() {
74+
chunkCids.Add(l.Cid)
75+
}
76+
}
77+
if !chunkCids.Has(nd.Cid()) {
78+
// Count as a file only if not a chunk block
79+
switch fsn.Type() {
80+
case unixfs.TFile, unixfs.TRaw, unixfs.TSymlink, unixfs.TMetadata:
81+
dagstats.NumFiles++
82+
}
83+
}
84+
}
85+
}
86+
87+
if !cidSet.Has(nd.Cid()) {
6388
dagStatSummary.incrementTotalSize(currentNodeSize)
6489
}
6590
dagStatSummary.incrementRedundantSize(currentNodeSize)
66-
cidSet.Add(current.Node.Cid())
91+
cidSet.Add(nd.Cid())
92+
6793
if progressive {
68-
if err := res.Emit(dagStatSummary); err != nil {
69-
return err
70-
}
94+
return res.Emit(dagStatSummary)
7195
}
7296
return nil
7397
},
@@ -82,10 +106,13 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment)
82106
dagStatSummary.UniqueBlocks = cidSet.Len()
83107
dagStatSummary.calculateSummary()
84108

85-
if err := res.Emit(dagStatSummary); err != nil {
86-
return err
109+
var totalFiles int64
110+
for _, s := range dagStatSummary.DagStatsArray {
111+
totalFiles += s.NumFiles
87112
}
88-
return nil
113+
dagStatSummary.NumFiles = totalFiles
114+
115+
return res.Emit(dagStatSummary)
89116
}
90117

91118
func finishCLIStat(res cmds.Response, re cmds.ResponseEmitter) error {
@@ -122,7 +149,10 @@ func finishCLIStat(res cmds.Response, re cmds.ResponseEmitter) error {
122149
totalBlocks += stat.NumBlocks
123150
totalSize += stat.Size
124151
}
125-
fmt.Fprintf(os.Stderr, "Fetched/Processed %d blocks, %d bytes (%s)\r", totalBlocks, totalSize, humanize.Bytes(totalSize))
152+
fmt.Fprintf(os.Stderr,
153+
"Fetched/Processed %d blocks, %d bytes (%s)\r",
154+
totalBlocks, totalSize, humanize.Bytes(totalSize),
155+
)
126156
}
127157
default:
128158
return e.TypeErr(out, v)

core/commands/get.go

Lines changed: 131 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@ import (
44
gotar "archive/tar"
55
"bufio"
66
"compress/gzip"
7+
"encoding/json"
78
"errors"
89
"fmt"
910
"io"
11+
"net/http"
12+
"net/url"
1013
"os"
1114
gopath "path"
1215
"path/filepath"
@@ -15,11 +18,13 @@ import (
1518
"github.com/ipfs/kubo/core/commands/cmdenv"
1619
"github.com/ipfs/kubo/core/commands/cmdutils"
1720
"github.com/ipfs/kubo/core/commands/e"
21+
fsrepo "github.com/ipfs/kubo/repo/fsrepo"
1822

1923
"github.com/cheggaaa/pb"
2024
"github.com/ipfs/boxo/files"
2125
"github.com/ipfs/boxo/tar"
2226
cmds "github.com/ipfs/go-ipfs-cmds"
27+
manet "github.com/multiformats/go-multiaddr/net"
2328
)
2429

2530
var ErrInvalidCompressionLevel = errors.New("compression level must be between 1 and 9")
@@ -29,6 +34,7 @@ const (
2934
archiveOptionName = "archive"
3035
compressOptionName = "compress"
3136
compressionLevelOptionName = "compression-level"
37+
getTotalBlocksKey = "_getTotalBlocks"
3238
)
3339

3440
var GetCmd = &cmds.Command{
@@ -61,8 +67,29 @@ may also specify the level of compression by specifying '-l=<1-9>'.
6167
cmds.BoolOption(progressOptionName, "p", "Stream progress data.").WithDefault(true),
6268
},
6369
PreRun: func(req *cmds.Request, env cmds.Environment) error {
64-
_, err := getCompressOptions(req)
65-
return err
70+
if _, err := getCompressOptions(req); err != nil {
71+
return err
72+
}
73+
74+
progress, _ := req.Options[progressOptionName].(bool)
75+
if !progress {
76+
return nil
77+
}
78+
79+
baseURL, err := getAPIBaseURL(env)
80+
if err != nil {
81+
return nil
82+
}
83+
84+
rootCID, err := resolveRootCID(baseURL, req.Arguments[0])
85+
if err != nil || rootCID == "" {
86+
return nil
87+
}
88+
89+
totalBlocks, files := fetchStatInfo(baseURL, rootCID)
90+
req.Options[getTotalBlocksKey] = totalBlocks
91+
printGetProgress(os.Stderr, rootCID, files, totalBlocks)
92+
return nil
6693
},
6794
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
6895
ctx := req.Context
@@ -141,6 +168,10 @@ may also specify the level of compression by specifying '-l=<1-9>'.
141168

142169
archive, _ := req.Options[archiveOptionName].(bool)
143170
progress, _ := req.Options[progressOptionName].(bool)
171+
totalBlocks, _ := req.Options[getTotalBlocksKey].(int)
172+
if totalBlocks <= 0 {
173+
totalBlocks = 1
174+
}
144175

145176
gw := getWriter{
146177
Out: os.Stdout,
@@ -149,13 +180,93 @@ may also specify the level of compression by specifying '-l=<1-9>'.
149180
Compression: cmplvl,
150181
Size: int64(res.Length()),
151182
Progress: progress,
183+
TotalBlocks: totalBlocks,
152184
}
153185

154186
return gw.Write(outReader, outPath)
155187
},
156188
},
157189
}
158190

191+
func getAPIBaseURL(env cmds.Environment) (string, error) {
192+
configRoot, err := cmdenv.GetConfigRoot(env)
193+
if err != nil {
194+
return "", err
195+
}
196+
apiAddr, err := fsrepo.APIAddr(configRoot)
197+
if err != nil {
198+
return "", err
199+
}
200+
network, host, err := manet.DialArgs(apiAddr)
201+
if err != nil || (network != "tcp" && network != "tcp4" && network != "tcp6") {
202+
return "", errors.New("unsupported network")
203+
}
204+
return "http://" + host, nil
205+
}
206+
207+
func resolveRootCID(baseURL, pathStr string) (string, error) {
208+
resp, err := http.Post(baseURL+"/api/v0/dag/resolve?arg="+url.QueryEscape(pathStr), "", nil)
209+
if err != nil {
210+
return "", err
211+
}
212+
defer resp.Body.Close()
213+
var out struct {
214+
Cid interface{} `json:"Cid"`
215+
}
216+
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
217+
return "", err
218+
}
219+
switch v := out.Cid.(type) {
220+
case string:
221+
return v, nil
222+
case map[string]interface{}:
223+
cid, _ := v["/"].(string)
224+
return cid, nil
225+
}
226+
return "", nil
227+
}
228+
229+
// fetchStatInfo calls dag/stat and returns totalBlocks and file count directly from NumFiles.
230+
func fetchStatInfo(baseURL, rootCID string) (totalBlocks, numFiles int) {
231+
resp, err := http.Post(
232+
baseURL+"/api/v0/dag/stat?arg="+url.QueryEscape("/ipfs/"+rootCID)+"&progress=false",
233+
"", nil,
234+
)
235+
if err != nil || resp.StatusCode != http.StatusOK {
236+
return 1, 1
237+
}
238+
defer resp.Body.Close()
239+
240+
var out struct {
241+
UniqueBlocks int `json:"UniqueBlocks"`
242+
DagStats []struct {
243+
NumBlocks int64 `json:"NumBlocks"`
244+
NumFiles int64 `json:"NumFiles"`
245+
} `json:"DagStats"`
246+
}
247+
if err := json.NewDecoder(resp.Body).Decode(&out); err == nil && len(out.DagStats) > 0 {
248+
ds := out.DagStats[0]
249+
totalBlocks = int(ds.NumBlocks)
250+
numFiles = int(ds.NumFiles)
251+
}
252+
if totalBlocks <= 0 {
253+
totalBlocks = max(out.UniqueBlocks, 1)
254+
}
255+
if numFiles <= 0 {
256+
numFiles = 1
257+
}
258+
return
259+
}
260+
261+
func printGetProgress(w io.Writer, cidStr string, numFiles, totalBlocks int) {
262+
fmt.Fprintf(w, "\nFetching CID: %s\n", cidStr)
263+
if numFiles == 1 {
264+
fmt.Fprintf(w, "Items: 1 file | Blocks: %d\n\n", totalBlocks)
265+
} else {
266+
fmt.Fprintf(w, "Items: %d files | Blocks: %d\n\n", numFiles, totalBlocks)
267+
}
268+
}
269+
159270
type clearlineReader struct {
160271
io.Reader
161272
out io.Writer
@@ -164,8 +275,7 @@ type clearlineReader struct {
164275
func (r *clearlineReader) Read(p []byte) (n int, err error) {
165276
n, err = r.Reader.Read(p)
166277
if err == io.EOF {
167-
// callback
168-
fmt.Fprintf(r.out, "\033[2K\r") // clear progress bar line on EOF
278+
fmt.Fprintf(r.out, "\033[2K\r")
169279
}
170280
return
171281
}
@@ -210,6 +320,7 @@ type getWriter struct {
210320
Compression int
211321
Size int64
212322
Progress bool
323+
TotalBlocks int
213324
}
214325

215326
func (gw *getWriter) Write(r io.Reader, fpath string) error {
@@ -258,10 +369,24 @@ func (gw *getWriter) writeExtracted(r io.Reader, fpath string) error {
258369
var progressCb func(int64) int64
259370
if gw.Progress {
260371
bar := makeProgressBar(gw.Err, gw.Size)
372+
totalBlocks := gw.TotalBlocks
373+
fmt.Fprintf(gw.Err, "Blocks: 0 / %d\n", totalBlocks)
261374
bar.Start()
262375
defer bar.Finish()
263-
defer bar.Set64(gw.Size)
264-
progressCb = bar.Add64
376+
defer func() {
377+
bar.Set64(gw.Size)
378+
fmt.Fprintf(gw.Err, "\033[A\033[2K\rBlocks: %d / %d\033[B", totalBlocks, totalBlocks)
379+
}()
380+
size := gw.Size
381+
progressCb = func(delta int64) int64 {
382+
total := bar.Add64(delta)
383+
blocksEst := totalBlocks
384+
if total < size && size > 0 {
385+
blocksEst = int(int64(totalBlocks) * total / size)
386+
}
387+
fmt.Fprintf(gw.Err, "\033[A\033[2K\rBlocks: %d / %d\033[B", blocksEst, totalBlocks)
388+
return total
389+
}
265390
}
266391

267392
extractor := &tar.Extractor{Path: fpath, Progress: progressCb}

0 commit comments

Comments
 (0)