Skip to content
Merged
2 changes: 1 addition & 1 deletion pkg/config/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type BaseConfig struct {
S3AssumeRoleExternalID string `yaml:"s3_assume_role_external_id"` // if set, this external ID is used by default for S3 uploads

// advanced
Insecure bool `yaml:"insecure"` // allow chrome to connect to an insecure websocket
Insecure bool `yaml:"insecure"` // allow chrome to connect to an insecure websocket, bypasses chrome LNA checks
Debug DebugConfig `yaml:"debug"` // create dot file on internal error
ChromeFlags map[string]interface{} `yaml:"chrome_flags"` // additional flags to pass to Chrome
Latency LatencyConfig `yaml:"latency"` // gstreamer latencies, modifying these may break the service
Expand Down
10 changes: 10 additions & 0 deletions pkg/config/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,16 @@ func (p *PipelineConfig) updateOutputs(req *livekit.ExportReplayRequest) error {
return errors.ErrInvalidInput("output")
}

// Non-live pipelines produce data faster than realtime. Stream outputs
// (RTMP, WebSocket) cannot ingest faster than 1x playback speed.
if !p.Live {
for _, output := range req.Outputs {
if _, ok := output.Config.(*livekit.Output_Stream); ok {
return errors.ErrNotSupported("stream output for non-live pipeline")
}
}
}

var hasFile, hasStream, hasSegments bool
var fileCount, streamCount, segmentCount int

Expand Down
12 changes: 10 additions & 2 deletions pkg/config/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,17 @@ type PipelineConfig struct {

Info *livekit.EgressInfo `yaml:"-"`
Manifest *Manifest `yaml:"-"`
IsReplay bool `yaml:"-"`
Live bool `yaml:"-"`
StorageObserver StorageObserver `yaml:"-"`
}

// IsReplay returns true when this is a replay/export pipeline. Use this for
// replay-specific integration points (IPC calls, storage access). For generic
// pipeline behavior (is-live, leaky queues, backpressure) use the Live field.
func (p *PipelineConfig) IsReplay() bool {
return !p.Live
}

type StorageObserver interface {
OnStorageEvent(egressID, operation, path string, size, lifetimeDays int64)
}
Expand Down Expand Up @@ -153,6 +160,7 @@ func NewPipelineConfig(confString string, req *rpc.StartEgressRequest) (*Pipelin
},
},
Outputs: make(map[types.EgressType][]OutputConfig),
Live: true,
}

if err := yaml.Unmarshal([]byte(confString), p); err != nil {
Expand All @@ -179,6 +187,7 @@ func GetValidatedPipelineConfig(conf *ServiceConfig, req *rpc.StartEgressRequest
BaseConfig: conf.BaseConfig,
TmpDir: path.Join(TmpDir, req.EgressId),
Outputs: make(map[types.EgressType][]OutputConfig),
Live: true,
}

return p, p.Update(req)
Expand Down Expand Up @@ -420,7 +429,6 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error {
}

case *rpc.StartEgressRequest_Replay:
p.IsReplay = true
replayReq := req.Replay
clone := proto.Clone(replayReq).(*livekit.ExportReplayRequest)
p.Info.Request = &livekit.EgressInfo_Replay{
Expand Down
2 changes: 1 addition & 1 deletion pkg/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (h *Handler) Run() {
}

// Replay coordination: signal ready and get timing
if h.conf.IsReplay {
if h.conf.IsReplay() {
rctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
resp, err := h.ipcServiceClient.ReplayReady(rctx, &rpc.EgressReadyRequest{
EgressId: h.conf.Info.EgressId,
Expand Down
15 changes: 11 additions & 4 deletions pkg/pipeline/builder/audio.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func BuildAudioBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error
return err
}
} else {
queue, err := gstreamer.BuildQueue(fmt.Sprintf("%s_queue", audioBinName), p.Latency.PipelineLatency, leakyQueue)
queue, err := gstreamer.BuildQueue(fmt.Sprintf("%s_queue", audioBinName), p.Latency.PipelineLatency, p.Live)
if err != nil {
return errors.ErrGstPipelineError(err)
}
Expand Down Expand Up @@ -220,8 +220,10 @@ func (b *AudioBin) buildSDKInput() error {
return err
}
}
if err := b.addAudioTestSrcBin(); err != nil {
return err
if b.conf.Live {
if err := b.addAudioTestSrcBin(); err != nil {
return err
}
}
if err := b.addMixer(); err != nil {
return err
Expand Down Expand Up @@ -252,9 +254,14 @@ func (b *AudioBin) addAudioAppSrcBinLocked(ts *config.TrackSource) error {
return false
})
ts.AppSrc.SetArg("format", "time")
if err := ts.AppSrc.SetProperty("is-live", true); err != nil {
if err := ts.AppSrc.SetProperty("is-live", b.conf.Live); err != nil {
return err
}
if !b.conf.Live {
if err := ts.AppSrc.SetProperty("block", true); err != nil {
return err
}
}
if err := appSrcBin.AddElement(ts.AppSrc.Element); err != nil {
return err
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/pipeline/builder/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ func BuildFileBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*gstr
if err = sink.SetProperty("sync", false); err != nil {
return nil, errors.ErrGstPipelineError(err)
}
if !p.Live {
if err = sink.SetProperty("async", false); err != nil {
return nil, errors.ErrGstPipelineError(err)
}
}

if err = b.AddElements(mux.GetElement(), sink); err != nil {
return nil, err
Expand Down
21 changes: 14 additions & 7 deletions pkg/pipeline/builder/video.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ type VideoBin struct {
rawVideoTee *gst.Element
}

// buildLeakyVideoQueue creates a leaky queue and attaches a monitor to track dropped buffers
func (b *VideoBin) buildLeakyVideoQueue(name string) (*gst.Element, error) {
queue, err := gstreamer.BuildQueue(name, b.conf.Latency.PipelineLatency, true)
// buildVideoQueue creates a queue for the video pipeline. For live sources the
// queue is leaky (drops old buffers when full) to handle real-time overrun. For
// non-live replay the queue is blocking so backpressure throttles the source.
func (b *VideoBin) buildVideoQueue(name string) (*gst.Element, error) {
queue, err := gstreamer.BuildQueue(name, b.conf.Latency.PipelineLatency, b.conf.Live)
if err != nil {
return nil, errors.ErrGstPipelineError(err)
}
Expand Down Expand Up @@ -97,7 +99,7 @@ func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error
return tee.GetRequestPad("src_%u")
}
} else if len(p.GetEncodedOutputs()) > 0 {
queue, err := b.buildLeakyVideoQueue("video_queue")
queue, err := b.buildVideoQueue("video_queue")
if err != nil {
return err
}
Expand Down Expand Up @@ -281,7 +283,7 @@ func (b *VideoBin) buildWebInput() error {
return errors.ErrGstPipelineError(err)
}

videoQueue, err := b.buildLeakyVideoQueue("video_input_queue")
videoQueue, err := b.buildVideoQueue("video_input_queue")
if err != nil {
return err
}
Expand Down Expand Up @@ -384,9 +386,14 @@ func (b *VideoBin) buildAppSrcBin(ts *config.TrackSource, name string) (*gstream
return false
})
ts.AppSrc.SetArg("format", "time")
if err := ts.AppSrc.SetProperty("is-live", true); err != nil {
if err := ts.AppSrc.SetProperty("is-live", b.conf.Live); err != nil {
return nil, errors.ErrGstPipelineError(err)
}
if !b.conf.Live {
if err := ts.AppSrc.SetProperty("block", true); err != nil {
return nil, errors.ErrGstPipelineError(err)
}
}
if err := appSrcBin.AddElement(ts.AppSrc.Element); err != nil {
return nil, err
}
Expand Down Expand Up @@ -725,7 +732,7 @@ func (b *VideoBin) addDecodedVideoSink() error {
}

func (b *VideoBin) addVideoConverter(bin *gstreamer.Bin) error {
videoQueue, err := b.buildLeakyVideoQueue("video_input_queue")
videoQueue, err := b.buildVideoQueue("video_input_queue")
if err != nil {
return err
}
Expand Down
98 changes: 67 additions & 31 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ type controllerStats struct {
droppedVideoBuffersByQueue map[string]uint64
}

// SourceBuilder constructs a pipeline source. It receives the controller's
// callbacks so the source can synchronize on GstReady; custom sources that
// don't need gst synchronization can ignore the argument.
type SourceBuilder func(callbacks *gstreamer.Callbacks) (source.Source, error)

var (
tracer = otel.Tracer("github.com/livekit/egress/pkg/pipeline")
)
Expand All @@ -104,7 +109,55 @@ func New(ctx context.Context, conf *config.PipelineConfig, ipcServiceClient ipc.
ctx, span := tracer.Start(ctx, "Pipeline.New")
defer span.End()

var err error
return NewWithSource(ctx, conf, ipcServiceClient, func(callbacks *gstreamer.Callbacks) (source.Source, error) {
return source.New(ctx, conf, callbacks)
})
}

// NewWithSource creates a Controller using the given SourceBuilder. The builder
// runs after the controller has been constructed and receives the controller's
// Callbacks, so the source can share GstReady with the pipeline. Use this when
// the source isn't the standard source.New (testfeeder, replay export, etc.).
func NewWithSource(
ctx context.Context,
conf *config.PipelineConfig,
ipcServiceClient ipc.EgressServiceClient,
srcBuilder SourceBuilder,
) (*Controller, error) {
c := newController(conf, ipcServiceClient)

// initialize gst
go func() {
_, span := tracer.Start(ctx, "gst.Init")
defer span.End()
gst.Init(nil)
gst.SetLogFunction(c.gstLog)
close(c.callbacks.GstReady)
}()

src, err := srcBuilder(c.callbacks)
if err != nil {
return nil, err
}
c.src = src

// create pipeline
<-c.callbacks.GstReady
if err := c.BuildPipeline(); err != nil {
c.src.Close()
return nil, err
}

return c, nil
}

// Callbacks returns the pipeline callbacks. Sources that need to wait for
// GstReady before creating appsrc elements can use this.
func (c *Controller) Callbacks() *gstreamer.Callbacks {
return c.callbacks
}

func newController(conf *config.PipelineConfig, ipcServiceClient ipc.EgressServiceClient) *Controller {
c := &Controller{
PipelineConfig: conf,
ipcServiceClient: ipcServiceClient,
Expand All @@ -129,30 +182,7 @@ func New(ctx context.Context, conf *config.PipelineConfig, ipcServiceClient ipc.
logger.Debugw("debug dot requested", "reason", reason)
c.generateDotFile(reason)
})

// initialize gst
go func() {
_, span := tracer.Start(ctx, "gst.Init")
defer span.End()
gst.Init(nil)
gst.SetLogFunction(c.gstLog)
close(c.callbacks.GstReady)
}()

// create source
c.src, err = source.New(ctx, conf, c.callbacks)
if err != nil {
return nil, err
}

// create pipeline
<-c.callbacks.GstReady
if err = c.BuildPipeline(); err != nil {
c.src.Close()
return nil, err
}

return c, nil
return c
}

func (c *Controller) BuildPipeline() error {
Expand All @@ -168,9 +198,9 @@ func (c *Controller) BuildPipeline() error {
c.stopped.Break()
return nil
})
if c.SourceType == types.SourceTypeSDK {
if sdkSrc, ok := c.src.(*source.SDKSource); ok {
p.SetEOSFunc(func() bool {
c.src.(*source.SDKSource).CloseWriters()
sdkSrc.CloseWriters()
return true
})
}
Expand Down Expand Up @@ -519,11 +549,11 @@ func (c *Controller) SendEOS(ctx context.Context, reason string) {

case livekit.EgressStatus_EGRESS_ACTIVE:
c.Info.UpdateStatus(livekit.EgressStatus_EGRESS_ENDING)
_, _ = c.ipcServiceClient.HandlerUpdate(ctx, c.Info)
c.sendHandlerUpdate(ctx, c.Info)
c.sendEOS()

case livekit.EgressStatus_EGRESS_ENDING:
_, _ = c.ipcServiceClient.HandlerUpdate(ctx, c.Info)
c.sendHandlerUpdate(ctx, c.Info)
c.sendEOS()

case livekit.EgressStatus_EGRESS_LIMIT_REACHED:
Expand Down Expand Up @@ -856,7 +886,7 @@ func (c *Controller) updateStartTime(startedAt int64) {

if c.Info.Status == livekit.EgressStatus_EGRESS_STARTING {
c.Info.UpdateStatus(livekit.EgressStatus_EGRESS_ACTIVE)
_, _ = c.ipcServiceClient.HandlerUpdate(context.Background(), c.Info)
c.sendHandlerUpdate(context.Background(), c.Info)
}
}

Expand Down Expand Up @@ -894,7 +924,13 @@ func (c *Controller) streamUpdated(ctx context.Context) {
}
}

_, _ = c.ipcServiceClient.HandlerUpdate(ctx, c.Info)
c.sendHandlerUpdate(ctx, c.Info)
}

func (c *Controller) sendHandlerUpdate(ctx context.Context, info *livekit.EgressInfo) {
if c.ipcServiceClient != nil {
_, _ = c.ipcServiceClient.HandlerUpdate(ctx, info)
}
}

func (c *Controller) updateEndTime() {
Expand Down
8 changes: 6 additions & 2 deletions pkg/pipeline/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ func (c *Controller) handleMessageError(gErr *gst.GError) error {
if message == msgStreamingNotNegotiated {
// send eosSent to app src
logger.Debugw("streaming stopped", "name", name)
c.src.(*source.SDKSource).StreamStopped(name)
if sdkSrc, ok := c.src.(*source.SDKSource); ok {
sdkSrc.StreamStopped(name)
}
return nil
}

Expand Down Expand Up @@ -281,7 +283,9 @@ func (c *Controller) handleMessageStateChanged(msg *gst.Message) {
trackID := s[4:]
logger.Debugw("appsrc state change", "trackID", trackID, "oldState", oldState.String(), "newState", newState.String())
if newState == gst.StatePlaying {
c.src.(*source.SDKSource).Playing(trackID)
if sdkSrc, ok := c.src.(*source.SDKSource); ok {
sdkSrc.Playing(trackID)
}
}
return
}
Expand Down
Loading