Skip to content

Commit aadb79c

Browse files
committed
Create shallow copies for every device and keep the original DAG runInfo in a designated field (so it won't be modified). Free the copy for every device that finishes, and use the original for calling OnFinish.
1 parent 36fa926 commit aadb79c

File tree

4 files changed

+21
-18
lines changed

4 files changed

+21
-18
lines changed

src/background_workers.c

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -175,10 +175,11 @@ void *RedisAI_Run_ThreadMain(void *arg) {
175175
if (timedOut == 1) {
176176
queueEvict(run_queue_info->run_queue, item);
177177

178+
RedisAI_RunInfo *orig = rinfo->orig_copy;
178179
long long dagRefCount = RAI_DagRunInfoFreeShallowCopy(rinfo);
179180
if (dagRefCount == 0) {
180-
RedisAI_OnFinishCtx finish_ctx = (RedisAI_RunInfo *)rinfo;
181-
rinfo->OnFinish(finish_ctx, rinfo->private_data);
181+
RedisAI_OnFinishCtx finish_ctx = (RedisAI_RunInfo *)orig;
182+
orig->OnFinish(finish_ctx, orig->private_data);
182183
}
183184

184185
queueItem *evicted_item = item;
@@ -413,10 +414,11 @@ void *RedisAI_Run_ThreadMain(void *arg) {
413414
// If there was an error and the reference count for the dag
414415
// has gone to zero and the client is still around, we unblock
415416
if (dagError) {
417+
RedisAI_RunInfo *orig = rinfo->orig_copy;
416418
long long dagRefCount = RAI_DagRunInfoFreeShallowCopy(rinfo);
417419
if (dagRefCount == 0) {
418-
RedisAI_OnFinishCtx finish_ctx = (RedisAI_RunInfo *)rinfo;
419-
rinfo->OnFinish(finish_ctx, rinfo->private_data);
420+
RedisAI_OnFinishCtx finish_ctx = (RedisAI_RunInfo *)orig;
421+
orig->OnFinish(finish_ctx, orig->private_data);
420422
}
421423
} else {
422424
rinfo->dagDeviceCompleteOpCount += 1;
@@ -433,22 +435,22 @@ void *RedisAI_Run_ThreadMain(void *arg) {
433435
int dag_complete_after_run = RedisAI_DagComplete(batch_rinfo[0]);
434436

435437
long long dagRefCount = -1;
436-
438+
RedisAI_RunInfo *orig;
437439
if (device_complete == 1 || device_complete_after_run == 1) {
438-
RedisAI_RunInfo *evicted_rinfo = (RedisAI_RunInfo *)(evicted_items[0]->value);
439-
// We decrease and get the reference count for the DAG
440+
RedisAI_RunInfo *evicted_rinfo = (RedisAI_RunInfo *)(evicted_items[0]->value);
441+
orig = evicted_rinfo->orig_copy;
442+
// We decrease and get the reference count for the DAG.
440443
dagRefCount = RAI_DagRunInfoFreeShallowCopy(evicted_rinfo);
441444
}
442445

443446
// If the DAG was complete, then it's time to unblock the client
444447
if (do_unblock == 1 || dag_complete_after_run == 1) {
445-
RedisAI_RunInfo *evicted_rinfo = (RedisAI_RunInfo *)(evicted_items[0]->value);
446448

447449
// If the reference count for the DAG is zero and the client is still around,
448450
// then we actually unblock the client
449451
if (dagRefCount == 0) {
450-
RedisAI_OnFinishCtx finish_ctx = (RedisAI_RunInfo *)evicted_rinfo;
451-
evicted_rinfo->OnFinish(finish_ctx, evicted_rinfo->private_data);
452+
RedisAI_OnFinishCtx finish_ctx = (RedisAI_RunInfo *)orig;
453+
orig->OnFinish(finish_ctx, orig->private_data);
452454
}
453455
}
454456

src/dag.c

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1302,7 +1302,7 @@ static bool DAG_InsertDAGToQueue(RedisAI_RunInfo *rinfo) {
13021302

13031303
void DAG_ReplyAndUnblock(RedisAI_OnFinishCtx ctx, void *private_data) {
13041304

1305-
RedisAI_RunInfo *rinfo = (RedisAI_RunInfo *)private_data;
1305+
RedisAI_RunInfo *rinfo = (RedisAI_RunInfo *)ctx;
13061306
if (rinfo->client)
13071307
RedisModule_UnblockClient(rinfo->client, rinfo);
13081308
}
@@ -1323,8 +1323,6 @@ int RedisAI_ProcessDagRunCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
13231323
// Block the client before adding rinfo to the run queues (sync call).
13241324
rinfo->client = RedisModule_BlockClient(ctx, RedisAI_DagRun_Reply, NULL, RedisAI_FreeData, 0);
13251325
RedisModule_SetDisconnectCallback(rinfo->client, RedisAI_Disconnected);
1326-
// Use the entire rinfo obj as the on finish's private data.
1327-
rinfo->private_data = rinfo;
13281326
rinfo->OnFinish = DAG_ReplyAndUnblock;
13291327
return DAG_InsertDAGToQueue(rinfo);
13301328
}

src/run_info.c

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ int RAI_InitRunInfo(RedisAI_RunInfo **result) {
130130
rinfo->dagCompleteOpCount = RedisModule_Calloc(1, sizeof(long long));
131131
rinfo->dagDeviceOpCount = 0;
132132
rinfo->dagDeviceCompleteOpCount = 0;
133+
rinfo->orig_copy = rinfo;
133134
pthread_rwlock_init(rinfo->dagLock, NULL);
134135
rinfo->timedOut = RedisModule_Calloc(1, sizeof(int));
135136
*result = rinfo;
@@ -206,9 +207,7 @@ long long RAI_DagRunInfoFreeShallowCopy(RedisAI_RunInfo *rinfo) {
206207
if (rinfo->dagDeviceOps) {
207208
array_free(rinfo->dagDeviceOps);
208209
}
209-
// If this is the last run info copy we do not free it, the OnFinish callback may free.
210-
if (ref_count > 0)
211-
RedisModule_Free(rinfo);
210+
RedisModule_Free(rinfo);
212211
return ref_count;
213212
}
214213

src/run_info.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,10 @@ typedef void (*RAI_OnFinishCB)(RedisAI_OnFinishCtx ctx, void *private_data);
7979
* Note that not all the context structure is always filled with actual values
8080
* but only the fields needed in a given operation.
8181
*/
82-
typedef struct RedisAI_RunInfo {
82+
83+
typedef struct RedisAI_RunInfo RedisAI_RunInfo;
84+
85+
struct RedisAI_RunInfo {
8386
RedisModuleBlockedClient *client;
8487
int single_op_dag;
8588
int single_device_dag;
@@ -106,8 +109,9 @@ typedef struct RedisAI_RunInfo {
106109
int *timedOut;
107110
struct timeval queuingTime;
108111
RAI_OnFinishCB OnFinish;
112+
RedisAI_RunInfo *orig_copy;
109113
void *private_data; // This is going to be sent to the OnFinish callback.
110-
} RedisAI_RunInfo;
114+
};
111115

112116
/**
113117
* Allocate the memory and initialise the RedisAI_RunInfo.

0 commit comments

Comments
 (0)