Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 17 additions & 21 deletions examples/avatar_agents/audio_wave/agent_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,21 @@ class AvatarConnectionInfo:
"""Token for avatar worker to join"""


async def launch_avatar_worker(
ctx: JobContext, avatar_dispatcher_url: str, avatar_identity: str
) -> None:
"""Wait for worker participant to join and start streaming"""
# create a token for the avatar worker
agent_identity = ctx.room.local_participant.identity
async def launch_avatar(ctx: JobContext, avatar_dispatcher_url: str, avatar_identity: str) -> None:
"""
Send a request to the avatar service for it to join the room

This function should be wrapped in a avatar plugin.
"""

# create a token for the avatar to join the room
token = (
api.AccessToken()
.with_identity(avatar_identity)
.with_name("Avatar Runner")
.with_grants(api.VideoGrants(room_join=True, room=ctx.room.name))
.with_kind("agent")
.with_attributes({ATTRIBUTE_PUBLISH_ON_BEHALF: agent_identity})
.with_attributes({ATTRIBUTE_PUBLISH_ON_BEHALF: ctx.token_claims().identity})
.to_jwt()
)

Expand All @@ -56,12 +58,6 @@ async def launch_avatar_worker(
response.raise_for_status()
logger.info("Avatar handshake completed")

# wait for the remote participant to join
await ctx.wait_for_participant(
identity=avatar_identity, kind=rtc.ParticipantKind.PARTICIPANT_KIND_AGENT
)
logger.info("Avatar runner joined")


async def entrypoint(ctx: JobContext, avatar_dispatcher_url: str):
await ctx.connect()
Expand All @@ -74,11 +70,13 @@ async def entrypoint(ctx: JobContext, avatar_dispatcher_url: str):
# tts=cartesia.TTS(),
)

# wait for the participant to join the room and the avatar worker to connect
await launch_avatar_worker(ctx, avatar_dispatcher_url, AVATAR_IDENTITY)

# connect the output audio to the avatar runner
session.output.audio = DataStreamAudioOutput(ctx.room, destination_identity=AVATAR_IDENTITY)
await launch_avatar(ctx, avatar_dispatcher_url, AVATAR_IDENTITY)
session.output.audio = DataStreamAudioOutput(
ctx.room,
destination_identity=AVATAR_IDENTITY,
# (optional) wait for the avatar to publish video track before generating a reply
wait_remote_track=rtc.TrackKind.KIND_VIDEO,
)

# start agent with room input and room text output
await session.start(
Expand All @@ -92,6 +90,7 @@ async def entrypoint(ctx: JobContext, avatar_dispatcher_url: str):

@session.output.audio.on("playback_finished")
def on_playback_finished(ev: PlaybackFinishedEvent) -> None:
# the avatar should notify when the audio playback is finished
logger.info(
"playback_finished",
extra={
Expand All @@ -105,11 +104,8 @@ def on_playback_finished(ev: PlaybackFinishedEvent) -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--avatar-url", type=str, default="http://localhost:8089/launch")
args, remaining_args = parser.parse_known_args()
print(sys.argv, remaining_args)
sys.argv = sys.argv[:1] + remaining_args

# WorkerType.ROOM is the default worker type which will create an agent for every room.
# You can also use WorkerType.PUBLISHER to create a single agent for all participants that publish a track. # noqa: E501
cli.run_app(
WorkerOptions(
entrypoint_fnc=partial(entrypoint, avatar_dispatcher_url=args.avatar_url),
Expand Down
Loading
Loading