Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/lerobot-go
/bin/
/go-lerobot
/lerobot-ds
*.test
Expand Down
100 changes: 100 additions & 0 deletions internal/v21/integrity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package v21

import (
"fmt"
"os"
"path/filepath"

"github.com/ioai-tech/lerobot-go/internal/meta"
)

func requiredVideoKeys(features map[string]meta.FeatureSpec) []string {
var keys []string
for k, f := range features {
if f.DType == "video" {
keys = append(keys, k)
}
}
return keys
}

// ValidateOutputIntegrity checks merged v2.1 dataset has per-episode parquet and video files.
func ValidateOutputIntegrity(outputRoot string, info meta.DatasetInfo, features map[string]meta.FeatureSpec) error {
if info.TotalEpisodes <= 0 {
matches, err := filepath.Glob(filepath.Join(outputRoot, meta.DataDir, "*", "episode_*.parquet"))
if err != nil {
return err
}
if len(matches) == 0 {
matches, err = filepath.Glob(filepath.Join(outputRoot, meta.DataDir, "*", "*.parquet"))
if err != nil {
return err
}
}
if len(matches) == 0 {
return fmt.Errorf("no v2.1 data parquet files under %s", outputRoot)
}
for _, p := range matches {
if err := nonEmptyFile(p); err != nil {
return err
}
}
} else {
for ep := 0; ep < info.TotalEpisodes; ep++ {
p := filepath.Join(outputRoot, meta.V21DataPathFromInfo(info, ep))
if err := nonEmptyFile(p); err != nil {
return fmt.Errorf("episode %d: %w", ep, err)
}
}
}
if !hasVideoFeatures(features) {
return nil
}
for _, vk := range requiredVideoKeys(features) {
found := 0
for ep := 0; ep < max(1, info.TotalEpisodes); ep++ {
p := filepath.Join(outputRoot, meta.V21VideoPathFromInfo(info, vk, ep))
if err := nonEmptyFile(p); err == nil {
found++
continue
}
alt := filepath.Join(outputRoot, meta.V21VideoPathFromInfo(info, filepath.Base(vk), ep))
if err := nonEmptyFile(alt); err == nil {
found++
}
}
if found == 0 {
glob, _ := filepath.Glob(filepath.Join(outputRoot, "videos", "*", vk, "episode_*.mp4"))
if len(glob) == 0 {
glob, _ = filepath.Glob(filepath.Join(outputRoot, "videos", "*", filepath.Base(vk), "episode_*.mp4"))
}
if len(glob) == 0 {
return fmt.Errorf("no merged video files for feature %q", vk)
}
for _, f := range glob {
if err := nonEmptyFile(f); err != nil {
return err
}
}
}
}
return nil
}

func nonEmptyFile(path string) error {
fi, err := os.Stat(path)
if err != nil {
return fmt.Errorf("file missing %s: %w", path, err)
}
if fi.Size() == 0 {
return fmt.Errorf("file empty: %s", path)
}
return nil
}

func max(a, b int) int {
if a > b {
return a
}
return b
}
68 changes: 68 additions & 0 deletions internal/v21/integrity_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package v21

import (
"os"
"path/filepath"
"testing"

"github.com/ioai-tech/lerobot-go/internal/meta"
)

func TestValidateOutputIntegrityParquetOK(t *testing.T) {
root := t.TempDir()
info := meta.NewDatasetInfo(meta.CodebaseV21, 30, map[string]meta.FeatureSpec{
"observation.state": {DType: "float32", Shape: []int{2}},
}, false, "test")
info.TotalEpisodes = 1
pq := filepath.Join(root, meta.V21DataPathFromInfo(info, 0))
if err := os.MkdirAll(filepath.Dir(pq), 0o755); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(pq, []byte("par1"), 0o644); err != nil {
t.Fatal(err)
}
if err := ValidateOutputIntegrity(root, info, info.Features); err != nil {
t.Fatalf("expected ok: %v", err)
}
}

func TestValidateOutputIntegrityMissingParquet(t *testing.T) {
root := t.TempDir()
info := meta.NewDatasetInfo(meta.CodebaseV21, 30, map[string]meta.FeatureSpec{
"observation.state": {DType: "float32", Shape: []int{2}},
}, false, "test")
info.TotalEpisodes = 1
if err := ValidateOutputIntegrity(root, info, info.Features); err == nil {
t.Fatal("expected error for missing parquet")
}
}

func TestValidateOutputIntegrityVideoRequired(t *testing.T) {
root := t.TempDir()
features := map[string]meta.FeatureSpec{
"observation.state": {DType: "float32", Shape: []int{2}},

Check failure on line 43 in internal/v21/integrity_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (gofmt)
"observation.images.cam": {DType: "video", Shape: []int{4, 4, 3}},
}
info := meta.NewDatasetInfo(meta.CodebaseV21, 30, features, true, "test")
info.TotalEpisodes = 1
pq := filepath.Join(root, meta.V21DataPathFromInfo(info, 0))
if err := os.MkdirAll(filepath.Dir(pq), 0o755); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(pq, []byte("par1"), 0o644); err != nil {
t.Fatal(err)
}
if err := ValidateOutputIntegrity(root, info, features); err == nil {
t.Fatal("expected error for missing video")
}
vid := filepath.Join(root, meta.V21VideoPathFromInfo(info, "observation.images.cam", 0))
if err := os.MkdirAll(filepath.Dir(vid), 0o755); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(vid, []byte("mp4"), 0o644); err != nil {
t.Fatal(err)
}
if err := ValidateOutputIntegrity(root, info, features); err != nil {
t.Fatalf("expected ok with video: %v", err)
}
}
8 changes: 7 additions & 1 deletion internal/v21/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,13 @@ func Merge(ctx context.Context, cfg MergeConfig) error {
if err := meta.UpdateVideoFeaturesInfo(ctx, &info, cfg.OutputRoot, cfg.Locator); err != nil {
return err
}
return meta.WriteInfo(cfg.OutputRoot, info)
if err := meta.WriteInfo(cfg.OutputRoot, info); err != nil {
return err
}
if err := ValidateOutputIntegrity(cfg.OutputRoot, info, cfg.Features); err != nil {
return fmt.Errorf("merged output integrity: %w", err)
}
return nil
}

func copyFile(src, dst string) error {
Expand Down
9 changes: 8 additions & 1 deletion internal/v21/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,14 @@ func (w *StagingWriter) finalizeVideos(ctx context.Context) (map[string]string,
return videos, durations, nil
}
if w.cfg.H264Remux {
return w.finalizeH264RemuxVideos(ctx)
remuxVideos, remuxDurations, err := w.finalizeH264RemuxVideos(ctx)
if err != nil {
return nil, nil, err
}
if len(remuxVideos) > 0 {
return remuxVideos, remuxDurations, nil
}
// Decode-fallback path may have streamed RGB into per-episode encoders without SetH264Remux.
}
type encClose struct {
key string
Expand Down
65 changes: 65 additions & 0 deletions internal/v21/staging_h264_fallback_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package v21

import (
"context"
"os"
"os/exec"
"path/filepath"
"testing"

"github.com/ioai-tech/lerobot-go/internal/manifest"
"github.com/ioai-tech/lerobot-go/internal/meta"
"github.com/ioai-tech/lerobot-go/internal/video"
)

func TestStagingH264DecodeFallbackPopulatesVideos(t *testing.T) {
if _, err := exec.LookPath("ffmpeg"); err != nil {
t.Skip("ffmpeg not installed")
}
dir := t.TempDir()
key := "observation.images.cam"
w, err := NewStagingWriter(StagingConfig{
Dir: dir,
Episode: 0,
FPS: 10,
UseVideos: true,
H264Remux: true,
Features: map[string]meta.FeatureSpec{
key: {DType: "video", Shape: []int{4, 4, 3}},
},
})
if err != nil {
t.Fatalf("NewStagingWriter: %v", err)
}
frame := video.VideoFrameRGB24{
Width: 4, Height: 4,
Data: make([]byte, 4*4*3),
}
for i := 0; i < 3; i++ {
if err := w.AppendRGBVideoFrame(context.Background(), key, frame); err != nil {
t.Fatalf("AppendRGBVideoFrame: %v", err)
}
if err := w.AddFrame(context.Background(), map[string]any{"task": "t"}); err != nil {
t.Fatalf("AddFrame: %v", err)
}
}
ep, err := w.SaveEpisode(context.Background())
if err != nil {
t.Fatalf("SaveEpisode: %v", err)
}
if len(ep.Videos) == 0 || ep.Videos[key] == "" {
t.Fatalf("expected videos map entry, got %#v", ep.Videos)
}
out := manifest.StagingMediaPath(dir, ep.Videos[key])
fi, err := os.Stat(out)
if err != nil {
t.Fatalf("missing staged mp4: %v", err)
}
if fi.Size() == 0 {
t.Fatal("empty staged mp4")
}
rel := filepath.Base(out)
if rel != "observation.images.cam.mp4" {
t.Fatalf("unexpected video rel basename: %s", rel)
}
}
12 changes: 11 additions & 1 deletion lerobot/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"

"github.com/ioai-tech/lerobot-go/internal/manifest"
"github.com/ioai-tech/lerobot-go/internal/meta"
v21 "github.com/ioai-tech/lerobot-go/internal/v21"
v30 "github.com/ioai-tech/lerobot-go/internal/v30"
"github.com/ioai-tech/lerobot-go/internal/video"
Expand Down Expand Up @@ -201,7 +202,16 @@ func (d *serialDataset) Root() string { return d.root }

// ValidateOutputIntegrity checks merged dataset has data parquet and video files.
func ValidateOutputIntegrity(root string, features map[string]FeatureSpec) error {
return v30.ValidateOutputIntegrity(root, features)
info, err := meta.LoadInfo(root)
if err != nil {
return v30.ValidateOutputIntegrity(root, features)
}
switch info.CodebaseVersion {
case meta.CodebaseV21:
return v21.ValidateOutputIntegrity(root, info, features)
default:
return v30.ValidateOutputIntegrity(root, features)
}
}

// Merge finalizes completed staging episodes into the official on-disk layout.
Expand Down
Loading