Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ lip_sync = { version = "0.1", git = "https://github.com/L-jasmine/lip_sync.git"
rand = "0.9.0"
uuid = { version = "1.14", features = [
"v4", # Lets you generate random UUIDs
"v5", # Lets you generate namespace-based UUIDs
"fast-rng",
] }
bytes = "1.10.0"
bytes = "1.11.0"
aho-corasick = "1.1.3"
lazy-regex = "3.4.2"

Expand Down
11 changes: 11 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,12 @@ pub struct RecordConfig {
pub callback_url: Option<String>,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct EchokitCC {
pub url: String,
// pub output_optimization: TTSTextOptimizationConfig,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(untagged)]
pub enum AIConfig {
Expand All @@ -361,6 +367,11 @@ pub enum AIConfig {
tts: TTSConfig,
asr: ASRConfig,
},
Claude {
claude: EchokitCC,
asr: ASRConfig,
tts: TTSConfig,
},
GeminiAndTTS {
gemini: GeminiConfig,
tts: TTSConfig,
Expand Down
23 changes: 22 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use axum::{
Router,
Expand Down Expand Up @@ -163,6 +163,27 @@ async fn routes(
}
});
}
config::AIConfig::Claude { claude, asr, tts } => {
let session = Arc::new(RwLock::new(Default::default()));
let session_ = session.clone();

tokio::spawn(async move {
if let Err(e) = crate::services::ws::stable::claude::run_session_manager(
&tts, &asr, &claude, rx, session,
)
.await
{
log::error!("Claude session manager exited with error: {}", e);
}
});

router = router
.route(
"/proxy/state/{id}",
get(services::ws::stable::claude::has_notification),
)
.layer(axum::Extension(session_));
}
}

router = router
Expand Down
3 changes: 3 additions & 0 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ pub enum ServerEvent {

ASR { text: String },
Action { action: String },
Choices { message: String, items: Vec<String> },
StartAudio { text: String },
AudioChunk { data: Vec<u8> },
DisplayText { text: String },
AudioChunkWithVowel { data: Vec<u8>, vowel: u8 },
EndAudio,
StartVideo,
Expand Down Expand Up @@ -47,6 +49,7 @@ pub enum ClientCommand {
StartChat,
Submit,
Text { input: String },
Select { index: usize },
}

#[test]
Expand Down
48 changes: 46 additions & 2 deletions src/services/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ pub enum WsCommand {
Video(Vec<Vec<u8>>),
EndResponse,
EndVad,
Choices(String, Vec<String>),
DisplayText(String),
Close,
}
type WsTx = tokio::sync::mpsc::UnboundedSender<WsCommand>;
type WsRx = tokio::sync::mpsc::UnboundedReceiver<WsCommand>;
Expand Down Expand Up @@ -113,6 +116,7 @@ pub enum ClientMsg {
AudioChunk(Bytes),
Submit,
Text(String),
Select(usize),
}

pub struct ConnectConfig {
Expand Down Expand Up @@ -151,6 +155,11 @@ async fn process_socket_io(

match r {
Some(WsEvent::Command(cmd)) => {
if matches!(cmd, WsCommand::Close) {
log::info!("Received Close command, closing websocket");
return Ok(());
}

if config.enable_opus {
process_command_with_opus(
socket,
Expand Down Expand Up @@ -184,6 +193,12 @@ async fn process_socket_io(
.send(ClientMsg::Text(input))
.await
.map_err(|_| anyhow::anyhow!("audio_tx closed"))?,
ProcessMessageResult::Select(index) => {
audio_tx
.send(ClientMsg::Select(index))
.await
.map_err(|_| anyhow::anyhow!("audio_tx closed"))?;
}
ProcessMessageResult::Skip => {}
ProcessMessageResult::StartChat => {
audio_tx
Expand Down Expand Up @@ -285,6 +300,18 @@ async fn process_command(ws: &mut WebSocket, cmd: WsCommand) -> anyhow::Result<(
ws.send(Message::binary(audio_chunk)).await?;
}
}
WsCommand::Choices(message, items) => {
let choices =
rmp_serde::to_vec(&crate::protocol::ServerEvent::Choices { message, items })
.expect("Failed to serialize Choices ServerEvent");
ws.send(Message::binary(choices)).await?;
}
WsCommand::DisplayText(text) => {
let display_text =
rmp_serde::to_vec(&crate::protocol::ServerEvent::DisplayText { text })
.expect("Failed to serialize DisplayText ServerEvent");
ws.send(Message::binary(display_text)).await?;
}
WsCommand::EndAudio => {
log::trace!("EndAudio");
let end_audio = rmp_serde::to_vec(&crate::protocol::ServerEvent::EndAudio)
Expand All @@ -306,6 +333,7 @@ async fn process_command(ws: &mut WebSocket, cmd: WsCommand) -> anyhow::Result<(
.expect("Failed to serialize EndVad ServerEvent");
ws.send(Message::binary(end_vad)).await?;
}
WsCommand::Close => {}
}
Ok(())
}
Expand Down Expand Up @@ -341,12 +369,23 @@ async fn process_command_with_opus(
.expect("Failed to serialize ASR ServerEvent");
ws.send(Message::binary(asr)).await?;
}

WsCommand::Action { action } => {
let action = rmp_serde::to_vec(&crate::protocol::ServerEvent::Action { action })
.expect("Failed to serialize Action ServerEvent");
ws.send(Message::binary(action)).await?;
}
WsCommand::Choices(message, items) => {
let choices =
rmp_serde::to_vec(&crate::protocol::ServerEvent::Choices { message, items })
.expect("Failed to serialize Choices ServerEvent");
ws.send(Message::binary(choices)).await?;
}
WsCommand::DisplayText(text) => {
let display_text =
rmp_serde::to_vec(&crate::protocol::ServerEvent::DisplayText { text })
.expect("Failed to serialize DisplayText ServerEvent");
ws.send(Message::binary(display_text)).await?;
}
WsCommand::StartAudio(text) => {
log::trace!("StartAudio: {text:?}");
opus_encode
Expand Down Expand Up @@ -453,6 +492,7 @@ async fn process_command_with_opus(
.expect("Failed to serialize EndVad ServerEvent");
ws.send(Message::binary(end_vad)).await?;
}
WsCommand::Close => {}
}
Ok(())
}
Expand All @@ -461,6 +501,7 @@ enum ProcessMessageResult {
Audio(Bytes),
Submit,
Text(String),
Select(usize),
StartChat,
Close,
Skip,
Expand All @@ -478,13 +519,16 @@ fn process_message(msg: Message) -> ProcessMessageResult {
crate::protocol::ClientCommand::Text { input } => {
ProcessMessageResult::Text(input)
}
crate::protocol::ClientCommand::Select { index } => {
ProcessMessageResult::Select(index)
}
}
} else {
ProcessMessageResult::Skip
}
}
Message::Binary(d) => {
log::debug!("Received binary message of size: {}", d.len());
log::trace!("Received binary message of size: {}", d.len());
ProcessMessageResult::Audio(d)
}
Message::Close(c) => {
Expand Down
4 changes: 4 additions & 0 deletions src/services/ws/stable/asr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ impl WhisperASRSession {
vad_started |= self.vad_session.detect(&audio_chunk)?;
}
}
ClientMsg::Select(..) => {}
}
}
}
Expand Down Expand Up @@ -205,6 +206,7 @@ impl WhisperASRSession {
log::warn!("`{id}` received a Unexpected Submit during Stream ASR");
return Err(anyhow::anyhow!("Unexpected Submit during Stream ASR"));
}
ClientMsg::Select(..) => {}
}
}

Expand Down Expand Up @@ -384,6 +386,7 @@ impl ParaformerASRSession {

continue;
}
ClientMsg::Select(..) => {}
}
}

Expand Down Expand Up @@ -518,6 +521,7 @@ impl ParaformerASRSession {
}
start_submit = true;
}
ClientMsg::Select(..) => {}
}
} else {
log::warn!("`{}` client rx channel closed unexpectedly", session.id);
Expand Down
Loading