@@ -167,7 +167,8 @@ static void _BGThread_Execute(RunQueueInfo *run_queue_info, RedisAI_RunInfo **ba
167167
168168static RedisAI_RunInfo * * _BGThread_BatchOperations (RunQueueInfo * run_queue_info ,
169169 RedisAI_RunInfo * rinfo ,
170- RedisAI_RunInfo * * batch_rinfo ) {
170+ RedisAI_RunInfo * * batch_rinfo ,
171+ bool * batchReady ) {
171172 // Since the current op can be batched, then we collect info on batching, namely
172173 // - batchsize
173174 // - minbatchsize
@@ -260,6 +261,9 @@ static RedisAI_RunInfo **_BGThread_BatchOperations(RunQueueInfo *run_queue_info,
260261 }
261262 }
262263 }
264+ if (minbatchsize != 0 && current_batchsize < minbatchsize ) {
265+ * batchReady = false;
266+ }
263267 return batch_rinfo ;
264268}
265269
@@ -304,7 +308,18 @@ void *RedisAI_Run_ThreadMain(void *arg) {
304308 }
305309
306310 if (currentOpBatchable ) {
307- batch_rinfo = _BGThread_BatchOperations (run_queue_info , rinfo , batch_rinfo );
311+ bool batchReady = true;
312+ batch_rinfo =
313+ _BGThread_BatchOperations (run_queue_info , rinfo , batch_rinfo , & batchReady );
314+ if (!batchReady ) {
315+ // Batch is not ready - batch size didn't match the expectations from
316+ // minbatchsize
317+ for (int i = array_len (batch_rinfo ) - 1 ; i >= 0 ; i -- ) {
318+ queuePush (run_queue_info -> run_queue , batch_rinfo [i ]);
319+ }
320+ // Exit the loop, give a chance to new tasks to submit.
321+ break ;
322+ }
308323 }
309324 // Run the computation step (batched or not)
310325 // We're done with the queue here, items have been evicted so we can
0 commit comments