Skip to content

Commit 7d41411

Browse files
committed
Make separate calls for sync and async UDFs
1 parent 1953f97 commit 7d41411

File tree

1 file changed

+13
-8
lines changed

1 file changed

+13
-8
lines changed

singlestoredb/functions/ext/asgi.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -326,21 +326,26 @@ async def run_in_parallel(
326326
limit = get_concurrency_limit(func)
327327
is_async = asyncio.iscoroutinefunction(func)
328328

329-
async def call(batch: Sequence[Any]) -> Any:
330-
"""Loop over batches of parameters and call the function."""
329+
async def call_sync(batch: Sequence[Any]) -> Any:
330+
"""Loop over batches of parameters and call the sync function."""
331331
res = []
332332
for params in batch:
333333
cancel_on_event(cancel_event)
334-
if is_async:
335-
res.append(transformer(await func(*params)))
336-
else:
337-
res.append(transformer(func(*params)))
334+
res.append(transformer(func(*params)))
335+
return res
336+
337+
async def call_async(batch: Sequence[Any]) -> Any:
338+
"""Loop over batches of parameters and call the async function."""
339+
res = []
340+
for params in batch:
341+
cancel_on_event(cancel_event)
342+
res.append(transformer(await func(*params)))
338343
return res
339344

340345
async def thread_call(batch: Sequence[Any]) -> Any:
341346
if is_async:
342-
return await call(batch)
343-
return await to_thread(lambda: asyncio.run(call(batch)))
347+
return await call_async(batch)
348+
return await to_thread(lambda: asyncio.run(call_sync(batch)))
344349

345350
# Create tasks in chunks to limit concurrency
346351
tasks = [thread_call(batch) for batch in chunked(params_list, limit)]

0 commit comments

Comments
 (0)