diff --git a/Cargo.toml b/Cargo.toml index 6eafbe1..12a4926 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ thiserror = "1.0" libc = "0.2" crossbeam-utils = "0.8" serde = "1.0" +crossbeam-channel = "0.5" [dependencies.mlua] version = "0.6" diff --git a/examples/bench/init.lua b/examples/bench/init.lua index 18c42b0..d965c6c 100644 --- a/examples/bench/init.lua +++ b/examples/bench/init.lua @@ -1,5 +1,9 @@ local txapi = require('bench') -txapi.start({}) -- will use default +txapi.start({ + fibers = 16, + max_batch = 16, + runtime = { type = "cur_thread" }, +}) -- will use default -- txapi.start({buffer = 128 }) -- txapi.start({buffer = 128, runtime = { type = "cur_thread" }}) -- txapi.start({buffer = 128, runtime = { type = "multi_thread" }}) -- default diff --git a/examples/bench/src/lib.rs b/examples/bench/src/lib.rs index 134e68f..d5ac83b 100644 --- a/examples/bench/src/lib.rs +++ b/examples/bench/src/lib.rs @@ -5,9 +5,20 @@ use tokio::time::Instant; use xtm_rust::{run_module, Dispatcher, ModuleConfig}; async fn module_main(dispatcher: Dispatcher) { - let iterations = 10_000_000; + tokio::spawn({ + let dispatcher = dispatcher.try_clone().unwrap(); + async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); + loop { + println!("task_queue: {:>3}", dispatcher.len()); + interval.tick().await; + } + } + }); + + let iterations = 4_000_000; - let worker_n = 6; + let worker_n = 16; let iterations_per_worker = iterations / worker_n; let mut workers = Vec::new(); @@ -57,6 +68,7 @@ fn bench(lua: &Lua) -> LuaResult { "start", lua.create_function_mut(|lua, (config,): (LuaValue,)| { let config: ModuleConfig = lua.from_value(config)?; + println!("{:?}", config); run_module(module_main, config, lua).map_err(LuaError::external) })?, diff --git a/src/config.rs b/src/config.rs index ca6f775..9de646a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,13 +1,14 @@ use serde::{Deserialize, Serialize}; use tokio::runtime; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(default)] pub struct ModuleConfig { pub buffer: usize, pub fibers: usize, - pub max_recv_retries: usize, + pub max_batch: usize, pub coio_timeout: f64, + pub fiber_standby_timeout: f64, pub runtime: RuntimeConfig, } @@ -16,14 +17,15 @@ impl Default for ModuleConfig { Self { buffer: 128, fibers: 16, - max_recv_retries: 100, - coio_timeout: 1.0, + max_batch: 16, + coio_timeout: 0.1, + fiber_standby_timeout: 1.0, runtime: RuntimeConfig::default(), } } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(tag = "type")] pub enum RuntimeConfig { #[serde(rename(deserialize = "cur_thread"))] diff --git a/src/fiber_pool.rs b/src/fiber_pool.rs new file mode 100644 index 0000000..30387f4 --- /dev/null +++ b/src/fiber_pool.rs @@ -0,0 +1,147 @@ +use std::{collections::LinkedList, rc::Rc, time::Duration}; + +use crossbeam_channel::{unbounded, TryRecvError}; +use mlua::Lua; +use tarantool::fiber; + +use crate::{ChannelError, Executor, ModuleConfig, Task}; + +struct SchedulerArgs<'a> { + lua: &'a Lua, + executor: Executor, + config: ModuleConfig, +} +fn scheduler_f(args: Box) -> i32 { + let SchedulerArgs { + lua, + executor, + config: + ModuleConfig { + max_batch, + coio_timeout, + fibers, + fiber_standby_timeout, + .. + }, + } = *args; + + let cond = Rc::new(fiber::Cond::new()); + let (tx, rx) = unbounded::(); + + let mut workers = LinkedList::new(); + for _ in 0..fibers { + let mut worker = fiber::Fiber::new("worker", &mut worker_f); + worker.set_joinable(true); + worker.start(WorkerArgs { + cond: cond.clone(), + lua, + rx: rx.clone(), + fiber_standby_timeout, + }); + workers.push_back(worker); + } + + let result = loop { + let tasks = match executor.pop_many(max_batch, coio_timeout) { + Ok(tasks) => tasks, + Err(ChannelError::RXChannelClosed) => break Ok(()), + Err(err) => break Err(err), + }; + + for task in tasks { + tx.send(task).unwrap(); // TODO: add error handling + cond.signal(); + } + }; + + // gracefully kill fibers + drop(tx); + cond.broadcast(); + + for worker in workers { + worker.join(); + } + + match result { + Ok(_) => 0, + Err(_) => -1, + } +} + +struct WorkerArgs<'a> { + cond: Rc, + lua: &'a Lua, + rx: crossbeam_channel::Receiver, + fiber_standby_timeout: f64, +} +fn worker_f(args: Box) -> i32 { + let WorkerArgs { + cond, + lua, + rx, + fiber_standby_timeout, + } = *args; + let fiber_standby_timeout = Duration::from_secs_f64(fiber_standby_timeout); + + let thread_func = lua + .create_function(move |lua, _: ()| { + loop { + match rx.try_recv() { + Ok(task) => match task(lua) { + Ok(()) => (), + Err(ChannelError::TXChannelClosed) => continue, + Err(err) => break Err(mlua::Error::external(err)), + }, + Err(TryRecvError::Disconnected) => break Ok(()), + Err(TryRecvError::Empty) => { + let signaled = cond.wait_timeout(fiber_standby_timeout); + // if !signaled { + // // kill fiber + // break Ok(()); + // } + } + } + } + }) + .unwrap(); + let thread = lua.create_thread(thread_func).unwrap(); + match thread.resume(()) { + Ok(()) => 0, + Err(_) => -1, + } +} + +pub(crate) struct FiberPool<'a> { + lua: &'a Lua, + executor: Executor, + config: ModuleConfig, + + scheduler: fiber::Fiber<'a, SchedulerArgs<'a>>, +} + +impl<'a> FiberPool<'a> { + pub fn new(lua: &'a Lua, executor: Executor, config: ModuleConfig) -> Self { + let mut scheduler = fiber::Fiber::new("scheduler", &mut scheduler_f); + scheduler.set_joinable(true); + Self { + lua, + executor, + config, + scheduler, + } + } + + pub fn run(&mut self) -> std::io::Result<()> { + self.scheduler.start(SchedulerArgs { + lua: self.lua, + executor: self.executor.try_clone()?, + config: self.config.clone(), + }); + Ok(()) + } + + // join will exit when all Dispatchers die + pub fn join(&self) { + self.scheduler.join(); + } +} diff --git a/src/lib.rs b/src/lib.rs index caf50df..4def8f6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,12 +4,11 @@ use std::future::Future; use crossbeam_utils::thread; use mlua::Lua; use tokio::runtime; - pub use txapi::*; -use tarantool::fiber::Fiber; mod eventfd; mod txapi; +mod fiber_pool; mod config; pub use config::*; @@ -27,31 +26,8 @@ where { let (dispatcher, executor) = channel(config.buffer)?; - let executor_loop = &mut |args: Box<(&Lua, Executor)>| { - let (lua, executor) = *args; - - let thread_func = lua.create_function(move |lua, _: ()| { - Ok(loop { - match executor.exec(lua, config.max_recv_retries, config.coio_timeout) { - Ok(_) => continue, - Err(ChannelError::TXChannelClosed) => continue, - Err(ChannelError::RXChannelClosed) => break 0, - Err(_err) => break -1, - } - }) - }).unwrap(); - let thread = lua.create_thread(thread_func).unwrap(); - thread.resume(()).unwrap() - }; - - // UNSAFE: fibers must die inside the current function - let mut fibers = Vec::with_capacity(config.fibers); - for _ in 0..config.fibers { - let mut fiber = Fiber::new("xtm", executor_loop); - fiber.set_joinable(true); - fiber.start((lua, executor.try_clone()?)); - fibers.push(fiber); - } + let mut fiber_pool = fiber_pool::FiberPool::new(lua, executor, config.clone()); + fiber_pool.run()?; let result = thread::scope(|scope| { let module_thread = scope @@ -67,9 +43,7 @@ where }) .unwrap(); - for fiber in &fibers { - fiber.join(); - } + fiber_pool.join(); module_thread.join().unwrap().unwrap() }) .unwrap(); diff --git a/src/txapi.rs b/src/txapi.rs index 0785235..a22a9f8 100644 --- a/src/txapi.rs +++ b/src/txapi.rs @@ -7,7 +7,7 @@ use std::os::unix::io::{AsRawFd, RawFd}; use std::io; use mlua::Lua; -type Task = Box Result<(), ChannelError> + Send>; +pub type Task = Box Result<(), ChannelError> + Send>; type TaskSender = async_channel::Sender; type TaskReceiver = async_channel::Receiver; @@ -86,7 +86,7 @@ impl Executor { Self { task_rx, eventfd } } - pub fn exec(&self, lua: &Lua, max_recv_retries: usize, coio_timeout: f64) -> Result<(), ChannelError> { + pub fn exec(&self, lua: &Lua, coio_timeout: f64) -> Result<(), ChannelError> { loop { match self.task_rx.try_recv() { Ok(func) => return func(lua), @@ -94,17 +94,31 @@ impl Executor { Err(TryRecvError::Closed) => return Err(ChannelError::RXChannelClosed), }; - for _ in 0..max_recv_retries { - match self.task_rx.try_recv() { - Ok(func) => return func(lua), - Err(TryRecvError::Empty) => tarantool::fiber::sleep(0.), - Err(TryRecvError::Closed) => return Err(ChannelError::RXChannelClosed), - }; - } let _ = self.eventfd.coio_read(coio_timeout); } } + pub fn pop_many(&self, max_tasks: usize, coio_timeout: f64) -> Result, ChannelError> { + if self.task_rx.is_empty() { + let _ = self.eventfd.coio_read(coio_timeout); + } + + let mut tasks = Vec::with_capacity(max_tasks); + for _ in 0..max_tasks { + match self.task_rx.try_recv() { + Ok(func) => tasks.push(func), + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Closed) => return Err(ChannelError::RXChannelClosed), + }; + + if self.task_rx.len() <= 1 { + break; + } + } + + Ok(tasks) + } + pub fn try_clone(&self) -> io::Result { Ok(Self { task_rx: self.task_rx.clone(),