diff --git a/pkg/config/base.go b/pkg/config/base.go index 8979afc8..ee809fbf 100644 --- a/pkg/config/base.go +++ b/pkg/config/base.go @@ -59,7 +59,7 @@ type BaseConfig struct { S3AssumeRoleExternalID string `yaml:"s3_assume_role_external_id"` // if set, this external ID is used by default for S3 uploads // advanced - Insecure bool `yaml:"insecure"` // allow chrome to connect to an insecure websocket + Insecure bool `yaml:"insecure"` // allow chrome to connect to an insecure websocket, bypasses chrome LNA checks Debug DebugConfig `yaml:"debug"` // create dot file on internal error ChromeFlags map[string]interface{} `yaml:"chrome_flags"` // additional flags to pass to Chrome Latency LatencyConfig `yaml:"latency"` // gstreamer latencies, modifying these may break the service diff --git a/pkg/config/output.go b/pkg/config/output.go index 579b5c9e..b76d645f 100644 --- a/pkg/config/output.go +++ b/pkg/config/output.go @@ -222,6 +222,16 @@ func (p *PipelineConfig) updateOutputs(req *livekit.ExportReplayRequest) error { return errors.ErrInvalidInput("output") } + // Non-live pipelines produce data faster than realtime. Stream outputs + // (RTMP, WebSocket) cannot ingest faster than 1x playback speed. + if !p.Live { + for _, output := range req.Outputs { + if _, ok := output.Config.(*livekit.Output_Stream); ok { + return errors.ErrNotSupported("stream output for non-live pipeline") + } + } + } + var hasFile, hasStream, hasSegments bool var fileCount, streamCount, segmentCount int diff --git a/pkg/config/pipeline.go b/pkg/config/pipeline.go index f63924e3..f98b7476 100644 --- a/pkg/config/pipeline.go +++ b/pkg/config/pipeline.go @@ -57,10 +57,17 @@ type PipelineConfig struct { Info *livekit.EgressInfo `yaml:"-"` Manifest *Manifest `yaml:"-"` - IsReplay bool `yaml:"-"` + Live bool `yaml:"-"` StorageObserver StorageObserver `yaml:"-"` } +// IsReplay returns true when this is a replay/export pipeline. Use this for +// replay-specific integration points (IPC calls, storage access). For generic +// pipeline behavior (is-live, leaky queues, backpressure) use the Live field. +func (p *PipelineConfig) IsReplay() bool { + return !p.Live +} + type StorageObserver interface { OnStorageEvent(egressID, operation, path string, size, lifetimeDays int64) } @@ -153,6 +160,7 @@ func NewPipelineConfig(confString string, req *rpc.StartEgressRequest) (*Pipelin }, }, Outputs: make(map[types.EgressType][]OutputConfig), + Live: true, } if err := yaml.Unmarshal([]byte(confString), p); err != nil { @@ -179,6 +187,7 @@ func GetValidatedPipelineConfig(conf *ServiceConfig, req *rpc.StartEgressRequest BaseConfig: conf.BaseConfig, TmpDir: path.Join(TmpDir, req.EgressId), Outputs: make(map[types.EgressType][]OutputConfig), + Live: true, } return p, p.Update(req) @@ -420,7 +429,6 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error { } case *rpc.StartEgressRequest_Replay: - p.IsReplay = true replayReq := req.Replay clone := proto.Clone(replayReq).(*livekit.ExportReplayRequest) p.Info.Request = &livekit.EgressInfo_Replay{ diff --git a/pkg/handler/handler.go b/pkg/handler/handler.go index fa323675..600880bf 100644 --- a/pkg/handler/handler.go +++ b/pkg/handler/handler.go @@ -135,7 +135,7 @@ func (h *Handler) Run() { } // Replay coordination: signal ready and get timing - if h.conf.IsReplay { + if h.conf.IsReplay() { rctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) resp, err := h.ipcServiceClient.ReplayReady(rctx, &rpc.EgressReadyRequest{ EgressId: h.conf.Info.EgressId, diff --git a/pkg/pipeline/builder/audio.go b/pkg/pipeline/builder/audio.go index e7084fa8..fbd13217 100644 --- a/pkg/pipeline/builder/audio.go +++ b/pkg/pipeline/builder/audio.go @@ -145,7 +145,7 @@ func BuildAudioBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error return err } } else { - queue, err := gstreamer.BuildQueue(fmt.Sprintf("%s_queue", audioBinName), p.Latency.PipelineLatency, leakyQueue) + queue, err := gstreamer.BuildQueue(fmt.Sprintf("%s_queue", audioBinName), p.Latency.PipelineLatency, p.Live) if err != nil { return errors.ErrGstPipelineError(err) } @@ -220,8 +220,10 @@ func (b *AudioBin) buildSDKInput() error { return err } } - if err := b.addAudioTestSrcBin(); err != nil { - return err + if b.conf.Live { + if err := b.addAudioTestSrcBin(); err != nil { + return err + } } if err := b.addMixer(); err != nil { return err @@ -252,9 +254,14 @@ func (b *AudioBin) addAudioAppSrcBinLocked(ts *config.TrackSource) error { return false }) ts.AppSrc.SetArg("format", "time") - if err := ts.AppSrc.SetProperty("is-live", true); err != nil { + if err := ts.AppSrc.SetProperty("is-live", b.conf.Live); err != nil { return err } + if !b.conf.Live { + if err := ts.AppSrc.SetProperty("block", true); err != nil { + return err + } + } if err := appSrcBin.AddElement(ts.AppSrc.Element); err != nil { return err } diff --git a/pkg/pipeline/builder/file.go b/pkg/pipeline/builder/file.go index ed51a9f7..e4d4793a 100644 --- a/pkg/pipeline/builder/file.go +++ b/pkg/pipeline/builder/file.go @@ -58,6 +58,11 @@ func BuildFileBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*gstr if err = sink.SetProperty("sync", false); err != nil { return nil, errors.ErrGstPipelineError(err) } + if !p.Live { + if err = sink.SetProperty("async", false); err != nil { + return nil, errors.ErrGstPipelineError(err) + } + } if err = b.AddElements(mux.GetElement(), sink); err != nil { return nil, err diff --git a/pkg/pipeline/builder/video.go b/pkg/pipeline/builder/video.go index c5641d90..b35130e0 100644 --- a/pkg/pipeline/builder/video.go +++ b/pkg/pipeline/builder/video.go @@ -49,9 +49,11 @@ type VideoBin struct { rawVideoTee *gst.Element } -// buildLeakyVideoQueue creates a leaky queue and attaches a monitor to track dropped buffers -func (b *VideoBin) buildLeakyVideoQueue(name string) (*gst.Element, error) { - queue, err := gstreamer.BuildQueue(name, b.conf.Latency.PipelineLatency, true) +// buildVideoQueue creates a queue for the video pipeline. For live sources the +// queue is leaky (drops old buffers when full) to handle real-time overrun. For +// non-live replay the queue is blocking so backpressure throttles the source. +func (b *VideoBin) buildVideoQueue(name string) (*gst.Element, error) { + queue, err := gstreamer.BuildQueue(name, b.conf.Latency.PipelineLatency, b.conf.Live) if err != nil { return nil, errors.ErrGstPipelineError(err) } @@ -97,7 +99,7 @@ func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error return tee.GetRequestPad("src_%u") } } else if len(p.GetEncodedOutputs()) > 0 { - queue, err := b.buildLeakyVideoQueue("video_queue") + queue, err := b.buildVideoQueue("video_queue") if err != nil { return err } @@ -281,7 +283,7 @@ func (b *VideoBin) buildWebInput() error { return errors.ErrGstPipelineError(err) } - videoQueue, err := b.buildLeakyVideoQueue("video_input_queue") + videoQueue, err := b.buildVideoQueue("video_input_queue") if err != nil { return err } @@ -384,9 +386,14 @@ func (b *VideoBin) buildAppSrcBin(ts *config.TrackSource, name string) (*gstream return false }) ts.AppSrc.SetArg("format", "time") - if err := ts.AppSrc.SetProperty("is-live", true); err != nil { + if err := ts.AppSrc.SetProperty("is-live", b.conf.Live); err != nil { return nil, errors.ErrGstPipelineError(err) } + if !b.conf.Live { + if err := ts.AppSrc.SetProperty("block", true); err != nil { + return nil, errors.ErrGstPipelineError(err) + } + } if err := appSrcBin.AddElement(ts.AppSrc.Element); err != nil { return nil, err } @@ -725,7 +732,7 @@ func (b *VideoBin) addDecodedVideoSink() error { } func (b *VideoBin) addVideoConverter(bin *gstreamer.Bin) error { - videoQueue, err := b.buildLeakyVideoQueue("video_input_queue") + videoQueue, err := b.buildVideoQueue("video_input_queue") if err != nil { return err } diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index a3bdaa11..d82471a3 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -96,6 +96,11 @@ type controllerStats struct { droppedVideoBuffersByQueue map[string]uint64 } +// SourceBuilder constructs a pipeline source. It receives the controller's +// callbacks so the source can synchronize on GstReady; custom sources that +// don't need gst synchronization can ignore the argument. +type SourceBuilder func(callbacks *gstreamer.Callbacks) (source.Source, error) + var ( tracer = otel.Tracer("github.com/livekit/egress/pkg/pipeline") ) @@ -104,7 +109,55 @@ func New(ctx context.Context, conf *config.PipelineConfig, ipcServiceClient ipc. ctx, span := tracer.Start(ctx, "Pipeline.New") defer span.End() - var err error + return NewWithSource(ctx, conf, ipcServiceClient, func(callbacks *gstreamer.Callbacks) (source.Source, error) { + return source.New(ctx, conf, callbacks) + }) +} + +// NewWithSource creates a Controller using the given SourceBuilder. The builder +// runs after the controller has been constructed and receives the controller's +// Callbacks, so the source can share GstReady with the pipeline. Use this when +// the source isn't the standard source.New (testfeeder, replay export, etc.). +func NewWithSource( + ctx context.Context, + conf *config.PipelineConfig, + ipcServiceClient ipc.EgressServiceClient, + srcBuilder SourceBuilder, +) (*Controller, error) { + c := newController(conf, ipcServiceClient) + + // initialize gst + go func() { + _, span := tracer.Start(ctx, "gst.Init") + defer span.End() + gst.Init(nil) + gst.SetLogFunction(c.gstLog) + close(c.callbacks.GstReady) + }() + + src, err := srcBuilder(c.callbacks) + if err != nil { + return nil, err + } + c.src = src + + // create pipeline + <-c.callbacks.GstReady + if err := c.BuildPipeline(); err != nil { + c.src.Close() + return nil, err + } + + return c, nil +} + +// Callbacks returns the pipeline callbacks. Sources that need to wait for +// GstReady before creating appsrc elements can use this. +func (c *Controller) Callbacks() *gstreamer.Callbacks { + return c.callbacks +} + +func newController(conf *config.PipelineConfig, ipcServiceClient ipc.EgressServiceClient) *Controller { c := &Controller{ PipelineConfig: conf, ipcServiceClient: ipcServiceClient, @@ -129,30 +182,7 @@ func New(ctx context.Context, conf *config.PipelineConfig, ipcServiceClient ipc. logger.Debugw("debug dot requested", "reason", reason) c.generateDotFile(reason) }) - - // initialize gst - go func() { - _, span := tracer.Start(ctx, "gst.Init") - defer span.End() - gst.Init(nil) - gst.SetLogFunction(c.gstLog) - close(c.callbacks.GstReady) - }() - - // create source - c.src, err = source.New(ctx, conf, c.callbacks) - if err != nil { - return nil, err - } - - // create pipeline - <-c.callbacks.GstReady - if err = c.BuildPipeline(); err != nil { - c.src.Close() - return nil, err - } - - return c, nil + return c } func (c *Controller) BuildPipeline() error { @@ -168,9 +198,9 @@ func (c *Controller) BuildPipeline() error { c.stopped.Break() return nil }) - if c.SourceType == types.SourceTypeSDK { + if sdkSrc, ok := c.src.(*source.SDKSource); ok { p.SetEOSFunc(func() bool { - c.src.(*source.SDKSource).CloseWriters() + sdkSrc.CloseWriters() return true }) } @@ -519,11 +549,11 @@ func (c *Controller) SendEOS(ctx context.Context, reason string) { case livekit.EgressStatus_EGRESS_ACTIVE: c.Info.UpdateStatus(livekit.EgressStatus_EGRESS_ENDING) - _, _ = c.ipcServiceClient.HandlerUpdate(ctx, c.Info) + c.sendHandlerUpdate(ctx, c.Info) c.sendEOS() case livekit.EgressStatus_EGRESS_ENDING: - _, _ = c.ipcServiceClient.HandlerUpdate(ctx, c.Info) + c.sendHandlerUpdate(ctx, c.Info) c.sendEOS() case livekit.EgressStatus_EGRESS_LIMIT_REACHED: @@ -856,7 +886,7 @@ func (c *Controller) updateStartTime(startedAt int64) { if c.Info.Status == livekit.EgressStatus_EGRESS_STARTING { c.Info.UpdateStatus(livekit.EgressStatus_EGRESS_ACTIVE) - _, _ = c.ipcServiceClient.HandlerUpdate(context.Background(), c.Info) + c.sendHandlerUpdate(context.Background(), c.Info) } } @@ -894,7 +924,13 @@ func (c *Controller) streamUpdated(ctx context.Context) { } } - _, _ = c.ipcServiceClient.HandlerUpdate(ctx, c.Info) + c.sendHandlerUpdate(ctx, c.Info) +} + +func (c *Controller) sendHandlerUpdate(ctx context.Context, info *livekit.EgressInfo) { + if c.ipcServiceClient != nil { + _, _ = c.ipcServiceClient.HandlerUpdate(ctx, info) + } } func (c *Controller) updateEndTime() { diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index 552b9fc6..74a127b3 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -232,7 +232,9 @@ func (c *Controller) handleMessageError(gErr *gst.GError) error { if message == msgStreamingNotNegotiated { // send eosSent to app src logger.Debugw("streaming stopped", "name", name) - c.src.(*source.SDKSource).StreamStopped(name) + if sdkSrc, ok := c.src.(*source.SDKSource); ok { + sdkSrc.StreamStopped(name) + } return nil } @@ -281,7 +283,9 @@ func (c *Controller) handleMessageStateChanged(msg *gst.Message) { trackID := s[4:] logger.Debugw("appsrc state change", "trackID", trackID, "oldState", oldState.String(), "newState", newState.String()) if newState == gst.StatePlaying { - c.src.(*source.SDKSource).Playing(trackID) + if sdkSrc, ok := c.src.(*source.SDKSource); ok { + sdkSrc.Playing(trackID) + } } return }