Skip to content

Commit 1cf97a9

Browse files
committed
Refactor parse modelrun and scriptrun commands following memory leaks.
1 parent 4c91847 commit 1cf97a9

File tree

6 files changed

+50
-113
lines changed

6 files changed

+50
-113
lines changed

src/DAG/dag.c

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -884,24 +884,3 @@ void DAG_ReplyAndUnblock(RedisAI_OnFinishCtx *ctx, void *private_data) {
884884
if (rinfo->client)
885885
RedisModule_UnblockClient(rinfo->client, rinfo);
886886
}
887-
888-
void Dag_PopulateOp(RAI_DagOp *currentOp, void *rctx, RedisModuleString **inkeys,
889-
RedisModuleString **outkeys, RedisModuleString *runkey) {
890-
891-
if (currentOp->commandType == REDISAI_DAG_CMD_MODELRUN) {
892-
currentOp->mctx = (RAI_ModelRunCtx *)rctx;
893-
currentOp->devicestr = currentOp->mctx->model->devicestr;
894-
} else {
895-
assert(currentOp->commandType == REDISAI_DAG_CMD_SCRIPTRUN);
896-
currentOp->sctx = (RAI_ScriptRunCtx *)rctx;
897-
currentOp->devicestr = currentOp->sctx->script->devicestr;
898-
}
899-
900-
// todo: temporary patch to fix leak, need refactor
901-
array_free(currentOp->inkeys);
902-
array_free(currentOp->outkeys);
903-
904-
currentOp->inkeys = inkeys;
905-
currentOp->outkeys = outkeys;
906-
currentOp->runkey = runkey;
907-
}

src/DAG/dag.h

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -147,17 +147,4 @@ void RunInfo_FreeData(RedisModuleCtx *ctx, void *rinfo);
147147
*/
148148
void RedisAI_Disconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc);
149149

150-
/**
151-
* @brief Populate a DAG modelrun/scriptrun op with its params .
152-
* @param rinfo An existing DAG to populate.
153-
* @param rctx ModelRunCtx or ScriptRunCtx that represents the single MODELRUN op.
154-
* @param inkeys The DAG operation inkeys (the input tensors).
155-
* @param outkeys The DAG operation outkeys (the output tensors).
156-
* @param runkey The model key.
157-
* @param cmd The DAG command (modelrun/scriptrun).
158-
*/
159-
160-
void Dag_PopulateOp(RAI_DagOp *currentOp, void *rctx, RedisModuleString **inkeys,
161-
RedisModuleString **outkeys, RedisModuleString *runkey);
162-
163150
#endif /* SRC_DAG_H_ */

src/background_workers.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,8 @@ void *RedisAI_Run_ThreadMain(void *arg) {
494494

495495
// If there's nothing else to do for the DAG in the current worker or if an error
496496
// occurred in any worker, we just move on
497-
if (device_complete == 1 || device_complete_after_run == 1 || do_unblock == 1) {
497+
if (device_complete == 1 || device_complete_after_run == 1 || do_unblock == 1 ||
498+
run_error == 1) {
498499
for (long long i = 0; i < array_len(evicted_items); i++) {
499500
RedisModule_Free(evicted_items[i]);
500501
}

src/command_parser.c

Lines changed: 37 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -125,56 +125,41 @@ static int _ModelRunCtx_SetParams(RedisModuleCtx *ctx, RedisModuleString **inkey
125125
int ParseModelRunCommand(RedisAI_RunInfo *rinfo, RedisModuleCtx *ctx, RedisModuleString **argv,
126126
int argc) {
127127

128+
RAI_DagOp *currentOp;
129+
RAI_InitDagOp(&currentOp);
130+
rinfo->dagOps = array_append(rinfo->dagOps, currentOp);
131+
128132
// Build a ModelRunCtx from command.
129-
RAI_Error error = {0};
130133
RAI_Model *model;
131-
RedisModuleString **inkeys = array_new(RedisModuleString *, 1);
132-
RedisModuleString **outkeys = array_new(RedisModuleString *, 1);
133-
RedisModuleString *runkey = NULL;
134-
RAI_ModelRunCtx *mctx = NULL;
135-
RAI_DagOp *currentOp;
136134

137135
long long timeout = 0;
138-
if (_ModelRunCommand_ParseArgs(ctx, argv, argc, &model, &error, &inkeys, &outkeys, &runkey,
136+
if (_ModelRunCommand_ParseArgs(ctx, argv, argc, &model, currentOp->err, &currentOp->inkeys,
137+
&currentOp->outkeys, &currentOp->runkey,
139138
&timeout) == REDISMODULE_ERR) {
140-
RedisModule_ReplyWithError(ctx, RAI_GetErrorOneLine(&error));
139+
RedisModule_ReplyWithError(ctx, RAI_GetErrorOneLine(currentOp->err));
140+
goto cleanup;
141+
}
142+
143+
if (timeout > 0 && !rinfo->single_op_dag) {
144+
RedisModule_ReplyWithError(ctx, "ERR TIMEOUT not allowed within a DAG command");
141145
goto cleanup;
142146
}
143-
mctx = RAI_ModelRunCtxCreate(model);
144147

148+
RAI_ModelRunCtx *mctx = RAI_ModelRunCtxCreate(model);
145149
if (rinfo->single_op_dag) {
146150
rinfo->timeout = timeout;
147151
// Set params in ModelRunCtx, bring inputs from key space.
148-
if (_ModelRunCtx_SetParams(ctx, inkeys, outkeys, mctx) == REDISMODULE_ERR)
152+
if (_ModelRunCtx_SetParams(ctx, currentOp->inkeys, currentOp->outkeys, mctx) ==
153+
REDISMODULE_ERR)
149154
goto cleanup;
150155
}
151-
if (RAI_InitDagOp(&currentOp) == REDISMODULE_ERR) {
152-
RedisModule_ReplyWithError(
153-
ctx, "ERR Unable to allocate the memory and initialise the RAI_dagOp structure");
154-
goto cleanup;
155-
}
156+
156157
currentOp->commandType = REDISAI_DAG_CMD_MODELRUN;
157-
Dag_PopulateOp(currentOp, mctx, inkeys, outkeys, runkey);
158-
rinfo->dagOps = array_append(rinfo->dagOps, currentOp);
158+
currentOp->mctx = mctx;
159+
currentOp->devicestr = mctx->model->devicestr;
159160
return REDISMODULE_OK;
160161

161162
cleanup:
162-
if (error.detail) {
163-
RedisModule_Free(error.detail);
164-
RedisModule_Free(error.detail_oneline);
165-
}
166-
for (size_t i = 0; i < array_len(inkeys); i++) {
167-
RedisModule_FreeString(NULL, inkeys[i]);
168-
}
169-
array_free(inkeys);
170-
for (size_t i = 0; i < array_len(outkeys); i++) {
171-
RedisModule_FreeString(NULL, outkeys[i]);
172-
}
173-
array_free(outkeys);
174-
if (runkey)
175-
RedisModule_FreeString(NULL, runkey);
176-
if (mctx)
177-
RAI_ModelRunCtxFree(mctx);
178163
RAI_FreeRunInfo(rinfo);
179164
return REDISMODULE_ERR;
180165
}
@@ -287,56 +272,44 @@ static int _ScriptRunCtx_SetParams(RedisModuleCtx *ctx, RedisModuleString **inke
287272
int ParseScriptRunCommand(RedisAI_RunInfo *rinfo, RedisModuleCtx *ctx, RedisModuleString **argv,
288273
int argc) {
289274

275+
RAI_DagOp *currentOp;
276+
RAI_InitDagOp(&currentOp);
277+
rinfo->dagOps = array_append(rinfo->dagOps, currentOp);
278+
290279
// Build a ScriptRunCtx from command.
291-
RAI_Error error = {0};
292280
RAI_Script *script;
293-
RedisModuleString **inkeys = array_new(RedisModuleString *, 1);
294-
RedisModuleString **outkeys = array_new(RedisModuleString *, 1);
295-
RedisModuleString *runkey = NULL;
296281
const char *func_name = NULL;
297-
RAI_ScriptRunCtx *sctx = NULL;
298-
RAI_DagOp *currentOp;
299282

300283
long long timeout = 0;
301284
int variadic = -1;
302-
if (_ScriptRunCommand_ParseArgs(ctx, argv, argc, &script, &error, &inkeys, &outkeys, &runkey,
303-
&func_name, &timeout, &variadic) == REDISMODULE_ERR) {
304-
RedisModule_ReplyWithError(ctx, RAI_GetErrorOneLine(&error));
285+
if (_ScriptRunCommand_ParseArgs(ctx, argv, argc, &script, currentOp->err, &currentOp->inkeys,
286+
&currentOp->outkeys, &currentOp->runkey, &func_name, &timeout,
287+
&variadic) == REDISMODULE_ERR) {
288+
RedisModule_ReplyWithError(ctx, RAI_GetErrorOneLine(currentOp->err));
289+
goto cleanup;
290+
}
291+
if (timeout > 0 && !rinfo->single_op_dag) {
292+
RedisModule_ReplyWithError(ctx, "ERR TIMEOUT not allowed within a DAG command");
305293
goto cleanup;
306294
}
307-
sctx = RAI_ScriptRunCtxCreate(script, func_name);
295+
296+
RAI_ScriptRunCtx *sctx = RAI_ScriptRunCtxCreate(script, func_name);
308297
sctx->variadic = variadic;
309298

310299
if (rinfo->single_op_dag) {
311300
rinfo->timeout = timeout;
312301
// Set params in ScriptRunCtx, bring inputs from key space.
313-
if (_ScriptRunCtx_SetParams(ctx, inkeys, outkeys, sctx) == REDISMODULE_ERR)
302+
if (_ScriptRunCtx_SetParams(ctx, currentOp->inkeys, currentOp->outkeys, sctx) ==
303+
REDISMODULE_ERR)
314304
goto cleanup;
315305
}
316-
RAI_InitDagOp(&currentOp);
317-
306+
currentOp->sctx = sctx;
318307
currentOp->commandType = REDISAI_DAG_CMD_SCRIPTRUN;
319-
Dag_PopulateOp(currentOp, sctx, inkeys, outkeys, runkey);
320-
rinfo->dagOps = array_append(rinfo->dagOps, currentOp);
308+
currentOp->devicestr = sctx->script->devicestr;
309+
321310
return REDISMODULE_OK;
322311

323312
cleanup:
324-
if (error.detail) {
325-
RedisModule_Free(error.detail);
326-
RedisModule_Free(error.detail_oneline);
327-
}
328-
for (size_t i = 0; i < array_len(inkeys); i++) {
329-
RedisModule_FreeString(NULL, inkeys[i]);
330-
}
331-
array_free(inkeys);
332-
for (size_t i = 0; i < array_len(outkeys); i++) {
333-
RedisModule_FreeString(NULL, outkeys[i]);
334-
}
335-
array_free(outkeys);
336-
if (runkey)
337-
RedisModule_FreeString(NULL, runkey);
338-
if (sctx)
339-
RAI_ScriptRunCtxFree(sctx);
340313
RAI_FreeRunInfo(rinfo);
341314
return REDISMODULE_ERR;
342315
}

src/model.c

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -504,19 +504,17 @@ RedisModuleType *RAI_ModelRedisType(void) { return RedisAI_ModelType; }
504504
int RAI_ModelRunAsync(RAI_ModelRunCtx *mctx, RAI_OnFinishCB ModelAsyncFinish, void *private_data) {
505505

506506
RedisAI_RunInfo *rinfo = NULL;
507-
if (RAI_InitRunInfo(&rinfo) == REDISMODULE_ERR) {
508-
return REDISMODULE_ERR;
509-
}
507+
RAI_InitRunInfo(&rinfo);
508+
510509
rinfo->single_op_dag = 1;
511510
rinfo->OnFinish = (RedisAI_OnFinishCB)ModelAsyncFinish;
512511
rinfo->private_data = private_data;
513512

514513
RAI_DagOp *op;
515-
if (RAI_InitDagOp(&op) == REDISMODULE_ERR) {
516-
return REDISMODULE_ERR;
517-
}
514+
RAI_InitDagOp(&op);
518515
op->commandType = REDISAI_DAG_CMD_MODELRUN;
519-
Dag_PopulateOp(op, mctx, NULL, NULL, NULL);
516+
op->devicestr = mctx->model->devicestr;
517+
op->mctx = mctx;
520518

521519
rinfo->dagOps = array_append(rinfo->dagOps, op);
522520
rinfo->dagOpCount = 1;

src/script.c

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -358,19 +358,18 @@ int RAI_ScriptRunAsync(RAI_ScriptRunCtx *sctx, RAI_OnFinishCB ScriptAsyncFinish,
358358
void *private_data) {
359359

360360
RedisAI_RunInfo *rinfo = NULL;
361-
if (RAI_InitRunInfo(&rinfo) == REDISMODULE_ERR) {
362-
return REDISMODULE_ERR;
363-
}
361+
RAI_InitRunInfo(&rinfo);
362+
364363
rinfo->single_op_dag = 1;
365364
rinfo->OnFinish = (RedisAI_OnFinishCB)ScriptAsyncFinish;
366365
rinfo->private_data = private_data;
367366

368367
RAI_DagOp *op;
369-
if (RAI_InitDagOp(&op) == REDISMODULE_ERR) {
370-
return REDISMODULE_ERR;
371-
}
368+
RAI_InitDagOp(&op);
369+
372370
op->commandType = REDISAI_DAG_CMD_SCRIPTRUN;
373-
Dag_PopulateOp(op, sctx, NULL, NULL, NULL);
371+
op->devicestr = sctx->script->devicestr;
372+
op->sctx = sctx;
374373

375374
rinfo->dagOps = array_append(rinfo->dagOps, op);
376375
rinfo->dagOpCount = 1;

0 commit comments

Comments
 (0)