diff --git a/src/backend/cdb/dispatcher/cdbdisp.c b/src/backend/cdb/dispatcher/cdbdisp.c index d41ac72005a9..fcb6af4c1b5d 100644 --- a/src/backend/cdb/dispatcher/cdbdisp.c +++ b/src/backend/cdb/dispatcher/cdbdisp.c @@ -102,6 +102,19 @@ cdbdisp_waitDispatchFinish(struct CdbDispatcherState *ds) (pDispatchFuncs->waitDispatchFinish) (ds); } +/* + * cdbdisp_checkDispatchAckNotice: + * + * Check for acknowledge NOTICE form QEs/EntryDB after cdbdisp_dispatchToGang(). + * + */ +void +cdbdisp_checkDispatchAckNotice(struct CdbDispatcherState *ds, bool wait, const char *message) +{ + if (pDispatchFuncs->checkResults != NULL) + (pDispatchFuncs->checkAckNotice) (ds, wait, message); +} + /* * cdbdisp_checkDispatchResult: * @@ -134,7 +147,7 @@ cdbdisp_checkDispatchResult(struct CdbDispatcherState *ds, } } -/** +/* * Check whether or not the PARALLEL RETRIEVE CURSOR Execution Finished * This func should be called after calling cdbdisp_checkDispatchResult(). * diff --git a/src/backend/cdb/dispatcher/cdbdisp_async.c b/src/backend/cdb/dispatcher/cdbdisp_async.c index 153bde8873e6..e0e53fb683d7 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_async.c +++ b/src/backend/cdb/dispatcher/cdbdisp_async.c @@ -81,8 +81,24 @@ typedef struct CdbDispatchCmdAsync } CdbDispatchCmdAsync; +typedef struct SegAckInfo +{ + const char* expectedMessage; + const SegmentDatabaseDescriptor *segdbDesc; + bool ackNotice; + PQnoticeReceiver oldReceiver; +} SegAckInfo; + +typedef struct AckNoticeInfo +{ + int segCount; + SegAckInfo *segAckInfos; +} AckNoticeInfo; + static void *cdbdisp_makeDispatchParams_async(int maxSlices, int largestGangSize, char *queryText, int len); +static void cdbdisp_checkAckNotice_async(struct CdbDispatcherState *ds, bool wait, const char *message); + static void cdbdisp_checkDispatchResult_async(struct CdbDispatcherState *ds, DispatchWaitMode waitMode); @@ -94,11 +110,16 @@ static void cdbdisp_waitDispatchFinish_async(struct CdbDispatcherState *ds); static bool cdbdisp_checkForCancel_async(struct CdbDispatcherState *ds); static int cdbdisp_getWaitSocketFd_async(struct CdbDispatcherState *ds); +static AckNoticeInfo *initAckNotice(const struct CdbDispatcherState *ds, const char *message); +static void ackNoticeReceiver(void *arg, const PGresult *res); +static void endAckNotice(AckNoticeInfo **ackNoticeInfo); + DispatcherInternalFuncs DispatcherAsyncFuncs = { cdbdisp_checkForCancel_async, cdbdisp_getWaitSocketFd_async, cdbdisp_makeDispatchParams_async, + cdbdisp_checkAckNotice_async, cdbdisp_checkDispatchResult_async, cdbdisp_dispatchToGang_async, cdbdisp_waitDispatchFinish_async @@ -110,7 +131,7 @@ static void dispatchCommand(CdbDispatchResult *dispatchResult, int query_text_len); static void checkDispatchResult(CdbDispatcherState *ds, - bool wait); + bool wait, const AckNoticeInfo *ackNoticeInfo); static bool processResults(CdbDispatchResult *dispatchResult); @@ -137,7 +158,7 @@ cdbdisp_checkForCancel_async(struct CdbDispatcherState *ds) { Assert(ds); - checkDispatchResult(ds, false); + checkDispatchResult(ds, false, NULL); return cdbdisp_checkResultsErrcode(ds->primaryResults); } @@ -306,6 +327,35 @@ cdbdisp_dispatchToGang_async(struct CdbDispatcherState *ds, } } +/* + * Check dispatch acknowledge NOTICE. + * + * Wait all dispatch work to get back specify acknowledge NOTICE, + * either success or fail. (Set stillRunning to true when + * one dispatch work is completed) + */ +static void +cdbdisp_checkAckNotice_async(struct CdbDispatcherState *ds, bool wait, const char *message) +{ + Assert(ds != NULL); + DispatchWaitMode prevWaitMode; + CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams; + + AckNoticeInfo *ackNoticeInfo = initAckNotice(ds, message); + + /* cdbdisp_destroyDispatcherState is called */ + if (pParms == NULL) + return; + + prevWaitMode = pParms->waitMode; + pParms->waitMode = DISPATCH_WAIT_NONE; + + checkDispatchResult(ds, wait, ackNoticeInfo); + + pParms->waitMode = prevWaitMode; + endAckNotice(&ackNoticeInfo); +} + /* * Check dispatch result. * @@ -330,7 +380,7 @@ cdbdisp_checkDispatchResult_async(struct CdbDispatcherState *ds, if (waitMode != DISPATCH_WAIT_NONE) pParms->waitMode = waitMode; - checkDispatchResult(ds, true); + checkDispatchResult(ds, true, NULL); /* * It looks like everything went fine, make sure we don't miss a user @@ -366,6 +416,82 @@ cdbdisp_makeDispatchParams_async(int maxSlices, int largestGangSize, char *query return (void *) pParms; } +/* + * Init the AckNoticeInfo struct which will be used to check the ACK message from QE. + */ +static AckNoticeInfo * +initAckNotice(const struct CdbDispatcherState *ds, const char *message) +{ + CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams; + AckNoticeInfo * ackNoticeInfo = palloc(sizeof(struct AckNoticeInfo)); + + ackNoticeInfo->segAckInfos = + palloc(sizeof(SegAckInfo) * pParms->dispatchCount); + ackNoticeInfo->segCount = pParms->dispatchCount; + + for (int i = 0; i < pParms->dispatchCount; i++) + { + SegmentDatabaseDescriptor *segdbDesc = + (pParms->dispatchResultPtrArray)[i]->segdbDesc; + (ackNoticeInfo->segAckInfos[i]).segdbDesc = segdbDesc; + (ackNoticeInfo->segAckInfos[i]).ackNotice = false; + (ackNoticeInfo->segAckInfos[i]).oldReceiver = PQsetNoticeReceiver( + segdbDesc->conn, &ackNoticeReceiver, &ackNoticeInfo->segAckInfos[i]); + (ackNoticeInfo->segAckInfos[i]).expectedMessage = message; + } + return ackNoticeInfo; +} + +static void +ackNoticeReceiver(void *arg, const PGresult *res) +{ + Assert(arg); + SegAckInfo *segAckInfo= (SegAckInfo *) arg; + PGMessageField *pfield; + const char* line = NULL; + const char* sqlstate = "00000"; + + for (pfield = res->errFields; pfield != NULL; pfield = pfield->next) + { + switch (pfield->code) + { + case PG_DIAG_SQLSTATE: + sqlstate = pfield->contents; + break; + case PG_DIAG_MESSAGE_PRIMARY: + line = pfield->contents; + + break; + } + } + if (sqlstate_to_errcode(sqlstate) == ERRCODE_GP_ACK_DONE && + strcmp(segAckInfo->expectedMessage, line) == 0) + { + segAckInfo->ackNotice = true; + } + else + { + segAckInfo->oldReceiver(arg, res); + } +} + +static void +endAckNotice(AckNoticeInfo **ackNoticeInfo) +{ + Assert(ackNoticeInfo); + Assert(*ackNoticeInfo); + Assert((*ackNoticeInfo)->segAckInfos); + for (int i = 0; i < (*ackNoticeInfo)->segCount; i++) + { + SegAckInfo *segAckInfo = &(*ackNoticeInfo)->segAckInfos[i]; + /* Set the receiver back to the original one. */ + PQsetNoticeReceiver(segAckInfo->segdbDesc->conn, segAckInfo->oldReceiver, + &segAckInfo->segdbDesc); + } + pfree(*ackNoticeInfo); + *ackNoticeInfo = NULL; +} + /* * Receive and process results from all running QEs. * @@ -377,7 +503,7 @@ cdbdisp_makeDispatchParams_async(int maxSlices, int largestGangSize, char *query */ static void checkDispatchResult(CdbDispatcherState *ds, - bool wait) + bool wait, const AckNoticeInfo *ackNoticeInfo) { CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams; CdbDispatchResults *meleeResults = ds->primaryResults; @@ -392,7 +518,6 @@ checkDispatchResult(CdbDispatcherState *ds, db_count = pParms->dispatchCount; fds = (struct pollfd *) palloc(db_count * sizeof(struct pollfd)); - /* * OK, we are finished submitting the command to the segdbs. Now, we have * to wait for them to finish. @@ -433,6 +558,8 @@ checkDispatchResult(CdbDispatcherState *ds, */ if (!dispatchResult->stillRunning) continue; + if (ackNoticeInfo && ackNoticeInfo->segAckInfos[i].ackNotice) + continue; Assert(!cdbconn_isBadConnection(segdbDesc)); @@ -544,7 +671,9 @@ checkDispatchResult(CdbDispatcherState *ds, } /* We have data waiting on one or more of the connections. */ else + { handlePollSuccess(pParms, fds); + } } pfree(fds); diff --git a/src/backend/cdb/endpoint/cdbendpoint.c b/src/backend/cdb/endpoint/cdbendpoint.c index 18d4fa9d5976..4162a0e824da 100644 --- a/src/backend/cdb/endpoint/cdbendpoint.c +++ b/src/backend/cdb/endpoint/cdbendpoint.c @@ -365,8 +365,19 @@ ChooseEndpointContentIDForParallelCursor(const struct Plan *planTree, } void -WaitEndpointReady(const struct Plan *planTree, const char *cursorName) +WaitEndpointReady(CdbDispatcherState* ds, const struct Plan *planTree, const char *cursorName) { + ErrorData *qeError = NULL; + + cdbdisp_checkDispatchAckNotice(ds, true, ENDPOINT_READY); + if(cdbdisp_checkResultsErrcode(ds->primaryResults)) + { + cdbdisp_getDispatchResults(ds, &qeError); + cdbdisp_destroyDispatcherState(ds); + ReThrowError(qeError); + } + + // TODO: redesign the token dispatch logic, remove below code call_endpoint_udf_on_qd(planTree, cursorName, 'r'); } @@ -544,6 +555,8 @@ CreateTQDestReceiverForEndpoint(TupleDesc tupleDesc, const char *cursorName) activeSharedEndpoint = alloc_endpoint(cursorName, dsm_segment_handle(activeDsmSeg)); + SEND_ACK_NOTICE(ENDPOINT_READY); + // TODO: rely on above logic to tell qd endpoint is ready, remove bolow code /* Unblock the latch to finish declare statement. */ declare_parallel_retrieve_ready(cursorName); return CreateTupleQueueDestReceiver(shmMqHandle); diff --git a/src/backend/cdb/endpoint/cdbendpointinternal.h b/src/backend/cdb/endpoint/cdbendpointinternal.h index d8a120599011..69fcbf35fdde 100644 --- a/src/backend/cdb/endpoint/cdbendpointinternal.h +++ b/src/backend/cdb/endpoint/cdbendpointinternal.h @@ -36,6 +36,14 @@ */ #define ENDPOINT_NAME_LEN (NAMEDATALEN + 1 + 8 + 1 + 8) +#define SEND_ACK_NOTICE(MSG) \ + ereport(NOTICE, (errcode(ERRCODE_GP_ACK_DONE), \ + errmsg("%s", MSG))) + +/* ACK NOTICE MESSAGE FROM ENDPOINT QE/Entry DB to QD */ +#define ENDPOINT_READY "ENDPOINT_READY" + + /* * Endpoint attach status. */ diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c index eedb66fb4018..2f8c26bd47b2 100644 --- a/src/backend/commands/portalcmds.c +++ b/src/backend/commands/portalcmds.c @@ -167,7 +167,9 @@ PerformCursorOpen(PlannedStmt *stmt, ParamListInfo params, if (portal->cursorOptions & CURSOR_OPT_PARALLEL_RETRIEVE) { PlannedStmt* stmt = (PlannedStmt *) linitial(portal->stmts); - WaitEndpointReady(stmt->planTree, portal->name); + WaitEndpointReady( + portal->queryDesc->estate->dispatcherState, + stmt->planTree, portal->name); } /* * We're done; the query won't actually be run until PerformPortalFetch is diff --git a/src/backend/utils/errcodes.txt b/src/backend/utils/errcodes.txt index 8c6cc4baa8d9..03dfbf6b1be8 100644 --- a/src/backend/utils/errcodes.txt +++ b/src/backend/utils/errcodes.txt @@ -80,6 +80,7 @@ Section: Class 00 - Successful Completion 00000 S ERRCODE_SUCCESSFUL_COMPLETION successful_completion +00M01 S ERRCODE_GP_ACK_DONE gp_ack_done Section: Class 01 - Warning diff --git a/src/include/cdb/cdbconn.h b/src/include/cdb/cdbconn.h index ec46fe80dc84..8ada8d437c29 100644 --- a/src/include/cdb/cdbconn.h +++ b/src/include/cdb/cdbconn.h @@ -103,4 +103,6 @@ bool cdbconn_signalQE(SegmentDatabaseDescriptor *segdbDesc, char *errbuf, bool i extern void forwardQENotices(void); +extern bool checkACKQENotices(const char *message); + #endif /* CDBCONN_H */ diff --git a/src/include/cdb/cdbdisp.h b/src/include/cdb/cdbdisp.h index 65115d0dab08..65ce59778f78 100644 --- a/src/include/cdb/cdbdisp.h +++ b/src/include/cdb/cdbdisp.h @@ -51,6 +51,7 @@ typedef struct DispatcherInternalFuncs bool (*checkForCancel)(struct CdbDispatcherState *ds); int (*getWaitSocketFd)(struct CdbDispatcherState *ds); void* (*makeDispatchParams)(int maxSlices, int largestGangSize, char *queryText, int queryTextLen); + void (*checkAckNotice)(struct CdbDispatcherState *ds, bool wait, const char* message); void (*checkResults)(struct CdbDispatcherState *ds, DispatchWaitMode waitMode); void (*dispatchToGang)(struct CdbDispatcherState *ds, struct Gang *gp, int sliceIndex); void (*waitDispatchFinish)(struct CdbDispatcherState *ds); @@ -100,6 +101,15 @@ cdbdisp_dispatchToGang(struct CdbDispatcherState *ds, void cdbdisp_waitDispatchFinish(struct CdbDispatcherState *ds); +/* + * cdbdisp_checkDispatchAckNotice: + * + * Check for acknowledge NOTICE form QEs/EntryDB after cdbdisp_dispatchToGang(). + * + */ +void +cdbdisp_checkDispatchAckNotice(struct CdbDispatcherState *ds, bool wait, const char *message); + /* * CdbCheckDispatchResult: * @@ -111,7 +121,7 @@ cdbdisp_waitDispatchFinish(struct CdbDispatcherState *ds); void cdbdisp_checkDispatchResult(struct CdbDispatcherState *ds, DispatchWaitMode waitMode); -/** +/* * Check whether or not the PARALLEL RETRIEVE CURSOR Execution Finished * This func should be called after calling cdbdisp_checkDispatchResult(). * diff --git a/src/include/cdb/cdbendpoint.h b/src/include/cdb/cdbendpoint.h index ca217537a668..d2ee499034b8 100644 --- a/src/include/cdb/cdbendpoint.h +++ b/src/include/cdb/cdbendpoint.h @@ -39,6 +39,11 @@ #include "nodes/parsenodes.h" #include "tcop/dest.h" #include "storage/lwlock.h" +#include "cdb/cdbdisp.h" + + +/* ACK NOTICE MESSAGE FROM ENDPOINT QE/Entry DB to QD */ +#define ENDPOINT_READY "ENDPOINT_READY" /* * Roles that used in PARALLEL RETRIEVE CURSOR execution. @@ -81,7 +86,7 @@ extern enum EndPointExecPosition GetParallelCursorEndpointPosition( const struct Plan *planTree); extern List *ChooseEndpointContentIDForParallelCursor( const struct Plan *planTree, enum EndPointExecPosition *position); -extern void WaitEndpointReady(const struct Plan *planTree, const char *cursorName); +extern void WaitEndpointReady(CdbDispatcherState* ds, const struct Plan *planTree, const char *cursorName); /* * Below functions should run on Endpoints(QE/Entry DB).