Skip to content
4 changes: 2 additions & 2 deletions cmd/lk/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ func createAgent(ctx context.Context, cmd *cli.Command) error {
return err
} else if viewLogs {
fmt.Println("Tailing runtime logs...safe to exit at any time")
return agentsClient.StreamLogs(ctx, "deploy", lkConfig.Agent.ID, os.Stdout, resp.ServerRegions[0])
return agentsClient.StreamLogs(ctx, "deploy", lkConfig.Agent.ID, "", os.Stdout, resp.ServerRegions[0])
}
}
return nil
Expand Down Expand Up @@ -1022,7 +1022,7 @@ func getLogs(ctx context.Context, cmd *cli.Command) error {
return fmt.Errorf("no agent deployments found")
}

return agentsClient.StreamLogs(ctx, cmd.String("log-type"), agentID, os.Stdout, response.Agents[0].AgentDeployments[0].ServerRegion)
return agentsClient.StreamLogs(ctx, cmd.String("log-type"), agentID, "", os.Stdout, response.Agents[0].AgentDeployments[0].ServerRegion)
}

func deleteAgent(ctx context.Context, cmd *cli.Command) error {
Expand Down
48 changes: 38 additions & 10 deletions cmd/lk/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ var (
"can be used multiple times to publish multiple files. " +
"can publish from Unix or TCP socket using the format '<codec>://<socket_name>' or '<codec>://<host:address>' respectively. Valid codecs are \"h264\", \"h265\", \"vp8\", \"opus\"",
},
&cli.BoolFlag{
Name: "attach-frame-metadata",
Usage: "Parse H264/H265 SEI for LKTS frame metadata (user timestamp and frame ID) and re-attach the packet trailer to each encoded frame",
Hidden: true,
},
&cli.FloatFlag{
Name: "fps",
Usage: "if video files are published, indicates FPS of video",
Expand Down Expand Up @@ -171,6 +176,7 @@ func _deprecatedJoinRoom(ctx context.Context, cmd *cli.Command) error {
if cmd.StringSlice("publish") != nil {
fps := cmd.Float("fps")
h26xStreamingFormat := cmd.String("h26x-streaming-format")
attachFrameMetadata := cmd.Bool("attach-frame-metadata")
for _, pub := range cmd.StringSlice("publish") {
onPublishComplete := func(pub *lksdk.LocalTrackPublication) {
if cmd.Bool("exit-after-publish") {
Expand All @@ -182,7 +188,7 @@ func _deprecatedJoinRoom(ctx context.Context, cmd *cli.Command) error {
_ = room.LocalParticipant.UnpublishTrack(pub.SID())
}
}
if err = handlePublish(room, pub, fps, h26xStreamingFormat, onPublishComplete); err != nil {
if err = handlePublish(room, pub, fps, h26xStreamingFormat, attachFrameMetadata, onPublishComplete); err != nil {
return err
}
}
Expand All @@ -196,16 +202,17 @@ func handlePublish(room *lksdk.Room,
name string,
fps float64,
h26xStreamingFormat string,
attachFrameMetadata bool,
onPublishComplete func(pub *lksdk.LocalTrackPublication),
) error {
if isSocketFormat(name) {
mimeType, socketType, address, err := parseSocketFromName(name)
if err != nil {
return err
}
return publishSocket(room, mimeType, socketType, address, fps, h26xStreamingFormat, onPublishComplete)
return publishSocket(room, mimeType, socketType, address, fps, h26xStreamingFormat, attachFrameMetadata, onPublishComplete)
}
return publishFile(room, name, fps, h26xStreamingFormat, onPublishComplete)
return publishFile(room, name, fps, h26xStreamingFormat, attachFrameMetadata, onPublishComplete)
}

func publishDemo(room *lksdk.Room) error {
Expand Down Expand Up @@ -238,6 +245,7 @@ func publishFile(room *lksdk.Room,
filename string,
fps float64,
h26xStreamingFormat string,
attachFrameMetadata bool,
onPublishComplete func(pub *lksdk.LocalTrackPublication),
) error {
// Configure provider
Expand Down Expand Up @@ -273,14 +281,19 @@ func publishFile(room *lksdk.Room,
return fmt.Errorf("unsupported h26x streaming format: %s", h26xStreamingFormat)
}
}
if attachFrameMetadata {
opts = append(opts, lksdk.ReaderTrackWithPacketTrailer(true))
}

// Create track and publish
track, err := lksdk.NewLocalFileTrack(filename, opts...)
if err != nil {
return err
}
pub, err = room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{
Name: filename,
Name: filename,
AttachUserTimestamp: attachFrameMetadata,
AttachFrameId: attachFrameMetadata,
})
return err
}
Expand Down Expand Up @@ -325,6 +338,7 @@ func publishSocket(room *lksdk.Room,
address string,
fps float64,
h26xStreamingFormat string,
attachFrameMetadata bool,
onPublishComplete func(pub *lksdk.LocalTrackPublication),
) error {
var mime string
Expand All @@ -348,7 +362,7 @@ func publishSocket(room *lksdk.Room,
}

// Publish to room
err = publishReader(room, sock, mime, fps, h26xStreamingFormat, onPublishComplete)
err = publishReader(room, sock, mime, fps, h26xStreamingFormat, attachFrameMetadata, onPublishComplete)
return err
}

Expand All @@ -357,6 +371,7 @@ func publishReader(room *lksdk.Room,
mime string,
fps float64,
h26xStreamingFormat string,
attachFrameMetadata bool,
onPublishComplete func(pub *lksdk.LocalTrackPublication),
) error {
// Configure provider
Expand Down Expand Up @@ -385,12 +400,19 @@ func publishReader(room *lksdk.Room,
}
}

if attachFrameMetadata {
opts = append(opts, lksdk.ReaderTrackWithPacketTrailer(true))
}

// Create track and publish
track, err := lksdk.NewLocalReaderTrack(in, mime, opts...)
if err != nil {
return err
}
pub, err = room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{})
pub, err = room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{
AttachUserTimestamp: attachFrameMetadata,
AttachFrameId: attachFrameMetadata,
})
if err != nil {
return err
}
Expand Down Expand Up @@ -442,7 +464,7 @@ func parseSimulcastURL(url string) (*simulcastURLParts, error) {
}

// createSimulcastVideoTrack creates a simulcast video track from a TCP or Unix socket H.264/H.265 streams
func createSimulcastVideoTrack(urlParts *simulcastURLParts, quality livekit.VideoQuality, fps float64, h26xStreamingFormat string, onComplete func()) (*lksdk.LocalTrack, error) {
func createSimulcastVideoTrack(urlParts *simulcastURLParts, quality livekit.VideoQuality, fps float64, h26xStreamingFormat string, attachFrameMetadata bool, onComplete func()) (*lksdk.LocalTrack, error) {
conn, err := net.Dial(urlParts.network, urlParts.address)
if err != nil {
return nil, fmt.Errorf("failed to connect to %s://%s: %w", urlParts.network, urlParts.address, err)
Expand Down Expand Up @@ -470,6 +492,10 @@ func createSimulcastVideoTrack(urlParts *simulcastURLParts, quality livekit.Vide
return nil, fmt.Errorf("unsupported h26x streaming format: %s", h26xStreamingFormat)
}

if attachFrameMetadata {
opts = append(opts, lksdk.ReaderTrackWithPacketTrailer(true))
}

// Configure simulcast layer
opts = append(opts, lksdk.ReaderTrackWithSampleOptions(lksdk.WithSimulcast("simulcast", &livekit.VideoLayer{
Quality: quality,
Expand All @@ -493,7 +519,7 @@ type simulcastLayer struct {
}

// handleSimulcastPublish handles publishing multiple H.264 streams as a simulcast track
func handleSimulcastPublish(room *lksdk.Room, urls []string, fps float64, h26xStreamingFormat string, onPublishComplete func(*lksdk.LocalTrackPublication)) error {
func handleSimulcastPublish(room *lksdk.Room, urls []string, fps float64, h26xStreamingFormat string, attachFrameMetadata bool, onPublishComplete func(*lksdk.LocalTrackPublication)) error {
// Parse all URLs
var layers []simulcastLayer
for _, url := range urls {
Expand Down Expand Up @@ -564,7 +590,7 @@ func handleSimulcastPublish(room *lksdk.Room, urls []string, fps float64, h26xSt
}

for _, layer := range layers {
track, err := createSimulcastVideoTrack(layer.parts, layer.quality, fps, h26xStreamingFormat, signalCompletion)
track, err := createSimulcastVideoTrack(layer.parts, layer.quality, fps, h26xStreamingFormat, attachFrameMetadata, signalCompletion)
if err != nil {
// Clean up any tracks we've already created
for _, t := range tracks {
Expand All @@ -580,7 +606,9 @@ func handleSimulcastPublish(room *lksdk.Room, urls []string, fps float64, h26xSt
// Publish simulcast track
var err error
pub, err = room.LocalParticipant.PublishSimulcastTrack(tracks, &lksdk.TrackPublicationOptions{
Name: "simulcast",
Name: "simulcast",
AttachUserTimestamp: attachFrameMetadata,
AttachFrameId: attachFrameMetadata,
})
if err != nil {
// Clean up tracks on publish failure
Expand Down
9 changes: 7 additions & 2 deletions cmd/lk/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ var (
Usage: "Format to use when reading H.264 from file or socket, \"annex-b\" OR \"length-prefixed\"",
Value: "annex-b",
},
&cli.BoolFlag{
Name: "attach-frame-metadata",
Usage: "When publishing H.264/H.265, parse SEI user_data_unregistered for LKTS frame metadata (user timestamp and frame ID) and re-attach the packet trailer to each encoded frame",
},
&cli.BoolFlag{
Name: "exit-after-publish",
Usage: "When publishing, exit after file or stream is complete",
Expand Down Expand Up @@ -997,6 +1001,7 @@ func joinRoom(ctx context.Context, cmd *cli.Command) error {
}

exitAfterPublish := cmd.Bool("exit-after-publish")
attachFrameMetadata := cmd.Bool("attach-frame-metadata")

// Handle publishing
if len(publishUrls) > 0 {
Expand All @@ -1015,7 +1020,7 @@ func joinRoom(ctx context.Context, cmd *cli.Command) error {
}
}

if err = handleSimulcastPublish(room, publishUrls, fps, h26xStreamingFormat, onPublishComplete); err != nil {
if err = handleSimulcastPublish(room, publishUrls, fps, h26xStreamingFormat, attachFrameMetadata, onPublishComplete); err != nil {
return err
}
} else {
Expand All @@ -1033,7 +1038,7 @@ func joinRoom(ctx context.Context, cmd *cli.Command) error {
_ = room.LocalParticipant.UnpublishTrack(pub.SID())
}
}
if err = handlePublish(room, pub, fps, h26xStreamingFormat, onPublishComplete); err != nil {
if err = handlePublish(room, pub, fps, h26xStreamingFormat, attachFrameMetadata, onPublishComplete); err != nil {
return err
}
}
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ require (
github.com/google/go-querystring v1.2.0
github.com/joho/godotenv v1.5.1
github.com/livekit/protocol v1.45.9-0.20260508203311-a249893d6a5d
github.com/livekit/server-sdk-go/v2 v2.16.2
github.com/livekit/server-sdk-go/v2 v2.16.3
github.com/mattn/go-isatty v0.0.21
github.com/moby/patternmatcher v0.6.1
github.com/modelcontextprotocol/go-sdk v1.4.1
github.com/pelletier/go-toml v1.9.5
github.com/pion/rtcp v1.2.16
github.com/pion/rtp v1.10.1
github.com/pion/webrtc/v4 v4.2.9
github.com/pion/webrtc/v4 v4.2.11
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.11.1
Expand Down Expand Up @@ -189,12 +189,12 @@ require (
github.com/pierrec/lz4/v4 v4.1.26 // indirect
github.com/pion/datachannel v1.6.0 // indirect
github.com/pion/dtls/v3 v3.1.2 // indirect
github.com/pion/ice/v4 v4.2.1 // indirect
github.com/pion/ice/v4 v4.2.2 // indirect
github.com/pion/interceptor v0.1.44 // indirect
github.com/pion/logging v0.2.4 // indirect
github.com/pion/mdns/v2 v2.1.0 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/sctp v1.9.3 // indirect
github.com/pion/sctp v1.9.4 // indirect
github.com/pion/sdp/v3 v3.0.18 // indirect
github.com/pion/srtp/v3 v3.0.10 // indirect
github.com/pion/stun/v3 v3.1.1 // indirect
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,8 @@ github.com/livekit/protocol v1.45.9-0.20260508203311-a249893d6a5d h1:mE0/AjgGnvs
github.com/livekit/protocol v1.45.9-0.20260508203311-a249893d6a5d/go.mod h1:KEPIJ/ZdMFQ9tmmfv/uT9TjQEuEcZupCZBabuRGEC1k=
github.com/livekit/psrpc v0.7.1 h1:ms37az0QTD3UXIWuUC5D/SkmKOlRMVRsI261eBWu/Vw=
github.com/livekit/psrpc v0.7.1/go.mod h1:bZ4iHFQptTkbPnB0LasvRNu/OBYXEu1NA6O5BMFo9kk=
github.com/livekit/server-sdk-go/v2 v2.16.2 h1:eQe24cka3X+5zUivezyL72nwtAJTWFXgibeiyJ/Jm+Y=
github.com/livekit/server-sdk-go/v2 v2.16.2/go.mod h1:/HOUG9AXJeCbMCdtw0dr37AB+3xXUlj/OLeXS/0p7rA=
github.com/livekit/server-sdk-go/v2 v2.16.3 h1:WFR7TQDNTVFZX0UIvZInYggC0dvcbLLwC0/BOHH89+E=
github.com/livekit/server-sdk-go/v2 v2.16.3/go.mod h1:Ua6WRLYw8U+27pm+FPN68ogW+KsMXTQ9tPVGfTPjDCg=
github.com/lucasb-eyer/go-colorful v1.4.0 h1:UtrWVfLdarDgc44HcS7pYloGHJUjHV/4FwW4TvVgFr4=
github.com/lucasb-eyer/go-colorful v1.4.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
github.com/magefile/mage v1.17.0 h1:dS4tkq997Ism03akafC8509iqDjeE7TNTexI25Y7sXM=
Expand Down Expand Up @@ -452,8 +452,8 @@ github.com/pion/datachannel v1.6.0 h1:XecBlj+cvsxhAMZWFfFcPyUaDZtd7IJvrXqlXD/53i
github.com/pion/datachannel v1.6.0/go.mod h1:ur+wzYF8mWdC+Mkis5Thosk+u/VOL287apDNEbFpsIk=
github.com/pion/dtls/v3 v3.1.2 h1:gqEdOUXLtCGW+afsBLO0LtDD8GnuBBjEy6HRtyofZTc=
github.com/pion/dtls/v3 v3.1.2/go.mod h1:Hw/igcX4pdY69z1Hgv5x7wJFrUkdgHwAn/Q/uo7YHRo=
github.com/pion/ice/v4 v4.2.1 h1:XPRYXaLiFq3LFDG7a7bMrmr3mFr27G/gtXN3v/TVfxY=
github.com/pion/ice/v4 v4.2.1/go.mod h1:2quLV1S5v1tAx3VvAJaH//KGitRXvo4RKlX6D3tnN+c=
github.com/pion/ice/v4 v4.2.2 h1:dQJzzcgTFHDYyV3BoCfjPeX+JEtr58BWPi4PGyo6Vjg=
github.com/pion/ice/v4 v4.2.2/go.mod h1:2quLV1S5v1tAx3VvAJaH//KGitRXvo4RKlX6D3tnN+c=
github.com/pion/interceptor v0.1.44 h1:sNlZwM8dWXU9JQAkJh8xrarC0Etn8Oolcniukmuy0/I=
github.com/pion/interceptor v0.1.44/go.mod h1:4atVlBkcgXuUP+ykQF0qOCGU2j7pQzX2ofvPRFsY5RY=
github.com/pion/logging v0.2.4 h1:tTew+7cmQ+Mc1pTBLKH2puKsOvhm32dROumOZ655zB8=
Expand All @@ -466,8 +466,8 @@ github.com/pion/rtcp v1.2.16 h1:fk1B1dNW4hsI78XUCljZJlC4kZOPk67mNRuQ0fcEkSo=
github.com/pion/rtcp v1.2.16/go.mod h1:/as7VKfYbs5NIb4h6muQ35kQF/J0ZVNz2Z3xKoCBYOo=
github.com/pion/rtp v1.10.1 h1:xP1prZcCTUuhO2c83XtxyOHJteISg6o8iPsE2acaMtA=
github.com/pion/rtp v1.10.1/go.mod h1:rF5nS1GqbR7H/TCpKwylzeq6yDM+MM6k+On5EgeThEM=
github.com/pion/sctp v1.9.3 h1:tjuOX9K/U4udMR2I7QDqr4sLE0tFzegtou7OF4a7p8A=
github.com/pion/sctp v1.9.3/go.mod h1:N20Dq6LY+JvJDAh9VVh1JELngb2rQ8dPgds5yBWiPgw=
github.com/pion/sctp v1.9.4 h1:cMxEu0F5tbP4qH07bKf1Zjf4rUih9LIo0qQt424e258=
github.com/pion/sctp v1.9.4/go.mod h1:N20Dq6LY+JvJDAh9VVh1JELngb2rQ8dPgds5yBWiPgw=
github.com/pion/sdp/v3 v3.0.18 h1:l0bAXazKHpepazVdp+tPYnrsy9dfh7ZbT8DxesH5ZnI=
github.com/pion/sdp/v3 v3.0.18/go.mod h1:ZREGo6A9ZygQ9XkqAj5xYCQtQpif0i6Pa81HOiAdqQ8=
github.com/pion/srtp/v3 v3.0.10 h1:tFirkpBb3XccP5VEXLi50GqXhv5SKPxqrdlhDCJlZrQ=
Expand All @@ -480,8 +480,8 @@ github.com/pion/transport/v4 v4.0.1 h1:sdROELU6BZ63Ab7FrOLn13M6YdJLY20wldXW2Cu2k
github.com/pion/transport/v4 v4.0.1/go.mod h1:nEuEA4AD5lPdcIegQDpVLgNoDGreqM/YqmEx3ovP4jM=
github.com/pion/turn/v4 v4.1.4 h1:EU11yMXKIsK43FhcUnjLlrhE4nboHZq+TXBIi3QpcxQ=
github.com/pion/turn/v4 v4.1.4/go.mod h1:ES1DXVFKnOhuDkqn9hn5VJlSWmZPaRJLyBXoOeO/BmQ=
github.com/pion/webrtc/v4 v4.2.9 h1:DZIh1HAhPIL3RvwEDFsmL5hfPSLEpxsQk9/Jir2vkJE=
github.com/pion/webrtc/v4 v4.2.9/go.mod h1:9EmLZve0H76eTzf8v2FmchZ6tcBXtDgpfTEu+drW6SY=
github.com/pion/webrtc/v4 v4.2.11 h1:QUX1QZKlNIn4O7U5JxLPGP0sV5RTncZkzu9SPR3jVNU=
github.com/pion/webrtc/v4 v4.2.11/go.mod h1:s/rAiyy77GyRFrZMx+Ls6aua26dIBPudH8/ZHYbIRWY=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
Loading