diff --git a/ffmpeg/ffmpeg.go b/ffmpeg/ffmpeg.go index c78ee0d466..f3b0455875 100755 --- a/ffmpeg/ffmpeg.go +++ b/ffmpeg/ffmpeg.go @@ -761,12 +761,15 @@ func (t *Transcoder) Transcode(input *TranscodeOptionsIn, ps []TranscodeOptions) t.started = true } if !t.started { - status, _, vcodec, _, _ := GetCodecInfo(input.Fname) - // NeedsBypass is state where video is present in container & vithout any frames - videoMissing := status == CodecStatusNeedsBypass || vcodec == "" - if videoMissing { - // Audio-only segment, fail fast right here as we cannot handle them nicely - return nil, ErrTranscoderVid + inputIsPipe := strings.HasPrefix(input.Fname, "pipe:") + if !inputIsPipe { + status, _, vcodec, _, _ := GetCodecInfo(input.Fname) + // NeedsBypass is state where video is present in container & vithout any frames + videoMissing := status == CodecStatusNeedsBypass || vcodec == "" + if videoMissing { + // Audio-only segment, fail fast right here as we cannot handle them nicely + return nil, ErrTranscoderVid + } } // Stream is either OK or completely broken, let the transcoder handle it t.started = true diff --git a/ffmpeg/piped_transcode.go b/ffmpeg/piped_transcode.go new file mode 100644 index 0000000000..83e2c3d3ec --- /dev/null +++ b/ffmpeg/piped_transcode.go @@ -0,0 +1,101 @@ +package ffmpeg + +import ( + "fmt" + "os" +) + +type OutputReader struct { + pipeEndpoint *os.File +} + +func (r *OutputReader) Read(b []byte) (n int, err error) { + return r.pipeEndpoint.Read(b) +} + +func (r *OutputReader) Close() error { + return r.pipeEndpoint.Close() +} + +type PipedOutput struct { + Read *os.File + Write *os.File + Options *TranscodeOptions +} + +type PipedInput struct { + Read *os.File + Write *os.File + Options *TranscodeOptionsIn +} + +type PipedTranscoding struct { + input PipedInput + outputs []PipedOutput +} + +func (t *PipedTranscoding) ClosePipes() { + t.input.Read.Close() + t.input.Write.Close() + for i := 0; i < len(t.outputs); i++ { + t.outputs[i].Read.Close() + t.outputs[i].Write.Close() + } +} + +func (t *PipedTranscoding) SetInput(input TranscodeOptionsIn) error { + t.input.Options = &input + var err error + t.input.Read, t.input.Write, err = os.Pipe() + if err != nil { + return err + } + // pass read pipe to ffmpeg + t.input.Options.Fname = fmt.Sprintf("pipe:%d", t.input.Read.Fd()) + // keep write pipe for passing input chunks + return nil +} + +func (t *PipedTranscoding) SetOutputs(outputs []TranscodeOptions) error { + t.outputs = make([]PipedOutput, len(outputs)) + for i, options := range outputs { + t.outputs[i].Options = &TranscodeOptions{} + *t.outputs[i].Options = options + // Because output is pipe, format can't be deduced from file extension + t.outputs[i].Options.Muxer = ComponentOptions{Name: "mpegts"} + var err error + t.outputs[i].Read, t.outputs[i].Write, err = os.Pipe() + if err != nil { + return err + } + // pass write pipe to ffmpeg + t.outputs[i].Options.Oname = fmt.Sprintf("pipe:%d", t.outputs[i].Write.Fd()) + // keep read pipe for receiving output chunks + } + return nil +} + +func (t *PipedTranscoding) Transcode() (*TranscodeResults, error) { + out := make([]TranscodeOptions, len(t.outputs)) + for i := 0; i < len(t.outputs); i++ { + out[i] = *t.outputs[i].Options + } + res, err := Transcode3(t.input.Options, out) + return res, err +} + +func (t *PipedTranscoding) Write(b []byte) (n int, err error) { + return t.input.Write.Write(b) +} + +func (t *PipedTranscoding) WriteClose() error { + return t.input.Write.Close() +} + +func (t *PipedTranscoding) GetOutputs() []OutputReader { + readers := make([]OutputReader, len(t.outputs)) + for i := 0; i < len(t.outputs); i++ { + readers[i].pipeEndpoint = t.outputs[i].Read + } + return readers +} diff --git a/ffmpeg/piped_transcode_test.go b/ffmpeg/piped_transcode_test.go new file mode 100644 index 0000000000..27297ea350 --- /dev/null +++ b/ffmpeg/piped_transcode_test.go @@ -0,0 +1,110 @@ +//go:build nvidia +// +build nvidia + +package ffmpeg + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "path" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type SegmentInformation struct { + Acodec string + Vcodec string + Format PixelFormat + Valid bool + Cached []byte +} + +func (s *SegmentInformation) ProcessChunk(chunk []byte) bool { + if s.Valid { + // We got info, skip further processing + return false + } + s.Cached = append(s.Cached, chunk...) + status, acodec, vcodec, pixelFormat, _ := GetCodecInfoBytes(s.Cached) + if status == CodecStatusOk { + s.Acodec = acodec + s.Vcodec = vcodec + s.Format = pixelFormat + s.Valid = true + return true + } + return false +} + +func streamInput(t *testing.T, fname string, transcoder *PipedTranscoding) { + defer transcoder.WriteClose() + info := &SegmentInformation{} + data, err := ioutil.ReadFile(fname) + require.NoError(t, err) + for len(data) > 0 { + var chunkSize int = min(4096, len(data)) + bytesWritten, err := transcoder.Write(data[:chunkSize]) + if info.ProcessChunk(data[:bytesWritten]) { + fmt.Printf("D> got media info at %d A=%s; V=%s; pixfmt=%d;\n", len(info.Cached), info.Acodec, info.Vcodec, info.Format.RawValue) + } + require.NoError(t, err) + // handle partial write + data = data[bytesWritten:] + } + require.True(t, info.Valid, "GetCodecInfoBytes() failed to parse media") +} + +func streamOutput(t *testing.T, fname string, reader *OutputReader) { + defer reader.Close() + file, err := os.Create(fname) + require.NoError(t, err) + defer file.Close() + buffer := make([]byte, 4096) + for { + byteCount, err := reader.Read(buffer) + if err == io.EOF { + break + } + require.NoError(t, err) + bytesWritten, err := file.Write(buffer[:byteCount]) + require.Equal(t, byteCount, bytesWritten, "partial write to file") + } +} + +func TestTranscoder_Pipe(t *testing.T) { + wd, err := os.Getwd() + require.NoError(t, err) + outNames := []string{ + path.Join(wd, "test-out-360.ts"), + path.Join(wd, "test-out-240.ts"), + path.Join(wd, "test-out-144.ts"), + } + inputFileName := path.Join(wd, "..", "samples", "sample_0_409_17041.ts") + + hevc := VideoProfile{Name: "P240p30fps16x9", Bitrate: "600k", Framerate: 30, AspectRatio: "16:9", Resolution: "426x240", Encoder: H265} + transcoder := &PipedTranscoding{} + transcoder.SetInput(TranscodeOptionsIn{Accel: Nvidia}) + transcoder.SetOutputs([]TranscodeOptions{ + {Profile: P360p30fps16x9, Accel: Nvidia}, + {Profile: hevc, Accel: Nvidia}, + {Profile: P144p30fps16x9, Accel: Nvidia}, + }) + // stream input chunks + go streamInput(t, inputFileName, transcoder) + // read output streams + outputs := transcoder.GetOutputs() + for i := 0; i < len(outputs); i++ { + go streamOutput(t, outNames[i], &outputs[i]) + } + // start transcode + res, err := transcoder.Transcode() + require.NoError(t, err) + for i := 0; i < 3; i++ { + // actual frame count should be 409 - is this a bug? + assert.Equal(t, res.Encoded[i].Frames, 512, "must produce 512 frame in output %d", i) + } +} diff --git a/samples/sample_0_409_17041.ts b/samples/sample_0_409_17041.ts new file mode 100644 index 0000000000..a71e736874 Binary files /dev/null and b/samples/sample_0_409_17041.ts differ