diff --git a/.gitignore b/.gitignore index 579b3e2..f4637d5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /lerobot-go +/bin/ /go-lerobot /lerobot-ds *.test diff --git a/internal/v21/integrity.go b/internal/v21/integrity.go new file mode 100644 index 0000000..c4f36f2 --- /dev/null +++ b/internal/v21/integrity.go @@ -0,0 +1,100 @@ +package v21 + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/ioai-tech/lerobot-go/internal/meta" +) + +func requiredVideoKeys(features map[string]meta.FeatureSpec) []string { + var keys []string + for k, f := range features { + if f.DType == "video" { + keys = append(keys, k) + } + } + return keys +} + +// ValidateOutputIntegrity checks merged v2.1 dataset has per-episode parquet and video files. +func ValidateOutputIntegrity(outputRoot string, info meta.DatasetInfo, features map[string]meta.FeatureSpec) error { + if info.TotalEpisodes <= 0 { + matches, err := filepath.Glob(filepath.Join(outputRoot, meta.DataDir, "*", "episode_*.parquet")) + if err != nil { + return err + } + if len(matches) == 0 { + matches, err = filepath.Glob(filepath.Join(outputRoot, meta.DataDir, "*", "*.parquet")) + if err != nil { + return err + } + } + if len(matches) == 0 { + return fmt.Errorf("no v2.1 data parquet files under %s", outputRoot) + } + for _, p := range matches { + if err := nonEmptyFile(p); err != nil { + return err + } + } + } else { + for ep := 0; ep < info.TotalEpisodes; ep++ { + p := filepath.Join(outputRoot, meta.V21DataPathFromInfo(info, ep)) + if err := nonEmptyFile(p); err != nil { + return fmt.Errorf("episode %d: %w", ep, err) + } + } + } + if !hasVideoFeatures(features) { + return nil + } + for _, vk := range requiredVideoKeys(features) { + found := 0 + for ep := 0; ep < max(1, info.TotalEpisodes); ep++ { + p := filepath.Join(outputRoot, meta.V21VideoPathFromInfo(info, vk, ep)) + if err := nonEmptyFile(p); err == nil { + found++ + continue + } + alt := filepath.Join(outputRoot, meta.V21VideoPathFromInfo(info, filepath.Base(vk), ep)) + if err := nonEmptyFile(alt); err == nil { + found++ + } + } + if found == 0 { + glob, _ := filepath.Glob(filepath.Join(outputRoot, "videos", "*", vk, "episode_*.mp4")) + if len(glob) == 0 { + glob, _ = filepath.Glob(filepath.Join(outputRoot, "videos", "*", filepath.Base(vk), "episode_*.mp4")) + } + if len(glob) == 0 { + return fmt.Errorf("no merged video files for feature %q", vk) + } + for _, f := range glob { + if err := nonEmptyFile(f); err != nil { + return err + } + } + } + } + return nil +} + +func nonEmptyFile(path string) error { + fi, err := os.Stat(path) + if err != nil { + return fmt.Errorf("file missing %s: %w", path, err) + } + if fi.Size() == 0 { + return fmt.Errorf("file empty: %s", path) + } + return nil +} + +func max(a, b int) int { + if a > b { + return a + } + return b +} diff --git a/internal/v21/integrity_test.go b/internal/v21/integrity_test.go new file mode 100644 index 0000000..4d356a3 --- /dev/null +++ b/internal/v21/integrity_test.go @@ -0,0 +1,68 @@ +package v21 + +import ( + "os" + "path/filepath" + "testing" + + "github.com/ioai-tech/lerobot-go/internal/meta" +) + +func TestValidateOutputIntegrityParquetOK(t *testing.T) { + root := t.TempDir() + info := meta.NewDatasetInfo(meta.CodebaseV21, 30, map[string]meta.FeatureSpec{ + "observation.state": {DType: "float32", Shape: []int{2}}, + }, false, "test") + info.TotalEpisodes = 1 + pq := filepath.Join(root, meta.V21DataPathFromInfo(info, 0)) + if err := os.MkdirAll(filepath.Dir(pq), 0o755); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(pq, []byte("par1"), 0o644); err != nil { + t.Fatal(err) + } + if err := ValidateOutputIntegrity(root, info, info.Features); err != nil { + t.Fatalf("expected ok: %v", err) + } +} + +func TestValidateOutputIntegrityMissingParquet(t *testing.T) { + root := t.TempDir() + info := meta.NewDatasetInfo(meta.CodebaseV21, 30, map[string]meta.FeatureSpec{ + "observation.state": {DType: "float32", Shape: []int{2}}, + }, false, "test") + info.TotalEpisodes = 1 + if err := ValidateOutputIntegrity(root, info, info.Features); err == nil { + t.Fatal("expected error for missing parquet") + } +} + +func TestValidateOutputIntegrityVideoRequired(t *testing.T) { + root := t.TempDir() + features := map[string]meta.FeatureSpec{ + "observation.state": {DType: "float32", Shape: []int{2}}, + "observation.images.cam": {DType: "video", Shape: []int{4, 4, 3}}, + } + info := meta.NewDatasetInfo(meta.CodebaseV21, 30, features, true, "test") + info.TotalEpisodes = 1 + pq := filepath.Join(root, meta.V21DataPathFromInfo(info, 0)) + if err := os.MkdirAll(filepath.Dir(pq), 0o755); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(pq, []byte("par1"), 0o644); err != nil { + t.Fatal(err) + } + if err := ValidateOutputIntegrity(root, info, features); err == nil { + t.Fatal("expected error for missing video") + } + vid := filepath.Join(root, meta.V21VideoPathFromInfo(info, "observation.images.cam", 0)) + if err := os.MkdirAll(filepath.Dir(vid), 0o755); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(vid, []byte("mp4"), 0o644); err != nil { + t.Fatal(err) + } + if err := ValidateOutputIntegrity(root, info, features); err != nil { + t.Fatalf("expected ok with video: %v", err) + } +} diff --git a/internal/v21/merge.go b/internal/v21/merge.go index 9cea32d..4ecf3f0 100644 --- a/internal/v21/merge.go +++ b/internal/v21/merge.go @@ -113,7 +113,13 @@ func Merge(ctx context.Context, cfg MergeConfig) error { if err := meta.UpdateVideoFeaturesInfo(ctx, &info, cfg.OutputRoot, cfg.Locator); err != nil { return err } - return meta.WriteInfo(cfg.OutputRoot, info) + if err := meta.WriteInfo(cfg.OutputRoot, info); err != nil { + return err + } + if err := ValidateOutputIntegrity(cfg.OutputRoot, info, cfg.Features); err != nil { + return fmt.Errorf("merged output integrity: %w", err) + } + return nil } func copyFile(src, dst string) error { diff --git a/internal/v21/staging.go b/internal/v21/staging.go index 2a695b3..6760d14 100644 --- a/internal/v21/staging.go +++ b/internal/v21/staging.go @@ -277,7 +277,14 @@ func (w *StagingWriter) finalizeVideos(ctx context.Context) (map[string]string, return videos, durations, nil } if w.cfg.H264Remux { - return w.finalizeH264RemuxVideos(ctx) + remuxVideos, remuxDurations, err := w.finalizeH264RemuxVideos(ctx) + if err != nil { + return nil, nil, err + } + if len(remuxVideos) > 0 { + return remuxVideos, remuxDurations, nil + } + // Decode-fallback path may have streamed RGB into per-episode encoders without SetH264Remux. } type encClose struct { key string diff --git a/internal/v21/staging_h264_fallback_test.go b/internal/v21/staging_h264_fallback_test.go new file mode 100644 index 0000000..1d20e02 --- /dev/null +++ b/internal/v21/staging_h264_fallback_test.go @@ -0,0 +1,65 @@ +package v21 + +import ( + "context" + "os" + "os/exec" + "path/filepath" + "testing" + + "github.com/ioai-tech/lerobot-go/internal/manifest" + "github.com/ioai-tech/lerobot-go/internal/meta" + "github.com/ioai-tech/lerobot-go/internal/video" +) + +func TestStagingH264DecodeFallbackPopulatesVideos(t *testing.T) { + if _, err := exec.LookPath("ffmpeg"); err != nil { + t.Skip("ffmpeg not installed") + } + dir := t.TempDir() + key := "observation.images.cam" + w, err := NewStagingWriter(StagingConfig{ + Dir: dir, + Episode: 0, + FPS: 10, + UseVideos: true, + H264Remux: true, + Features: map[string]meta.FeatureSpec{ + key: {DType: "video", Shape: []int{4, 4, 3}}, + }, + }) + if err != nil { + t.Fatalf("NewStagingWriter: %v", err) + } + frame := video.VideoFrameRGB24{ + Width: 4, Height: 4, + Data: make([]byte, 4*4*3), + } + for i := 0; i < 3; i++ { + if err := w.AppendRGBVideoFrame(context.Background(), key, frame); err != nil { + t.Fatalf("AppendRGBVideoFrame: %v", err) + } + if err := w.AddFrame(context.Background(), map[string]any{"task": "t"}); err != nil { + t.Fatalf("AddFrame: %v", err) + } + } + ep, err := w.SaveEpisode(context.Background()) + if err != nil { + t.Fatalf("SaveEpisode: %v", err) + } + if len(ep.Videos) == 0 || ep.Videos[key] == "" { + t.Fatalf("expected videos map entry, got %#v", ep.Videos) + } + out := manifest.StagingMediaPath(dir, ep.Videos[key]) + fi, err := os.Stat(out) + if err != nil { + t.Fatalf("missing staged mp4: %v", err) + } + if fi.Size() == 0 { + t.Fatal("empty staged mp4") + } + rel := filepath.Base(out) + if rel != "observation.images.cam.mp4" { + t.Fatalf("unexpected video rel basename: %s", rel) + } +} diff --git a/lerobot/writer.go b/lerobot/writer.go index 10a3ae5..1eba41b 100644 --- a/lerobot/writer.go +++ b/lerobot/writer.go @@ -6,6 +6,7 @@ import ( "os" "github.com/ioai-tech/lerobot-go/internal/manifest" + "github.com/ioai-tech/lerobot-go/internal/meta" v21 "github.com/ioai-tech/lerobot-go/internal/v21" v30 "github.com/ioai-tech/lerobot-go/internal/v30" "github.com/ioai-tech/lerobot-go/internal/video" @@ -201,7 +202,16 @@ func (d *serialDataset) Root() string { return d.root } // ValidateOutputIntegrity checks merged dataset has data parquet and video files. func ValidateOutputIntegrity(root string, features map[string]FeatureSpec) error { - return v30.ValidateOutputIntegrity(root, features) + info, err := meta.LoadInfo(root) + if err != nil { + return v30.ValidateOutputIntegrity(root, features) + } + switch info.CodebaseVersion { + case meta.CodebaseV21: + return v21.ValidateOutputIntegrity(root, info, features) + default: + return v30.ValidateOutputIntegrity(root, features) + } } // Merge finalizes completed staging episodes into the official on-disk layout.