From 4437177e91d82c7fbf80699cb9b867533b351c08 Mon Sep 17 00:00:00 2001 From: Andriy Biletsky Date: Thu, 1 May 2025 22:25:02 +0700 Subject: [PATCH 01/11] Change fragment duration to 10s, add per-tier CRF --- .golangci.yml | 68 +++++++++++++++++---------- client/client.go | 3 +- encoder/pool.go | 2 +- go.mod | 2 +- internal/testservices/testservices.go | 2 +- ladder/arguments.go | 10 ++-- ladder/defaults.go | 38 ++------------- ladder/defaults.yml | 35 ++++++++++++++ ladder/ladder.go | 5 ++ ladder/ladder_test.go | 9 +++- library/validator.go | 2 +- library/walker.go | 4 +- pkg/retriever/retriever.go | 2 +- 13 files changed, 108 insertions(+), 74 deletions(-) create mode 100644 ladder/defaults.yml diff --git a/.golangci.yml b/.golangci.yml index eb6f773..2c0b19b 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,36 +1,52 @@ -linters-settings: - errcheck: - check-type-assertions: true - gci: - sections: - - standard - - prefix(github.com/OdyseeTeam) - - prefix(github.com/lbryio) - - default - custom-order: true - no-lex-order: true - goconst: - min-len: 2 - min-occurrences: 3 - +version: "2" linters: - disable-all: true + default: none enable: - dupl - - gci - - gocritic - goconst + - gocritic - gocyclo - - gofmt - - goimports - - gosimple - - govet - gosec + - govet - ineffassign - misspell - - staticcheck - sqlclosecheck + - staticcheck - unused - -run: - timeout: 10m + settings: + errcheck: + check-type-assertions: true + goconst: + min-len: 2 + min-occurrences: 3 + exclusions: + generated: lax + presets: + - comments + - common-false-positives + - legacy + - std-error-handling + paths: + - third_party$ + - builtin$ + - examples$ +formatters: + enable: + - gci + - gofmt + - goimports + settings: + gci: + sections: + - standard + - prefix(github.com/OdyseeTeam) + - prefix(github.com/lbryio) + - default + custom-order: true + no-lex-order: true + exclusions: + generated: lax + paths: + - third_party$ + - builtin$ + - examples$ diff --git a/client/client.go b/client/client.go index dd516fb..93f2506 100644 --- a/client/client.go +++ b/client/client.go @@ -4,7 +4,6 @@ import ( "bufio" "fmt" "io" - "math" "net" "net/http" "net/url" @@ -96,7 +95,7 @@ type streamLocation struct { func Configure() *Configuration { return &Configuration{ remoteServer: defaultRemoteServer, - cacheSize: int64(math.Pow(1024, 3)), + cacheSize: 1024 * 1024 * 1024, itemsToPrune: 100, httpClient: &http.Client{ CheckRedirect: func(req *http.Request, via []*http.Request) error { diff --git a/encoder/pool.go b/encoder/pool.go index b6f1bad..7b1a28d 100644 --- a/encoder/pool.go +++ b/encoder/pool.go @@ -29,5 +29,5 @@ func NewPool(encoder Encoder, parallel int) pool { // It works slightly different from encoder.Encode but the result should eventually be the same. // For how to obtain encoding progress, see poolSuite.TestEncode. func (p pool) Encode(in, out string) *dispatcher.Result { - return p.Dispatcher.Dispatch(encodeTask{in, out}) + return p.Dispatch(encodeTask{in, out}) } diff --git a/go.mod b/go.mod index 55224b0..b18e855 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/OdyseeTeam/transcoder -go 1.23 +go 1.24 require ( github.com/Pallinder/go-randomdata v1.2.0 diff --git a/internal/testservices/testservices.go b/internal/testservices/testservices.go index 85432f2..f84ad39 100644 --- a/internal/testservices/testservices.go +++ b/internal/testservices/testservices.go @@ -99,7 +99,7 @@ func Minio() (*S3Options, Teardown, error) { } return nil }); err != nil { - return nil, nil, fmt.Errorf("Could not connect to docker: %w", err) + return nil, nil, fmt.Errorf("could not connect to docker: %w", err) } opts := &S3Options{ diff --git a/ladder/arguments.go b/ladder/arguments.go index 55957e0..d8bf40f 100644 --- a/ladder/arguments.go +++ b/ladder/arguments.go @@ -27,11 +27,10 @@ type ArgumentSet struct { } var hlsDefaultArguments = map[string]string{ - "preset": preset, - "sc_threshold": "0", - "c:v": "libx264", - "pix_fmt": "yuv420p", - // "crf": constantRateFactor, + "preset": preset, + "sc_threshold": "0", + "c:v": "libx264", + "pix_fmt": "yuv420p", "c:a": "aac", "ac": "2", "ar": "44100", @@ -63,6 +62,7 @@ func (a *ArgumentSet) GetStrArguments() []string { ladArgs = append(ladArgs, "-map", "v:0", "-filter:v:"+s, "scale=-2:"+strconv.Itoa(tier.Height), + "-crf:v:"+s, strconv.Itoa(tier.CRF), "-b:v:"+s, vRate, "-maxrate:v:"+s, vRate, "-bufsize:v:"+s, vRate, diff --git a/ladder/defaults.go b/ladder/defaults.go index 9b9f99c..7ae0214 100644 --- a/ladder/defaults.go +++ b/ladder/defaults.go @@ -1,38 +1,10 @@ package ladder -var defaultLadderYaml = []byte(` -args: - sws_flags: bilinear - profile:v: main - crf: 23 - refs: 1 - preset: veryfast - force_key_frames: "expr:gte(t,n_forced*2)" - hls_time: 6 +import _ "embed" -tiers: - - definition: 1080p - bitrate: 3500_000 - # bitrate_cutoff: 6000_000 - audio_bitrate: 160k - width: 1920 - height: 1080 - - definition: 720p - bitrate: 2500_000 - audio_bitrate: 128k - width: 1280 - height: 720 - - definition: 360p - bitrate: 500_000 - audio_bitrate: 96k - width: 640 - height: 360 - - definition: 144p - width: 256 - height: 144 - bitrate: 100_000 - audio_bitrate: 96k - framerate: 15 -`) +//go:embed defaults.yml +var defaultLadderYaml []byte + +const DefaultCRF = 24 var Default, _ = Load(defaultLadderYaml) diff --git a/ladder/defaults.yml b/ladder/defaults.yml new file mode 100644 index 0000000..3e35ef6 --- /dev/null +++ b/ladder/defaults.yml @@ -0,0 +1,35 @@ +args: + sws_flags: bilinear + profile:v: main + refs: 1 + preset: veryfast + force_key_frames: "expr:gte(t,n_forced*2)" + hls_time: 10 + +tiers: + - definition: 1080p + bitrate: 3500_000 + # bitrate_cutoff: 6000_000 + audio_bitrate: 160k + width: 1920 + height: 1080 + crf: 23 + - definition: 720p + bitrate: 2500_000 + audio_bitrate: 128k + width: 1280 + height: 720 + crf: 24 + - definition: 360p + bitrate: 500_000 + audio_bitrate: 96k + width: 640 + height: 360 + crf: 25 + - definition: 144p + width: 256 + height: 144 + bitrate: 100_000 + audio_bitrate: 96k + framerate: 15 + crf: 26 diff --git a/ladder/ladder.go b/ladder/ladder.go index 488bd62..4ee6e5a 100644 --- a/ladder/ladder.go +++ b/ladder/ladder.go @@ -26,6 +26,7 @@ type Tier struct { Framerate decimal.Decimal `yaml:",omitempty"` KeepFramerate bool `yaml:"keep_framerate"` BitrateCutoff int `yaml:"bitrate_cutoff"` + CRF int } func Load(yamlLadder []byte) (Ladder, error) { @@ -66,6 +67,9 @@ func (x Ladder) Tweak(md *Metadata) (Ladder, error) { if t.Height == h { origResSeen = true } + if t.CRF == 0 { + t.CRF = DefaultCRF + } newLadder.Tiers = append(newLadder.Tiers, t) } @@ -75,6 +79,7 @@ func (x Ladder) Tweak(md *Metadata) (Ladder, error) { Width: w, VideoBitrate: nsRate(w, h), AudioBitrate: "128k", + CRF: DefaultCRF, }}, newLadder.Tiers...) } diff --git a/ladder/ladder_test.go b/ladder/ladder_test.go index 6aa2c6c..6ed1614 100644 --- a/ladder/ladder_test.go +++ b/ladder/ladder_test.go @@ -24,7 +24,6 @@ var baseExpectedArgs = []string{ "-pix_fmt yuv420p", "-c:a aac", "-strftime_mkdir 1", - "-crf 23", "-ac 2", "-master_pl_name master.m3u8", "-force_key_frames expr:gte(t,n_forced*2)", @@ -70,6 +69,7 @@ func TestTweak(t *testing.T) { // map v:0 must be present before every tier declaration, argument order (within the substring) is important "-map v:0 -filter:v:0 scale=-2:360", "-b:v:0 500000", + "-crf:v:0 25", "-maxrate:v:0 500000", "-bufsize:v:0 500000", "-r:v:0 30/1", @@ -77,6 +77,7 @@ func TestTweak(t *testing.T) { // Here again we are mapping video stream #0 to our transcoded tier "-map v:0 -filter:v:1 scale=-2:144", "-b:v:1 100000", + "-crf:v:1 26", "-maxrate:v:1 100000", "-bufsize:v:1 100000", "-r:v:1 15", @@ -97,11 +98,13 @@ func TestTweak(t *testing.T) { "-var_stream_map v:0,a:0 v:1,a:1 v:2,a:2", "-map v:0 -filter:v:0 scale=-2:480", "-b:v:0 1297298", + "-crf:v:0 24", "-maxrate:v:0 1297298", "-bufsize:v:0 1297298", "-r:v:0 30/1", "-g:v:0 60", "-map v:0 -filter:v:1 scale=-2:360", + "-crf:v:1 25", "-b:v:1 500000", "-maxrate:v:1 500000", "-bufsize:v:1 500000", @@ -109,6 +112,7 @@ func TestTweak(t *testing.T) { "-g:v:1 60", "-map v:0 -filter:v:2 scale=-2:144", "-b:v:2 100000", + "-crf:v:2 26", "-maxrate:v:2 100000", "-bufsize:v:2 100000", "-r:v:2 15", @@ -129,18 +133,21 @@ func TestTweak(t *testing.T) { "-var_stream_map v:0,a:0 v:1,a:1 v:2,a:2", "-map v:0 -filter:v:0 scale=-2:480", "-b:v:0 1297298", + "-crf:v:0 24", "-maxrate:v:0 1297298", "-bufsize:v:0 1297298", "-r:v:0 189941760/7981033", "-g:v:0 48", "-map v:0 -filter:v:1 scale=-2:360", "-b:v:1 500000", + "-crf:v:1 25", "-maxrate:v:1 500000", "-bufsize:v:1 500000", "-r:v:1 189941760/7981033", "-g:v:1 48", "-map v:0 -filter:v:2 scale=-2:144", "-b:v:2 100000", + "-crf:v:2 26", "-maxrate:v:2 100000", "-bufsize:v:2 100000", "-r:v:2 15", diff --git a/library/validator.go b/library/validator.go index 58b3e1e..1421a02 100644 --- a/library/validator.go +++ b/library/validator.go @@ -31,7 +31,7 @@ func ValidateStream(baseURL string, failFast bool, skipSegments bool) (*Validati url := strings.Join(p, "/") if path.Ext(p[len(p)-1]) == ".ts" { if skipSegments { - return nil, SkipSegment + return nil, ErrSkipSegment } r, err = http.Head(url) // #nosec G107 } else { diff --git a/library/walker.go b/library/walker.go index bd873c8..b479638 100644 --- a/library/walker.go +++ b/library/walker.go @@ -12,7 +12,7 @@ import ( type StreamGetter func(path ...string) (io.ReadCloser, error) type StreamProcessor func(fgName string, r io.ReadCloser) error -var SkipSegment = errors.New("skip fragment") +var ErrSkipSegment = errors.New("skip fragment") // WalkStream parses an HLS playlist, calling `getFn` to load and `processFn` // for the master playlist located in `baseURI`, subplaylists and all segments contained within. @@ -65,7 +65,7 @@ func WalkStream(baseURI string, getFn StreamGetter, processFn StreamProcessor) e continue } r, err := getFn(baseURI, seg.URI) - if errors.Is(err, SkipSegment) { + if errors.Is(err, ErrSkipSegment) { continue } if err != nil { diff --git a/pkg/retriever/retriever.go b/pkg/retriever/retriever.go index 9b45f71..53e5c1a 100644 --- a/pkg/retriever/retriever.go +++ b/pkg/retriever/retriever.go @@ -46,7 +46,7 @@ func NewPool(parallel int) pool { // It will block if all workers are busy. // Duplicate urls are not checked for. func (p pool) Retrieve(url, out string) *dispatcher.Result { - return p.Dispatcher.Dispatch(downloadTask{url, out}) + return p.Dispatch(downloadTask{url, out}) } func (w worker) Work(t dispatcher.Task) error { From 1a33b2d433cbf4ab47827c59b2470ee2fb3de809 Mon Sep 17 00:00:00 2001 From: Andriy Biletsky Date: Thu, 1 May 2025 22:38:14 +0700 Subject: [PATCH 02/11] Up transcoding client timeout --- client/client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/client_test.go b/client/client_test.go index fef72fc..724c664 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -75,7 +75,7 @@ func (s *clientSuite) TestPlayFragment() { ) // Request stream and wait until it's available. - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) wait := time.NewTicker(1000 * time.Millisecond) Waiting: for { From b5b3164dc76b30633890b85f5bf7d3cab31ba8fe Mon Sep 17 00:00:00 2001 From: Andriy Biletsky Date: Thu, 1 May 2025 22:38:34 +0700 Subject: [PATCH 03/11] Update golangci-lint version --- .github/workflows/test.yaml | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 9845234..4b86f62 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -18,7 +18,7 @@ jobs: fetch-depth: 0 - uses: actions/setup-go@v5 with: - go-version: "1.23" + go-version: "1.24" - uses: FedericoCarboni/setup-ffmpeg@v3 id: setup-ffmpeg with: @@ -34,10 +34,7 @@ jobs: - uses: actions/checkout@v4 - uses: actions/setup-go@v5 with: - go-version: "1.23" + go-version: "1.24" cache: false - name: golangci-lint - uses: golangci/golangci-lint-action@v6 - with: - version: v1.63 - args: -v + uses: golangci/golangci-lint-action@v7 From d0a859ca2ccd996bae41497e09f2c55b2ec5a043 Mon Sep 17 00:00:00 2001 From: Andriy Biletsky Date: Thu, 1 May 2025 22:45:37 +0700 Subject: [PATCH 04/11] Rename lint workflow --- .github/workflows/test.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 4b86f62..0b7b4bc 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -27,7 +27,7 @@ jobs: make test_prepare - name: Run tests run: go test -covermode=count -coverprofile=coverage.out ./... - golangci: + lint: name: lint runs-on: ubuntu-latest steps: From f4b266e9621dfca19f8f6cfc0d48fbbd53e3449c Mon Sep 17 00:00:00 2001 From: Andriy Biletsky Date: Thu, 1 May 2025 22:57:57 +0700 Subject: [PATCH 05/11] Update test stream playlists --- client/client_test.go | 4 ++++ client/testdata/known-stream/master.m3u8 | 9 +++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index 724c664..1a8f780 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -21,6 +21,10 @@ import ( "github.com/stretchr/testify/suite" ) +// To regenerate: +// +// go run ./tccli transcode "lbry://@specialoperationstest#3/fear-of-death-inspirational#a" +// mv fear-of-death-inspirational/*.m3u8 client/testdata/known-stream var streamURL = "@specialoperationstest#3/fear-of-death-inspirational#a" var streamSDHash = "f12fb044f5805334a473bf9a81363d89bd1cb54c4065ac05be71a599a6c51efc6c6afb257208326af304324094105774" diff --git a/client/testdata/known-stream/master.m3u8 b/client/testdata/known-stream/master.m3u8 index f108ce4..1f2ff97 100644 --- a/client/testdata/known-stream/master.m3u8 +++ b/client/testdata/known-stream/master.m3u8 @@ -1,13 +1,14 @@ #EXTM3U #EXT-X-VERSION:6 -#EXT-X-STREAM-INF:BANDWIDTH=4026000,RESOLUTION=1920x1080,CODECS="avc1.4d4028,mp4a.40.2",CLOSED-CAPTIONS=NONE +#EXT-X-STREAM-INF:BANDWIDTH=2579692,AVERAGE-BANDWIDTH=2579692,RESOLUTION=1920x1080,CODECS="avc1.4d4028,mp4a.40.2" v0.m3u8 -#EXT-X-STREAM-INF:BANDWIDTH=2890800,RESOLUTION=1280x720,CODECS="avc1.4d401f,mp4a.40.2",CLOSED-CAPTIONS=NONE +#EXT-X-STREAM-INF:BANDWIDTH=863458,AVERAGE-BANDWIDTH=863458,RESOLUTION=1280x720,CODECS="avc1.4d401f,mp4a.40.2" v1.m3u8 -#EXT-X-STREAM-INF:BANDWIDTH=655600,RESOLUTION=640x360,CODECS="avc1.4d401e,mp4a.40.2",CLOSED-CAPTIONS=NONE +#EXT-X-STREAM-INF:BANDWIDTH=307708,AVERAGE-BANDWIDTH=307708,RESOLUTION=640x360,CODECS="avc1.4d401e,mp4a.40.2" v2.m3u8 -#EXT-X-STREAM-INF:BANDWIDTH=215600,RESOLUTION=256x144,CODECS="avc1.4d400b,mp4a.40.2",CLOSED-CAPTIONS=NONE +#EXT-X-STREAM-INF:BANDWIDTH=157920,AVERAGE-BANDWIDTH=157920,RESOLUTION=256x144,CODECS="avc1.4d400b,mp4a.40.2" v3.m3u8 + From 75703564835c74381caccd59189e9f6024751a5b Mon Sep 17 00:00:00 2001 From: Andriy Biletsky Date: Thu, 1 May 2025 23:10:36 +0700 Subject: [PATCH 06/11] Update GHA ffmpeg version --- .github/workflows/test.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 0b7b4bc..bf563c7 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -22,7 +22,7 @@ jobs: - uses: FedericoCarboni/setup-ffmpeg@v3 id: setup-ffmpeg with: - ffmpeg-version: "6.1.0" + ffmpeg-version: release - run: | make test_prepare - name: Run tests From f799f61d7e13e67a3a5b324c0f21a7e083a24ad0 Mon Sep 17 00:00:00 2001 From: Andriy Biletsky Date: Fri, 2 May 2025 00:41:01 +0700 Subject: [PATCH 07/11] Optimize transcoded output tests across platforms --- Makefile | 8 +++++++- client/client_test.go | 22 ++++++++++++---------- client/testdata/known-stream/master.m3u8 | 8 ++++---- docker-compose.yml | 13 ++++++------- docker/Dockerfile-conductor | 2 +- docker/Dockerfile-ffmpeg | 2 +- docker/Dockerfile-ffprobe | 2 +- docker/Dockerfile-tccli | 8 ++++---- go.mod | 2 ++ readme.md | 13 ++++++++++++- tccli/main.go | 2 +- 11 files changed, 51 insertions(+), 31 deletions(-) diff --git a/Makefile b/Makefile index a2b8154..f4c4bf6 100644 --- a/Makefile +++ b/Makefile @@ -23,8 +23,14 @@ cworker_image: $(BUILD_DIR)/$(GOOS)_$(GOARCH)/transcoder ffmpeg_image: docker buildx build -f docker/Dockerfile-ffmpeg -t odyseeteam/transcoder-ffmpeg:git --platform linux/amd64 . -test_down: +tccli_image: + docker buildx build -f docker/Dockerfile-tccli -t odyseeteam/transcoder-tccli:latest --platform linux/amd64 . + +test_clean: docker-compose down + docker volume rm -f transcoder_minio-data + docker volume rm -f transcoder_redis-data + docker volume rm -f transcoder_db-data test_prepare: make transcoder diff --git a/client/client_test.go b/client/client_test.go index 1a8f780..361b90f 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -54,10 +54,10 @@ var streamFragmentCases = []struct { {"v1.m3u8", 0}, {"v2.m3u8", 0}, {"v3.m3u8", 0}, - {"v0_s000000.ts", 2_000_000}, - {"v1_s000000.ts", 760_000}, - {"v2_s000000.ts", 300_000}, - {"v3_s000000.ts", 120_000}, + {"v0_s000000.ts", 1860636}, + {"v1_s000000.ts", 623032}, + {"v2_s000000.ts", 221840}, + {"v3_s000000.ts", 115620}, } func TestClientSuite(t *testing.T) { @@ -106,18 +106,20 @@ Waiting: sz, err := c.PlayFragment(streamURL, streamSDHash, tc.name, rr, httptest.NewRequest(http.MethodGet, "/", nil)) s.Require().NoError(err) s.Require().Equal(http.StatusOK, rr.Result().StatusCode) - rbody, err := io.ReadAll(rr.Result().Body) + receivedBodyRaw, err := io.ReadAll(rr.Result().Body) s.Require().NoError(err) if tc.size > 0 { // Different transcoding runs produce slightly different files. - s.InDelta(tc.size, len(rbody), float64(tc.size)*0.2) - s.EqualValues(sz, len(rbody)) + s.InDelta(tc.size, len(receivedBodyRaw), float64(tc.size)*0.2) + s.EqualValues(sz, len(receivedBodyRaw)) } else { - absPath, err := filepath.Abs(filepath.Join("./testdata", "known-stream", tc.name)) + expectedFile, err := filepath.Abs(filepath.Join("./testdata", "known-stream", tc.name)) s.Require().NoError(err) - tbody, err := os.ReadFile(absPath) + expectedBody, err := os.ReadFile(expectedFile) s.Require().NoError(err) - s.Equal(strings.TrimRight(string(tbody), "\n"), strings.TrimRight(string(rbody), "\n")) + receivedBody := strings.TrimRight(string(receivedBodyRaw), "\n") + receivedBody = strings.ReplaceAll(receivedBody, ",CLOSED-CAPTIONS=NONE", "") + s.Equal(strings.TrimRight(string(expectedBody), "\n"), receivedBody) } if tc.name == MasterPlaylistName { s.Equal(cacheHeaderHit, rr.Result().Header.Get(cacheHeaderName)) diff --git a/client/testdata/known-stream/master.m3u8 b/client/testdata/known-stream/master.m3u8 index 1f2ff97..6ff32d5 100644 --- a/client/testdata/known-stream/master.m3u8 +++ b/client/testdata/known-stream/master.m3u8 @@ -1,14 +1,14 @@ #EXTM3U #EXT-X-VERSION:6 -#EXT-X-STREAM-INF:BANDWIDTH=2579692,AVERAGE-BANDWIDTH=2579692,RESOLUTION=1920x1080,CODECS="avc1.4d4028,mp4a.40.2" +#EXT-X-STREAM-INF:BANDWIDTH=4026000,RESOLUTION=1920x1080,CODECS="avc1.4d4028,mp4a.40.2" v0.m3u8 -#EXT-X-STREAM-INF:BANDWIDTH=863458,AVERAGE-BANDWIDTH=863458,RESOLUTION=1280x720,CODECS="avc1.4d401f,mp4a.40.2" +#EXT-X-STREAM-INF:BANDWIDTH=2890800,RESOLUTION=1280x720,CODECS="avc1.4d401f,mp4a.40.2" v1.m3u8 -#EXT-X-STREAM-INF:BANDWIDTH=307708,AVERAGE-BANDWIDTH=307708,RESOLUTION=640x360,CODECS="avc1.4d401e,mp4a.40.2" +#EXT-X-STREAM-INF:BANDWIDTH=655600,RESOLUTION=640x360,CODECS="avc1.4d401e,mp4a.40.2" v2.m3u8 -#EXT-X-STREAM-INF:BANDWIDTH=157920,AVERAGE-BANDWIDTH=157920,RESOLUTION=256x144,CODECS="avc1.4d400b,mp4a.40.2" +#EXT-X-STREAM-INF:BANDWIDTH=215600,RESOLUTION=256x144,CODECS="avc1.4d400b,mp4a.40.2" v3.m3u8 diff --git a/docker-compose.yml b/docker-compose.yml index fe3fdad..6d04e2f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -17,12 +17,7 @@ services: depends_on: - redis - db - deploy: - restart_policy: - condition: on-failure - delay: 3s - max_attempts: 3 - window: 120s + restart: unless-stopped cworker: image: odyseeteam/transcoder-cworker:latest platform: linux/amd64 @@ -34,9 +29,12 @@ services: # build: # context: . # dockerfile: docker/Dockerfile-worker - depends_on: ["redis"] + depends_on: + - redis + - minio volumes: - ${PWD}/worker.ex.yml:/app/worker.yml + restart: unless-stopped # asynqmon: # image: hibiken/asynqmon:latest # container_name: asynqmon @@ -93,6 +91,7 @@ services: - minio-data:/data entrypoint: > /bin/sh -c " + sleep 3; /usr/bin/mc config host add myminio http://minio:9000 ody odyseetes3; /usr/bin/mc mb myminio/transcoded; /usr/bin/mc anonymous set download myminio/transcoded; diff --git a/docker/Dockerfile-conductor b/docker/Dockerfile-conductor index 2879ae8..0bea6e2 100644 --- a/docker/Dockerfile-conductor +++ b/docker/Dockerfile-conductor @@ -1,4 +1,4 @@ -FROM alpine:3 +FROM alpine:3.21 EXPOSE 8080 RUN apk add --no-cache libc6-compat diff --git a/docker/Dockerfile-ffmpeg b/docker/Dockerfile-ffmpeg index e61c7c2..9edda7c 100644 --- a/docker/Dockerfile-ffmpeg +++ b/docker/Dockerfile-ffmpeg @@ -1,6 +1,6 @@ # syntax=docker/dockerfile:1 -FROM alpine:3 +FROM alpine:3.21 WORKDIR /build diff --git a/docker/Dockerfile-ffprobe b/docker/Dockerfile-ffprobe index 6b89b9a..75268d0 100644 --- a/docker/Dockerfile-ffprobe +++ b/docker/Dockerfile-ffprobe @@ -1,7 +1,7 @@ # syntax=docker/dockerfile:1 FROM odyseeteam/transcoder-ffmpeg:git AS ffmpeg -FROM alpine:3.19 +FROM alpine:3.21 COPY --from=ffmpeg /build/ffprobe /usr/local/bin/ diff --git a/docker/Dockerfile-tccli b/docker/Dockerfile-tccli index e47b904..3594cc3 100644 --- a/docker/Dockerfile-tccli +++ b/docker/Dockerfile-tccli @@ -2,7 +2,7 @@ # FROM odyseeteam/transcoder-gensprite:latest AS spritegen -FROM alpine:3.15 AS gather +FROM alpine:3.21 AS gather WORKDIR /build @@ -11,7 +11,7 @@ RUN tar -xf ffmpeg-git-arm64-static.tar.xz && mv ffmpeg-*-static/ffmpeg ffmpeg-* RUN chmod a+x ffmpeg ffprobe -FROM alpine:3.15 AS build +FROM alpine:3.21 AS build EXPOSE 8080 @@ -21,7 +21,7 @@ COPY --from=gather /build/ffmpeg /build/ffprobe /usr/local/bin/ WORKDIR /app -COPY ./dist/linux_arm64/tccli . +COPY ./dist/linux_amd64/tccli . COPY ./conductor.ex.yml ./conductor.yml -# ENTRYPOINT ["./tccli"] +ENTRYPOINT ["/app/tccli"] diff --git a/go.mod b/go.mod index b18e855..ac11407 100644 --- a/go.mod +++ b/go.mod @@ -150,3 +150,5 @@ replace github.com/draganm/miniotest v0.1.0 => github.com/anbsky/miniotest v0.1. replace github.com/lbryio/lbry.go/v3 => github.com/anbsky/lbry.go/v3 v3.0.6 replace github.com/btcsuite/btcd => github.com/lbryio/lbrycrd.go v0.0.0-20200203050410-e1076f12bf19 + +// replace github.com/nikooo777/lbry-blobs-downloader => ../lbry-blobs-downloader diff --git a/readme.md b/readme.md index 34fd31d..83b5e9d 100644 --- a/readme.md +++ b/readme.md @@ -5,7 +5,7 @@ ## Development -Requires go 1.22. +Requires go 1.24. ## Building @@ -42,6 +42,17 @@ This project is using [SemVer](https://semver.org) YY.MM.MINOR[.MICRO] for `clie git tag transcoder-v24.2.0 ``` +## Tools + +To download a regular stream and produce a transcoded copy locally: + +``` +docker run -v $(pwd):$(pwd) -w $(pwd) odyseeteam/transcoder-tccli transcode "lbry://@specialoperationstest#3/fear-of-dea +th-inspirational#a" +``` + +Check `./tccli/main.go` for more commands. + ## Contributing Please ensure that your code builds, passes `golanci-lint` and automated tests run successfully before pushing your branch. diff --git a/tccli/main.go b/tccli/main.go index 4202c8e..4dd123d 100644 --- a/tccli/main.go +++ b/tccli/main.go @@ -79,7 +79,7 @@ func main() { inPath = strings.TrimPrefix(CLI.Transcode.URL, "file://") outPath = inPath + "_out" } else { - tmpDir, err := os.MkdirTemp(".", "") + tmpDir, err := os.MkdirTemp("./", "") if err != nil { panic(err) } From ab5109484a7e4bee9e04092429e89727ab95b228 Mon Sep 17 00:00:00 2001 From: Andriy Biletsky Date: Fri, 2 May 2025 00:50:25 +0700 Subject: [PATCH 08/11] Limit threads used per video --- ladder/arguments.go | 1 + 1 file changed, 1 insertion(+) diff --git a/ladder/arguments.go b/ladder/arguments.go index d8bf40f..ae85f00 100644 --- a/ladder/arguments.go +++ b/ladder/arguments.go @@ -27,6 +27,7 @@ type ArgumentSet struct { } var hlsDefaultArguments = map[string]string{ + "threads": "2", "preset": preset, "sc_threshold": "0", "c:v": "libx264", From 6e357431bfc28069665ba42a07b82b37b80280bc Mon Sep 17 00:00:00 2001 From: Andriy Biletsky Date: Fri, 2 May 2025 01:23:56 +0700 Subject: [PATCH 09/11] Switch to patched blobs downloader to fix tmp dir issues --- client/client_test.go | 6 +++--- go.mod | 2 +- go.sum | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index 361b90f..9a4fb74 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -54,8 +54,8 @@ var streamFragmentCases = []struct { {"v1.m3u8", 0}, {"v2.m3u8", 0}, {"v3.m3u8", 0}, - {"v0_s000000.ts", 1860636}, - {"v1_s000000.ts", 623032}, + {"v0_s000000.ts", 2244532}, + {"v1_s000000.ts", 621528}, {"v2_s000000.ts", 221840}, {"v3_s000000.ts", 115620}, } @@ -110,7 +110,7 @@ Waiting: s.Require().NoError(err) if tc.size > 0 { // Different transcoding runs produce slightly different files. - s.InDelta(tc.size, len(receivedBodyRaw), float64(tc.size)*0.2) + s.InDelta(tc.size, len(receivedBodyRaw), float64(tc.size)*0.25) s.EqualValues(sz, len(receivedBodyRaw)) } else { expectedFile, err := filepath.Abs(filepath.Join("./testdata", "known-stream", tc.name)) diff --git a/go.mod b/go.mod index ac11407..17af342 100644 --- a/go.mod +++ b/go.mod @@ -151,4 +151,4 @@ replace github.com/lbryio/lbry.go/v3 => github.com/anbsky/lbry.go/v3 v3.0.6 replace github.com/btcsuite/btcd => github.com/lbryio/lbrycrd.go v0.0.0-20200203050410-e1076f12bf19 -// replace github.com/nikooo777/lbry-blobs-downloader => ../lbry-blobs-downloader +replace github.com/nikooo777/lbry-blobs-downloader => github.com/anbsky/lbry-blobs-downloader v1.4.2 diff --git a/go.sum b/go.sum index 1b60073..07e0168 100644 --- a/go.sum +++ b/go.sum @@ -68,6 +68,8 @@ github.com/alecthomas/kong v0.2.17 h1:URDISCI96MIgcIlQyoCAlhOmrSw6pZScBNkctg8r0W github.com/alecthomas/kong v0.2.17/go.mod h1:ka3VZ8GZNPXv9Ov+j4YNLkI8mTuhXyr/0ktSlqIydQQ= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/anbsky/lbry-blobs-downloader v1.4.2 h1:GnEPoKg3ydwqLWQY+NSciSCip+L5In02nkZT40QVSvU= +github.com/anbsky/lbry-blobs-downloader v1.4.2/go.mod h1:8Jb5lhN93C3zJS9w5N0LEmCR8SLLbOSB2szlTks0lDE= github.com/anbsky/transcoder v1.2.1 h1:fU8nlKswdJ9+iVcagFJhVSFdvSfb1BL2GEB0vC3nxSg= github.com/anbsky/transcoder v1.2.1/go.mod h1:lT+f8NEGaHP6AVeYLicas0EI0+TforD+DRuLziJg6bY= github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= @@ -476,8 +478,6 @@ github.com/mrunalp/fileutils v0.5.0/go.mod h1:M1WthSahJixYnrXQl/DFQuteStB1weuxD2 github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nelsam/hel/v2 v2.3.2/go.mod h1:1ZTGfU2PFTOd5mx22i5O0Lc2GY933lQ2wb/ggy+rL3w= github.com/nelsam/hel/v2 v2.3.3/go.mod h1:1ZTGfU2PFTOd5mx22i5O0Lc2GY933lQ2wb/ggy+rL3w= -github.com/nikooo777/lbry-blobs-downloader v1.3.0 h1:yVvFvFx0qydlC0V1YNTSMjdfNnVVq/KFVWY9/YCafQk= -github.com/nikooo777/lbry-blobs-downloader v1.3.0/go.mod h1:8Jb5lhN93C3zJS9w5N0LEmCR8SLLbOSB2szlTks0lDE= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= From 7935cc4ba6b8d19dc589eb0a48ccd425f9fdb1bf Mon Sep 17 00:00:00 2001 From: Andriy Biletsky Date: Fri, 2 May 2025 14:29:34 +0700 Subject: [PATCH 10/11] Manifests: add released_at, fix version field --- go.mod | 3 +- go.sum | 10 +---- library/db/db.go | 2 + library/db/migrations/0001_initial.sql | 8 ++-- .../db/migrations/0004_add_released_at.sql | 8 ++++ library/db/models.go | 29 ++++++++++++- library/db/queries.sql | 4 +- library/db/queries.sql.go | 43 +++++++++++-------- library/library.go | 19 ++++---- library/stream.go | 9 +++- library/stream_test.go | 8 ++-- pkg/conductor/tasks/tasks.go | 5 ++- pkg/resolve/resolve.go | 14 ++++-- sqlc.yaml | 27 ++++++------ tccli/main.go | 2 +- 15 files changed, 126 insertions(+), 65 deletions(-) create mode 100644 library/db/migrations/0004_add_released_at.sql diff --git a/go.mod b/go.mod index 17af342..3762f60 100644 --- a/go.mod +++ b/go.mod @@ -26,11 +26,11 @@ require ( github.com/rubenv/sql-migrate v1.4.0 github.com/shopspring/decimal v1.3.1 github.com/spf13/viper v1.15.0 + github.com/sqlc-dev/pqtype v0.3.0 github.com/stretchr/testify v1.9.0 github.com/tabbed/pqtype v0.1.1 github.com/testcontainers/testcontainers-go v0.21.0 github.com/valyala/fasthttp v1.36.0 - go.etcd.io/etcd/api/v3 v3.5.6 go.uber.org/goleak v1.1.12 go.uber.org/zap v1.21.0 gopkg.in/yaml.v3 v3.0.1 @@ -56,7 +56,6 @@ require ( github.com/containerd/containerd v1.7.13 // indirect github.com/containerd/continuity v0.4.2 // indirect github.com/containerd/log v0.1.0 // indirect - github.com/coreos/go-semver v0.3.0 // indirect github.com/cpuguy83/dockercfg v0.3.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect diff --git a/go.sum b/go.sum index 07e0168..922f397 100644 --- a/go.sum +++ b/go.sum @@ -117,7 +117,6 @@ github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqy github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= -github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/checkpoint-restore/go-criu/v5 v5.3.0/go.mod h1:E/eQpaFtUKGOOSEBZgmKAcn+zUUwWxqcaKZlF54wK8E= @@ -129,7 +128,6 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= -github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/containerd/console v1.0.3/go.mod h1:7LqA/THxQ86k76b8c/EMSiaJ3h1eZkMkXar0TQ1gf3U= github.com/containerd/containerd v1.7.13 h1:wPYKIeGMN8vaggSKuV1X0wZulpMz4CrgEsZdaCyB6Is= github.com/containerd/containerd v1.7.13/go.mod h1:zT3up6yTRfEUa6+GsITYIJNgSVL9NQ4x4h1RPzk0Wu4= @@ -140,7 +138,6 @@ github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3 github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= -github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= @@ -179,7 +176,6 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= -github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fasthttp/router v1.4.9 h1:8s1HEqP+GvsC2B8vPdLAPHJegs4s28z7UsraPuHM1K8= github.com/fasthttp/router v1.4.9/go.mod h1:oWPrQCi9QOrzxKC+rZuliS1+JhYj2bpR01J6T8vUDUQ= @@ -602,6 +598,8 @@ github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/y github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns= github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU= github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA= +github.com/sqlc-dev/pqtype v0.3.0 h1:b09TewZ3cSnO5+M1Kqq05y0+OjqIptxELaSayg7bmqk= +github.com/sqlc-dev/pqtype v0.3.0/go.mod h1:oyUjp5981ctiL9UYvj1bVvCKi8OXkCa0u645hce7CAs= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -666,8 +664,6 @@ github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs= -go.etcd.io/etcd/api/v3 v3.5.6 h1:Cy2qx3npLcYqTKqGJzMypnMv2tiRyifZJ17BlWIWA7A= -go.etcd.io/etcd/api/v3 v3.5.6/go.mod h1:KFtNaxGDw4Yx/BA4iPPwevUTAuqcsPxzyX8PHydchN8= go.etcd.io/etcd/client/pkg/v3 v3.5.0/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/v2 v2.305.0/go.mod h1:h9puh54ZTgAKtEbut2oe9P4L/oqKCVB6xsXlzd7alYQ= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= @@ -677,7 +673,6 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= -go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= @@ -1089,7 +1084,6 @@ google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= -google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k= google.golang.org/grpc v1.61.1 h1:kLAiWrZs7YeDM6MumDe7m3y4aM6wacLzM1Y/wiLP9XY= google.golang.org/grpc v1.61.1/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/library/db/db.go b/library/db/db.go index c3c034a..41b7a34 100644 --- a/library/db/db.go +++ b/library/db/db.go @@ -1,4 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 package db diff --git a/library/db/migrations/0001_initial.sql b/library/db/migrations/0001_initial.sql index cc83f41..475ede8 100644 --- a/library/db/migrations/0001_initial.sql +++ b/library/db/migrations/0001_initial.sql @@ -10,9 +10,9 @@ CREATE TYPE channel_priority AS ENUM ( CREATE TABLE videos ( id SERIAL NOT NULL PRIMARY KEY, - created_at timestamp NOT NULL DEFAULT NOW(), - updated_at timestamp, - accessed_at timestamp, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP, + accessed_at TIMESTAMP, access_count integer DEFAULT 0, tid text NOT NULL UNIQUE CHECK (tid <> ''), @@ -31,7 +31,7 @@ CREATE TABLE videos ( CREATE TABLE channels ( id SERIAL NOT NULL PRIMARY KEY, - created_at timestamp NOT NULL DEFAULT NOW(), + created_at TIMESTAMP NOT NULL DEFAULT NOW(), url text NOT NULL UNIQUE CHECK (url <> ''), claim_id text NOT NULL UNIQUE CHECK (claim_id <> ''), diff --git a/library/db/migrations/0004_add_released_at.sql b/library/db/migrations/0004_add_released_at.sql new file mode 100644 index 0000000..3c89e6a --- /dev/null +++ b/library/db/migrations/0004_add_released_at.sql @@ -0,0 +1,8 @@ +-- +migrate Up + +ALTER TABLE videos + ADD COLUMN released_at TIMESTAMP; + +-- +migrate Down +ALTER TABLE videos + DROP COLUMN released_at; diff --git a/library/db/models.go b/library/db/models.go index 1111f77..eda8e8d 100644 --- a/library/db/models.go +++ b/library/db/models.go @@ -1,13 +1,16 @@ // Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 package db import ( "database/sql" + "database/sql/driver" "fmt" "time" - "github.com/tabbed/pqtype" + "github.com/sqlc-dev/pqtype" ) type ChannelPriority string @@ -31,6 +34,29 @@ func (e *ChannelPriority) Scan(src interface{}) error { return nil } +type NullChannelPriority struct { + ChannelPriority ChannelPriority + Valid bool // Valid is true if ChannelPriority is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullChannelPriority) Scan(value interface{}) error { + if value == nil { + ns.ChannelPriority, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.ChannelPriority.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullChannelPriority) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.ChannelPriority), nil +} + type Channel struct { ID int32 CreatedAt time.Time @@ -54,4 +80,5 @@ type Video struct { Size int64 Checksum sql.NullString Manifest pqtype.NullRawMessage + ReleasedAt sql.NullTime } diff --git a/library/db/queries.sql b/library/db/queries.sql index 0365e3b..163b8e5 100644 --- a/library/db/queries.sql +++ b/library/db/queries.sql @@ -1,8 +1,8 @@ -- name: AddVideo :one INSERT INTO videos ( - tid, sd_hash, url, channel, storage, path, size, checksum, manifest + tid, sd_hash, url, released_at, channel, storage, path, size, checksum, manifest ) VALUES ( - $1, $2, $3, $4, $5, $6, $7, $8, $9 + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10 ) RETURNING *; diff --git a/library/db/queries.sql.go b/library/db/queries.sql.go index 8ff0301..0befed6 100644 --- a/library/db/queries.sql.go +++ b/library/db/queries.sql.go @@ -1,4 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 // source: queries.sql package db @@ -7,7 +9,7 @@ import ( "context" "database/sql" - "github.com/tabbed/pqtype" + "github.com/sqlc-dev/pqtype" ) const addChannel = `-- name: AddChannel :one @@ -40,23 +42,24 @@ func (q *Queries) AddChannel(ctx context.Context, arg AddChannelParams) (Channel const addVideo = `-- name: AddVideo :one INSERT INTO videos ( - tid, sd_hash, url, channel, storage, path, size, checksum, manifest + tid, sd_hash, url, released_at, channel, storage, path, size, checksum, manifest ) VALUES ( - $1, $2, $3, $4, $5, $6, $7, $8, $9 + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10 ) -RETURNING id, created_at, updated_at, accessed_at, access_count, tid, url, sd_hash, channel, storage, path, size, checksum, manifest +RETURNING id, created_at, updated_at, accessed_at, access_count, tid, url, sd_hash, channel, storage, path, size, checksum, manifest, released_at ` type AddVideoParams struct { - TID string - SDHash string - URL string - Channel string - Storage string - Path string - Size int64 - Checksum sql.NullString - Manifest pqtype.NullRawMessage + TID string + SDHash string + URL string + ReleasedAt sql.NullTime + Channel string + Storage string + Path string + Size int64 + Checksum sql.NullString + Manifest pqtype.NullRawMessage } func (q *Queries) AddVideo(ctx context.Context, arg AddVideoParams) (Video, error) { @@ -64,6 +67,7 @@ func (q *Queries) AddVideo(ctx context.Context, arg AddVideoParams) (Video, erro arg.TID, arg.SDHash, arg.URL, + arg.ReleasedAt, arg.Channel, arg.Storage, arg.Path, @@ -87,6 +91,7 @@ func (q *Queries) AddVideo(ctx context.Context, arg AddVideoParams) (Video, erro &i.Size, &i.Checksum, &i.Manifest, + &i.ReleasedAt, ) return i, err } @@ -135,7 +140,7 @@ func (q *Queries) GetAllChannels(ctx context.Context) ([]Channel, error) { } const getAllVideos = `-- name: GetAllVideos :many -SELECT id, created_at, updated_at, accessed_at, access_count, tid, url, sd_hash, channel, storage, path, size, checksum, manifest FROM videos +SELECT id, created_at, updated_at, accessed_at, access_count, tid, url, sd_hash, channel, storage, path, size, checksum, manifest, released_at FROM videos ` func (q *Queries) GetAllVideos(ctx context.Context) ([]Video, error) { @@ -162,6 +167,7 @@ func (q *Queries) GetAllVideos(ctx context.Context) ([]Video, error) { &i.Size, &i.Checksum, &i.Manifest, + &i.ReleasedAt, ); err != nil { return nil, err } @@ -177,7 +183,7 @@ func (q *Queries) GetAllVideos(ctx context.Context) ([]Video, error) { } const getAllVideosForStorage = `-- name: GetAllVideosForStorage :many -SELECT id, created_at, updated_at, accessed_at, access_count, tid, url, sd_hash, channel, storage, path, size, checksum, manifest FROM videos +SELECT id, created_at, updated_at, accessed_at, access_count, tid, url, sd_hash, channel, storage, path, size, checksum, manifest, released_at FROM videos WHERE storage = $1 ` @@ -205,6 +211,7 @@ func (q *Queries) GetAllVideosForStorage(ctx context.Context, storage string) ([ &i.Size, &i.Checksum, &i.Manifest, + &i.ReleasedAt, ); err != nil { return nil, err } @@ -220,7 +227,7 @@ func (q *Queries) GetAllVideosForStorage(ctx context.Context, storage string) ([ } const getAllVideosForStorageLimit = `-- name: GetAllVideosForStorageLimit :many -SELECT id, created_at, updated_at, accessed_at, access_count, tid, url, sd_hash, channel, storage, path, size, checksum, manifest FROM videos +SELECT id, created_at, updated_at, accessed_at, access_count, tid, url, sd_hash, channel, storage, path, size, checksum, manifest, released_at FROM videos WHERE storage = $1 ORDER BY id ASC LIMIT $2 OFFSET $3 @@ -256,6 +263,7 @@ func (q *Queries) GetAllVideosForStorageLimit(ctx context.Context, arg GetAllVid &i.Size, &i.Checksum, &i.Manifest, + &i.ReleasedAt, ); err != nil { return nil, err } @@ -289,7 +297,7 @@ func (q *Queries) GetChannel(ctx context.Context, claimID string) (Channel, erro } const getVideo = `-- name: GetVideo :one -SELECT id, created_at, updated_at, accessed_at, access_count, tid, url, sd_hash, channel, storage, path, size, checksum, manifest FROM videos +SELECT id, created_at, updated_at, accessed_at, access_count, tid, url, sd_hash, channel, storage, path, size, checksum, manifest, released_at FROM videos WHERE sd_hash = $1 LIMIT 1 ` @@ -311,6 +319,7 @@ func (q *Queries) GetVideo(ctx context.Context, sdHash string) (Video, error) { &i.Size, &i.Checksum, &i.Manifest, + &i.ReleasedAt, ) return i, err } diff --git a/library/library.go b/library/library.go index 9f38e00..6be9411 100644 --- a/library/library.go +++ b/library/library.go @@ -14,7 +14,7 @@ import ( "github.com/c2h5oh/datasize" "github.com/panjf2000/ants/v2" "github.com/pkg/errors" - "github.com/tabbed/pqtype" + "github.com/sqlc-dev/pqtype" ) const ( @@ -77,22 +77,25 @@ func (lib *Library) AddRemoteStream(stream Stream) error { if stream.Manifest == nil { return errors.New("cannot add remote stream, manifest is missing") } - m := stream.Manifest - bm, err := json.Marshal(m) + manifest := stream.Manifest + bm, err := json.Marshal(manifest) if err != nil { return errors.Wrap(err, "cannot marshal stream manifest") } p := db.AddVideoParams{ - TID: m.TID, - URL: m.URL, - SDHash: m.SDHash, - Channel: m.ChannelURL, + TID: manifest.TID, + URL: manifest.URL, + SDHash: manifest.SDHash, + Channel: manifest.ChannelURL, Storage: stream.RemoteStorage, - Path: m.TID, + Path: manifest.TID, Size: stream.Size(), Checksum: sql.NullString{String: stream.Checksum(), Valid: true}, Manifest: pqtype.NullRawMessage{RawMessage: bm, Valid: true}, } + if !manifest.ReleasedAt.IsZero() { + p.ReleasedAt = sql.NullTime{Time: manifest.ReleasedAt, Valid: true} + } _, err = lib.db.AddVideo(context.Background(), p) return err } diff --git a/library/stream.go b/library/stream.go index 10266f8..b1a864c 100644 --- a/library/stream.go +++ b/library/stream.go @@ -42,6 +42,7 @@ type Stream struct { type Manifest struct { URL string + ReleasedAt time.Time ChannelURL string `yaml:",omitempty" json:"channel_url"` SDHash string @@ -61,12 +62,18 @@ type Manifest struct { type StreamWalker func(fi fs.FileInfo, fullPath, name string) error -func WithTimestamp(ts time.Time) func(*Manifest) { +func WithTranscodedAt(ts time.Time) func(*Manifest) { return func(m *Manifest) { m.TranscodedAt = ts } } +func WithReleasedAt(ts time.Time) func(*Manifest) { + return func(m *Manifest) { + m.ReleasedAt = ts + } +} + func WithWorkerName(n string) func(*Manifest) { return func(m *Manifest) { m.TranscodedBy = n diff --git a/library/stream_test.go b/library/stream_test.go index 1b3fba5..19c169a 100644 --- a/library/stream_test.go +++ b/library/stream_test.go @@ -35,14 +35,14 @@ func TestStream(t *testing.T) { stream1 := InitStream(path.Join(dir, sdHash), "") require.NoError(t, - stream1.GenerateManifest(randomdata.SillyName(), randomdata.SillyName(), sdHash, WithTimestamp(ts)), + stream1.GenerateManifest(randomdata.SillyName(), randomdata.SillyName(), sdHash, WithTranscodedAt(ts)), ) stream2 := InitStream(path.Join(dir, sdHash), "") require.NoError(t, - stream2.GenerateManifest(stream1.URL(), stream1.Manifest.ChannelURL, stream1.SDHash(), WithTimestamp(ts)), + stream2.GenerateManifest(stream1.URL(), stream1.Manifest.ChannelURL, stream1.SDHash(), WithTranscodedAt(ts)), ) - err := stream1.GenerateManifest(randomdata.SillyName(), randomdata.SillyName(), sdHash, WithTimestamp(ts)) + err := stream1.GenerateManifest(randomdata.SillyName(), randomdata.SillyName(), sdHash, WithTranscodedAt(ts)) require.NoError(t, err) assert.Equal(t, stream1.Manifest.TID, stream2.Manifest.TID) @@ -61,7 +61,7 @@ func TestStream(t *testing.T) { require.NoError(t, stream.GenerateManifest( url, channelURL, sdHash, - WithTimestamp(ts), + WithTranscodedAt(ts), WithVersion(version), WithWorkerName(workerName), ), diff --git a/pkg/conductor/tasks/tasks.go b/pkg/conductor/tasks/tasks.go index 9d41651..0009f78 100644 --- a/pkg/conductor/tasks/tasks.go +++ b/pkg/conductor/tasks/tasks.go @@ -12,6 +12,7 @@ import ( "time" "github.com/OdyseeTeam/transcoder/encoder" + "github.com/OdyseeTeam/transcoder/internal/version" "github.com/OdyseeTeam/transcoder/library" "github.com/OdyseeTeam/transcoder/pkg/conductor/metrics" "github.com/OdyseeTeam/transcoder/pkg/logging" @@ -22,7 +23,6 @@ import ( "github.com/hibiken/asynq" "github.com/pkg/errors" redis "github.com/redis/go-redis/v9" - "go.etcd.io/etcd/api/v3/version" ) const ( @@ -193,9 +193,10 @@ func (r *EncoderRunner) Run(ctx context.Context, t *asynq.Task) error { stream = library.InitStream(encodedPath, r.storage.Name()) err = stream.GenerateManifest( payload.URL, resolved.ChannelURI, payload.SDHash, - library.WithTimestamp(time.Now()), + library.WithTranscodedAt(time.Now()), library.WithWorkerName(r.options.Name), library.WithVersion(version.Version), + library.WithReleasedAt(resolved.ReleaseTime), ) if err != nil { log.Error("failed to fill manifest", "err", err) diff --git a/pkg/resolve/resolve.go b/pkg/resolve/resolve.go index aa551ed..bcf7710 100644 --- a/pkg/resolve/resolve.go +++ b/pkg/resolve/resolve.go @@ -46,6 +46,7 @@ type ResolvedStream struct { URI, Name, ClaimID, SDHash, ChannelURI, ChannelClaimID, NormalizedName string ChannelSupportAmount int64 + ReleaseTime time.Time } func (wc *WriteCounter) Write(p []byte) (int, error) { @@ -78,11 +79,17 @@ func ResolveStream(uri string) (*ResolvedStream, error) { return nil, ErrNoSigningChannel } - src := claim.Value.GetStream().GetSource() - if src == nil { + stream := claim.Value.GetStream() + if stream == nil { + return nil, errors.New("claim doesn't have a stream") + } + releaseTime := time.Unix(stream.GetReleaseTime(), 0) + + streamSource := stream.GetSource() + if streamSource == nil { return nil, errors.New("stream doesn't have source data") } - h := hex.EncodeToString(src.SdHash) + h := hex.EncodeToString(streamSource.SdHash) ch := strings.Replace(strings.ToLower(claim.SigningChannel.CanonicalURL), "#", ":", 1) sup, _ := strconv.ParseFloat(claim.SigningChannel.Meta.SupportAmount, 64) @@ -96,6 +103,7 @@ func ResolveStream(uri string) (*ResolvedStream, error) { ChannelURI: ch, ChannelClaimID: claim.SigningChannel.ClaimID, ChannelSupportAmount: int64(math.Floor(sup)), + ReleaseTime: releaseTime, } return r, nil } diff --git a/sqlc.yaml b/sqlc.yaml index 696690f..17c5478 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -1,15 +1,18 @@ version: "2" -packages: - - path: "library/db" +sql: + - schema: "library/db/migrations/" + queries: "library/db/queries.sql" name: "db" engine: "postgresql" - schema: "library/db/migrations/" - queries: "library/db/queries.sql" -rename: - url: "URL" - sd_hash: "SDHash" - ulid: "ULID" - tid: "TID" -overrides: - - column: "videos.size" - go_type: "int64" + gen: + go: + package: db + out: library/db + rename: + url: "URL" + sd_hash: "SDHash" + ulid: "ULID" + tid: "TID" + overrides: + - column: "videos.size" + go_type: "int64" diff --git a/tccli/main.go b/tccli/main.go index 4dd123d..fc02091 100644 --- a/tccli/main.go +++ b/tccli/main.go @@ -118,7 +118,7 @@ func main() { ls := library.InitStream(CLI.Genstream.Path, "wasabi") err = ls.GenerateManifest( rr.URI, rr.ChannelURI, rr.SDHash, - library.WithTimestamp(time.Now()), + library.WithTranscodedAt(time.Now()), library.WithWorkerName("manual"), ) if err != nil { From 752edc7379f50c45b5838521d9d7ba0bab715709 Mon Sep 17 00:00:00 2001 From: Andriy Biletsky Date: Fri, 2 May 2025 14:39:44 +0700 Subject: [PATCH 11/11] Add test for the extra manifest field --- library/testing.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/library/testing.go b/library/testing.go index 93bc0d2..acdb168 100644 --- a/library/testing.go +++ b/library/testing.go @@ -56,6 +56,8 @@ var PopulatedHLSPlaylistFiles = []string{ "s3_000071.ts", "s3_000072.ts", "s3_000073.ts", "s3_000074.ts", "s3_000075.ts", "s3_000076.ts", "s3_000077.ts", "stream_0.m3u8", "stream_1.m3u8", "stream_2.m3u8", "stream_3.m3u8"} +var DummyReleasedAt = time.Unix(1745861002, 0).UTC() + type LibraryTestHelper struct { DB *sql.DB DBCleanup migrator.TestDBCleanup @@ -165,6 +167,7 @@ func GenerateDummyStream(storage Storage) *Stream { RemoteStorage: storage.Name(), Manifest: &Manifest{ URL: randomdata.SillyName(), + ReleasedAt: DummyReleasedAt, ChannelURL: randomdata.SillyName(), SDHash: randomdata.Alphanumeric(96), TranscodedAt: time.Now(),