Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/backend/cdb/dispatcher/cdbdisp.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ cdbdisp_waitDispatchFinish(struct CdbDispatcherState *ds)
(pDispatchFuncs->waitDispatchFinish) (ds);
}

/*
* cdbdisp_checkDispatchAckNotice:
*
* Check for acknowledge NOTICE form QEs/EntryDB after cdbdisp_dispatchToGang().
*
*/
void
cdbdisp_checkDispatchAckNotice(struct CdbDispatcherState *ds, bool wait, const char *message)
{
if (pDispatchFuncs->checkResults != NULL)
(pDispatchFuncs->checkAckNotice) (ds, wait, message);
}

/*
* cdbdisp_checkDispatchResult:
*
Expand Down Expand Up @@ -134,7 +147,7 @@ cdbdisp_checkDispatchResult(struct CdbDispatcherState *ds,
}
}

/**
/*
* Check whether or not the PARALLEL RETRIEVE CURSOR Execution Finished
* This func should be called after calling cdbdisp_checkDispatchResult().
*
Expand Down
139 changes: 134 additions & 5 deletions src/backend/cdb/dispatcher/cdbdisp_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,24 @@ typedef struct CdbDispatchCmdAsync

} CdbDispatchCmdAsync;

typedef struct SegAckInfo
{
const char* expectedMessage;
const SegmentDatabaseDescriptor *segdbDesc;
bool ackNotice;
PQnoticeReceiver oldReceiver;
} SegAckInfo;

typedef struct AckNoticeInfo
{
int segCount;
SegAckInfo *segAckInfos;
} AckNoticeInfo;

static void *cdbdisp_makeDispatchParams_async(int maxSlices, int largestGangSize, char *queryText, int len);

static void cdbdisp_checkAckNotice_async(struct CdbDispatcherState *ds, bool wait, const char *message);

static void cdbdisp_checkDispatchResult_async(struct CdbDispatcherState *ds,
DispatchWaitMode waitMode);

Expand All @@ -94,11 +110,16 @@ static void cdbdisp_waitDispatchFinish_async(struct CdbDispatcherState *ds);
static bool cdbdisp_checkForCancel_async(struct CdbDispatcherState *ds);
static int cdbdisp_getWaitSocketFd_async(struct CdbDispatcherState *ds);

static AckNoticeInfo *initAckNotice(const struct CdbDispatcherState *ds, const char *message);
static void ackNoticeReceiver(void *arg, const PGresult *res);
static void endAckNotice(AckNoticeInfo **ackNoticeInfo);

DispatcherInternalFuncs DispatcherAsyncFuncs =
{
cdbdisp_checkForCancel_async,
cdbdisp_getWaitSocketFd_async,
cdbdisp_makeDispatchParams_async,
cdbdisp_checkAckNotice_async,
cdbdisp_checkDispatchResult_async,
cdbdisp_dispatchToGang_async,
cdbdisp_waitDispatchFinish_async
Expand All @@ -110,7 +131,7 @@ static void dispatchCommand(CdbDispatchResult *dispatchResult,
int query_text_len);

static void checkDispatchResult(CdbDispatcherState *ds,
bool wait);
bool wait, const AckNoticeInfo *ackNoticeInfo);

static bool processResults(CdbDispatchResult *dispatchResult);

Expand All @@ -137,7 +158,7 @@ cdbdisp_checkForCancel_async(struct CdbDispatcherState *ds)
{
Assert(ds);

checkDispatchResult(ds, false);
checkDispatchResult(ds, false, NULL);
return cdbdisp_checkResultsErrcode(ds->primaryResults);
}

Expand Down Expand Up @@ -306,6 +327,35 @@ cdbdisp_dispatchToGang_async(struct CdbDispatcherState *ds,
}
}

/*
* Check dispatch acknowledge NOTICE.
*
* Wait all dispatch work to get back specify acknowledge NOTICE,
* either success or fail. (Set stillRunning to true when
* one dispatch work is completed)
*/
static void
cdbdisp_checkAckNotice_async(struct CdbDispatcherState *ds, bool wait, const char *message)
{
Assert(ds != NULL);
DispatchWaitMode prevWaitMode;
CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams;

AckNoticeInfo *ackNoticeInfo = initAckNotice(ds, message);

/* cdbdisp_destroyDispatcherState is called */
if (pParms == NULL)
return;

prevWaitMode = pParms->waitMode;
pParms->waitMode = DISPATCH_WAIT_NONE;

checkDispatchResult(ds, wait, ackNoticeInfo);

pParms->waitMode = prevWaitMode;
endAckNotice(&ackNoticeInfo);
}

/*
* Check dispatch result.
*
Expand All @@ -330,7 +380,7 @@ cdbdisp_checkDispatchResult_async(struct CdbDispatcherState *ds,
if (waitMode != DISPATCH_WAIT_NONE)
pParms->waitMode = waitMode;

checkDispatchResult(ds, true);
checkDispatchResult(ds, true, NULL);

/*
* It looks like everything went fine, make sure we don't miss a user
Expand Down Expand Up @@ -366,6 +416,82 @@ cdbdisp_makeDispatchParams_async(int maxSlices, int largestGangSize, char *query
return (void *) pParms;
}

/*
* Init the AckNoticeInfo struct which will be used to check the ACK message from QE.
*/
static AckNoticeInfo *
initAckNotice(const struct CdbDispatcherState *ds, const char *message)
{
CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams;
AckNoticeInfo * ackNoticeInfo = palloc(sizeof(struct AckNoticeInfo));

ackNoticeInfo->segAckInfos =
palloc(sizeof(SegAckInfo) * pParms->dispatchCount);
ackNoticeInfo->segCount = pParms->dispatchCount;

for (int i = 0; i < pParms->dispatchCount; i++)
{
SegmentDatabaseDescriptor *segdbDesc =
(pParms->dispatchResultPtrArray)[i]->segdbDesc;
(ackNoticeInfo->segAckInfos[i]).segdbDesc = segdbDesc;
(ackNoticeInfo->segAckInfos[i]).ackNotice = false;
(ackNoticeInfo->segAckInfos[i]).oldReceiver = PQsetNoticeReceiver(
segdbDesc->conn, &ackNoticeReceiver, &ackNoticeInfo->segAckInfos[i]);
(ackNoticeInfo->segAckInfos[i]).expectedMessage = message;
}
return ackNoticeInfo;
}

static void
ackNoticeReceiver(void *arg, const PGresult *res)
{
Assert(arg);
SegAckInfo *segAckInfo= (SegAckInfo *) arg;
PGMessageField *pfield;
const char* line = NULL;
const char* sqlstate = "00000";

for (pfield = res->errFields; pfield != NULL; pfield = pfield->next)
{
switch (pfield->code)
{
case PG_DIAG_SQLSTATE:
sqlstate = pfield->contents;
break;
case PG_DIAG_MESSAGE_PRIMARY:
line = pfield->contents;

break;
}
}
if (sqlstate_to_errcode(sqlstate) == ERRCODE_GP_ACK_DONE &&
strcmp(segAckInfo->expectedMessage, line) == 0)
{
segAckInfo->ackNotice = true;
}
else
{
segAckInfo->oldReceiver(arg, res);
}
}

static void
endAckNotice(AckNoticeInfo **ackNoticeInfo)
{
Assert(ackNoticeInfo);
Assert(*ackNoticeInfo);
Assert((*ackNoticeInfo)->segAckInfos);
for (int i = 0; i < (*ackNoticeInfo)->segCount; i++)
{
SegAckInfo *segAckInfo = &(*ackNoticeInfo)->segAckInfos[i];
/* Set the receiver back to the original one. */
PQsetNoticeReceiver(segAckInfo->segdbDesc->conn, segAckInfo->oldReceiver,
&segAckInfo->segdbDesc);
}
pfree(*ackNoticeInfo);
*ackNoticeInfo = NULL;
}

/*
* Receive and process results from all running QEs.
*
Expand All @@ -377,7 +503,7 @@ cdbdisp_makeDispatchParams_async(int maxSlices, int largestGangSize, char *query
*/
static void
checkDispatchResult(CdbDispatcherState *ds,
bool wait)
bool wait, const AckNoticeInfo *ackNoticeInfo)
{
CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams;
CdbDispatchResults *meleeResults = ds->primaryResults;
Expand All @@ -392,7 +518,6 @@ checkDispatchResult(CdbDispatcherState *ds,

db_count = pParms->dispatchCount;
fds = (struct pollfd *) palloc(db_count * sizeof(struct pollfd));

/*
* OK, we are finished submitting the command to the segdbs. Now, we have
* to wait for them to finish.
Expand Down Expand Up @@ -433,6 +558,8 @@ checkDispatchResult(CdbDispatcherState *ds,
*/
if (!dispatchResult->stillRunning)
continue;
if (ackNoticeInfo && ackNoticeInfo->segAckInfos[i].ackNotice)
continue;

Assert(!cdbconn_isBadConnection(segdbDesc));

Expand Down Expand Up @@ -544,7 +671,9 @@ checkDispatchResult(CdbDispatcherState *ds,
}
/* We have data waiting on one or more of the connections. */
else
{
handlePollSuccess(pParms, fds);
}
}

pfree(fds);
Expand Down
15 changes: 14 additions & 1 deletion src/backend/cdb/endpoint/cdbendpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,19 @@ ChooseEndpointContentIDForParallelCursor(const struct Plan *planTree,
}

void
WaitEndpointReady(const struct Plan *planTree, const char *cursorName)
WaitEndpointReady(CdbDispatcherState* ds, const struct Plan *planTree, const char *cursorName)
{
ErrorData *qeError = NULL;

cdbdisp_checkDispatchAckNotice(ds, true, ENDPOINT_READY);
if(cdbdisp_checkResultsErrcode(ds->primaryResults))
{
cdbdisp_getDispatchResults(ds, &qeError);
cdbdisp_destroyDispatcherState(ds);
ReThrowError(qeError);
}

// TODO: redesign the token dispatch logic, remove below code
call_endpoint_udf_on_qd(planTree, cursorName, 'r');
}

Expand Down Expand Up @@ -544,6 +555,8 @@ CreateTQDestReceiverForEndpoint(TupleDesc tupleDesc, const char *cursorName)
activeSharedEndpoint =
alloc_endpoint(cursorName, dsm_segment_handle(activeDsmSeg));

SEND_ACK_NOTICE(ENDPOINT_READY);
// TODO: rely on above logic to tell qd endpoint is ready, remove bolow code
/* Unblock the latch to finish declare statement. */
declare_parallel_retrieve_ready(cursorName);
return CreateTupleQueueDestReceiver(shmMqHandle);
Expand Down
8 changes: 8 additions & 0 deletions src/backend/cdb/endpoint/cdbendpointinternal.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@
*/
#define ENDPOINT_NAME_LEN (NAMEDATALEN + 1 + 8 + 1 + 8)

#define SEND_ACK_NOTICE(MSG) \
ereport(NOTICE, (errcode(ERRCODE_GP_ACK_DONE), \
errmsg("%s", MSG)))

/* ACK NOTICE MESSAGE FROM ENDPOINT QE/Entry DB to QD */
#define ENDPOINT_READY "ENDPOINT_READY"


/*
* Endpoint attach status.
*/
Expand Down
4 changes: 3 additions & 1 deletion src/backend/commands/portalcmds.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ PerformCursorOpen(PlannedStmt *stmt, ParamListInfo params,
if (portal->cursorOptions & CURSOR_OPT_PARALLEL_RETRIEVE)
{
PlannedStmt* stmt = (PlannedStmt *) linitial(portal->stmts);
WaitEndpointReady(stmt->planTree, portal->name);
WaitEndpointReady(
portal->queryDesc->estate->dispatcherState,
stmt->planTree, portal->name);
}
/*
* We're done; the query won't actually be run until PerformPortalFetch is
Expand Down
1 change: 1 addition & 0 deletions src/backend/utils/errcodes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
Section: Class 00 - Successful Completion

00000 S ERRCODE_SUCCESSFUL_COMPLETION successful_completion
00M01 S ERRCODE_GP_ACK_DONE gp_ack_done

Section: Class 01 - Warning

Expand Down
2 changes: 2 additions & 0 deletions src/include/cdb/cdbconn.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,6 @@ bool cdbconn_signalQE(SegmentDatabaseDescriptor *segdbDesc, char *errbuf, bool i

extern void forwardQENotices(void);

extern bool checkACKQENotices(const char *message);

#endif /* CDBCONN_H */
12 changes: 11 additions & 1 deletion src/include/cdb/cdbdisp.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ typedef struct DispatcherInternalFuncs
bool (*checkForCancel)(struct CdbDispatcherState *ds);
int (*getWaitSocketFd)(struct CdbDispatcherState *ds);
void* (*makeDispatchParams)(int maxSlices, int largestGangSize, char *queryText, int queryTextLen);
void (*checkAckNotice)(struct CdbDispatcherState *ds, bool wait, const char* message);
void (*checkResults)(struct CdbDispatcherState *ds, DispatchWaitMode waitMode);
void (*dispatchToGang)(struct CdbDispatcherState *ds, struct Gang *gp, int sliceIndex);
void (*waitDispatchFinish)(struct CdbDispatcherState *ds);
Expand Down Expand Up @@ -100,6 +101,15 @@ cdbdisp_dispatchToGang(struct CdbDispatcherState *ds,
void
cdbdisp_waitDispatchFinish(struct CdbDispatcherState *ds);

/*
* cdbdisp_checkDispatchAckNotice:
*
* Check for acknowledge NOTICE form QEs/EntryDB after cdbdisp_dispatchToGang().
*
*/
void
cdbdisp_checkDispatchAckNotice(struct CdbDispatcherState *ds, bool wait, const char *message);

/*
* CdbCheckDispatchResult:
*
Expand All @@ -111,7 +121,7 @@ cdbdisp_waitDispatchFinish(struct CdbDispatcherState *ds);
void
cdbdisp_checkDispatchResult(struct CdbDispatcherState *ds, DispatchWaitMode waitMode);

/**
/*
* Check whether or not the PARALLEL RETRIEVE CURSOR Execution Finished
* This func should be called after calling cdbdisp_checkDispatchResult().
*
Expand Down
7 changes: 6 additions & 1 deletion src/include/cdb/cdbendpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
#include "nodes/parsenodes.h"
#include "tcop/dest.h"
#include "storage/lwlock.h"
#include "cdb/cdbdisp.h"


/* ACK NOTICE MESSAGE FROM ENDPOINT QE/Entry DB to QD */
#define ENDPOINT_READY "ENDPOINT_READY"

/*
* Roles that used in PARALLEL RETRIEVE CURSOR execution.
Expand Down Expand Up @@ -81,7 +86,7 @@ extern enum EndPointExecPosition GetParallelCursorEndpointPosition(
const struct Plan *planTree);
extern List *ChooseEndpointContentIDForParallelCursor(
const struct Plan *planTree, enum EndPointExecPosition *position);
extern void WaitEndpointReady(const struct Plan *planTree, const char *cursorName);
extern void WaitEndpointReady(CdbDispatcherState* ds, const struct Plan *planTree, const char *cursorName);

/*
* Below functions should run on Endpoints(QE/Entry DB).
Expand Down