Skip to content

Commit 730dc56

Browse files
authored
Introduce Realtime LipSync Client (#3)
* Introduce Realtime LipSync Client * README.md: Fix ruff linting command line
1 parent 6e85756 commit 730dc56

8 files changed

Lines changed: 1154 additions & 20 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ uv sync --all-extras
8282
uv run pytest
8383

8484
# Run linting
85-
uv run ruff check decart/
85+
uv run ruff check decart/ tests/ examples/
8686

8787
# Format code
8888
uv run black decart/ tests/ examples/

decart/lipsync/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .client import RealtimeLipsyncClient
2+
3+
__all__ = ["RealtimeLipsyncClient"]

decart/lipsync/client.py

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
import asyncio
2+
import websockets
3+
from typing import Optional, Tuple
4+
from .messages import (
5+
LipsyncClientMessage,
6+
LipsyncServerMessage,
7+
LipsyncServerMessageAdapter,
8+
LipsyncConfigMessage,
9+
LipsyncConfigAckMessage,
10+
LipsyncAudioInputMessage,
11+
LipsyncVideoInputMessage,
12+
LipsyncInterruptAudioMessage,
13+
LipsyncSyncedOutputMessage,
14+
LipsyncErrorMessage,
15+
)
16+
import fractions
17+
import time
18+
import logging
19+
import cv2
20+
import numpy as np
21+
22+
logger = logging.getLogger(__name__)
23+
24+
25+
class RealtimeLipsyncClient:
26+
27+
DECART_LIPSYNC_ENDPOINT = "/router/lipsync/ws"
28+
VIDEO_FPS = 25
29+
30+
def __init__(
31+
self,
32+
api_key: str,
33+
base_url: str = "https://api.decart.ai",
34+
audio_sample_rate: int = 16000,
35+
video_fps: int = VIDEO_FPS,
36+
sync_latency: float = 0.0,
37+
):
38+
"""
39+
Args:
40+
api_key: The API key for the Decart Lipsync server
41+
url: The URL of the Decart Lipsync server
42+
audio_sample_rate: The sample rate of the audio
43+
video_fps: The FPS of the video
44+
sync_latency: Delay next frame up to this many seconds, to account for variable latency
45+
"""
46+
self._url = f"{base_url}{self.DECART_LIPSYNC_ENDPOINT}".replace(
47+
"https://", "wss://"
48+
).replace("http://", "ws://")
49+
self._api_key = api_key
50+
self._audio_sample_rate = audio_sample_rate
51+
self._video_fps = video_fps
52+
self._sync_latency = sync_latency
53+
54+
self._websocket: Optional[websockets.ClientConnection] = None
55+
self._out_queue = asyncio.Queue()
56+
self._response_handling_task: Optional[asyncio.Task] = None
57+
58+
self._video_frame_interval = fractions.Fraction(1, video_fps)
59+
self._video_out_frame_index = 0
60+
self._video_out_start_time = 0
61+
62+
async def _recv(self) -> LipsyncServerMessage:
63+
response = await self._websocket.recv()
64+
return LipsyncServerMessageAdapter.validate_json(response)
65+
66+
async def _send(self, message: LipsyncClientMessage):
67+
msg = message.model_dump_json()
68+
await self._websocket.send(msg)
69+
70+
async def _handle_server_responses(self):
71+
try:
72+
while self._websocket is not None:
73+
response = await self._recv()
74+
if isinstance(response, LipsyncSyncedOutputMessage):
75+
await self._out_queue.put(response)
76+
elif isinstance(response, LipsyncErrorMessage):
77+
logger.error(f"Lipsync server error: {response.message}")
78+
raise Exception(response.message)
79+
else:
80+
logger.error(f"Unknown response from lipsync server: {response}")
81+
except asyncio.CancelledError:
82+
pass
83+
except websockets.exceptions.ConnectionClosedOK:
84+
logger.debug("Connection closed by server")
85+
86+
async def _decode_video_frame(self, video_frame: bytes) -> bytes:
87+
def _decode_video_frame_sync(video_frame: bytes) -> bytes:
88+
nparr = np.frombuffer(video_frame, np.uint8)
89+
video_frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
90+
return video_frame
91+
92+
return await asyncio.to_thread(_decode_video_frame_sync, video_frame)
93+
94+
async def _encode_video_frame(self, image: np.ndarray) -> bytes:
95+
def _encode_video_frame_sync(image: np.ndarray) -> bytes:
96+
success, encoded_image = cv2.imencode(".jpeg", image)
97+
if not success:
98+
raise Exception("Failed to encode video frame as JPEG")
99+
return encoded_image.tobytes()
100+
101+
return await asyncio.to_thread(_encode_video_frame_sync, image)
102+
103+
async def _decode_audio_frame(self, audio_frame: bytes) -> bytes:
104+
return audio_frame
105+
106+
async def connect(self):
107+
logger.debug(f"Connecting to lipsync server at {self._url}")
108+
self._websocket = await websockets.connect(f"{self._url}?api_key={self._api_key}")
109+
logger.debug("WebSocket connected")
110+
# Initial handshake
111+
await self._send(
112+
LipsyncConfigMessage(
113+
video_fps=self._video_fps,
114+
audio_sample_rate=self._audio_sample_rate,
115+
)
116+
)
117+
logger.debug("Configuration sent")
118+
response = await self._recv()
119+
if not isinstance(response, LipsyncConfigAckMessage):
120+
raise Exception(f"Configuration not acknowledged by server: {response}")
121+
logger.debug("Configuration acknowledged")
122+
123+
self._response_handling_task = asyncio.create_task(self._handle_server_responses())
124+
125+
logger.debug("Connected to lipsync server")
126+
127+
async def disconnect(self):
128+
if self._websocket is not None:
129+
await self._websocket.close()
130+
self._websocket = None
131+
132+
if self._response_handling_task is not None:
133+
self._response_handling_task.cancel()
134+
try:
135+
await self._response_handling_task
136+
except asyncio.CancelledError:
137+
pass
138+
self._response_handling_task = None
139+
140+
async def send_audio(self, audio_data: bytes):
141+
await self._send(LipsyncAudioInputMessage(audio_data=audio_data))
142+
143+
async def send_video_frame_bytes(self, video_frame_bytes: bytes):
144+
await self._send(LipsyncVideoInputMessage(video_frame=video_frame_bytes))
145+
146+
async def send_video_frame(self, image: np.ndarray):
147+
encoded_image = await self._encode_video_frame(image)
148+
await self.send_video_frame_bytes(encoded_image)
149+
150+
async def interrupt_audio(self):
151+
await self._send(LipsyncInterruptAudioMessage())
152+
153+
async def get_synced_output(self, timeout: Optional[float] = None) -> Tuple[bytes, bytes]:
154+
synced_output: LipsyncSyncedOutputMessage = await asyncio.wait_for(
155+
self._out_queue.get(), timeout=timeout
156+
)
157+
158+
video_frame = await self._decode_video_frame(synced_output.video_frame)
159+
audio_frame = await self._decode_audio_frame(synced_output.audio_frame)
160+
161+
if self._video_out_frame_index == 0:
162+
self._video_out_start_time = time.time() + self._sync_latency
163+
164+
time_til_frame = (
165+
self._video_out_start_time
166+
+ (self._video_out_frame_index * self._video_frame_interval)
167+
- time.time()
168+
)
169+
if time_til_frame > 0:
170+
await asyncio.sleep(time_til_frame)
171+
172+
return video_frame, audio_frame

decart/lipsync/messages.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
from pydantic import BaseModel, Field, ConfigDict, TypeAdapter
2+
from typing import Literal, Union, Annotated
3+
4+
5+
class LipsyncMessage(BaseModel):
6+
model_config = ConfigDict(ser_json_bytes="base64", val_json_bytes="base64")
7+
8+
9+
class LipsyncConfigMessage(LipsyncMessage):
10+
type: Literal["config"] = "config"
11+
video_fps: int
12+
audio_sample_rate: int
13+
14+
15+
class LipsyncConfigAckMessage(LipsyncMessage):
16+
type: Literal["config_ack"] = "config_ack"
17+
18+
19+
class LipsyncAudioInputMessage(LipsyncMessage):
20+
type: Literal["audio_input"] = "audio_input"
21+
audio_data: bytes
22+
23+
24+
class LipsyncVideoInputMessage(LipsyncMessage):
25+
type: Literal["video_input"] = "video_input"
26+
video_frame: bytes
27+
28+
29+
class LipsyncInterruptAudioMessage(LipsyncMessage):
30+
type: Literal["interrupt_audio"] = "interrupt_audio"
31+
32+
33+
class LipsyncSyncedOutputMessage(LipsyncMessage):
34+
type: Literal["synced_result"] = "synced_result"
35+
video_frame: bytes
36+
audio_frame: bytes
37+
38+
39+
class LipsyncErrorMessage(LipsyncMessage):
40+
type: Literal["error"] = "error"
41+
message: str
42+
43+
44+
LipsyncClientMessage = Annotated[
45+
Union[
46+
LipsyncConfigMessage,
47+
LipsyncAudioInputMessage,
48+
LipsyncVideoInputMessage,
49+
LipsyncInterruptAudioMessage,
50+
],
51+
Field(discriminator="type"),
52+
]
53+
LipsyncServerMessage = Annotated[
54+
Union[LipsyncConfigAckMessage, LipsyncSyncedOutputMessage, LipsyncErrorMessage],
55+
Field(discriminator="type"),
56+
]
57+
58+
LipsyncServerMessageAdapter = TypeAdapter(LipsyncServerMessage)

examples/lipsync_file.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
#!/usr/bin/env python
2+
"""
3+
Example of using Decart's Realtime Lipsync API to synchronize audio with video.
4+
5+
This example loads a video file and an audio file, processes them through the
6+
Decart Lipsync API, and saves the lipsynced result to a new video file.
7+
8+
Usage:
9+
python lipsync_file.py <video_file> <audio_file> <output_file>
10+
11+
Example:
12+
python lipsync_file.py input.mp4 speech.wav output_lipsynced.mp4
13+
python lipsync_file.py input.mp4 speech.mp3 output_lipsynced.mp4
14+
"""
15+
16+
import asyncio
17+
import os
18+
import sys
19+
import cv2
20+
from pathlib import Path
21+
22+
from decart.lipsync import RealtimeLipsyncClient
23+
24+
25+
async def process_lipsync(video_path: str, audio_path: str, output_path: str):
26+
"""Process video and audio through Decart's lipsync API."""
27+
28+
# Get API key
29+
api_key = os.getenv("DECART_API_KEY")
30+
if not api_key:
31+
print("Error: Please set DECART_API_KEY environment variable")
32+
return
33+
34+
# Initialize client
35+
client = RealtimeLipsyncClient(api_key=api_key)
36+
37+
print(f"Processing: {video_path} + {audio_path} -> {output_path}")
38+
39+
# Connect to server
40+
await client.connect()
41+
print("Connected to Decart Lipsync server")
42+
43+
try:
44+
# Load audio data - handle different formats
45+
with open(audio_path, "rb") as f:
46+
audio_data = f.read()
47+
48+
# Send audio to server (server handles chunking)
49+
await client.send_audio(audio_data)
50+
51+
# Load video frames and convert to RGB
52+
frame_count = 0
53+
cap = cv2.VideoCapture(video_path)
54+
while True:
55+
ret, frame = cap.read()
56+
if not ret:
57+
break
58+
frame_count += 1
59+
60+
# Convert from BGR (OpenCV default) to RGB
61+
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
62+
print(rgb_frame.shape)
63+
await client.send_video_frame(rgb_frame)
64+
cap.release()
65+
66+
# Receive lipsynced output
67+
out = cv2.VideoWriter(
68+
output_path,
69+
cv2.VideoWriter_fourcc(*"mp4v"),
70+
client._video_fps,
71+
(rgb_frame.shape[1], rgb_frame.shape[0]),
72+
)
73+
for i in range(frame_count):
74+
try:
75+
76+
video_frame, audio_frame = await client.get_synced_output(timeout=1.0)
77+
bgr_frame = cv2.cvtColor(video_frame, cv2.COLOR_RGB2BGR)
78+
out.write(bgr_frame)
79+
except asyncio.TimeoutError:
80+
print(f"Warning: Timeout at frame {i}")
81+
break
82+
out.release()
83+
84+
finally:
85+
await client.disconnect()
86+
print("Disconnected from server")
87+
88+
89+
async def main():
90+
"""Main entry point."""
91+
if len(sys.argv) != 4:
92+
print("Usage: python lipsync_file.py <video_file> <wav_audio_file> <output_file>")
93+
print("Example: python lipsync_file.py input.mp4 speech.wav output_lipsynced.mp4")
94+
sys.exit(1)
95+
96+
video_path = sys.argv[1]
97+
audio_path = sys.argv[2]
98+
output_path = sys.argv[3]
99+
100+
# Check input files exist
101+
if not Path(video_path).exists():
102+
print(f"Error: Video file not found: {video_path}")
103+
sys.exit(1)
104+
105+
if not Path(audio_path).exists():
106+
print(f"Error: Audio file not found: {audio_path}")
107+
sys.exit(1)
108+
109+
# Process the files
110+
await process_lipsync(video_path, audio_path, output_path)
111+
112+
113+
if __name__ == "__main__":
114+
asyncio.run(main())

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ dependencies = [
3535
"aiohttp>=3.9.0",
3636
"aiofiles>=23.0.0",
3737
"pydantic>=2.0.0",
38+
"websockets>=15.0.1",
39+
"numpy>=2.0.2",
40+
"opencv-python>=4.11.0.86",
3841
]
3942

4043
[project.optional-dependencies]

0 commit comments

Comments
 (0)