-
-
Notifications
You must be signed in to change notification settings - Fork 186
Description
I have some doubts about how to enable Lua code to achieve concurrent/parallel processing ability through asynchronous methods provided by Rust in mlua.
Here is my test code, where I believe the implementation of the asynchronous method execute_tasks is the key:
async fn test_lua() -> LuaResult<()> {
let lua = Lua::new();
let fetch_json = lua.create_async_function(|lua, uri: String| async move {
let resp = reqwest::get(&uri)
.await
.and_then(|resp| resp.error_for_status())
.into_lua_err()?;
let json = resp.json::<serde_json::Value>().await.into_lua_err()?;
lua.to_value(&json)
})?;
let dbg = lua.create_function(|_, value: LuaValue| {
println!("{value:#?}");
Ok(())
})?;
let globals = lua.globals();
globals.set("execute_tasks", lua.create_async_function(execute_tasks)?)?;
globals.set("fetch_json", fetch_json)?;
globals.set("dbg", dbg)?;
lua.load(
r#"
local tasks = {}
for i = 1, 1000 do
tasks[i] = function()
dbg("do " .. i)
return fetch_json(string.format("https://httpbin.org/anything?index=%d",i))
end
end
local results = execute_tasks(tasks, 10)
for i, result in ipairs(results) do
dbg(result)
end
"#,
)
.exec_async()
.await?;
Ok(())
}
mod test {
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
async fn test_lua() {
super::test_lua().await.unwrap();
}
}
First, I tried feature = "send" option. with the send option, I can easily use tokio::spawn:
async fn execute_tasks(
lua: Lua,
(tasks, max_concurrency): (Table, usize),
) -> LuaResult<Vec<mlua::Value>> {
let semaphore = Arc::new(Semaphore::new(max_concurrency));
let mut handles = Vec::new();
for pair in tasks.pairs::<mlua::Integer, LuaFunction>() {
let (index, task_fn) = pair?;
let semaphore = semaphore.clone();
let task = tokio::spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
dbg!("thread id: {:?}",thread::current().id());
(index, task_fn.call_async::<mlua::Value>(()).await)
});
handles.push(task);
}
let results = join_all(handles).await;
...
}
Since I used the multi_thread mode of the tokio runtime, the printed thread id information shows different threads, which is as expected.
Later, I saw:
By default mlua is !Send. This can be changed by enabling feature = "send" that adds Send requirement to Rust functions and UserData types.
In this case Lua objects and their types can be sent or used from other threads. Internally, access to the Lua VM is synchronized using a reentrant mutex that can be locked many times within the same thread.
This means that for multi-threaded parallelism, due to the locking mechanism, it might not achieve the desired effect.
But, I don't want the entire single-threaded runtime,so I thought of the LocalSet approach.
In this approach, the send option needs to be disabled, and it also works well:
async fn execute_tasks(
lua: Lua,
(tasks, max_concurrency): (Table, usize),
) -> LuaResult<Vec<mlua::Value>> {
let local_set = LocalSet::new();
let semaphore = Rc::new(Semaphore::new(max_concurrency));
let mut handles = Vec::new();
for pair in tasks.pairs::<mlua::Integer, LuaFunction>() {
let (index, task_fn) = pair?;
let semaphore = semaphore.clone();
let handle = local_set.spawn_local(async move {
let _permit = semaphore.acquire().await.unwrap();
let result = task_fn.call_async::<mlua::Value>(()).await;
(index, result)
});
handles.push(handle);
}
let results = local_set
.run_until(async move { join_all(handles).await })
.await;
...
}
Currently, both approaches are feasible. My question is, assuming my asynchronous tasks are ideal, is the second approach more efficient because, as I understand it, mlua itself is inherently not thread-safe?
Or are there more efficient methods to provide concurrency/parallelism support for Lua?
Any suggestions and discussions are welcome.