From a8e975e7e8ffd2edfb8396651444ad1f14606905 Mon Sep 17 00:00:00 2001 From: "Junfeng(Jerome) Yang" Date: Sat, 12 Oct 2019 14:17:48 +0800 Subject: [PATCH 1/8] Check for acknowledge NOTICE form QEs --- src/backend/cdb/dispatcher/cdbdisp.c | 15 +++++- src/backend/cdb/dispatcher/cdbdisp_async.c | 51 ++++++++++++++++++- src/backend/cdb/dispatcher/cdbdisp_query.c | 13 ++++- src/backend/cdb/endpoint/cdbendpoint.c | 3 +- .../cdb/endpoint/cdbendpointinternal.h | 6 +++ src/backend/commands/portalcmds.c | 10 ++-- src/include/cdb/cdbconn.h | 2 + src/include/cdb/cdbdisp.h | 13 ++++- src/include/cdb/cdbendpoint.h | 3 ++ 9 files changed, 105 insertions(+), 11 deletions(-) diff --git a/src/backend/cdb/dispatcher/cdbdisp.c b/src/backend/cdb/dispatcher/cdbdisp.c index d41ac72005a9..fcb6af4c1b5d 100644 --- a/src/backend/cdb/dispatcher/cdbdisp.c +++ b/src/backend/cdb/dispatcher/cdbdisp.c @@ -102,6 +102,19 @@ cdbdisp_waitDispatchFinish(struct CdbDispatcherState *ds) (pDispatchFuncs->waitDispatchFinish) (ds); } +/* + * cdbdisp_checkDispatchAckNotice: + * + * Check for acknowledge NOTICE form QEs/EntryDB after cdbdisp_dispatchToGang(). + * + */ +void +cdbdisp_checkDispatchAckNotice(struct CdbDispatcherState *ds, bool wait, const char *message) +{ + if (pDispatchFuncs->checkResults != NULL) + (pDispatchFuncs->checkAckNotice) (ds, wait, message); +} + /* * cdbdisp_checkDispatchResult: * @@ -134,7 +147,7 @@ cdbdisp_checkDispatchResult(struct CdbDispatcherState *ds, } } -/** +/* * Check whether or not the PARALLEL RETRIEVE CURSOR Execution Finished * This func should be called after calling cdbdisp_checkDispatchResult(). * diff --git a/src/backend/cdb/dispatcher/cdbdisp_async.c b/src/backend/cdb/dispatcher/cdbdisp_async.c index 153bde8873e6..62c3c554dbbc 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_async.c +++ b/src/backend/cdb/dispatcher/cdbdisp_async.c @@ -68,6 +68,8 @@ typedef struct CdbDispatchCmdAsync */ volatile DispatchWaitMode waitMode; + const char *ackNoticeMsg; + /* * Text information to dispatch: The format is type(1 byte) + length(size * of int) + content(n bytes) @@ -83,6 +85,8 @@ typedef struct CdbDispatchCmdAsync static void *cdbdisp_makeDispatchParams_async(int maxSlices, int largestGangSize, char *queryText, int len); +static void cdbdisp_checkAckNotice_async(struct CdbDispatcherState *ds, bool wait, const char *message); + static void cdbdisp_checkDispatchResult_async(struct CdbDispatcherState *ds, DispatchWaitMode waitMode); @@ -99,6 +103,7 @@ DispatcherInternalFuncs DispatcherAsyncFuncs = cdbdisp_checkForCancel_async, cdbdisp_getWaitSocketFd_async, cdbdisp_makeDispatchParams_async, + cdbdisp_checkAckNotice_async, cdbdisp_checkDispatchResult_async, cdbdisp_dispatchToGang_async, cdbdisp_waitDispatchFinish_async @@ -306,6 +311,33 @@ cdbdisp_dispatchToGang_async(struct CdbDispatcherState *ds, } } +/* + * Check dispatch acknowledge NOTICE. + * + * Wait all dispatch work to get back specify acknowledge NOTICE, + * either success or fail. (Set stillRunning to true when + * one dispatch work is completed) + */ +static void +cdbdisp_checkAckNotice_async(struct CdbDispatcherState *ds, bool wait, const char *message) +{ + Assert(ds != NULL); + DispatchWaitMode prevWaitMode; + CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams; + + /* cdbdisp_destroyDispatcherState is called */ + if (pParms == NULL) + return; + pParms->ackNoticeMsg = message; + + prevWaitMode = pParms->waitMode; + pParms->waitMode = DISPATCH_WAIT_ACK_NOTICE; + + checkDispatchResult(ds, wait); + + pParms->waitMode = prevWaitMode; +} + /* * Check dispatch result. * @@ -317,6 +349,8 @@ cdbdisp_checkDispatchResult_async(struct CdbDispatcherState *ds, DispatchWaitMode waitMode) { Assert(ds != NULL); + /* DISPATCH_WAIT_ACK_NOTICE can not be set in cdbdisp_checkDispatchResult_async */ + Assert(waitMode != DISPATCH_WAIT_ACK_NOTICE); CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams; /* cdbdisp_destroyDispatcherState is called */ @@ -338,7 +372,8 @@ cdbdisp_checkDispatchResult_async(struct CdbDispatcherState *ds, * * The waitMode argument is NONE when we are doing "normal work". */ - if (waitMode == DISPATCH_WAIT_NONE || waitMode == DISPATCH_WAIT_FINISH) + if (waitMode == DISPATCH_WAIT_NONE || waitMode == DISPATCH_WAIT_FINISH || + waitMode == DISPATCH_WAIT_ACK_NOTICE) CHECK_FOR_INTERRUPTS(); } @@ -360,6 +395,7 @@ cdbdisp_makeDispatchParams_async(int maxSlices, int largestGangSize, char *query pParms->dispatchResultPtrArray = (CdbDispatchResult **) palloc0(size); pParms->dispatchCount = 0; pParms->waitMode = DISPATCH_WAIT_NONE; + pParms->ackNoticeMsg = NULL; pParms->query_text = queryText; pParms->query_text_len = len; @@ -389,10 +425,11 @@ checkDispatchResult(CdbDispatcherState *ds, bool sentSignal = false; struct pollfd *fds; uint8 ftsVersion = 0; + bool *ackNotices; db_count = pParms->dispatchCount; fds = (struct pollfd *) palloc(db_count * sizeof(struct pollfd)); - + ackNotices = palloc0(db_count * sizeof(bool)); /* * OK, we are finished submitting the command to the segdbs. Now, we have * to wait for them to finish. @@ -433,6 +470,8 @@ checkDispatchResult(CdbDispatcherState *ds, */ if (!dispatchResult->stillRunning) continue; + if (pParms->waitMode == DISPATCH_WAIT_ACK_NOTICE && ackNotices[i]) + continue; Assert(!cdbconn_isBadConnection(segdbDesc)); @@ -544,10 +583,18 @@ checkDispatchResult(CdbDispatcherState *ds, } /* We have data waiting on one or more of the connections. */ else + { handlePollSuccess(pParms, fds); + if (pParms->waitMode == DISPATCH_WAIT_ACK_NOTICE && + checkACKQENotices(pParms->ackNoticeMsg)) + { + ackNotices[i] = true; + } + } } pfree(fds); + pfree(ackNotices); } /* diff --git a/src/backend/cdb/dispatcher/cdbdisp_query.c b/src/backend/cdb/dispatcher/cdbdisp_query.c index 665444baab4a..1810b7c616e1 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_query.c +++ b/src/backend/cdb/dispatcher/cdbdisp_query.c @@ -44,6 +44,7 @@ #include "cdb/cdbdisp_dtx.h" /* for qdSerializeDtxContextInfo() */ #include "cdb/cdbdispatchresult.h" #include "cdb/cdbcopy.h" +#include "cdb/cdbendpoint.h" #include "executor/execUtils.h" #define QUERY_STRING_TRUNCATE_SIZE (1024) @@ -1153,7 +1154,6 @@ cdbdisp_dispatchX(QueryDesc* queryDesc, pfree(sliceVector); cdbdisp_waitDispatchFinish(ds); - /* * If bailed before completely dispatched, stop QEs and throw error. */ @@ -1192,6 +1192,17 @@ cdbdisp_dispatchX(QueryDesc* queryDesc, errmsg_internal("unable to dispatch plan"))); } + if (queryDesc->parallel_retrieve_cursor) + { + cdbdisp_checkDispatchAckNotice(ds, true, ENDPOINT_READY); + if(cdbdisp_checkResultsErrcode(ds->primaryResults)) + { + cdbdisp_getDispatchResults(ds, &qeError); + cdbdisp_destroyDispatcherState(ds); + ReThrowError(qeError); + } + } + if (DEBUG1 >= log_min_messages) { char msec_str[32]; diff --git a/src/backend/cdb/endpoint/cdbendpoint.c b/src/backend/cdb/endpoint/cdbendpoint.c index 18d4fa9d5976..6e202af9b906 100644 --- a/src/backend/cdb/endpoint/cdbendpoint.c +++ b/src/backend/cdb/endpoint/cdbendpoint.c @@ -544,8 +544,9 @@ CreateTQDestReceiverForEndpoint(TupleDesc tupleDesc, const char *cursorName) activeSharedEndpoint = alloc_endpoint(cursorName, dsm_segment_handle(activeDsmSeg)); + SEND_ACK_NOTICE(ENDPOINT_READY); /* Unblock the latch to finish declare statement. */ - declare_parallel_retrieve_ready(cursorName); +// declare_parallel_retrieve_ready(cursorName); return CreateTupleQueueDestReceiver(shmMqHandle); } diff --git a/src/backend/cdb/endpoint/cdbendpointinternal.h b/src/backend/cdb/endpoint/cdbendpointinternal.h index d8a120599011..35163b3b6e95 100644 --- a/src/backend/cdb/endpoint/cdbendpointinternal.h +++ b/src/backend/cdb/endpoint/cdbendpointinternal.h @@ -36,6 +36,12 @@ */ #define ENDPOINT_NAME_LEN (NAMEDATALEN + 1 + 8 + 1 + 8) +#define ACK_NOTICE_MSG_HEADER "^ACK_NOTICE^" + +#define SEND_ACK_NOTICE(MSG) \ + ereport(NOTICE, (errcode(ERRCODE_SUCCESSFUL_COMPLETION), \ + errmsg("%s: %s", ACK_NOTICE_MSG_HEADER, MSG))) + /* * Endpoint attach status. */ diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c index eedb66fb4018..526cf12b533e 100644 --- a/src/backend/commands/portalcmds.c +++ b/src/backend/commands/portalcmds.c @@ -164,11 +164,11 @@ PerformCursorOpen(PlannedStmt *stmt, ParamListInfo params, Assert(portal->strategy == PORTAL_ONE_SELECT); - if (portal->cursorOptions & CURSOR_OPT_PARALLEL_RETRIEVE) - { - PlannedStmt* stmt = (PlannedStmt *) linitial(portal->stmts); - WaitEndpointReady(stmt->planTree, portal->name); - } +// if (portal->cursorOptions & CURSOR_OPT_PARALLEL_RETRIEVE) +// { +// PlannedStmt* stmt = (PlannedStmt *) linitial(portal->stmts); +// WaitEndpointReady(stmt->planTree, portal->name); +// } /* * We're done; the query won't actually be run until PerformPortalFetch is * called. diff --git a/src/include/cdb/cdbconn.h b/src/include/cdb/cdbconn.h index ec46fe80dc84..8ada8d437c29 100644 --- a/src/include/cdb/cdbconn.h +++ b/src/include/cdb/cdbconn.h @@ -103,4 +103,6 @@ bool cdbconn_signalQE(SegmentDatabaseDescriptor *segdbDesc, char *errbuf, bool i extern void forwardQENotices(void); +extern bool checkACKQENotices(const char *message); + #endif /* CDBCONN_H */ diff --git a/src/include/cdb/cdbdisp.h b/src/include/cdb/cdbdisp.h index 65115d0dab08..80d83b9dba6a 100644 --- a/src/include/cdb/cdbdisp.h +++ b/src/include/cdb/cdbdisp.h @@ -32,6 +32,7 @@ enum GangType; typedef enum DispatchWaitMode { DISPATCH_WAIT_NONE = 0, /* wait until QE fully completes */ + DISPATCH_WAIT_ACK_NOTICE, /* wait until QE send ack NOTICE back */ DISPATCH_WAIT_FINISH, /* send query finish */ DISPATCH_WAIT_CANCEL /* send query cancel */ } DispatchWaitMode; @@ -51,6 +52,7 @@ typedef struct DispatcherInternalFuncs bool (*checkForCancel)(struct CdbDispatcherState *ds); int (*getWaitSocketFd)(struct CdbDispatcherState *ds); void* (*makeDispatchParams)(int maxSlices, int largestGangSize, char *queryText, int queryTextLen); + void (*checkAckNotice)(struct CdbDispatcherState *ds, bool wait, const char* message); void (*checkResults)(struct CdbDispatcherState *ds, DispatchWaitMode waitMode); void (*dispatchToGang)(struct CdbDispatcherState *ds, struct Gang *gp, int sliceIndex); void (*waitDispatchFinish)(struct CdbDispatcherState *ds); @@ -100,6 +102,15 @@ cdbdisp_dispatchToGang(struct CdbDispatcherState *ds, void cdbdisp_waitDispatchFinish(struct CdbDispatcherState *ds); +/* + * cdbdisp_checkDispatchAckNotice: + * + * Check for acknowledge NOTICE form QEs/EntryDB after cdbdisp_dispatchToGang(). + * + */ +void +cdbdisp_checkDispatchAckNotice(struct CdbDispatcherState *ds, bool wait, const char *message); + /* * CdbCheckDispatchResult: * @@ -111,7 +122,7 @@ cdbdisp_waitDispatchFinish(struct CdbDispatcherState *ds); void cdbdisp_checkDispatchResult(struct CdbDispatcherState *ds, DispatchWaitMode waitMode); -/** +/* * Check whether or not the PARALLEL RETRIEVE CURSOR Execution Finished * This func should be called after calling cdbdisp_checkDispatchResult(). * diff --git a/src/include/cdb/cdbendpoint.h b/src/include/cdb/cdbendpoint.h index ca217537a668..a175c242e6b6 100644 --- a/src/include/cdb/cdbendpoint.h +++ b/src/include/cdb/cdbendpoint.h @@ -40,6 +40,9 @@ #include "tcop/dest.h" #include "storage/lwlock.h" +/* ACK NOTICE MESSAGE FROM ENDPOINT QE/Entry DB to QD */ +#define ENDPOINT_READY "ENDPOINT_READY" + /* * Roles that used in PARALLEL RETRIEVE CURSOR execution. * From 409374aa058de68cf896fa64477c6ced54ab9b10 Mon Sep 17 00:00:00 2001 From: "Junfeng(Jerome) Yang" Date: Sat, 12 Oct 2019 14:17:48 +0800 Subject: [PATCH 2/8] Check for acknowledge NOTICE form QEs --- src/backend/cdb/dispatcher/cdbdisp.c | 15 +++++- src/backend/cdb/dispatcher/cdbdisp_async.c | 51 ++++++++++++++++++- src/backend/cdb/dispatcher/cdbdisp_query.c | 13 ++++- src/backend/cdb/endpoint/cdbendpoint.c | 3 +- .../cdb/endpoint/cdbendpointinternal.h | 6 +++ src/backend/commands/portalcmds.c | 10 ++-- src/include/cdb/cdbconn.h | 2 + src/include/cdb/cdbdisp.h | 13 ++++- src/include/cdb/cdbendpoint.h | 3 ++ 9 files changed, 105 insertions(+), 11 deletions(-) diff --git a/src/backend/cdb/dispatcher/cdbdisp.c b/src/backend/cdb/dispatcher/cdbdisp.c index d41ac72005a9..fcb6af4c1b5d 100644 --- a/src/backend/cdb/dispatcher/cdbdisp.c +++ b/src/backend/cdb/dispatcher/cdbdisp.c @@ -102,6 +102,19 @@ cdbdisp_waitDispatchFinish(struct CdbDispatcherState *ds) (pDispatchFuncs->waitDispatchFinish) (ds); } +/* + * cdbdisp_checkDispatchAckNotice: + * + * Check for acknowledge NOTICE form QEs/EntryDB after cdbdisp_dispatchToGang(). + * + */ +void +cdbdisp_checkDispatchAckNotice(struct CdbDispatcherState *ds, bool wait, const char *message) +{ + if (pDispatchFuncs->checkResults != NULL) + (pDispatchFuncs->checkAckNotice) (ds, wait, message); +} + /* * cdbdisp_checkDispatchResult: * @@ -134,7 +147,7 @@ cdbdisp_checkDispatchResult(struct CdbDispatcherState *ds, } } -/** +/* * Check whether or not the PARALLEL RETRIEVE CURSOR Execution Finished * This func should be called after calling cdbdisp_checkDispatchResult(). * diff --git a/src/backend/cdb/dispatcher/cdbdisp_async.c b/src/backend/cdb/dispatcher/cdbdisp_async.c index 153bde8873e6..62c3c554dbbc 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_async.c +++ b/src/backend/cdb/dispatcher/cdbdisp_async.c @@ -68,6 +68,8 @@ typedef struct CdbDispatchCmdAsync */ volatile DispatchWaitMode waitMode; + const char *ackNoticeMsg; + /* * Text information to dispatch: The format is type(1 byte) + length(size * of int) + content(n bytes) @@ -83,6 +85,8 @@ typedef struct CdbDispatchCmdAsync static void *cdbdisp_makeDispatchParams_async(int maxSlices, int largestGangSize, char *queryText, int len); +static void cdbdisp_checkAckNotice_async(struct CdbDispatcherState *ds, bool wait, const char *message); + static void cdbdisp_checkDispatchResult_async(struct CdbDispatcherState *ds, DispatchWaitMode waitMode); @@ -99,6 +103,7 @@ DispatcherInternalFuncs DispatcherAsyncFuncs = cdbdisp_checkForCancel_async, cdbdisp_getWaitSocketFd_async, cdbdisp_makeDispatchParams_async, + cdbdisp_checkAckNotice_async, cdbdisp_checkDispatchResult_async, cdbdisp_dispatchToGang_async, cdbdisp_waitDispatchFinish_async @@ -306,6 +311,33 @@ cdbdisp_dispatchToGang_async(struct CdbDispatcherState *ds, } } +/* + * Check dispatch acknowledge NOTICE. + * + * Wait all dispatch work to get back specify acknowledge NOTICE, + * either success or fail. (Set stillRunning to true when + * one dispatch work is completed) + */ +static void +cdbdisp_checkAckNotice_async(struct CdbDispatcherState *ds, bool wait, const char *message) +{ + Assert(ds != NULL); + DispatchWaitMode prevWaitMode; + CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams; + + /* cdbdisp_destroyDispatcherState is called */ + if (pParms == NULL) + return; + pParms->ackNoticeMsg = message; + + prevWaitMode = pParms->waitMode; + pParms->waitMode = DISPATCH_WAIT_ACK_NOTICE; + + checkDispatchResult(ds, wait); + + pParms->waitMode = prevWaitMode; +} + /* * Check dispatch result. * @@ -317,6 +349,8 @@ cdbdisp_checkDispatchResult_async(struct CdbDispatcherState *ds, DispatchWaitMode waitMode) { Assert(ds != NULL); + /* DISPATCH_WAIT_ACK_NOTICE can not be set in cdbdisp_checkDispatchResult_async */ + Assert(waitMode != DISPATCH_WAIT_ACK_NOTICE); CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams; /* cdbdisp_destroyDispatcherState is called */ @@ -338,7 +372,8 @@ cdbdisp_checkDispatchResult_async(struct CdbDispatcherState *ds, * * The waitMode argument is NONE when we are doing "normal work". */ - if (waitMode == DISPATCH_WAIT_NONE || waitMode == DISPATCH_WAIT_FINISH) + if (waitMode == DISPATCH_WAIT_NONE || waitMode == DISPATCH_WAIT_FINISH || + waitMode == DISPATCH_WAIT_ACK_NOTICE) CHECK_FOR_INTERRUPTS(); } @@ -360,6 +395,7 @@ cdbdisp_makeDispatchParams_async(int maxSlices, int largestGangSize, char *query pParms->dispatchResultPtrArray = (CdbDispatchResult **) palloc0(size); pParms->dispatchCount = 0; pParms->waitMode = DISPATCH_WAIT_NONE; + pParms->ackNoticeMsg = NULL; pParms->query_text = queryText; pParms->query_text_len = len; @@ -389,10 +425,11 @@ checkDispatchResult(CdbDispatcherState *ds, bool sentSignal = false; struct pollfd *fds; uint8 ftsVersion = 0; + bool *ackNotices; db_count = pParms->dispatchCount; fds = (struct pollfd *) palloc(db_count * sizeof(struct pollfd)); - + ackNotices = palloc0(db_count * sizeof(bool)); /* * OK, we are finished submitting the command to the segdbs. Now, we have * to wait for them to finish. @@ -433,6 +470,8 @@ checkDispatchResult(CdbDispatcherState *ds, */ if (!dispatchResult->stillRunning) continue; + if (pParms->waitMode == DISPATCH_WAIT_ACK_NOTICE && ackNotices[i]) + continue; Assert(!cdbconn_isBadConnection(segdbDesc)); @@ -544,10 +583,18 @@ checkDispatchResult(CdbDispatcherState *ds, } /* We have data waiting on one or more of the connections. */ else + { handlePollSuccess(pParms, fds); + if (pParms->waitMode == DISPATCH_WAIT_ACK_NOTICE && + checkACKQENotices(pParms->ackNoticeMsg)) + { + ackNotices[i] = true; + } + } } pfree(fds); + pfree(ackNotices); } /* diff --git a/src/backend/cdb/dispatcher/cdbdisp_query.c b/src/backend/cdb/dispatcher/cdbdisp_query.c index 665444baab4a..1810b7c616e1 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_query.c +++ b/src/backend/cdb/dispatcher/cdbdisp_query.c @@ -44,6 +44,7 @@ #include "cdb/cdbdisp_dtx.h" /* for qdSerializeDtxContextInfo() */ #include "cdb/cdbdispatchresult.h" #include "cdb/cdbcopy.h" +#include "cdb/cdbendpoint.h" #include "executor/execUtils.h" #define QUERY_STRING_TRUNCATE_SIZE (1024) @@ -1153,7 +1154,6 @@ cdbdisp_dispatchX(QueryDesc* queryDesc, pfree(sliceVector); cdbdisp_waitDispatchFinish(ds); - /* * If bailed before completely dispatched, stop QEs and throw error. */ @@ -1192,6 +1192,17 @@ cdbdisp_dispatchX(QueryDesc* queryDesc, errmsg_internal("unable to dispatch plan"))); } + if (queryDesc->parallel_retrieve_cursor) + { + cdbdisp_checkDispatchAckNotice(ds, true, ENDPOINT_READY); + if(cdbdisp_checkResultsErrcode(ds->primaryResults)) + { + cdbdisp_getDispatchResults(ds, &qeError); + cdbdisp_destroyDispatcherState(ds); + ReThrowError(qeError); + } + } + if (DEBUG1 >= log_min_messages) { char msec_str[32]; diff --git a/src/backend/cdb/endpoint/cdbendpoint.c b/src/backend/cdb/endpoint/cdbendpoint.c index 18d4fa9d5976..6e202af9b906 100644 --- a/src/backend/cdb/endpoint/cdbendpoint.c +++ b/src/backend/cdb/endpoint/cdbendpoint.c @@ -544,8 +544,9 @@ CreateTQDestReceiverForEndpoint(TupleDesc tupleDesc, const char *cursorName) activeSharedEndpoint = alloc_endpoint(cursorName, dsm_segment_handle(activeDsmSeg)); + SEND_ACK_NOTICE(ENDPOINT_READY); /* Unblock the latch to finish declare statement. */ - declare_parallel_retrieve_ready(cursorName); +// declare_parallel_retrieve_ready(cursorName); return CreateTupleQueueDestReceiver(shmMqHandle); } diff --git a/src/backend/cdb/endpoint/cdbendpointinternal.h b/src/backend/cdb/endpoint/cdbendpointinternal.h index d8a120599011..35163b3b6e95 100644 --- a/src/backend/cdb/endpoint/cdbendpointinternal.h +++ b/src/backend/cdb/endpoint/cdbendpointinternal.h @@ -36,6 +36,12 @@ */ #define ENDPOINT_NAME_LEN (NAMEDATALEN + 1 + 8 + 1 + 8) +#define ACK_NOTICE_MSG_HEADER "^ACK_NOTICE^" + +#define SEND_ACK_NOTICE(MSG) \ + ereport(NOTICE, (errcode(ERRCODE_SUCCESSFUL_COMPLETION), \ + errmsg("%s: %s", ACK_NOTICE_MSG_HEADER, MSG))) + /* * Endpoint attach status. */ diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c index eedb66fb4018..526cf12b533e 100644 --- a/src/backend/commands/portalcmds.c +++ b/src/backend/commands/portalcmds.c @@ -164,11 +164,11 @@ PerformCursorOpen(PlannedStmt *stmt, ParamListInfo params, Assert(portal->strategy == PORTAL_ONE_SELECT); - if (portal->cursorOptions & CURSOR_OPT_PARALLEL_RETRIEVE) - { - PlannedStmt* stmt = (PlannedStmt *) linitial(portal->stmts); - WaitEndpointReady(stmt->planTree, portal->name); - } +// if (portal->cursorOptions & CURSOR_OPT_PARALLEL_RETRIEVE) +// { +// PlannedStmt* stmt = (PlannedStmt *) linitial(portal->stmts); +// WaitEndpointReady(stmt->planTree, portal->name); +// } /* * We're done; the query won't actually be run until PerformPortalFetch is * called. diff --git a/src/include/cdb/cdbconn.h b/src/include/cdb/cdbconn.h index ec46fe80dc84..8ada8d437c29 100644 --- a/src/include/cdb/cdbconn.h +++ b/src/include/cdb/cdbconn.h @@ -103,4 +103,6 @@ bool cdbconn_signalQE(SegmentDatabaseDescriptor *segdbDesc, char *errbuf, bool i extern void forwardQENotices(void); +extern bool checkACKQENotices(const char *message); + #endif /* CDBCONN_H */ diff --git a/src/include/cdb/cdbdisp.h b/src/include/cdb/cdbdisp.h index 65115d0dab08..80d83b9dba6a 100644 --- a/src/include/cdb/cdbdisp.h +++ b/src/include/cdb/cdbdisp.h @@ -32,6 +32,7 @@ enum GangType; typedef enum DispatchWaitMode { DISPATCH_WAIT_NONE = 0, /* wait until QE fully completes */ + DISPATCH_WAIT_ACK_NOTICE, /* wait until QE send ack NOTICE back */ DISPATCH_WAIT_FINISH, /* send query finish */ DISPATCH_WAIT_CANCEL /* send query cancel */ } DispatchWaitMode; @@ -51,6 +52,7 @@ typedef struct DispatcherInternalFuncs bool (*checkForCancel)(struct CdbDispatcherState *ds); int (*getWaitSocketFd)(struct CdbDispatcherState *ds); void* (*makeDispatchParams)(int maxSlices, int largestGangSize, char *queryText, int queryTextLen); + void (*checkAckNotice)(struct CdbDispatcherState *ds, bool wait, const char* message); void (*checkResults)(struct CdbDispatcherState *ds, DispatchWaitMode waitMode); void (*dispatchToGang)(struct CdbDispatcherState *ds, struct Gang *gp, int sliceIndex); void (*waitDispatchFinish)(struct CdbDispatcherState *ds); @@ -100,6 +102,15 @@ cdbdisp_dispatchToGang(struct CdbDispatcherState *ds, void cdbdisp_waitDispatchFinish(struct CdbDispatcherState *ds); +/* + * cdbdisp_checkDispatchAckNotice: + * + * Check for acknowledge NOTICE form QEs/EntryDB after cdbdisp_dispatchToGang(). + * + */ +void +cdbdisp_checkDispatchAckNotice(struct CdbDispatcherState *ds, bool wait, const char *message); + /* * CdbCheckDispatchResult: * @@ -111,7 +122,7 @@ cdbdisp_waitDispatchFinish(struct CdbDispatcherState *ds); void cdbdisp_checkDispatchResult(struct CdbDispatcherState *ds, DispatchWaitMode waitMode); -/** +/* * Check whether or not the PARALLEL RETRIEVE CURSOR Execution Finished * This func should be called after calling cdbdisp_checkDispatchResult(). * diff --git a/src/include/cdb/cdbendpoint.h b/src/include/cdb/cdbendpoint.h index ca217537a668..a175c242e6b6 100644 --- a/src/include/cdb/cdbendpoint.h +++ b/src/include/cdb/cdbendpoint.h @@ -40,6 +40,9 @@ #include "tcop/dest.h" #include "storage/lwlock.h" +/* ACK NOTICE MESSAGE FROM ENDPOINT QE/Entry DB to QD */ +#define ENDPOINT_READY "ENDPOINT_READY" + /* * Roles that used in PARALLEL RETRIEVE CURSOR execution. * From 0de0c71d0e035ebf7d64814b9ef05e36e180fb02 Mon Sep 17 00:00:00 2001 From: "Junfeng(Jerome) Yang" Date: Sat, 12 Oct 2019 15:05:47 +0800 Subject: [PATCH 3/8] move cdbdisp_checkDispatchAckNotice to WaitEndpointReady --- src/backend/cdb/dispatcher/cdbdisp_query.c | 12 ------------ src/backend/cdb/endpoint/cdbendpoint.c | 13 ++++++++++++- src/backend/cdb/endpoint/cdbendpointinternal.h | 5 +++++ src/backend/commands/portalcmds.c | 12 +++++++----- src/include/cdb/cdbendpoint.h | 5 ++--- 5 files changed, 26 insertions(+), 21 deletions(-) diff --git a/src/backend/cdb/dispatcher/cdbdisp_query.c b/src/backend/cdb/dispatcher/cdbdisp_query.c index 1810b7c616e1..e3f9c4c5f683 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_query.c +++ b/src/backend/cdb/dispatcher/cdbdisp_query.c @@ -44,7 +44,6 @@ #include "cdb/cdbdisp_dtx.h" /* for qdSerializeDtxContextInfo() */ #include "cdb/cdbdispatchresult.h" #include "cdb/cdbcopy.h" -#include "cdb/cdbendpoint.h" #include "executor/execUtils.h" #define QUERY_STRING_TRUNCATE_SIZE (1024) @@ -1192,17 +1191,6 @@ cdbdisp_dispatchX(QueryDesc* queryDesc, errmsg_internal("unable to dispatch plan"))); } - if (queryDesc->parallel_retrieve_cursor) - { - cdbdisp_checkDispatchAckNotice(ds, true, ENDPOINT_READY); - if(cdbdisp_checkResultsErrcode(ds->primaryResults)) - { - cdbdisp_getDispatchResults(ds, &qeError); - cdbdisp_destroyDispatcherState(ds); - ReThrowError(qeError); - } - } - if (DEBUG1 >= log_min_messages) { char msec_str[32]; diff --git a/src/backend/cdb/endpoint/cdbendpoint.c b/src/backend/cdb/endpoint/cdbendpoint.c index 6e202af9b906..37581e3e8e57 100644 --- a/src/backend/cdb/endpoint/cdbendpoint.c +++ b/src/backend/cdb/endpoint/cdbendpoint.c @@ -365,8 +365,19 @@ ChooseEndpointContentIDForParallelCursor(const struct Plan *planTree, } void -WaitEndpointReady(const struct Plan *planTree, const char *cursorName) +WaitEndpointReady(CdbDispatcherState* ds, const struct Plan *planTree, const char *cursorName) { + ErrorData *qeError = NULL; + + cdbdisp_checkDispatchAckNotice(ds, true, ENDPOINT_READY); + if(cdbdisp_checkResultsErrcode(ds->primaryResults)) + { + cdbdisp_getDispatchResults(ds, &qeError); + cdbdisp_destroyDispatcherState(ds); + ReThrowError(qeError); + } + + // TODO: redesign the token dispatch logic, remove below code call_endpoint_udf_on_qd(planTree, cursorName, 'r'); } diff --git a/src/backend/cdb/endpoint/cdbendpointinternal.h b/src/backend/cdb/endpoint/cdbendpointinternal.h index 35163b3b6e95..b3b86b24d428 100644 --- a/src/backend/cdb/endpoint/cdbendpointinternal.h +++ b/src/backend/cdb/endpoint/cdbendpointinternal.h @@ -42,6 +42,11 @@ ereport(NOTICE, (errcode(ERRCODE_SUCCESSFUL_COMPLETION), \ errmsg("%s: %s", ACK_NOTICE_MSG_HEADER, MSG))) + +/* ACK NOTICE MESSAGE FROM ENDPOINT QE/Entry DB to QD */ +#define ENDPOINT_READY "ENDPOINT_READY" + + /* * Endpoint attach status. */ diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c index 526cf12b533e..2f8c26bd47b2 100644 --- a/src/backend/commands/portalcmds.c +++ b/src/backend/commands/portalcmds.c @@ -164,11 +164,13 @@ PerformCursorOpen(PlannedStmt *stmt, ParamListInfo params, Assert(portal->strategy == PORTAL_ONE_SELECT); -// if (portal->cursorOptions & CURSOR_OPT_PARALLEL_RETRIEVE) -// { -// PlannedStmt* stmt = (PlannedStmt *) linitial(portal->stmts); -// WaitEndpointReady(stmt->planTree, portal->name); -// } + if (portal->cursorOptions & CURSOR_OPT_PARALLEL_RETRIEVE) + { + PlannedStmt* stmt = (PlannedStmt *) linitial(portal->stmts); + WaitEndpointReady( + portal->queryDesc->estate->dispatcherState, + stmt->planTree, portal->name); + } /* * We're done; the query won't actually be run until PerformPortalFetch is * called. diff --git a/src/include/cdb/cdbendpoint.h b/src/include/cdb/cdbendpoint.h index a175c242e6b6..9108dc7fe1e1 100644 --- a/src/include/cdb/cdbendpoint.h +++ b/src/include/cdb/cdbendpoint.h @@ -39,9 +39,8 @@ #include "nodes/parsenodes.h" #include "tcop/dest.h" #include "storage/lwlock.h" +#include "cdb/cdbdisp.h" -/* ACK NOTICE MESSAGE FROM ENDPOINT QE/Entry DB to QD */ -#define ENDPOINT_READY "ENDPOINT_READY" /* * Roles that used in PARALLEL RETRIEVE CURSOR execution. @@ -84,7 +83,7 @@ extern enum EndPointExecPosition GetParallelCursorEndpointPosition( const struct Plan *planTree); extern List *ChooseEndpointContentIDForParallelCursor( const struct Plan *planTree, enum EndPointExecPosition *position); -extern void WaitEndpointReady(const struct Plan *planTree, const char *cursorName); +extern void WaitEndpointReady(CdbDispatcherState* ds, const struct Plan *planTree, const char *cursorName); /* * Below functions should run on Endpoints(QE/Entry DB). From a73f1255802faf1ad5bea022daf38762d20331b1 Mon Sep 17 00:00:00 2001 From: "Junfeng(Jerome) Yang" Date: Sat, 12 Oct 2019 15:28:56 +0800 Subject: [PATCH 4/8] remove useless code --- src/backend/cdb/dispatcher/cdbdisp_async.c | 3 +-- src/backend/cdb/dispatcher/cdbdisp_query.c | 13 +------------ src/backend/cdb/endpoint/cdbendpoint.c | 3 ++- 3 files changed, 4 insertions(+), 15 deletions(-) diff --git a/src/backend/cdb/dispatcher/cdbdisp_async.c b/src/backend/cdb/dispatcher/cdbdisp_async.c index 62c3c554dbbc..a3a38635f28f 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_async.c +++ b/src/backend/cdb/dispatcher/cdbdisp_async.c @@ -372,8 +372,7 @@ cdbdisp_checkDispatchResult_async(struct CdbDispatcherState *ds, * * The waitMode argument is NONE when we are doing "normal work". */ - if (waitMode == DISPATCH_WAIT_NONE || waitMode == DISPATCH_WAIT_FINISH || - waitMode == DISPATCH_WAIT_ACK_NOTICE) + if (waitMode == DISPATCH_WAIT_NONE || waitMode == DISPATCH_WAIT_FINISH) CHECK_FOR_INTERRUPTS(); } diff --git a/src/backend/cdb/dispatcher/cdbdisp_query.c b/src/backend/cdb/dispatcher/cdbdisp_query.c index 1810b7c616e1..665444baab4a 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_query.c +++ b/src/backend/cdb/dispatcher/cdbdisp_query.c @@ -44,7 +44,6 @@ #include "cdb/cdbdisp_dtx.h" /* for qdSerializeDtxContextInfo() */ #include "cdb/cdbdispatchresult.h" #include "cdb/cdbcopy.h" -#include "cdb/cdbendpoint.h" #include "executor/execUtils.h" #define QUERY_STRING_TRUNCATE_SIZE (1024) @@ -1154,6 +1153,7 @@ cdbdisp_dispatchX(QueryDesc* queryDesc, pfree(sliceVector); cdbdisp_waitDispatchFinish(ds); + /* * If bailed before completely dispatched, stop QEs and throw error. */ @@ -1192,17 +1192,6 @@ cdbdisp_dispatchX(QueryDesc* queryDesc, errmsg_internal("unable to dispatch plan"))); } - if (queryDesc->parallel_retrieve_cursor) - { - cdbdisp_checkDispatchAckNotice(ds, true, ENDPOINT_READY); - if(cdbdisp_checkResultsErrcode(ds->primaryResults)) - { - cdbdisp_getDispatchResults(ds, &qeError); - cdbdisp_destroyDispatcherState(ds); - ReThrowError(qeError); - } - } - if (DEBUG1 >= log_min_messages) { char msec_str[32]; diff --git a/src/backend/cdb/endpoint/cdbendpoint.c b/src/backend/cdb/endpoint/cdbendpoint.c index 37581e3e8e57..4162a0e824da 100644 --- a/src/backend/cdb/endpoint/cdbendpoint.c +++ b/src/backend/cdb/endpoint/cdbendpoint.c @@ -556,8 +556,9 @@ CreateTQDestReceiverForEndpoint(TupleDesc tupleDesc, const char *cursorName) alloc_endpoint(cursorName, dsm_segment_handle(activeDsmSeg)); SEND_ACK_NOTICE(ENDPOINT_READY); + // TODO: rely on above logic to tell qd endpoint is ready, remove bolow code /* Unblock the latch to finish declare statement. */ -// declare_parallel_retrieve_ready(cursorName); + declare_parallel_retrieve_ready(cursorName); return CreateTupleQueueDestReceiver(shmMqHandle); } From 9ca428d5b98893e2fbde20a3da041e7c34c14743 Mon Sep 17 00:00:00 2001 From: Chen Mulong Date: Sat, 12 Oct 2019 16:56:13 +0800 Subject: [PATCH 5/8] Make notice callback work --- src/backend/cdb/dispatcher/cdbdisp_async.c | 101 ++++++++++++++++-- .../cdb/endpoint/cdbendpointinternal.h | 6 +- src/backend/utils/errcodes.txt | 1 + 3 files changed, 95 insertions(+), 13 deletions(-) diff --git a/src/backend/cdb/dispatcher/cdbdisp_async.c b/src/backend/cdb/dispatcher/cdbdisp_async.c index a3a38635f28f..4078dfa4df1c 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_async.c +++ b/src/backend/cdb/dispatcher/cdbdisp_async.c @@ -98,6 +98,10 @@ static void cdbdisp_waitDispatchFinish_async(struct CdbDispatcherState *ds); static bool cdbdisp_checkForCancel_async(struct CdbDispatcherState *ds); static int cdbdisp_getWaitSocketFd_async(struct CdbDispatcherState *ds); +static void initAckNotice(const struct CdbDispatcherState *ds, const char *message); +static void ackNoticeReceiver(void *arg, const PGresult *res); +static void endAckNotice(void); + DispatcherInternalFuncs DispatcherAsyncFuncs = { cdbdisp_checkForCancel_async, @@ -325,6 +329,8 @@ cdbdisp_checkAckNotice_async(struct CdbDispatcherState *ds, bool wait, const cha DispatchWaitMode prevWaitMode; CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams; + initAckNotice(ds, message); + /* cdbdisp_destroyDispatcherState is called */ if (pParms == NULL) return; @@ -336,6 +342,7 @@ cdbdisp_checkAckNotice_async(struct CdbDispatcherState *ds, bool wait, const cha checkDispatchResult(ds, wait); pParms->waitMode = prevWaitMode; + endAckNotice(); } /* @@ -401,6 +408,90 @@ cdbdisp_makeDispatchParams_async(int maxSlices, int largestGangSize, char *query return (void *) pParms; } +typedef struct AckNoticeInfo +{ + const CdbDispatchCmdAsync *pParms; + const char* expectedMessage; + bool *ackNotice; + PQnoticeReceiver *oldReceivers; +} AckNoticeInfo; + +static struct AckNoticeInfo *ackNoticeInfo = NULL; + + +static void +initAckNotice(const struct CdbDispatcherState *ds, const char *message) +{ + Assert(!ackNoticeInfo); + + CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams; + + ackNoticeInfo = palloc(sizeof(struct AckNoticeInfo)); + ackNoticeInfo->pParms = pParms; + ackNoticeInfo->ackNotice = palloc(sizeof(bool) * pParms->dispatchCount); + ackNoticeInfo->oldReceivers = palloc(sizeof(PQnoticeReceiver *) * pParms->dispatchCount); + ackNoticeInfo->expectedMessage = message; + for (int i = 0; i < pParms->dispatchCount; i++) + { + SegmentDatabaseDescriptor *segdbDesc = (pParms->dispatchResultPtrArray)[i]->segdbDesc; + ackNoticeInfo->ackNotice[i] = false; + ackNoticeInfo->oldReceivers[i] = PQsetNoticeReceiver(segdbDesc->conn, &ackNoticeReceiver, segdbDesc); + } +} + +static void +ackNoticeReceiver(void *arg, const PGresult *res) +{ + SegmentDatabaseDescriptor *segdbDesc = (SegmentDatabaseDescriptor *) arg; + PGMessageField *pfield; + const char* line = NULL; + const char* sqlstate = "00000"; + + Assert(ackNoticeInfo); + + for (pfield = res->errFields; pfield != NULL; pfield = pfield->next) + { + switch (pfield->code) + { + case PG_DIAG_SQLSTATE: + sqlstate = pfield->contents; + break; + case PG_DIAG_MESSAGE_PRIMARY: + line = pfield->contents; + + break; + } + } + for (int i = 0; i < ackNoticeInfo->pParms->dispatchCount; i++) + { + if ((ackNoticeInfo->pParms->dispatchResultPtrArray)[i] + ->segdbDesc->segindex == segdbDesc->segindex) + { + if (sqlstate_to_errcode(sqlstate) == ERRCODE_GP_ACK_DONE && + strcmp(ackNoticeInfo->expectedMessage, line) == 0) + { + ackNoticeInfo->ackNotice[i] = true; + } + else + { + ackNoticeInfo->oldReceivers[i](arg, res); + } + break; + } + } +} + +static void +endAckNotice(void) +{ + Assert(ackNoticeInfo); + Assert(ackNoticeInfo->ackNotice); + pfree(ackNoticeInfo->ackNotice); + pfree(ackNoticeInfo->oldReceivers); + pfree(ackNoticeInfo); + ackNoticeInfo = NULL; +} + /* * Receive and process results from all running QEs. * @@ -424,11 +515,9 @@ checkDispatchResult(CdbDispatcherState *ds, bool sentSignal = false; struct pollfd *fds; uint8 ftsVersion = 0; - bool *ackNotices; db_count = pParms->dispatchCount; fds = (struct pollfd *) palloc(db_count * sizeof(struct pollfd)); - ackNotices = palloc0(db_count * sizeof(bool)); /* * OK, we are finished submitting the command to the segdbs. Now, we have * to wait for them to finish. @@ -469,7 +558,7 @@ checkDispatchResult(CdbDispatcherState *ds, */ if (!dispatchResult->stillRunning) continue; - if (pParms->waitMode == DISPATCH_WAIT_ACK_NOTICE && ackNotices[i]) + if (ackNoticeInfo && ackNoticeInfo->ackNotice[i]) continue; Assert(!cdbconn_isBadConnection(segdbDesc)); @@ -584,16 +673,10 @@ checkDispatchResult(CdbDispatcherState *ds, else { handlePollSuccess(pParms, fds); - if (pParms->waitMode == DISPATCH_WAIT_ACK_NOTICE && - checkACKQENotices(pParms->ackNoticeMsg)) - { - ackNotices[i] = true; - } } } pfree(fds); - pfree(ackNotices); } /* diff --git a/src/backend/cdb/endpoint/cdbendpointinternal.h b/src/backend/cdb/endpoint/cdbendpointinternal.h index aa9d08066036..69fcbf35fdde 100644 --- a/src/backend/cdb/endpoint/cdbendpointinternal.h +++ b/src/backend/cdb/endpoint/cdbendpointinternal.h @@ -36,11 +36,9 @@ */ #define ENDPOINT_NAME_LEN (NAMEDATALEN + 1 + 8 + 1 + 8) -#define ACK_NOTICE_MSG_HEADER "^ACK_NOTICE^" - #define SEND_ACK_NOTICE(MSG) \ - ereport(NOTICE, (errcode(ERRCODE_SUCCESSFUL_COMPLETION), \ - errmsg("%s: %s", ACK_NOTICE_MSG_HEADER, MSG))) + ereport(NOTICE, (errcode(ERRCODE_GP_ACK_DONE), \ + errmsg("%s", MSG))) /* ACK NOTICE MESSAGE FROM ENDPOINT QE/Entry DB to QD */ #define ENDPOINT_READY "ENDPOINT_READY" diff --git a/src/backend/utils/errcodes.txt b/src/backend/utils/errcodes.txt index 8c6cc4baa8d9..03dfbf6b1be8 100644 --- a/src/backend/utils/errcodes.txt +++ b/src/backend/utils/errcodes.txt @@ -80,6 +80,7 @@ Section: Class 00 - Successful Completion 00000 S ERRCODE_SUCCESSFUL_COMPLETION successful_completion +00M01 S ERRCODE_GP_ACK_DONE gp_ack_done Section: Class 01 - Warning From 9aa619397ecbac9746c4f37499c43fd42279c79b Mon Sep 17 00:00:00 2001 From: "Junfeng(Jerome) Yang" Date: Sat, 12 Oct 2019 17:26:29 +0800 Subject: [PATCH 6/8] Should wait if ack notice not received on qd --- src/backend/cdb/dispatcher/cdbdisp_async.c | 4 +--- src/include/cdb/cdbdisp.h | 1 - 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src/backend/cdb/dispatcher/cdbdisp_async.c b/src/backend/cdb/dispatcher/cdbdisp_async.c index 4078dfa4df1c..a1de19658dfe 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_async.c +++ b/src/backend/cdb/dispatcher/cdbdisp_async.c @@ -337,7 +337,7 @@ cdbdisp_checkAckNotice_async(struct CdbDispatcherState *ds, bool wait, const cha pParms->ackNoticeMsg = message; prevWaitMode = pParms->waitMode; - pParms->waitMode = DISPATCH_WAIT_ACK_NOTICE; + pParms->waitMode = DISPATCH_WAIT_NONE; checkDispatchResult(ds, wait); @@ -356,8 +356,6 @@ cdbdisp_checkDispatchResult_async(struct CdbDispatcherState *ds, DispatchWaitMode waitMode) { Assert(ds != NULL); - /* DISPATCH_WAIT_ACK_NOTICE can not be set in cdbdisp_checkDispatchResult_async */ - Assert(waitMode != DISPATCH_WAIT_ACK_NOTICE); CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams; /* cdbdisp_destroyDispatcherState is called */ diff --git a/src/include/cdb/cdbdisp.h b/src/include/cdb/cdbdisp.h index 80d83b9dba6a..65ce59778f78 100644 --- a/src/include/cdb/cdbdisp.h +++ b/src/include/cdb/cdbdisp.h @@ -32,7 +32,6 @@ enum GangType; typedef enum DispatchWaitMode { DISPATCH_WAIT_NONE = 0, /* wait until QE fully completes */ - DISPATCH_WAIT_ACK_NOTICE, /* wait until QE send ack NOTICE back */ DISPATCH_WAIT_FINISH, /* send query finish */ DISPATCH_WAIT_CANCEL /* send query cancel */ } DispatchWaitMode; From 99e218aeed5103b3c79c9373f73e54eb391d13dc Mon Sep 17 00:00:00 2001 From: "Junfeng(Jerome) Yang" Date: Sat, 12 Oct 2019 17:45:48 +0800 Subject: [PATCH 7/8] Remove useless code. --- src/backend/cdb/dispatcher/cdbdisp_async.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/backend/cdb/dispatcher/cdbdisp_async.c b/src/backend/cdb/dispatcher/cdbdisp_async.c index a1de19658dfe..89ff220afc2f 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_async.c +++ b/src/backend/cdb/dispatcher/cdbdisp_async.c @@ -68,8 +68,6 @@ typedef struct CdbDispatchCmdAsync */ volatile DispatchWaitMode waitMode; - const char *ackNoticeMsg; - /* * Text information to dispatch: The format is type(1 byte) + length(size * of int) + content(n bytes) @@ -334,7 +332,6 @@ cdbdisp_checkAckNotice_async(struct CdbDispatcherState *ds, bool wait, const cha /* cdbdisp_destroyDispatcherState is called */ if (pParms == NULL) return; - pParms->ackNoticeMsg = message; prevWaitMode = pParms->waitMode; pParms->waitMode = DISPATCH_WAIT_NONE; @@ -399,7 +396,6 @@ cdbdisp_makeDispatchParams_async(int maxSlices, int largestGangSize, char *query pParms->dispatchResultPtrArray = (CdbDispatchResult **) palloc0(size); pParms->dispatchCount = 0; pParms->waitMode = DISPATCH_WAIT_NONE; - pParms->ackNoticeMsg = NULL; pParms->query_text = queryText; pParms->query_text_len = len; From 83accf09747c213e80af6efe1ca985137067c68f Mon Sep 17 00:00:00 2001 From: Chen Mulong Date: Sat, 12 Oct 2019 18:01:10 +0800 Subject: [PATCH 8/8] Clean up code --- src/backend/cdb/dispatcher/cdbdisp_async.c | 118 +++++++++++---------- 1 file changed, 62 insertions(+), 56 deletions(-) diff --git a/src/backend/cdb/dispatcher/cdbdisp_async.c b/src/backend/cdb/dispatcher/cdbdisp_async.c index 89ff220afc2f..e0e53fb683d7 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_async.c +++ b/src/backend/cdb/dispatcher/cdbdisp_async.c @@ -81,6 +81,20 @@ typedef struct CdbDispatchCmdAsync } CdbDispatchCmdAsync; +typedef struct SegAckInfo +{ + const char* expectedMessage; + const SegmentDatabaseDescriptor *segdbDesc; + bool ackNotice; + PQnoticeReceiver oldReceiver; +} SegAckInfo; + +typedef struct AckNoticeInfo +{ + int segCount; + SegAckInfo *segAckInfos; +} AckNoticeInfo; + static void *cdbdisp_makeDispatchParams_async(int maxSlices, int largestGangSize, char *queryText, int len); static void cdbdisp_checkAckNotice_async(struct CdbDispatcherState *ds, bool wait, const char *message); @@ -96,9 +110,9 @@ static void cdbdisp_waitDispatchFinish_async(struct CdbDispatcherState *ds); static bool cdbdisp_checkForCancel_async(struct CdbDispatcherState *ds); static int cdbdisp_getWaitSocketFd_async(struct CdbDispatcherState *ds); -static void initAckNotice(const struct CdbDispatcherState *ds, const char *message); +static AckNoticeInfo *initAckNotice(const struct CdbDispatcherState *ds, const char *message); static void ackNoticeReceiver(void *arg, const PGresult *res); -static void endAckNotice(void); +static void endAckNotice(AckNoticeInfo **ackNoticeInfo); DispatcherInternalFuncs DispatcherAsyncFuncs = { @@ -117,7 +131,7 @@ static void dispatchCommand(CdbDispatchResult *dispatchResult, int query_text_len); static void checkDispatchResult(CdbDispatcherState *ds, - bool wait); + bool wait, const AckNoticeInfo *ackNoticeInfo); static bool processResults(CdbDispatchResult *dispatchResult); @@ -144,7 +158,7 @@ cdbdisp_checkForCancel_async(struct CdbDispatcherState *ds) { Assert(ds); - checkDispatchResult(ds, false); + checkDispatchResult(ds, false, NULL); return cdbdisp_checkResultsErrcode(ds->primaryResults); } @@ -327,7 +341,7 @@ cdbdisp_checkAckNotice_async(struct CdbDispatcherState *ds, bool wait, const cha DispatchWaitMode prevWaitMode; CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams; - initAckNotice(ds, message); + AckNoticeInfo *ackNoticeInfo = initAckNotice(ds, message); /* cdbdisp_destroyDispatcherState is called */ if (pParms == NULL) @@ -336,10 +350,10 @@ cdbdisp_checkAckNotice_async(struct CdbDispatcherState *ds, bool wait, const cha prevWaitMode = pParms->waitMode; pParms->waitMode = DISPATCH_WAIT_NONE; - checkDispatchResult(ds, wait); + checkDispatchResult(ds, wait, ackNoticeInfo); pParms->waitMode = prevWaitMode; - endAckNotice(); + endAckNotice(&ackNoticeInfo); } /* @@ -366,7 +380,7 @@ cdbdisp_checkDispatchResult_async(struct CdbDispatcherState *ds, if (waitMode != DISPATCH_WAIT_NONE) pParms->waitMode = waitMode; - checkDispatchResult(ds, true); + checkDispatchResult(ds, true, NULL); /* * It looks like everything went fine, make sure we don't miss a user @@ -402,47 +416,41 @@ cdbdisp_makeDispatchParams_async(int maxSlices, int largestGangSize, char *query return (void *) pParms; } -typedef struct AckNoticeInfo -{ - const CdbDispatchCmdAsync *pParms; - const char* expectedMessage; - bool *ackNotice; - PQnoticeReceiver *oldReceivers; -} AckNoticeInfo; - -static struct AckNoticeInfo *ackNoticeInfo = NULL; - - -static void +/* + * Init the AckNoticeInfo struct which will be used to check the ACK message from QE. + */ +static AckNoticeInfo * initAckNotice(const struct CdbDispatcherState *ds, const char *message) { - Assert(!ackNoticeInfo); - CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams; + AckNoticeInfo * ackNoticeInfo = palloc(sizeof(struct AckNoticeInfo)); + + ackNoticeInfo->segAckInfos = + palloc(sizeof(SegAckInfo) * pParms->dispatchCount); + ackNoticeInfo->segCount = pParms->dispatchCount; - ackNoticeInfo = palloc(sizeof(struct AckNoticeInfo)); - ackNoticeInfo->pParms = pParms; - ackNoticeInfo->ackNotice = palloc(sizeof(bool) * pParms->dispatchCount); - ackNoticeInfo->oldReceivers = palloc(sizeof(PQnoticeReceiver *) * pParms->dispatchCount); - ackNoticeInfo->expectedMessage = message; for (int i = 0; i < pParms->dispatchCount; i++) { - SegmentDatabaseDescriptor *segdbDesc = (pParms->dispatchResultPtrArray)[i]->segdbDesc; - ackNoticeInfo->ackNotice[i] = false; - ackNoticeInfo->oldReceivers[i] = PQsetNoticeReceiver(segdbDesc->conn, &ackNoticeReceiver, segdbDesc); + SegmentDatabaseDescriptor *segdbDesc = + (pParms->dispatchResultPtrArray)[i]->segdbDesc; + (ackNoticeInfo->segAckInfos[i]).segdbDesc = segdbDesc; + (ackNoticeInfo->segAckInfos[i]).ackNotice = false; + (ackNoticeInfo->segAckInfos[i]).oldReceiver = PQsetNoticeReceiver( + segdbDesc->conn, &ackNoticeReceiver, &ackNoticeInfo->segAckInfos[i]); + (ackNoticeInfo->segAckInfos[i]).expectedMessage = message; } + return ackNoticeInfo; } static void ackNoticeReceiver(void *arg, const PGresult *res) { - SegmentDatabaseDescriptor *segdbDesc = (SegmentDatabaseDescriptor *) arg; + Assert(arg); + SegAckInfo *segAckInfo= (SegAckInfo *) arg; PGMessageField *pfield; const char* line = NULL; const char* sqlstate = "00000"; - Assert(ackNoticeInfo); - for (pfield = res->errFields; pfield != NULL; pfield = pfield->next) { switch (pfield->code) @@ -456,34 +464,32 @@ ackNoticeReceiver(void *arg, const PGresult *res) break; } } - for (int i = 0; i < ackNoticeInfo->pParms->dispatchCount; i++) + if (sqlstate_to_errcode(sqlstate) == ERRCODE_GP_ACK_DONE && + strcmp(segAckInfo->expectedMessage, line) == 0) { - if ((ackNoticeInfo->pParms->dispatchResultPtrArray)[i] - ->segdbDesc->segindex == segdbDesc->segindex) - { - if (sqlstate_to_errcode(sqlstate) == ERRCODE_GP_ACK_DONE && - strcmp(ackNoticeInfo->expectedMessage, line) == 0) - { - ackNoticeInfo->ackNotice[i] = true; - } - else - { - ackNoticeInfo->oldReceivers[i](arg, res); - } - break; - } + segAckInfo->ackNotice = true; + } + else + { + segAckInfo->oldReceiver(arg, res); } } static void -endAckNotice(void) +endAckNotice(AckNoticeInfo **ackNoticeInfo) { Assert(ackNoticeInfo); - Assert(ackNoticeInfo->ackNotice); - pfree(ackNoticeInfo->ackNotice); - pfree(ackNoticeInfo->oldReceivers); - pfree(ackNoticeInfo); - ackNoticeInfo = NULL; + Assert(*ackNoticeInfo); + Assert((*ackNoticeInfo)->segAckInfos); + for (int i = 0; i < (*ackNoticeInfo)->segCount; i++) + { + SegAckInfo *segAckInfo = &(*ackNoticeInfo)->segAckInfos[i]; + /* Set the receiver back to the original one. */ + PQsetNoticeReceiver(segAckInfo->segdbDesc->conn, segAckInfo->oldReceiver, + &segAckInfo->segdbDesc); + } + pfree(*ackNoticeInfo); + *ackNoticeInfo = NULL; } /* @@ -497,7 +503,7 @@ endAckNotice(void) */ static void checkDispatchResult(CdbDispatcherState *ds, - bool wait) + bool wait, const AckNoticeInfo *ackNoticeInfo) { CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams; CdbDispatchResults *meleeResults = ds->primaryResults; @@ -552,7 +558,7 @@ checkDispatchResult(CdbDispatcherState *ds, */ if (!dispatchResult->stillRunning) continue; - if (ackNoticeInfo && ackNoticeInfo->ackNotice[i]) + if (ackNoticeInfo && ackNoticeInfo->segAckInfos[i].ackNotice) continue; Assert(!cdbconn_isBadConnection(segdbDesc));