From a94c1f4c83cc1e32f8f3d19e1a21a5c399cf71ac Mon Sep 17 00:00:00 2001 From: Hexin Date: Thu, 6 Mar 2025 04:55:26 +0900 Subject: [PATCH 01/11] Implement restarter subcommand and drop restarter binary --- Cargo.lock | 1 + Cargo.toml | 7 +- {restarter => crates/restarter}/Cargo.toml | 5 +- crates/restarter/src/client.rs | 52 ++++++++ crates/restarter/src/event_handler.rs | 54 ++++++++ crates/restarter/src/event_handler/ready.rs | 10 ++ .../src/event_handler/voice_state_update.rs | 87 +++++++++++++ crates/restarter/src/lib.rs | 5 + crates/restarter/src/restarter.rs | 58 +++++++++ restarter/src/event_handler.rs | 121 ------------------ restarter/src/main.rs | 82 ------------ seitai/Cargo.toml | 5 +- seitai/src/cli.rs | 29 +++-- 13 files changed, 301 insertions(+), 215 deletions(-) rename {restarter => crates/restarter}/Cargo.toml (93%) create mode 100644 crates/restarter/src/client.rs create mode 100644 crates/restarter/src/event_handler.rs create mode 100644 crates/restarter/src/event_handler/ready.rs create mode 100644 crates/restarter/src/event_handler/voice_state_update.rs create mode 100644 crates/restarter/src/lib.rs create mode 100644 crates/restarter/src/restarter.rs delete mode 100644 restarter/src/event_handler.rs delete mode 100644 restarter/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 28413c71..c81d1035 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2890,6 +2890,7 @@ dependencies = [ "mockall", "ordered-float 5.0.0", "regex-lite", + "restarter", "serde", "serde_json", "serenity", diff --git a/Cargo.toml b/Cargo.toml index 1d861ab9..80350728 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,11 +1,16 @@ [workspace] -members = ["crates/database", "crates/logging", "crates/soundboard", "crates/voicevox", "restarter", "seitai"] +members = ["crates/database", "crates/logging", "crates/restarter", "crates/soundboard", "crates/voicevox", "seitai"] default-members = ["seitai"] resolver = "3" [workspace.dependencies.anyhow] version = "1.0.96" +[workspace.dependencies.futures] +version = "0.3.31" +default-features =false +features = ["std"] + [workspace.dependencies.http-body-util] version = "0.1.2" diff --git a/restarter/Cargo.toml b/crates/restarter/Cargo.toml similarity index 93% rename from restarter/Cargo.toml rename to crates/restarter/Cargo.toml index 78b44317..3b1b0382 100644 --- a/restarter/Cargo.toml +++ b/crates/restarter/Cargo.toml @@ -3,11 +3,14 @@ name = "restarter" version = "0.1.7" edition = "2024" +[lib] +doctest = false + [dependencies.anyhow] workspace = true [dependencies.futures] -version = "0.3.31" +workspace = true [dependencies.k8s-openapi] version = "0.24.0" diff --git a/crates/restarter/src/client.rs b/crates/restarter/src/client.rs new file mode 100644 index 00000000..b3e354a2 --- /dev/null +++ b/crates/restarter/src/client.rs @@ -0,0 +1,52 @@ +use std::{collections::HashMap, sync::Arc}; + +use anyhow::Result; +use futures::lock::Mutex; +use serenity::{all::GatewayIntents, Client as SerenityClient}; +use tokio::{signal::unix::{self, SignalKind}, sync::Notify, task::JoinHandle}; + +use crate::{event_handler::Handler, restarter::Restarter}; + +pub struct Client; + +impl Client { + #[tracing::instrument(skip_all)] + pub async fn start(token: String) -> Result<()> { + enable_graceful_shutdown(); + + let intents = GatewayIntents::GUILD_VOICE_STATES; + let mut client = SerenityClient::builder(token, intents) + .event_handler(Handler { + connected_channels: Arc::new(Mutex::new(HashMap::new())), + abort_controller: Arc::new(Notify::default()), + restarter: Restarter::new(), + }) + .await?; + + tracing::debug!("seitai client starts"); + if let Err(err) = client.start().await { + tracing::error!("failed to start client\nError: {err:?}"); + return Err(err.into()); + } + + Ok(()) + } +} + +fn enable_graceful_shutdown() -> JoinHandle> { + tokio::spawn(async move { + let mut sigint = unix::signal(SignalKind::interrupt())?; + let mut sigterm = unix::signal(SignalKind::terminate())?; + + tokio::select! { + _ = sigint.recv() => { + tracing::info!("received SIGINT, shutting down"); + std::process::exit(130); + }, + _ = sigterm.recv() => { + tracing::info!("received SIGTERM, shutting down"); + std::process::exit(143); + }, + } + }) +} diff --git a/crates/restarter/src/event_handler.rs b/crates/restarter/src/event_handler.rs new file mode 100644 index 00000000..76760c38 --- /dev/null +++ b/crates/restarter/src/event_handler.rs @@ -0,0 +1,54 @@ +use std::{collections::HashMap, pin::Pin, sync::Arc}; + +use futures::{lock::Mutex, FutureExt}; +use serenity::{ + all::{ChannelId, GuildId, VoiceState}, + client::{Context, EventHandler}, + model::gateway::Ready, +}; +use tokio::sync::Notify; +use tracing::instrument; + +use crate::{event_handler, restarter::Restarter}; + +mod ready; +mod voice_state_update; + +pub struct Handler { + pub connected_channels: Arc>>, + pub abort_controller: Arc, + pub restarter: Restarter, +} + +impl EventHandler for Handler { + #[instrument(skip_all)] + fn ready<'s, 'async_trait>(&'s self, ctx: Context, ready: Ready) -> Pin + Send + 'async_trait)>> + where + Self: 'async_trait, + 's: 'async_trait, + { + async move { + if let Err(err) = event_handler::ready::handle(self, ctx, ready).await { + tracing::error!("failed to handle ready event\nError: {err:?}"); + } + }.boxed() + } + + #[instrument(skip_all)] + fn voice_state_update<'s, 'async_trait>( + &'s self, + context: Context, + old_state: Option, + new_state: VoiceState, + ) -> Pin + Send + 'async_trait)>> + where + Self: 'async_trait, + 's: 'async_trait, + { + async move { + if let Err(err) = event_handler::voice_state_update::handle(self, context, old_state, new_state).await { + tracing::error!("failed to handle voice state update event\nError: {err:?}"); + } + }.boxed() + } +} diff --git a/crates/restarter/src/event_handler/ready.rs b/crates/restarter/src/event_handler/ready.rs new file mode 100644 index 00000000..f887f4c1 --- /dev/null +++ b/crates/restarter/src/event_handler/ready.rs @@ -0,0 +1,10 @@ +use anyhow::Result; +use serenity::all::{Context, Ready}; + +use super::Handler; + +pub(crate) async fn handle(_handler: &Handler, _ctx: Context, ready: Ready) -> Result<()> { + tracing::info!("{} is ready", ready.user.name); + + Ok(()) +} diff --git a/crates/restarter/src/event_handler/voice_state_update.rs b/crates/restarter/src/event_handler/voice_state_update.rs new file mode 100644 index 00000000..ef8d29ba --- /dev/null +++ b/crates/restarter/src/event_handler/voice_state_update.rs @@ -0,0 +1,87 @@ +use std::time::Duration; + +use anyhow::Result; +use serenity::all::{ChannelId, Context, UserId, VoiceState}; + +use super::Handler; + +pub(crate) async fn handle(handler: &Handler, ctx: Context, old_state: Option, new_state: VoiceState) -> Result<()> { + let Some(guild_id) = new_state.guild_id else { + return Ok(()); + }; + + let bot_id = ctx.http.get_current_user().await?.id; + let action = VoiceStateAction::new(old_state, new_state); + + if !action.is_bot_action(bot_id) { + return Ok(()); + } + + match action.connection() { + VoiceStateConnection::Joined(channel_id) => { + handler + .connected_channels + .lock() + .await + .insert(guild_id, channel_id); + + dbg!(&handler.connected_channels.lock().await); + handler.restarter.abort(); + }, + VoiceStateConnection::Left => { + let mut connected_channels = handler.connected_channels.lock().await; + connected_channels.remove(&guild_id); + + if connected_channels.is_empty() { + handler.restarter.wait(Duration::from_secs(300)); + } + }, + VoiceStateConnection::NoAction => (), + } + + Ok(()) +} + +#[derive(Debug)] +struct VoiceStateAction { + old_state: Option, + new_state: VoiceState, +} + +#[allow(dead_code)] +enum VoiceStateConnection { + // including moved + Joined(ChannelId), + Left, + NoAction, +} + +impl VoiceStateAction { + fn new(old_state: Option, new_state: VoiceState) -> Self { + Self { + old_state, + new_state, + } + } + + fn connection(&self) -> VoiceStateConnection { + if self.old_state.is_some() { + return VoiceStateConnection::NoAction; + } + + match self.new_state.channel_id { + Some(channel_id) => { + tracing::debug!("joined voice channel {channel_id}"); + VoiceStateConnection::Joined(channel_id) + }, + None => { + tracing::debug!("left voice channel"); + VoiceStateConnection::Left + }, + } + } + + fn is_bot_action(&self, bot_id: UserId) -> bool { + self.new_state.user_id == bot_id + } +} diff --git a/crates/restarter/src/lib.rs b/crates/restarter/src/lib.rs new file mode 100644 index 00000000..c788e1b7 --- /dev/null +++ b/crates/restarter/src/lib.rs @@ -0,0 +1,5 @@ +pub use client::Client; + +pub mod event_handler; +pub mod client; +pub mod restarter; diff --git a/crates/restarter/src/restarter.rs b/crates/restarter/src/restarter.rs new file mode 100644 index 00000000..1cb3bbe6 --- /dev/null +++ b/crates/restarter/src/restarter.rs @@ -0,0 +1,58 @@ +use std::time::Duration; + +use anyhow::Result; +use k8s_openapi::api::apps::v1::StatefulSet; +use kube::{Api, Client, Config}; +use tokio::sync::Notify; +use tracing::Instrument; + +#[derive(Debug, Clone)] +pub struct Restarter; + +impl Restarter { + pub(crate) fn new() -> Self { + Self + } + + #[tracing::instrument] + pub(crate) async fn restart() -> Result<()> { + let config = match Config::incluster() { + Ok(config) => config, + Err(_) => { + tracing::warn!("This app doesn't running in Kubernetes, so it didn't restart voicevox."); + return Ok(()) + }, + }; + let client = Client::try_from(config)?; + let stateful_sets: Api = Api::default_namespaced(client); + + stateful_sets.restart("voicevox").await?; + + tracing::info!("succeeded in restarting statefulsets/voicevox"); + + Ok(()) + } + + #[tracing::instrument(skip_all)] + pub(crate) fn wait(&self, duration: Duration) { + tokio::spawn(async move { + // Resets notify waiters because notified() is immediately received notify by notify_one() called before starting waiting. + let notify = Notify::new(); + + tokio::select! { + _ = tokio::time::sleep(duration) => { + if let Err(err) = Self::restart().await { + tracing::error!("failed to restart statefulsets/voicevox\nError: {err:?}"); + } + }, + _ = notify.notified() => { + tracing::info!("canceled waiting for restarting statefulsets/voicevox"); + }, + } + }.in_current_span()); + } + + pub(crate) fn abort(&self) { + Notify::new().notify_one(); + } +} diff --git a/restarter/src/event_handler.rs b/restarter/src/event_handler.rs deleted file mode 100644 index c3d2489f..00000000 --- a/restarter/src/event_handler.rs +++ /dev/null @@ -1,121 +0,0 @@ -use std::{pin::Pin, sync::Arc}; - -use anyhow::{bail, Result}; -use futures::lock::Mutex; -use k8s_openapi::api::apps::v1::StatefulSet; -use kube::{Api, Client, Config}; -use serenity::{ - all::VoiceState, - client::{Context, EventHandler}, - model::gateway::Ready, -}; -use tokio::sync::Notify; -use tracing::instrument; - -use crate::Data; - -pub struct Handler; - -impl EventHandler for Handler { - #[instrument(skip(self, context))] - fn ready<'s, 'async_trait>( - &'s self, - context: Context, - ready: Ready, - ) -> Pin + Send + 'async_trait)>> - where - Self: 'async_trait, - 's: 'async_trait, - { - tracing::info!("{} is ready", ready.user.name); - - Box::pin(async move { - let Some(data) = get_data(&context).await else { - tracing::error!("failed to get data"); - return; - }; - data.lock().await.bot_id = ready.user.id; - }) - } - - fn voice_state_update<'s, 'async_trait>( - &'s self, - context: Context, - _old: Option, - new: VoiceState, - ) -> Pin + Send + 'async_trait)>> - where - Self: 'async_trait, - 's: 'async_trait, - { - Box::pin(async move { - let Some(data) = get_data(&context).await else { - tracing::error!("failed to get data"); - return; - }; - let mut data = data.lock().await; - let bot_id = data.bot_id; - - if new.user_id == bot_id { - // When bot joined a voice channel - if let (Some(channel_id), Some(guild_id)) = (new.channel_id, new.guild_id) { - data.connected_channels.insert(guild_id, channel_id); - data.cancellation.notify_one(); - // When bot left a voice channel - } else if let (None, Some(guild_id)) = (new.channel_id, new.guild_id) { - data.connected_channels.remove(&guild_id); - if data.connected_channels.is_empty() { - wait_restart(&context).await; - } - }; - } - }) - } -} - -async fn get_data(context: &Context) -> Option>> { - let data = context.data.read().await; - data.get::().cloned() -} - -async fn wait_restart(context: &Context) { - let Some(data) = get_data(context).await else { - tracing::error!("failed to get data"); - return; - }; - - tokio::spawn(async move { - let cancellation = { - let mut data = data.lock().await; - data.cancellation = Arc::new(Notify::new()); - data.cancellation.clone() - }; - - tokio::select! { - _ = tokio::time::sleep(tokio::time::Duration::from_secs(300)) => { - if let Err(error) = restart().await { - tracing::error!("failed to restart statefulsets/voicevox\nError: {error:?}"); - return; - } - tracing::info!("succeeded in restarting statefulsets/voicevox"); - }, - _ = cancellation.notified() => { - tracing::info!("canceled restarting statefulsets/voicevox"); - }, - } - }); -} - -async fn restart() -> Result<()> { - let config = match Config::incluster() { - Ok(config) => config, - Err(_) => { - bail!("this app is not running in cluster of Kubernetes"); - }, - }; - let client = Client::try_from(config)?; - let stateful_sets: Api = Api::default_namespaced(client); - stateful_sets.restart("voicevox").await?; - - Ok(()) -} diff --git a/restarter/src/main.rs b/restarter/src/main.rs deleted file mode 100644 index 25670ef0..00000000 --- a/restarter/src/main.rs +++ /dev/null @@ -1,82 +0,0 @@ -use std::{collections::HashMap, env, process::exit, sync::Arc}; - -use futures::lock::Mutex; -use logging::initialize_logging; -use serenity::{ - all::{ChannelId, GuildId, UserId}, - client::Client, - model::gateway::GatewayIntents, - prelude::TypeMapKey, -}; -use tokio::{ - signal::unix::{signal, SignalKind}, - sync::Notify, -}; - -mod event_handler; - -struct Data { - bot_id: UserId, - connected_channels: HashMap, - cancellation: Arc, -} - -impl TypeMapKey for Data { - type Value = Arc>; -} - -#[tokio::main] -async fn main() { - initialize_logging(); - - let token = match env::var("DISCORD_TOKEN") { - Ok(token) => token, - Err(error) => { - tracing::error!("failed to fetch environment variable DISCORD_TOKEN\nError: {error:?}"); - exit(1); - }, - }; - - let intents = GatewayIntents::non_privileged() | GatewayIntents::MESSAGE_CONTENT; - let mut client = match Client::builder(token, intents) - .event_handler(event_handler::Handler) - .await - { - Ok(client) => client, - Err(error) => { - tracing::error!("failed to build serenity client\nError: {error:?}"); - exit(1); - }, - }; - - { - let mut data = client.data.write().await; - - data.insert::(Arc::new(Mutex::new(Data { - bot_id: UserId::default(), - connected_channels: HashMap::new(), - cancellation: Arc::new(Notify::default()), - }))); - } - - tokio::spawn(async move { - if let Err(error) = client.start().await { - tracing::error!("failed to start client\nError: {error:?}"); - exit(1); - } - }); - - let mut sigint = signal(SignalKind::interrupt()).unwrap(); - let mut sigterm = signal(SignalKind::terminate()).unwrap(); - - tokio::select! { - _ = sigint.recv() => { - tracing::info!("received SIGINT, shutting down"); - exit(130); - }, - _ = sigterm.recv() => { - tracing::info!("received SIGTERM, shutting down"); - exit(143); - }, - } -} diff --git a/seitai/Cargo.toml b/seitai/Cargo.toml index afbfb27b..32c4a01d 100644 --- a/seitai/Cargo.toml +++ b/seitai/Cargo.toml @@ -14,7 +14,7 @@ features = ["derive"] path = "../crates/database" [dependencies.futures] -version = "0.3.31" +workspace = true [dependencies.hashbrown] version = "0.15.2" @@ -45,6 +45,9 @@ version = "5.0.0" [dependencies.regex-lite] version = "0.1.6" +[dependencies.restarter] +path = "../crates/restarter" + [dependencies.serde] workspace = true diff --git a/seitai/src/cli.rs b/seitai/src/cli.rs index 08761a2b..ff0404ea 100644 --- a/seitai/src/cli.rs +++ b/seitai/src/cli.rs @@ -1,4 +1,4 @@ -use std::process; +use std::{env, process::exit}; use anyhow::Result; use clap::{error::ErrorKind, Parser}; @@ -18,7 +18,8 @@ pub struct Cli { #[derive(clap::Subcommand)] enum Subcommand { - Migration(MigrationCommand) + Migration(MigrationCommand), + Restarter, } impl Application { @@ -35,13 +36,23 @@ impl Application { }, }; - let migrator = Migrator::new(); - let pgpool = set_up_database().await?; - - #[allow(irrefutable_let_patterns)] - if let Subcommand::Migration(migration) = cli.subcommand { - migration.run(&mut *pgpool.acquire().await?, migrator.into_boxed_inner()).await?; - process::exit(0); + match cli.subcommand { + Subcommand::Migration(migration) => { + let migrator = Migrator::new(); + let pgpool = set_up_database().await?; + migration.run(&mut *pgpool.acquire().await?, migrator.into_boxed_inner()).await?; + }, + Subcommand::Restarter => { + let token = match env::var("DISCORD_TOKEN") { + Ok(token) => token, + Err(error) => { + tracing::error!("failed to fetch environment variable DISCORD_TOKEN\nError: {error:?}"); + exit(1); + }, + }; + + restarter::Client::start(token).await?; + }, } Ok(()) From 1e72edf7662dfc9044cedae3dc38ae9b799bbf4b Mon Sep 17 00:00:00 2001 From: Hexin Date: Thu, 6 Mar 2025 04:57:48 +0900 Subject: [PATCH 02/11] Remove building restarter binary from docker --- Dockerfile | 15 --------------- compose.yaml | 8 +------- docker-bake.hcl | 10 +--------- 3 files changed, 2 insertions(+), 31 deletions(-) diff --git a/Dockerfile b/Dockerfile index 95ec207b..2eb50b2e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,28 +13,13 @@ FROM runtime AS development FROM runtime AS builder RUN --mount=type=bind,source=crates,target=crates \ - --mount=type=bind,source=restarter,target=restarter \ --mount=type=bind,source=seitai,target=seitai \ --mount=type=bind,source=Cargo.toml,target=Cargo.toml \ --mount=type=bind,source=Cargo.lock,target=Cargo.lock \ --mount=type=cache,target=/usr/src/myapp/target \ cargo build --release --workspace \ - && cp target/release/restarter /restarter \ && cp target/release/seitai /seitai -FROM scratch AS restarter -LABEL io.github.hexium310.seitai.app=restarter -LABEL org.opencontainers.image.source=https://github.com/hexium310/seitai -COPY --from=runtime /etc/ssl/certs/ /etc/ssl/certs/ -COPY --from=runtime /lib/x86_64-linux-gnu/libc.so* /lib/x86_64-linux-gnu/ -COPY --from=runtime /lib/x86_64-linux-gnu/libcrypto.so* /lib/x86_64-linux-gnu/ -COPY --from=runtime /lib/x86_64-linux-gnu/libgcc_s.so* /lib/x86_64-linux-gnu/ -COPY --from=runtime /lib/x86_64-linux-gnu/libm.so* /lib/x86_64-linux-gnu/ -COPY --from=runtime /lib/x86_64-linux-gnu/libssl.so* /lib/x86_64-linux-gnu/ -COPY --from=runtime /lib64/ld-linux-x86-64.so* /lib64/ -COPY --from=builder /restarter / -CMD ["/restarter"] - FROM scratch AS seitai LABEL io.github.hexium310.seitai.app=seitai LABEL org.opencontainers.image.source=https://github.com/hexium310/seitai diff --git a/compose.yaml b/compose.yaml index a70829dd..89ae43f5 100644 --- a/compose.yaml +++ b/compose.yaml @@ -22,9 +22,6 @@ services: - type: bind source: seitai target: /usr/src/myapp/seitai - - type: bind - source: restarter - target: /usr/src/myapp/restarter command: /bin/sh -c 'cargo run -p seitai' environment: DISCORD_TOKEN: @@ -53,10 +50,7 @@ services: - type: bind source: seitai target: /usr/src/myapp/seitai - - type: bind - source: restarter - target: /usr/src/myapp/restarter - command: /bin/sh -c 'cargo run -p restarter' + command: /bin/sh -c 'cargo run -- restarter' environment: DISCORD_TOKEN: diff --git a/docker-bake.hcl b/docker-bake.hcl index 88fb6260..0746266a 100644 --- a/docker-bake.hcl +++ b/docker-bake.hcl @@ -1,19 +1,11 @@ group "default" { - targets = ["restarter", "seitai"] + targets = ["seitai"] } variable "VERSION" { default = "latest" } -target "restarter" { - target = "restarter" - tags = [ - "ghcr.io/hexium310/restarter:latest", - "ghcr.io/hexium310/restarter:${VERSION}", - ] -} - target "seitai" { target = "seitai" tags = [ From a45ca6815e744b88c41ec5db991cacab00c0c07c Mon Sep 17 00:00:00 2001 From: Hexin Date: Thu, 6 Mar 2025 07:00:04 +0900 Subject: [PATCH 03/11] Move voice state utility to new crate --- Cargo.lock | 9 ++++ Cargo.toml | 16 ++++++- crates/restarter/Cargo.toml | 3 ++ crates/restarter/src/client.rs | 3 +- crates/restarter/src/event_handler.rs | 8 ++-- .../src/event_handler/voice_state_update.rs | 47 +------------------ crates/restarter/src/lib.rs | 6 +-- crates/restarter/src/restarter.rs | 2 +- crates/serenity-utils/Cargo.toml | 10 ++++ crates/serenity-utils/src/lib.rs | 1 + crates/serenity-utils/src/voice_state.rs | 44 +++++++++++++++++ seitai/Cargo.toml | 2 +- 12 files changed, 93 insertions(+), 58 deletions(-) create mode 100644 crates/serenity-utils/Cargo.toml create mode 100644 crates/serenity-utils/src/lib.rs create mode 100644 crates/serenity-utils/src/voice_state.rs diff --git a/Cargo.lock b/Cargo.lock index c81d1035..fc89af14 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2626,6 +2626,7 @@ dependencies = [ "kube", "logging", "serenity", + "serenity_utils", "tokio", "tracing", ] @@ -3084,6 +3085,14 @@ dependencies = [ "serde_repr", ] +[[package]] +name = "serenity_utils" +version = "0.1.0" +dependencies = [ + "serenity", + "tracing", +] + [[package]] name = "sha1" version = "0.10.6" diff --git a/Cargo.toml b/Cargo.toml index 80350728..af21a42f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,13 @@ [workspace] -members = ["crates/database", "crates/logging", "crates/restarter", "crates/soundboard", "crates/voicevox", "seitai"] +members = [ + "crates/database", + "crates/logging", + "crates/restarter", + "crates/serenity-utils", + "crates/soundboard", + "crates/voicevox", + "seitai", +] default-members = ["seitai"] resolver = "3" @@ -25,6 +33,9 @@ features = ["client", "client-legacy", "http1", "tokio"] [workspace.dependencies.logging] path = "crates/logging" +[workspace.dependencies.restarter] +path = "crates/restarter" + [workspace.dependencies.serde] version = "1.0.218" features = ["derive"] @@ -36,6 +47,9 @@ version = "1.0.139" version = "0.12.4" default-features = false +[workspace.dependencies.serenity_utils] +path = "crates/serenity-utils" + [workspace.dependencies.tokio] version = "1.43.0" features = ["macros", "net", "rt-multi-thread", "signal"] diff --git a/crates/restarter/Cargo.toml b/crates/restarter/Cargo.toml index 3b1b0382..3f1acb3f 100644 --- a/crates/restarter/Cargo.toml +++ b/crates/restarter/Cargo.toml @@ -33,3 +33,6 @@ workspace = true [dependencies.serenity] workspace = true features = ["cache", "client", "gateway", "model", "native_tls_backend"] + +[dependencies.serenity_utils] +workspace = true diff --git a/crates/restarter/src/client.rs b/crates/restarter/src/client.rs index b3e354a2..21b0f7c6 100644 --- a/crates/restarter/src/client.rs +++ b/crates/restarter/src/client.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc}; use anyhow::Result; use futures::lock::Mutex; use serenity::{all::GatewayIntents, Client as SerenityClient}; -use tokio::{signal::unix::{self, SignalKind}, sync::Notify, task::JoinHandle}; +use tokio::{signal::unix::{self, SignalKind}, task::JoinHandle}; use crate::{event_handler::Handler, restarter::Restarter}; @@ -18,7 +18,6 @@ impl Client { let mut client = SerenityClient::builder(token, intents) .event_handler(Handler { connected_channels: Arc::new(Mutex::new(HashMap::new())), - abort_controller: Arc::new(Notify::default()), restarter: Restarter::new(), }) .await?; diff --git a/crates/restarter/src/event_handler.rs b/crates/restarter/src/event_handler.rs index 76760c38..00bb5f7b 100644 --- a/crates/restarter/src/event_handler.rs +++ b/crates/restarter/src/event_handler.rs @@ -6,7 +6,6 @@ use serenity::{ client::{Context, EventHandler}, model::gateway::Ready, }; -use tokio::sync::Notify; use tracing::instrument; use crate::{event_handler, restarter::Restarter}; @@ -14,10 +13,9 @@ use crate::{event_handler, restarter::Restarter}; mod ready; mod voice_state_update; -pub struct Handler { - pub connected_channels: Arc>>, - pub abort_controller: Arc, - pub restarter: Restarter, +pub(crate) struct Handler { + pub(crate) connected_channels: Arc>>, + pub(crate) restarter: Restarter, } impl EventHandler for Handler { diff --git a/crates/restarter/src/event_handler/voice_state_update.rs b/crates/restarter/src/event_handler/voice_state_update.rs index ef8d29ba..1945ad05 100644 --- a/crates/restarter/src/event_handler/voice_state_update.rs +++ b/crates/restarter/src/event_handler/voice_state_update.rs @@ -1,7 +1,8 @@ use std::time::Duration; use anyhow::Result; -use serenity::all::{ChannelId, Context, UserId, VoiceState}; +use serenity::all::{Context, VoiceState}; +use serenity_utils::voice_state::{VoiceStateAction, VoiceStateConnection}; use super::Handler; @@ -41,47 +42,3 @@ pub(crate) async fn handle(handler: &Handler, ctx: Context, old_state: Option, - new_state: VoiceState, -} - -#[allow(dead_code)] -enum VoiceStateConnection { - // including moved - Joined(ChannelId), - Left, - NoAction, -} - -impl VoiceStateAction { - fn new(old_state: Option, new_state: VoiceState) -> Self { - Self { - old_state, - new_state, - } - } - - fn connection(&self) -> VoiceStateConnection { - if self.old_state.is_some() { - return VoiceStateConnection::NoAction; - } - - match self.new_state.channel_id { - Some(channel_id) => { - tracing::debug!("joined voice channel {channel_id}"); - VoiceStateConnection::Joined(channel_id) - }, - None => { - tracing::debug!("left voice channel"); - VoiceStateConnection::Left - }, - } - } - - fn is_bot_action(&self, bot_id: UserId) -> bool { - self.new_state.user_id == bot_id - } -} diff --git a/crates/restarter/src/lib.rs b/crates/restarter/src/lib.rs index c788e1b7..d099f828 100644 --- a/crates/restarter/src/lib.rs +++ b/crates/restarter/src/lib.rs @@ -1,5 +1,5 @@ pub use client::Client; -pub mod event_handler; -pub mod client; -pub mod restarter; +mod event_handler; +mod client; +mod restarter; diff --git a/crates/restarter/src/restarter.rs b/crates/restarter/src/restarter.rs index 1cb3bbe6..3abe6762 100644 --- a/crates/restarter/src/restarter.rs +++ b/crates/restarter/src/restarter.rs @@ -7,7 +7,7 @@ use tokio::sync::Notify; use tracing::Instrument; #[derive(Debug, Clone)] -pub struct Restarter; +pub(crate) struct Restarter; impl Restarter { pub(crate) fn new() -> Self { diff --git a/crates/serenity-utils/Cargo.toml b/crates/serenity-utils/Cargo.toml new file mode 100644 index 00000000..8ec088f6 --- /dev/null +++ b/crates/serenity-utils/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "serenity_utils" +version = "0.1.0" +edition = "2024" + +[dependencies.serenity] +workspace = true + +[dependencies.tracing] +workspace = true diff --git a/crates/serenity-utils/src/lib.rs b/crates/serenity-utils/src/lib.rs new file mode 100644 index 00000000..cad7be11 --- /dev/null +++ b/crates/serenity-utils/src/lib.rs @@ -0,0 +1 @@ +pub mod voice_state; diff --git a/crates/serenity-utils/src/voice_state.rs b/crates/serenity-utils/src/voice_state.rs new file mode 100644 index 00000000..e9847e11 --- /dev/null +++ b/crates/serenity-utils/src/voice_state.rs @@ -0,0 +1,44 @@ +use serenity::all::{ChannelId, UserId, VoiceState}; + +#[derive(Debug)] +pub struct VoiceStateAction { + old_state: Option, + new_state: VoiceState, +} + +pub enum VoiceStateConnection { + // including moved + Joined(ChannelId), + Left, + NoAction, +} + +impl VoiceStateAction { + pub fn new(old_state: Option, new_state: VoiceState) -> Self { + Self { + old_state, + new_state, + } + } + + pub fn connection(&self) -> VoiceStateConnection { + if self.old_state.is_some() { + return VoiceStateConnection::NoAction; + } + + match self.new_state.channel_id { + Some(channel_id) => { + tracing::debug!("joined voice channel {channel_id}"); + VoiceStateConnection::Joined(channel_id) + }, + None => { + tracing::debug!("left voice channel"); + VoiceStateConnection::Left + }, + } + } + + pub fn is_bot_action(&self, bot_id: UserId) -> bool { + self.new_state.user_id == bot_id + } +} diff --git a/seitai/Cargo.toml b/seitai/Cargo.toml index 32c4a01d..0c8bfcd3 100644 --- a/seitai/Cargo.toml +++ b/seitai/Cargo.toml @@ -46,7 +46,7 @@ version = "5.0.0" version = "0.1.6" [dependencies.restarter] -path = "../crates/restarter" +workspace = true [dependencies.serde] workspace = true From 36f5037523984f48ff67e5be0cc9299dfe54d8e8 Mon Sep 17 00:00:00 2001 From: Hexin Date: Thu, 6 Mar 2025 07:09:16 +0900 Subject: [PATCH 04/11] Change manifest --- manifests/statefulset-seitai.yaml | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/manifests/statefulset-seitai.yaml b/manifests/statefulset-seitai.yaml index 1d817aa0..ece3b4b5 100644 --- a/manifests/statefulset-seitai.yaml +++ b/manifests/statefulset-seitai.yaml @@ -16,8 +16,9 @@ spec: - name: seitai-migration image: ghcr.io/hexium310/seitai imagePullPolicy: Always - args: + command: - /seitai + args: - migration - apply env: @@ -64,8 +65,12 @@ spec: name: seitai.seitai-database.credentials.postgresql.acid.zalan.do key: password - name: restarter - image: ghcr.io/hexium310/restarter + image: ghcr.io/hexium310/seitai imagePullPolicy: Always + command: + - /seitai + args: + - restarter env: - name: NO_COLOR value: '1' From d5c94767c53dab9b79d35c5b45bfe8d12366995a Mon Sep 17 00:00:00 2001 From: Hexin Date: Thu, 6 Mar 2025 07:42:42 +0900 Subject: [PATCH 05/11] Use clap for parsing environment variable --- seitai/Cargo.toml | 2 +- seitai/src/cli.rs | 35 ++++++------------------- seitai/src/cli/args.rs | 1 + seitai/src/cli/args/discord.rs | 7 +++++ seitai/src/cli/subcommands.rs | 11 ++++++++ seitai/src/cli/subcommands/migration.rs | 21 +++++++++++++++ seitai/src/cli/subcommands/restarter.rs | 20 ++++++++++++++ 7 files changed, 69 insertions(+), 28 deletions(-) create mode 100644 seitai/src/cli/args.rs create mode 100644 seitai/src/cli/args/discord.rs create mode 100644 seitai/src/cli/subcommands.rs create mode 100644 seitai/src/cli/subcommands/migration.rs create mode 100644 seitai/src/cli/subcommands/restarter.rs diff --git a/seitai/Cargo.toml b/seitai/Cargo.toml index 0c8bfcd3..a9000758 100644 --- a/seitai/Cargo.toml +++ b/seitai/Cargo.toml @@ -8,7 +8,7 @@ workspace = true [dependencies.clap] version = "4.5.31" -features = ["derive"] +features = ["derive", "env"] [dependencies.database] path = "../crates/database" diff --git a/seitai/src/cli.rs b/seitai/src/cli.rs index ff0404ea..697e82b2 100644 --- a/seitai/src/cli.rs +++ b/seitai/src/cli.rs @@ -1,10 +1,11 @@ -use std::{env, process::exit}; - use anyhow::Result; use clap::{error::ErrorKind, Parser}; -use database::migrations::{MigrationCommand, Migrator}; +use subcommands::Subcommand; + +use crate::start_bot; -use crate::{set_up_database, start_bot}; +mod args; +mod subcommands; pub struct Application; @@ -16,12 +17,6 @@ pub struct Cli { subcommand: Subcommand, } -#[derive(clap::Subcommand)] -enum Subcommand { - Migration(MigrationCommand), - Restarter, -} - impl Application { pub async fn start() -> Result<()> { let cli = match Cli::try_parse() { @@ -31,28 +26,14 @@ impl Application { return Ok(()); }, Err(help) => { - println!("{help}"); + help.print()?; return Ok(()); }, }; match cli.subcommand { - Subcommand::Migration(migration) => { - let migrator = Migrator::new(); - let pgpool = set_up_database().await?; - migration.run(&mut *pgpool.acquire().await?, migrator.into_boxed_inner()).await?; - }, - Subcommand::Restarter => { - let token = match env::var("DISCORD_TOKEN") { - Ok(token) => token, - Err(error) => { - tracing::error!("failed to fetch environment variable DISCORD_TOKEN\nError: {error:?}"); - exit(1); - }, - }; - - restarter::Client::start(token).await?; - }, + Subcommand::Migration(migration) => migration.run().await?, + Subcommand::Restarter(restarter) => restarter.run().await?, } Ok(()) diff --git a/seitai/src/cli/args.rs b/seitai/src/cli/args.rs new file mode 100644 index 00000000..3228a62a --- /dev/null +++ b/seitai/src/cli/args.rs @@ -0,0 +1 @@ +pub mod discord; diff --git a/seitai/src/cli/args/discord.rs b/seitai/src/cli/args/discord.rs new file mode 100644 index 00000000..b1e4fd96 --- /dev/null +++ b/seitai/src/cli/args/discord.rs @@ -0,0 +1,7 @@ +use clap::Parser; + +#[derive(Debug, Parser)] +pub struct DiscordConfig { + #[arg(long, env, hide_env_values = true)] + pub discord_token: String, +} diff --git a/seitai/src/cli/subcommands.rs b/seitai/src/cli/subcommands.rs new file mode 100644 index 00000000..66ad2a9c --- /dev/null +++ b/seitai/src/cli/subcommands.rs @@ -0,0 +1,11 @@ +use migration::Migration; +use restarter::Restarter; + +mod migration; +mod restarter; + +#[derive(clap::Subcommand)] +pub enum Subcommand { + Migration(Migration), + Restarter(Restarter), +} diff --git a/seitai/src/cli/subcommands/migration.rs b/seitai/src/cli/subcommands/migration.rs new file mode 100644 index 00000000..4d1b74c2 --- /dev/null +++ b/seitai/src/cli/subcommands/migration.rs @@ -0,0 +1,21 @@ +use anyhow::Result; +use clap::Parser; +use database::migrations::{MigrationCommand, Migrator}; + +use crate::set_up_database; + +#[derive(Debug, Parser)] +pub struct Migration { + #[command(flatten)] + pub command: MigrationCommand, +} + +impl Migration { + pub async fn run(&self) -> Result<()> { + let migrator = Migrator::new(); + let pgpool = set_up_database().await?; + self.command.run(&mut *pgpool.acquire().await?, migrator.into_boxed_inner()).await?; + + Ok(()) + } +} diff --git a/seitai/src/cli/subcommands/restarter.rs b/seitai/src/cli/subcommands/restarter.rs new file mode 100644 index 00000000..c7fadd24 --- /dev/null +++ b/seitai/src/cli/subcommands/restarter.rs @@ -0,0 +1,20 @@ +use anyhow::Result; +use clap::Parser; + +use crate::cli::args::discord::DiscordConfig; + +#[derive(Debug, Parser)] +pub struct Restarter { + #[command(flatten)] + pub discord_args: DiscordConfig, +} + +impl Restarter { + pub async fn run(&self) -> Result<()> { + let token = self.discord_args.discord_token.clone(); + + restarter::Client::start(token).await?; + + Ok(()) + } +} From 1d0fdbff9e93900b92d3ccc1fc6a041306fdcf79 Mon Sep 17 00:00:00 2001 From: Hexin Date: Thu, 6 Mar 2025 08:08:49 +0900 Subject: [PATCH 06/11] Make duration until clients restarts voicevox configurable --- crates/restarter/src/client.rs | 4 ++-- .../src/event_handler/voice_state_update.rs | 5 +---- crates/restarter/src/restarter.rs | 14 +++++++++----- seitai/src/cli/subcommands/restarter.rs | 6 +++++- 4 files changed, 17 insertions(+), 12 deletions(-) diff --git a/crates/restarter/src/client.rs b/crates/restarter/src/client.rs index 21b0f7c6..beac820d 100644 --- a/crates/restarter/src/client.rs +++ b/crates/restarter/src/client.rs @@ -11,14 +11,14 @@ pub struct Client; impl Client { #[tracing::instrument(skip_all)] - pub async fn start(token: String) -> Result<()> { + pub async fn start(token: String, restart_interval: u64) -> Result<()> { enable_graceful_shutdown(); let intents = GatewayIntents::GUILD_VOICE_STATES; let mut client = SerenityClient::builder(token, intents) .event_handler(Handler { connected_channels: Arc::new(Mutex::new(HashMap::new())), - restarter: Restarter::new(), + restarter: Restarter::new(restart_interval), }) .await?; diff --git a/crates/restarter/src/event_handler/voice_state_update.rs b/crates/restarter/src/event_handler/voice_state_update.rs index 1945ad05..c5e1c7b9 100644 --- a/crates/restarter/src/event_handler/voice_state_update.rs +++ b/crates/restarter/src/event_handler/voice_state_update.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use anyhow::Result; use serenity::all::{Context, VoiceState}; use serenity_utils::voice_state::{VoiceStateAction, VoiceStateConnection}; @@ -26,7 +24,6 @@ pub(crate) async fn handle(handler: &Handler, ctx: Context, old_state: Option { @@ -34,7 +31,7 @@ pub(crate) async fn handle(handler: &Handler, ctx: Context, old_state: Option (), diff --git a/crates/restarter/src/restarter.rs b/crates/restarter/src/restarter.rs index 3abe6762..9c24df15 100644 --- a/crates/restarter/src/restarter.rs +++ b/crates/restarter/src/restarter.rs @@ -7,11 +7,13 @@ use tokio::sync::Notify; use tracing::Instrument; #[derive(Debug, Clone)] -pub(crate) struct Restarter; +pub(crate) struct Restarter { + interval: u64, +} impl Restarter { - pub(crate) fn new() -> Self { - Self + pub(crate) fn new(interval: u64) -> Self { + Self { interval } } #[tracing::instrument] @@ -34,13 +36,15 @@ impl Restarter { } #[tracing::instrument(skip_all)] - pub(crate) fn wait(&self, duration: Duration) { + pub(crate) fn wait(&self) { + let interval = Duration::from_secs(self.interval); + tokio::spawn(async move { // Resets notify waiters because notified() is immediately received notify by notify_one() called before starting waiting. let notify = Notify::new(); tokio::select! { - _ = tokio::time::sleep(duration) => { + _ = tokio::time::sleep(interval) => { if let Err(err) = Self::restart().await { tracing::error!("failed to restart statefulsets/voicevox\nError: {err:?}"); } diff --git a/seitai/src/cli/subcommands/restarter.rs b/seitai/src/cli/subcommands/restarter.rs index c7fadd24..0290f564 100644 --- a/seitai/src/cli/subcommands/restarter.rs +++ b/seitai/src/cli/subcommands/restarter.rs @@ -4,16 +4,20 @@ use clap::Parser; use crate::cli::args::discord::DiscordConfig; #[derive(Debug, Parser)] +#[command(about = "Launches client to restart voicevox statefulset")] pub struct Restarter { #[command(flatten)] pub discord_args: DiscordConfig, + + #[arg(long, default_value_t = 300, help = "secs until this client restarts voicevox after leaving from voice channel")] + pub duration: u64, } impl Restarter { pub async fn run(&self) -> Result<()> { let token = self.discord_args.discord_token.clone(); - restarter::Client::start(token).await?; + restarter::Client::start(token, self.duration).await?; Ok(()) } From 0563bbf058fe13683381cdd9b0ab2d68dc90f6a4 Mon Sep 17 00:00:00 2001 From: Hexin Date: Fri, 7 Mar 2025 06:38:13 +0900 Subject: [PATCH 07/11] Enable serenity cache for old voice state --- crates/restarter/src/client.rs | 2 +- .../src/event_handler/voice_state_update.rs | 4 +-- crates/serenity-utils/src/voice_state.rs | 32 +++++++++++-------- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/crates/restarter/src/client.rs b/crates/restarter/src/client.rs index beac820d..3210555b 100644 --- a/crates/restarter/src/client.rs +++ b/crates/restarter/src/client.rs @@ -14,7 +14,7 @@ impl Client { pub async fn start(token: String, restart_interval: u64) -> Result<()> { enable_graceful_shutdown(); - let intents = GatewayIntents::GUILD_VOICE_STATES; + let intents = GatewayIntents::GUILDS | GatewayIntents::GUILD_VOICE_STATES; let mut client = SerenityClient::builder(token, intents) .event_handler(Handler { connected_channels: Arc::new(Mutex::new(HashMap::new())), diff --git a/crates/restarter/src/event_handler/voice_state_update.rs b/crates/restarter/src/event_handler/voice_state_update.rs index c5e1c7b9..80e113db 100644 --- a/crates/restarter/src/event_handler/voice_state_update.rs +++ b/crates/restarter/src/event_handler/voice_state_update.rs @@ -26,7 +26,7 @@ pub(crate) async fn handle(handler: &Handler, ctx: Context, old_state: Option { + VoiceStateConnection::Left(_) => { let mut connected_channels = handler.connected_channels.lock().await; connected_channels.remove(&guild_id); @@ -34,7 +34,7 @@ pub(crate) async fn handle(handler: &Handler, ctx: Context, old_state: Option (), + VoiceStateConnection::Moved(..) | VoiceStateConnection::NoAction => (), } Ok(()) diff --git a/crates/serenity-utils/src/voice_state.rs b/crates/serenity-utils/src/voice_state.rs index e9847e11..8040c9a6 100644 --- a/crates/serenity-utils/src/voice_state.rs +++ b/crates/serenity-utils/src/voice_state.rs @@ -7,9 +7,9 @@ pub struct VoiceStateAction { } pub enum VoiceStateConnection { - // including moved Joined(ChannelId), - Left, + Left(ChannelId), + Moved(ChannelId, ChannelId), NoAction, } @@ -22,18 +22,24 @@ impl VoiceStateAction { } pub fn connection(&self) -> VoiceStateConnection { - if self.old_state.is_some() { - return VoiceStateConnection::NoAction; - } - - match self.new_state.channel_id { - Some(channel_id) => { - tracing::debug!("joined voice channel {channel_id}"); - VoiceStateConnection::Joined(channel_id) + match &self.old_state { + Some(old_state) => match (old_state.channel_id, self.new_state.channel_id) { + (Some(old_channel_id), Some(new_channel_id)) if old_channel_id != new_channel_id => { + tracing::debug!("moved voice channel from {old_channel_id} to {new_channel_id}"); + VoiceStateConnection::Moved(old_channel_id, new_channel_id) + }, + (Some(channel_id), None) => { + tracing::debug!("left voice channel from {channel_id}"); + VoiceStateConnection::Left(channel_id) + }, + _ => VoiceStateConnection::NoAction, }, - None => { - tracing::debug!("left voice channel"); - VoiceStateConnection::Left + None => match self.new_state.channel_id { + Some(channel_id) => { + tracing::debug!("joined voice channel to {channel_id}"); + VoiceStateConnection::Joined(channel_id) + }, + None => VoiceStateConnection::NoAction, }, } } From 913e89fdc24608d0931c6f04fe28d2158e0b7dea Mon Sep 17 00:00:00 2001 From: Hexin Date: Fri, 7 Mar 2025 08:17:08 +0900 Subject: [PATCH 08/11] Fix issue where abort doens't work --- Cargo.lock | 1 + crates/restarter/Cargo.toml | 3 + crates/restarter/src/client.rs | 8 +- crates/restarter/src/event_handler/ready.rs | 4 +- .../src/event_handler/voice_state_update.rs | 14 ++-- crates/restarter/src/restarter.rs | 76 ++++++++++++++----- 6 files changed, 71 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fc89af14..c9f4e9c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2628,6 +2628,7 @@ dependencies = [ "serenity", "serenity_utils", "tokio", + "tokio-util", "tracing", ] diff --git a/crates/restarter/Cargo.toml b/crates/restarter/Cargo.toml index 3f1acb3f..d3ab7da5 100644 --- a/crates/restarter/Cargo.toml +++ b/crates/restarter/Cargo.toml @@ -6,6 +6,9 @@ edition = "2024" [lib] doctest = false +[dependencies] +tokio-util = "0.7.13" + [dependencies.anyhow] workspace = true diff --git a/crates/restarter/src/client.rs b/crates/restarter/src/client.rs index 3210555b..e9b67f16 100644 --- a/crates/restarter/src/client.rs +++ b/crates/restarter/src/client.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use anyhow::Result; use futures::lock::Mutex; @@ -11,18 +11,18 @@ pub struct Client; impl Client { #[tracing::instrument(skip_all)] - pub async fn start(token: String, restart_interval: u64) -> Result<()> { + pub async fn start(token: String, restart_duration: u64) -> Result<()> { enable_graceful_shutdown(); let intents = GatewayIntents::GUILDS | GatewayIntents::GUILD_VOICE_STATES; let mut client = SerenityClient::builder(token, intents) .event_handler(Handler { connected_channels: Arc::new(Mutex::new(HashMap::new())), - restarter: Restarter::new(restart_interval), + restarter: Restarter::new(Duration::from_secs(restart_duration)), }) .await?; - tracing::debug!("seitai client starts"); + tracing::debug!("restarter client starts"); if let Err(err) = client.start().await { tracing::error!("failed to start client\nError: {err:?}"); return Err(err.into()); diff --git a/crates/restarter/src/event_handler/ready.rs b/crates/restarter/src/event_handler/ready.rs index f887f4c1..f4493b22 100644 --- a/crates/restarter/src/event_handler/ready.rs +++ b/crates/restarter/src/event_handler/ready.rs @@ -3,8 +3,10 @@ use serenity::all::{Context, Ready}; use super::Handler; -pub(crate) async fn handle(_handler: &Handler, _ctx: Context, ready: Ready) -> Result<()> { +pub(crate) async fn handle(handler: &Handler, _ctx: Context, ready: Ready) -> Result<()> { tracing::info!("{} is ready", ready.user.name); + handler.restarter.wait().await; + Ok(()) } diff --git a/crates/restarter/src/event_handler/voice_state_update.rs b/crates/restarter/src/event_handler/voice_state_update.rs index 80e113db..ef134f34 100644 --- a/crates/restarter/src/event_handler/voice_state_update.rs +++ b/crates/restarter/src/event_handler/voice_state_update.rs @@ -18,21 +18,17 @@ pub(crate) async fn handle(handler: &Handler, ctx: Context, old_state: Option { - handler - .connected_channels - .lock() - .await - .insert(guild_id, channel_id); + let mut connected_channels = handler.connected_channels.lock().await; + connected_channels.insert(guild_id, channel_id); - handler.restarter.abort(); + handler.restarter.send(connected_channels.len()).await?; }, VoiceStateConnection::Left(_) => { let mut connected_channels = handler.connected_channels.lock().await; connected_channels.remove(&guild_id); - if connected_channels.is_empty() { - handler.restarter.wait(); - } + handler.restarter.send(connected_channels.len()).await?; + }, VoiceStateConnection::Moved(..) | VoiceStateConnection::NoAction => (), } diff --git a/crates/restarter/src/restarter.rs b/crates/restarter/src/restarter.rs index 9c24df15..7e8ae1b7 100644 --- a/crates/restarter/src/restarter.rs +++ b/crates/restarter/src/restarter.rs @@ -1,19 +1,31 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use anyhow::Result; +use futures::lock::Mutex; use k8s_openapi::api::apps::v1::StatefulSet; use kube::{Api, Client, Config}; -use tokio::sync::Notify; +use tokio::sync::mpsc::{self, error::SendError, Receiver, Sender}; +use tokio_util::sync::CancellationToken; use tracing::Instrument; #[derive(Debug, Clone)] pub(crate) struct Restarter { - interval: u64, + duration: Duration, + waiting: bool, + tx: Sender, + rx: Arc>>, } impl Restarter { - pub(crate) fn new(interval: u64) -> Self { - Self { interval } + pub(crate) fn new(duration: Duration) -> Self { + let (tx, rx) = mpsc::channel(1); + + Self { + duration, + waiting: false, + tx, + rx: Arc::new(Mutex::new(rx)), + } } #[tracing::instrument] @@ -36,27 +48,49 @@ impl Restarter { } #[tracing::instrument(skip_all)] - pub(crate) fn wait(&self) { - let interval = Duration::from_secs(self.interval); + pub(crate) async fn wait(&self) { + let duration = self.duration; + let rx = self.rx.clone(); + let mut waiting = self.waiting; + let cancellation_token = CancellationToken::new(); tokio::spawn(async move { - // Resets notify waiters because notified() is immediately received notify by notify_one() called before starting waiting. - let notify = Notify::new(); - - tokio::select! { - _ = tokio::time::sleep(interval) => { - if let Err(err) = Self::restart().await { - tracing::error!("failed to restart statefulsets/voicevox\nError: {err:?}"); - } - }, - _ = notify.notified() => { - tracing::info!("canceled waiting for restarting statefulsets/voicevox"); - }, + let mut rx = rx.lock().await; + + while let Some(connection_count) = rx.recv().await { + match (connection_count, waiting) { + (0, false) => { + waiting = true; + let cancellation_token = cancellation_token.clone(); + + tracing::info!("statefulsets/voicevox is going to restart in {} secs", duration.as_secs()); + + tokio::spawn(async move { + tokio::select! { + _ = cancellation_token.cancelled() => { + tracing::info!("cancelled restarting statefulsets/voicevox"); + }, + _ = tokio::time::sleep(duration) => { + if let Err(err) = Self::restart().await { + tracing::error!("failed to restart statefulsets/voicevox\nError: {err:?}"); + } + }, + } + }); + }, + (1.., true) => { + waiting = false; + cancellation_token.cancel(); + }, + (connection_count, waiting) => { + tracing::error!("unexpected connection count {connection_count} and waiting {waiting}"); + }, + } } }.in_current_span()); } - pub(crate) fn abort(&self) { - Notify::new().notify_one(); + pub(crate) async fn send(&self, count: usize) -> Result<(), SendError> { + self.tx.send(count).await } } From 8165823cd96973cdf4125b18134b0b24cd596c14 Mon Sep 17 00:00:00 2001 From: Hexin Date: Fri, 7 Mar 2025 08:29:16 +0900 Subject: [PATCH 09/11] Move associated function restart() to trait method --- crates/restarter/src/client.rs | 4 +-- crates/restarter/src/restarter.rs | 46 ++++++++++++++++++++----------- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/crates/restarter/src/client.rs b/crates/restarter/src/client.rs index e9b67f16..5d44e26c 100644 --- a/crates/restarter/src/client.rs +++ b/crates/restarter/src/client.rs @@ -5,7 +5,7 @@ use futures::lock::Mutex; use serenity::{all::GatewayIntents, Client as SerenityClient}; use tokio::{signal::unix::{self, SignalKind}, task::JoinHandle}; -use crate::{event_handler::Handler, restarter::Restarter}; +use crate::{event_handler::Handler, restarter::{KubeRestarter, Restarter}}; pub struct Client; @@ -18,7 +18,7 @@ impl Client { let mut client = SerenityClient::builder(token, intents) .event_handler(Handler { connected_channels: Arc::new(Mutex::new(HashMap::new())), - restarter: Restarter::new(Duration::from_secs(restart_duration)), + restarter: Restarter::new(Duration::from_secs(restart_duration), KubeRestarter), }) .await?; diff --git a/crates/restarter/src/restarter.rs b/crates/restarter/src/restarter.rs index 7e8ae1b7..9ff5eb46 100644 --- a/crates/restarter/src/restarter.rs +++ b/crates/restarter/src/restarter.rs @@ -9,27 +9,24 @@ use tokio_util::sync::CancellationToken; use tracing::Instrument; #[derive(Debug, Clone)] -pub(crate) struct Restarter { +pub(crate) struct Restarter { duration: Duration, waiting: bool, tx: Sender, rx: Arc>>, + restart: Restart, } -impl Restarter { - pub(crate) fn new(duration: Duration) -> Self { - let (tx, rx) = mpsc::channel(1); +#[derive(Debug, Clone)] +pub(crate) struct KubeRestarter; - Self { - duration, - waiting: false, - tx, - rx: Arc::new(Mutex::new(rx)), - } - } +pub(crate) trait Restart: Send { + fn restart(&self) -> impl Future> + Send; +} +impl Restart for KubeRestarter { #[tracing::instrument] - pub(crate) async fn restart() -> Result<()> { + async fn restart(&self) -> Result<()> { let config = match Config::incluster() { Ok(config) => config, Err(_) => { @@ -46,11 +43,26 @@ impl Restarter { Ok(()) } +} + +impl Restarter { + pub(crate) fn new(duration: Duration, restart: Restart) -> Self { + let (tx, rx) = mpsc::channel(1); + + Self { + duration, + waiting: false, + tx, + rx: Arc::new(Mutex::new(rx)), + restart, + } + } #[tracing::instrument(skip_all)] pub(crate) async fn wait(&self) { let duration = self.duration; let rx = self.rx.clone(); + let restart = self.restart.clone(); let mut waiting = self.waiting; let cancellation_token = CancellationToken::new(); @@ -59,9 +71,13 @@ impl Restarter { while let Some(connection_count) = rx.recv().await { match (connection_count, waiting) { + (0, true) => { + tracing::error!("unexpected connection count {connection_count} and waiting {waiting}"); + }, (0, false) => { waiting = true; let cancellation_token = cancellation_token.clone(); + let restart = restart.clone(); tracing::info!("statefulsets/voicevox is going to restart in {} secs", duration.as_secs()); @@ -71,7 +87,7 @@ impl Restarter { tracing::info!("cancelled restarting statefulsets/voicevox"); }, _ = tokio::time::sleep(duration) => { - if let Err(err) = Self::restart().await { + if let Err(err) = restart.restart().await { tracing::error!("failed to restart statefulsets/voicevox\nError: {err:?}"); } }, @@ -82,9 +98,7 @@ impl Restarter { waiting = false; cancellation_token.cancel(); }, - (connection_count, waiting) => { - tracing::error!("unexpected connection count {connection_count} and waiting {waiting}"); - }, + (1.., false) => (), } } }.in_current_span()); From cce1c8e5fc10600550461cd9572b7a66950df7d5 Mon Sep 17 00:00:00 2001 From: Hexin Date: Fri, 7 Mar 2025 19:54:25 +0900 Subject: [PATCH 10/11] Change process of waiting for restart --- crates/restarter/src/event_handler/ready.rs | 4 +- .../src/event_handler/voice_state_update.rs | 8 +- crates/restarter/src/restarter.rs | 77 +++++++------------ 3 files changed, 36 insertions(+), 53 deletions(-) diff --git a/crates/restarter/src/event_handler/ready.rs b/crates/restarter/src/event_handler/ready.rs index f4493b22..f887f4c1 100644 --- a/crates/restarter/src/event_handler/ready.rs +++ b/crates/restarter/src/event_handler/ready.rs @@ -3,10 +3,8 @@ use serenity::all::{Context, Ready}; use super::Handler; -pub(crate) async fn handle(handler: &Handler, _ctx: Context, ready: Ready) -> Result<()> { +pub(crate) async fn handle(_handler: &Handler, _ctx: Context, ready: Ready) -> Result<()> { tracing::info!("{} is ready", ready.user.name); - handler.restarter.wait().await; - Ok(()) } diff --git a/crates/restarter/src/event_handler/voice_state_update.rs b/crates/restarter/src/event_handler/voice_state_update.rs index ef134f34..5d1b6856 100644 --- a/crates/restarter/src/event_handler/voice_state_update.rs +++ b/crates/restarter/src/event_handler/voice_state_update.rs @@ -21,14 +21,18 @@ pub(crate) async fn handle(handler: &Handler, ctx: Context, old_state: Option { let mut connected_channels = handler.connected_channels.lock().await; connected_channels.remove(&guild_id); - handler.restarter.send(connected_channels.len()).await?; + let count = connected_channels.len(); + handler.restarter.set_connection_count(count).await; + if count == 0 { + handler.restarter.wait().await; + } }, VoiceStateConnection::Moved(..) | VoiceStateConnection::NoAction => (), } diff --git a/crates/restarter/src/restarter.rs b/crates/restarter/src/restarter.rs index 9ff5eb46..4715005d 100644 --- a/crates/restarter/src/restarter.rs +++ b/crates/restarter/src/restarter.rs @@ -4,17 +4,14 @@ use anyhow::Result; use futures::lock::Mutex; use k8s_openapi::api::apps::v1::StatefulSet; use kube::{Api, Client, Config}; -use tokio::sync::mpsc::{self, error::SendError, Receiver, Sender}; -use tokio_util::sync::CancellationToken; use tracing::Instrument; #[derive(Debug, Clone)] pub(crate) struct Restarter { duration: Duration, - waiting: bool, - tx: Sender, - rx: Arc>>, + waiting: Arc>, restart: Restart, + connection_count: Arc>, } #[derive(Debug, Clone)] @@ -47,64 +44,48 @@ impl Restart for KubeRestarter { impl Restarter { pub(crate) fn new(duration: Duration, restart: Restart) -> Self { - let (tx, rx) = mpsc::channel(1); - Self { duration, - waiting: false, - tx, - rx: Arc::new(Mutex::new(rx)), + waiting: Arc::new(Mutex::new(false)), restart, + connection_count: Arc::new(Mutex::new(0)), } } #[tracing::instrument(skip_all)] pub(crate) async fn wait(&self) { + let waiting = self.waiting.clone(); + + if *waiting.lock().await { + return; + } + let duration = self.duration; - let rx = self.rx.clone(); let restart = self.restart.clone(); - let mut waiting = self.waiting; - let cancellation_token = CancellationToken::new(); + let connection_count = self.connection_count.clone(); + + tracing::info!("statefulsets/voicevox is going to restart in {} secs", duration.as_secs()); tokio::spawn(async move { - let mut rx = rx.lock().await; - - while let Some(connection_count) = rx.recv().await { - match (connection_count, waiting) { - (0, true) => { - tracing::error!("unexpected connection count {connection_count} and waiting {waiting}"); - }, - (0, false) => { - waiting = true; - let cancellation_token = cancellation_token.clone(); - let restart = restart.clone(); - - tracing::info!("statefulsets/voicevox is going to restart in {} secs", duration.as_secs()); - - tokio::spawn(async move { - tokio::select! { - _ = cancellation_token.cancelled() => { - tracing::info!("cancelled restarting statefulsets/voicevox"); - }, - _ = tokio::time::sleep(duration) => { - if let Err(err) = restart.restart().await { - tracing::error!("failed to restart statefulsets/voicevox\nError: {err:?}"); - } - }, - } - }); - }, - (1.., true) => { - waiting = false; - cancellation_token.cancel(); - }, - (1.., false) => (), - } + *waiting.lock().await = true; + + tokio::time::sleep(duration).await; + + *waiting.lock().await = false; + + if *connection_count.lock().await != 0 { + tracing::info!("cancelled restarting statefulsets/voicevox"); + + return; + } + + if let Err(err) = restart.restart().await { + tracing::error!("failed to restart statefulsets/voicevox\nError: {err:?}"); } }.in_current_span()); } - pub(crate) async fn send(&self, count: usize) -> Result<(), SendError> { - self.tx.send(count).await + pub(crate) async fn set_connection_count(&self, count: usize) { + *self.connection_count.lock().await = count; } } From 2b1939e43b797d28bc1cfbc9da68865bda9586eb Mon Sep 17 00:00:00 2001 From: Hexin Date: Sat, 8 Mar 2025 07:51:26 +0900 Subject: [PATCH 11/11] Add tests --- Cargo.lock | 2 +- crates/restarter/Cargo.toml | 8 ++- .../src/event_handler/voice_state_update.rs | 2 +- crates/restarter/src/restarter.rs | 32 ++++++--- crates/restarter/src/restarter/tests.rs | 66 +++++++++++++++++++ 5 files changed, 98 insertions(+), 12 deletions(-) create mode 100644 crates/restarter/src/restarter/tests.rs diff --git a/Cargo.lock b/Cargo.lock index c9f4e9c1..0810cc08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2625,10 +2625,10 @@ dependencies = [ "k8s-openapi", "kube", "logging", + "mockall", "serenity", "serenity_utils", "tokio", - "tokio-util", "tracing", ] diff --git a/crates/restarter/Cargo.toml b/crates/restarter/Cargo.toml index d3ab7da5..4c338708 100644 --- a/crates/restarter/Cargo.toml +++ b/crates/restarter/Cargo.toml @@ -7,7 +7,6 @@ edition = "2024" doctest = false [dependencies] -tokio-util = "0.7.13" [dependencies.anyhow] workspace = true @@ -39,3 +38,10 @@ features = ["cache", "client", "gateway", "model", "native_tls_backend"] [dependencies.serenity_utils] workspace = true + +[dev-dependencies.mockall] +version = "0.13.1" + +[dev-dependencies.tokio] +workspace = true +features = ["test-util"] diff --git a/crates/restarter/src/event_handler/voice_state_update.rs b/crates/restarter/src/event_handler/voice_state_update.rs index 5d1b6856..864a95ed 100644 --- a/crates/restarter/src/event_handler/voice_state_update.rs +++ b/crates/restarter/src/event_handler/voice_state_update.rs @@ -31,7 +31,7 @@ pub(crate) async fn handle(handler: &Handler, ctx: Context, old_state: Option (), diff --git a/crates/restarter/src/restarter.rs b/crates/restarter/src/restarter.rs index 4715005d..68bec921 100644 --- a/crates/restarter/src/restarter.rs +++ b/crates/restarter/src/restarter.rs @@ -1,9 +1,10 @@ -use std::{sync::Arc, time::Duration}; +use std::{fmt::Debug, sync::Arc, time::Duration}; use anyhow::Result; use futures::lock::Mutex; use k8s_openapi::api::apps::v1::StatefulSet; use kube::{Api, Client, Config}; +use tokio::{sync::oneshot, task::JoinHandle}; use tracing::Instrument; #[derive(Debug, Clone)] @@ -42,7 +43,7 @@ impl Restart for KubeRestarter { } } -impl Restarter { +impl Restarter { pub(crate) fn new(duration: Duration, restart: Restart) -> Self { Self { duration, @@ -52,12 +53,13 @@ impl Restarter { } } - #[tracing::instrument(skip_all)] - pub(crate) async fn wait(&self) { + #[allow(clippy::async_yields_async)] + #[tracing::instrument(skip(self), fields(self.duration = ?self.duration, self.restart = ?self.restart))] + pub(crate) async fn schedule_restart(&self) -> JoinHandle<()> { let waiting = self.waiting.clone(); if *waiting.lock().await { - return; + return tokio::spawn(async {}); } let duration = self.duration; @@ -66,26 +68,38 @@ impl Restarter { tracing::info!("statefulsets/voicevox is going to restart in {} secs", duration.as_secs()); - tokio::spawn(async move { + let (tx, rx) = oneshot::channel(); + + let handle = tokio::spawn(async move { + tx.send(()).expect("failed to send notice that timer task spawned"); *waiting.lock().await = true; tokio::time::sleep(duration).await; *waiting.lock().await = false; - if *connection_count.lock().await != 0 { - tracing::info!("cancelled restarting statefulsets/voicevox"); + { + let connection_count = *connection_count.lock().await; + if connection_count != 0 { + tracing::info!("statefulsets/voicevox wasn't restart becase {connection_count} clients are connected"); - return; + return; + } } if let Err(err) = restart.restart().await { tracing::error!("failed to restart statefulsets/voicevox\nError: {err:?}"); } }.in_current_span()); + + rx.await.expect("failed to receive notice that timer task spawned"); + handle } pub(crate) async fn set_connection_count(&self, count: usize) { *self.connection_count.lock().await = count; } } + +#[cfg(test)] +mod tests; diff --git a/crates/restarter/src/restarter/tests.rs b/crates/restarter/src/restarter/tests.rs new file mode 100644 index 00000000..3dbb7617 --- /dev/null +++ b/crates/restarter/src/restarter/tests.rs @@ -0,0 +1,66 @@ +use std::time::Duration; + +use anyhow::Result; +use futures::FutureExt; + +use crate::restarter::{Restart, Restarter}; + +mockall::mock! { + #[derive(Debug)] + Restart {} + impl Clone for Restart { + fn clone(&self) -> Self; + } + impl Restart for Restart { + fn restart(&self) -> impl Future> + Send; + } +} + +#[tokio::test(start_paused = true)] +async fn restart_as_scheduled() { + let mut mock_restart = MockRestart::new(); + mock_restart + .expect_clone() + .once() + .return_once(move || { + let mut mock_restart = MockRestart::new(); + mock_restart + .expect_restart() + .once() + .returning(|| async { Ok(()) }.boxed()); + + mock_restart + }); + + let restarter = Restarter::new(Duration::from_secs(300), mock_restart); + + let wait = restarter.schedule_restart().await; + + restarter.set_connection_count(0).await; + + assert!(wait.await.is_ok()); +} + +#[tokio::test(start_paused = true)] +async fn cancel_restart() { + let mut mock_restart = MockRestart::new(); + mock_restart + .expect_clone() + .once() + .return_once(move || { + let mut mock_restart = MockRestart::new(); + mock_restart + .expect_restart() + .never(); + + mock_restart + }); + + let restarter = Restarter::new(Duration::from_secs(300), mock_restart); + + let wait = restarter.schedule_restart().await; + + restarter.set_connection_count(1).await; + + assert!(wait.await.is_ok()); +}