Skip to content

Commit 0564228

Browse files
committed
Make separate calls for sync and async UDFs
1 parent 2625f9b commit 0564228

File tree

1 file changed

+13
-8
lines changed

1 file changed

+13
-8
lines changed

singlestoredb/functions/ext/asgi.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -329,21 +329,26 @@ async def run_in_parallel(
329329
limit = get_concurrency_limit(func)
330330
is_async = asyncio.iscoroutinefunction(func)
331331

332-
async def call(batch: Sequence[Any]) -> Any:
333-
"""Loop over batches of parameters and call the function."""
332+
async def call_sync(batch: Sequence[Any]) -> Any:
333+
"""Loop over batches of parameters and call the sync function."""
334334
res = []
335335
for params in batch:
336336
cancel_on_event(cancel_event)
337-
if is_async:
338-
res.append(transformer(await func(*params)))
339-
else:
340-
res.append(transformer(func(*params)))
337+
res.append(transformer(func(*params)))
338+
return res
339+
340+
async def call_async(batch: Sequence[Any]) -> Any:
341+
"""Loop over batches of parameters and call the async function."""
342+
res = []
343+
for params in batch:
344+
cancel_on_event(cancel_event)
345+
res.append(transformer(await func(*params)))
341346
return res
342347

343348
async def thread_call(batch: Sequence[Any]) -> Any:
344349
if is_async:
345-
return await call(batch)
346-
return await to_thread(lambda: asyncio.run(call(batch)))
350+
return await call_async(batch)
351+
return await to_thread(lambda: asyncio.run(call_sync(batch)))
347352

348353
# Create tasks in chunks to limit concurrency
349354
tasks = [thread_call(batch) for batch in chunked(params_list, limit)]

0 commit comments

Comments
 (0)