Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions ffmpeg/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,12 +761,15 @@ func (t *Transcoder) Transcode(input *TranscodeOptionsIn, ps []TranscodeOptions)
t.started = true
}
if !t.started {
status, _, vcodec, _, _ := GetCodecInfo(input.Fname)
// NeedsBypass is state where video is present in container & vithout any frames
videoMissing := status == CodecStatusNeedsBypass || vcodec == ""
if videoMissing {
// Audio-only segment, fail fast right here as we cannot handle them nicely
return nil, ErrTranscoderVid
inputIsPipe := strings.HasPrefix(input.Fname, "pipe:")
if !inputIsPipe {
status, _, vcodec, _, _ := GetCodecInfo(input.Fname)
// NeedsBypass is state where video is present in container & vithout any frames
videoMissing := status == CodecStatusNeedsBypass || vcodec == ""
if videoMissing {
// Audio-only segment, fail fast right here as we cannot handle them nicely
return nil, ErrTranscoderVid
}
}
// Stream is either OK or completely broken, let the transcoder handle it
t.started = true
Expand Down
101 changes: 101 additions & 0 deletions ffmpeg/piped_transcode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package ffmpeg

import (
"fmt"
"os"
)

type OutputReader struct {
pipeEndpoint *os.File
}

func (r *OutputReader) Read(b []byte) (n int, err error) {
return r.pipeEndpoint.Read(b)
}

func (r *OutputReader) Close() error {
return r.pipeEndpoint.Close()
}

type PipedOutput struct {
Read *os.File
Write *os.File
Options *TranscodeOptions
}

type PipedInput struct {
Read *os.File
Write *os.File
Options *TranscodeOptionsIn
}

type PipedTranscoding struct {
input PipedInput
outputs []PipedOutput
}

func (t *PipedTranscoding) ClosePipes() {
t.input.Read.Close()
t.input.Write.Close()
for i := 0; i < len(t.outputs); i++ {
t.outputs[i].Read.Close()
t.outputs[i].Write.Close()
}
}

func (t *PipedTranscoding) SetInput(input TranscodeOptionsIn) error {
t.input.Options = &input
var err error
t.input.Read, t.input.Write, err = os.Pipe()
if err != nil {
return err
}
// pass read pipe to ffmpeg
t.input.Options.Fname = fmt.Sprintf("pipe:%d", t.input.Read.Fd())
// keep write pipe for passing input chunks
return nil
}

func (t *PipedTranscoding) SetOutputs(outputs []TranscodeOptions) error {
t.outputs = make([]PipedOutput, len(outputs))
for i, options := range outputs {
t.outputs[i].Options = &TranscodeOptions{}
*t.outputs[i].Options = options
// Because output is pipe, format can't be deduced from file extension
t.outputs[i].Options.Muxer = ComponentOptions{Name: "mpegts"}
var err error
t.outputs[i].Read, t.outputs[i].Write, err = os.Pipe()
if err != nil {
return err
}
// pass write pipe to ffmpeg
t.outputs[i].Options.Oname = fmt.Sprintf("pipe:%d", t.outputs[i].Write.Fd())
// keep read pipe for receiving output chunks
}
return nil
}

func (t *PipedTranscoding) Transcode() (*TranscodeResults, error) {
out := make([]TranscodeOptions, len(t.outputs))
for i := 0; i < len(t.outputs); i++ {
out[i] = *t.outputs[i].Options
}
res, err := Transcode3(t.input.Options, out)
return res, err
}

func (t *PipedTranscoding) Write(b []byte) (n int, err error) {
return t.input.Write.Write(b)
}

func (t *PipedTranscoding) WriteClose() error {
return t.input.Write.Close()
}

func (t *PipedTranscoding) GetOutputs() []OutputReader {
readers := make([]OutputReader, len(t.outputs))
for i := 0; i < len(t.outputs); i++ {
readers[i].pipeEndpoint = t.outputs[i].Read
}
return readers
}
110 changes: 110 additions & 0 deletions ffmpeg/piped_transcode_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
//go:build nvidia
// +build nvidia

package ffmpeg

import (
"fmt"
"io"
"io/ioutil"
"os"
"path"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type SegmentInformation struct {
Acodec string
Vcodec string
Format PixelFormat
Valid bool
Cached []byte
}

func (s *SegmentInformation) ProcessChunk(chunk []byte) bool {
if s.Valid {
// We got info, skip further processing
return false
}
s.Cached = append(s.Cached, chunk...)
status, acodec, vcodec, pixelFormat, _ := GetCodecInfoBytes(s.Cached)
if status == CodecStatusOk {
s.Acodec = acodec
s.Vcodec = vcodec
s.Format = pixelFormat
s.Valid = true
return true
}
return false
}

func streamInput(t *testing.T, fname string, transcoder *PipedTranscoding) {
defer transcoder.WriteClose()
info := &SegmentInformation{}
data, err := ioutil.ReadFile(fname)
require.NoError(t, err)
for len(data) > 0 {
var chunkSize int = min(4096, len(data))
bytesWritten, err := transcoder.Write(data[:chunkSize])
if info.ProcessChunk(data[:bytesWritten]) {
fmt.Printf("D> got media info at %d A=%s; V=%s; pixfmt=%d;\n", len(info.Cached), info.Acodec, info.Vcodec, info.Format.RawValue)
}
require.NoError(t, err)
// handle partial write
data = data[bytesWritten:]
}
require.True(t, info.Valid, "GetCodecInfoBytes() failed to parse media")
}

func streamOutput(t *testing.T, fname string, reader *OutputReader) {
defer reader.Close()
file, err := os.Create(fname)
require.NoError(t, err)
defer file.Close()
buffer := make([]byte, 4096)
for {
byteCount, err := reader.Read(buffer)
if err == io.EOF {
break
}
require.NoError(t, err)
bytesWritten, err := file.Write(buffer[:byteCount])
require.Equal(t, byteCount, bytesWritten, "partial write to file")
}
}

func TestTranscoder_Pipe(t *testing.T) {
wd, err := os.Getwd()
require.NoError(t, err)
outNames := []string{
path.Join(wd, "test-out-360.ts"),
path.Join(wd, "test-out-240.ts"),
path.Join(wd, "test-out-144.ts"),
}
inputFileName := path.Join(wd, "..", "samples", "sample_0_409_17041.ts")

hevc := VideoProfile{Name: "P240p30fps16x9", Bitrate: "600k", Framerate: 30, AspectRatio: "16:9", Resolution: "426x240", Encoder: H265}
transcoder := &PipedTranscoding{}
transcoder.SetInput(TranscodeOptionsIn{Accel: Nvidia})
transcoder.SetOutputs([]TranscodeOptions{
{Profile: P360p30fps16x9, Accel: Nvidia},
{Profile: hevc, Accel: Nvidia},
{Profile: P144p30fps16x9, Accel: Nvidia},
})
// stream input chunks
go streamInput(t, inputFileName, transcoder)
// read output streams
outputs := transcoder.GetOutputs()
for i := 0; i < len(outputs); i++ {
go streamOutput(t, outNames[i], &outputs[i])
}
// start transcode
res, err := transcoder.Transcode()
require.NoError(t, err)
for i := 0; i < 3; i++ {
// actual frame count should be 409 - is this a bug?
assert.Equal(t, res.Encoded[i].Frames, 512, "must produce 512 frame in output %d", i)
}
}
Binary file added samples/sample_0_409_17041.ts
Binary file not shown.