Skip to content

Commit ea58a4d

Browse files
committed
Gracefully handle OOM when cancelling the query
Previosly, if a query fails under resource group manager due to a lack of memory, the query is cancelled due to an error. The coordinator would still try to receive tuples from segments upon cancellation. Retrieving tuples requires more memory allocations, which may cause the same OOM error again. The query will get cancelled due to an error, entering into a recursion and leading to an eventual crash. This patch implements proper query cancellation by explicitly ignoring any libpq data from segments without extra memory allocations. Ticket: ADBDEV-7690
1 parent fd74279 commit ea58a4d

8 files changed

Lines changed: 468 additions & 25 deletions

File tree

src/backend/cdb/dispatcher/cdbdisp_async.c

Lines changed: 105 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,9 @@ static bool processResults(CdbDispatchResult *dispatchResult);
135135
static void
136136
signalQEs(CdbDispatchCmdAsync *pParms);
137137

138+
static void
139+
signalQE(CdbDispatchResult *dispatchResult, DispatchWaitMode waitMode);
140+
138141
static void
139142
checkSegmentAlive(CdbDispatchCmdAsync *pParms);
140143

@@ -685,6 +688,8 @@ checkDispatchResult(CdbDispatcherState *ds, int timeout_sec)
685688
handlePollSuccess(pParms, fds);
686689
}
687690

691+
SIMPLE_FAULT_INJECTOR("check_dispatch_result_end");
692+
688693
pfree(fds);
689694
}
690695

@@ -820,6 +825,60 @@ handlePollError(CdbDispatchCmdAsync *pParms)
820825
return;
821826
}
822827

828+
/*
829+
* Completely discard results from a dispatchResult's connection without extra
830+
* memory allocations by abusing libpq state-machine hacks.
831+
*/
832+
static void
833+
resetConnAndResult(CdbDispatchResult *dispatchResult)
834+
{
835+
PGresult *res;
836+
PGnotify *notify;
837+
PGconn *conn = dispatchResult->segdbDesc->conn;
838+
839+
/* Free some memory and replace current result with a fatal error dummy. */
840+
pqSaveErrorResult(conn);
841+
842+
/* Make sure PQgetResult() calls are not blocking. */
843+
PQconsumeInput(conn);
844+
845+
/*
846+
* Discard anything that is unread. Since our result contains a fatal
847+
* error, we'll just consume the entire message without actually parsing
848+
* it.
849+
*/
850+
while (!PQisBusy(conn) && (res = PQgetResult(conn)) != NULL)
851+
{
852+
switch (PQresultStatus(res))
853+
{
854+
case PGRES_COPY_IN:
855+
case PGRES_COPY_OUT:
856+
case PGRES_COPY_BOTH:
857+
PQendcopy(conn);
858+
/* fallthrough */
859+
default:
860+
PQclear(res);
861+
}
862+
863+
pqSaveErrorResult(conn);
864+
}
865+
866+
/* Free notices. */
867+
while ((notify = PQnotifies(conn)) != NULL)
868+
PQfreemem(notify);
869+
870+
/* The result is not needed anymore. */
871+
pqClearAsyncResult(conn);
872+
873+
if (PQisBusy(conn) && PQstatus(conn) != CONNECTION_BAD)
874+
{
875+
/* Some work is still remaining until we can die. */
876+
return;
877+
}
878+
879+
dispatchResult->stillRunning = false;
880+
}
881+
823882
/*
824883
* Receive and process results from QEs.
825884
*/
@@ -866,6 +925,24 @@ handlePollSuccess(CdbDispatchCmdAsync *pParms,
866925
ELOG_DISPATCHER_DEBUG("PQsocket says there are results from %d of %d (%s)",
867926
i + 1, pParms->dispatchCount, segdbDesc->whoami);
868927

928+
929+
/*
930+
* Was dispatchCancel() the callee? We don't need to read the results then.
931+
*/
932+
if (pParms->waitMode == DISPATCH_WAIT_CANCEL)
933+
{
934+
/*
935+
* If we're cancelling the transaction due to an OOM, there
936+
* might not be enough memory to discard the result properly.
937+
* Let's get the big guns out.
938+
*/
939+
resetConnAndResult(dispatchResult);
940+
941+
forwardQENotices();
942+
943+
continue;
944+
}
945+
869946
/*
870947
* Receive and process results from this QE.
871948
*/
@@ -912,38 +989,41 @@ static void
912989
signalQEs(CdbDispatchCmdAsync *pParms)
913990
{
914991
int i;
915-
DispatchWaitMode waitMode = pParms->waitMode;
916992

917993
for (i = 0; i < pParms->dispatchCount; i++)
918-
{
919-
char errbuf[256];
920-
bool sent = false;
921-
CdbDispatchResult *dispatchResult = pParms->dispatchResultPtrArray[i];
994+
signalQE(pParms->dispatchResultPtrArray[i], pParms->waitMode);
995+
}
922996

923-
Assert(dispatchResult != NULL);
924-
SegmentDatabaseDescriptor *segdbDesc = dispatchResult->segdbDesc;
997+
/*
998+
* Send finish or cancel signal to QE if needed.
999+
*/
1000+
static void
1001+
signalQE(CdbDispatchResult *dispatchResult, DispatchWaitMode waitMode)
1002+
{
1003+
Assert(dispatchResult != NULL);
1004+
SegmentDatabaseDescriptor *segdbDesc = dispatchResult->segdbDesc;
9251005

926-
/*
927-
* Don't send the signal if - QE is finished or canceled - the signal
928-
* was already sent - connection is dead
929-
*/
1006+
/*
1007+
* Don't send the signal if - QE is finished or canceled - the signal
1008+
* was already sent - connection is dead
1009+
*/
9301010

931-
if (!dispatchResult->stillRunning ||
932-
dispatchResult->wasCanceled ||
933-
(pParms->waitMode == DISPATCH_WAIT_ACK_ROOT &&
934-
dispatchResult->receivedAckMsg) ||
935-
cdbconn_isBadConnection(segdbDesc))
936-
continue;
1011+
if (!dispatchResult->stillRunning ||
1012+
dispatchResult->wasCanceled ||
1013+
(waitMode == DISPATCH_WAIT_ACK_ROOT &&
1014+
dispatchResult->receivedAckMsg) ||
1015+
cdbconn_isBadConnection(segdbDesc))
1016+
{
1017+
return;
1018+
}
9371019

938-
memset(errbuf, 0, sizeof(errbuf));
1020+
char errbuf[256] = {0};
9391021

940-
sent = cdbconn_signalQE(segdbDesc, errbuf, waitMode == DISPATCH_WAIT_CANCEL);
941-
if (sent)
942-
dispatchResult->sentSignal = waitMode;
943-
else
944-
elog(LOG, "Unable to cancel: %s",
945-
strlen(errbuf) == 0 ? "cannot allocate PGCancel" : errbuf);
946-
}
1022+
if (cdbconn_signalQE(segdbDesc, errbuf, waitMode == DISPATCH_WAIT_CANCEL))
1023+
dispatchResult->sentSignal = waitMode;
1024+
else
1025+
elog(LOG, "Unable to cancel: %s",
1026+
strlen(errbuf) == 0 ? "cannot allocate PGCancel" : errbuf);
9471027
}
9481028

9491029
/*

src/test/isolation2/input/parallel_retrieve_cursor/fault_inject.source

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,3 +249,25 @@ SELECT gp_inject_fault('fetch_tuples_from_endpoint', 'resume', dbid)
249249
WHERE content=2 AND role='p';
250250

251251
DROP TABLE t2;
252+
253+
SELECT gp_inject_fault('alloc_endpoint_slot_full', 'reset', 2);
254+
SELECT gp_inject_fault('alloc_endpoint_slot_full_reset', 'reset', 2);
255+
256+
-- Test 8: QD shouldn't hang waiting.
257+
258+
DROP TABLE IF EXISTS t3;
259+
CREATE TABLE t3 AS SELECT generate_series(1, 10) AS a DISTRIBUTED by (a);
260+
261+
SELECT gp_inject_fault('alloc_endpoint_slot_full', 'error', dbid)
262+
FROM gp_segment_configuration WHERE role = 'p' AND content = 0;
263+
264+
BEGIN;
265+
-- QE encounters error and destroys the endpoint immediately. QD successfully
266+
-- resets the query, sends ACKs, and discards transaction.
267+
DECLARE c1 PARALLEL RETRIEVE CURSOR FOR SELECT * FROM t3;
268+
ROLLBACK;
269+
270+
SELECT gp_inject_fault('alloc_endpoint_slot_full', 'reset', dbid)
271+
FROM gp_segment_configuration WHERE role = 'p' AND content = 0;
272+
273+
DROP TABLE t3;
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
-- start_matchsubs
2+
-- m/ERROR: Out of memory.*/
3+
-- s/ERROR: Out of memory.*/ERROR: Out of memory/
4+
-- m/ERROR: Canceling query because of high VMEM usage.*/
5+
-- s/ERROR: Canceling query because of high VMEM usage.*/ERROR: Out of memory/
6+
-- end_matchsubs
7+
8+
-- start_ignore
9+
CREATE EXTENSION gp_inject_fault;
10+
DROP FUNCTION gp_mock_cdbdispatchcommand(amount int);
11+
DROP ROLE role_oom_test;
12+
DROP RESOURCE GROUP rg_oom_test;
13+
-- end_ignore
14+
CREATE RESOURCE GROUP rg_oom_test
15+
WITH (cpu_rate_limit=20, memory_limit=20, memory_shared_quota=100);
16+
CREATE ROLE role_oom_test RESOURCE GROUP rg_oom_test;
17+
CREATE OR REPLACE FUNCTION gp_mock_cdbdispatchcommand(amount int)
18+
RETURNS SETOF text AS '@abs_srcdir@/../regress/regress.so',
19+
'gp_mock_cdbdispatchcommand' LANGUAGE C;
20+
21+
1: SET ROLE TO role_oom_test;
22+
23+
-- Freeze coordinator's session after it reads results from segments.
24+
SELECT gp_inject_fault('check_dispatch_result_end', 'suspend', dbid)
25+
FROM gp_segment_configuration WHERE role = 'p' AND content = -1;
26+
27+
-- Send the heavy query.
28+
1&: SELECT count(*) FROM gp_mock_cdbdispatchcommand(1000000);
29+
30+
-- Wait until we receive everything.
31+
SELECT gp_wait_until_triggered_fault('check_dispatch_result_end', 1, dbid)
32+
FROM gp_segment_configuration WHERE role = 'p' AND content = -1;
33+
34+
-- The query should've used ~135 MB of memory. Allow 15 MB error.
35+
WITH r AS (
36+
SELECT (memory_usage->'-1'->'used')::text::int AS mb
37+
FROM gp_toolkit.gp_resgroup_status WHERE rsgname = 'rg_oom_test'
38+
)
39+
SELECT r.mb < 150 AS "under 150 MB", r.mb > 120 AS "above 120 MB"
40+
FROM r;
41+
42+
SELECT gp_inject_fault('check_dispatch_result_end', 'reset', dbid) FROM
43+
gp_segment_configuration WHERE role = 'p' AND content = -1;
44+
45+
1<:
46+
47+
-- And finally, make sure we don't enter error recursion on fail.
48+
ALTER RESOURCE GROUP rg_oom_test SET memory_shared_quota 0;
49+
1: SELECT count(*) FROM gp_mock_cdbdispatchcommand(10000000);
50+
51+
DROP FUNCTION gp_mock_cdbdispatchcommand(amount int);
52+
DROP ROLE role_oom_test;
53+
DROP RESOURCE GROUP rg_oom_test;

src/test/isolation2/isolation2_resgroup_schedule

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,6 @@ test: resgroup/resgroup_large_group_id
6161

6262
test: resgroup/resgroup_startup_memory
6363

64+
test: resgroup/resgroup_oom
65+
6466
test: resgroup/disable_resgroup

src/test/isolation2/output/parallel_retrieve_cursor/fault_inject.source

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -770,3 +770,45 @@ ROLLBACK
770770

771771
DROP TABLE t2;
772772
DROP
773+
774+
SELECT gp_inject_fault('alloc_endpoint_slot_full', 'reset', 2);
775+
gp_inject_fault
776+
-----------------
777+
Success:
778+
(1 row)
779+
SELECT gp_inject_fault('alloc_endpoint_slot_full_reset', 'reset', 2);
780+
gp_inject_fault
781+
-----------------
782+
Success:
783+
(1 row)
784+
785+
-- Test 8: QD shouldn't hang waiting.
786+
787+
DROP TABLE IF EXISTS t3;
788+
DROP
789+
CREATE TABLE t3 AS SELECT generate_series(1, 10) AS a DISTRIBUTED by (a);
790+
CREATE 10
791+
792+
SELECT gp_inject_fault('alloc_endpoint_slot_full', 'error', dbid) FROM gp_segment_configuration WHERE role = 'p' AND content = 0;
793+
gp_inject_fault
794+
-----------------
795+
Success:
796+
(1 row)
797+
798+
BEGIN;
799+
BEGIN
800+
-- QE encounters error and destroys the endpoint immediately. QD successfully
801+
-- resets the query, sends ACKs, and discards transaction.
802+
DECLARE c1 PARALLEL RETRIEVE CURSOR FOR SELECT * FROM t3;
803+
ERROR: fault triggered, fault name:'alloc_endpoint_slot_full' fault type:'error' (seg0 127.0.0.1:6002 pid=8916)
804+
ROLLBACK;
805+
ROLLBACK
806+
807+
SELECT gp_inject_fault('alloc_endpoint_slot_full', 'reset', dbid) FROM gp_segment_configuration WHERE role = 'p' AND content = 0;
808+
gp_inject_fault
809+
-----------------
810+
Success:
811+
(1 row)
812+
813+
DROP TABLE t3;
814+
DROP
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
-- start_matchsubs
2+
-- m/ERROR: Out of memory.*/
3+
-- s/ERROR: Out of memory.*/ERROR: Out of memory/
4+
-- m/ERROR: Canceling query because of high VMEM usage.*/
5+
-- s/ERROR: Canceling query because of high VMEM usage.*/ERROR: Out of memory/
6+
-- end_matchsubs
7+
8+
CREATE RESOURCE GROUP rg_oom_test WITH (cpu_rate_limit=20, memory_limit=20, memory_shared_quota=100);
9+
CREATE
10+
CREATE ROLE role_oom_test RESOURCE GROUP rg_oom_test;
11+
CREATE
12+
CREATE OR REPLACE FUNCTION gp_mock_cdbdispatchcommand(amount int) RETURNS SETOF text AS '@abs_srcdir@/../regress/regress.so', 'gp_mock_cdbdispatchcommand' LANGUAGE C;
13+
CREATE
14+
15+
1: SET ROLE TO role_oom_test;
16+
SET
17+
18+
-- Freeze coordinator's session after it reads results from segments.
19+
SELECT gp_inject_fault('check_dispatch_result_end', 'suspend', dbid) FROM gp_segment_configuration WHERE role = 'p' AND content = -1;
20+
gp_inject_fault
21+
-----------------
22+
Success:
23+
(1 row)
24+
25+
-- Send the heavy query.
26+
1&: SELECT count(*) FROM gp_mock_cdbdispatchcommand(1000000); <waiting ...>
27+
28+
-- Wait until we receive everything.
29+
SELECT gp_wait_until_triggered_fault('check_dispatch_result_end', 1, dbid) FROM gp_segment_configuration WHERE role = 'p' AND content = -1;
30+
gp_wait_until_triggered_fault
31+
-------------------------------
32+
Success:
33+
(1 row)
34+
35+
-- The query should've used ~135 MB of memory. Allow 15 MB error.
36+
WITH r AS ( SELECT (memory_usage->'-1'->'used')::text::int AS mb FROM gp_toolkit.gp_resgroup_status WHERE rsgname = 'rg_oom_test' ) SELECT r.mb < 150 AS "under 150 MB", r.mb > 120 AS "above 120 MB" FROM r;
37+
under 150 MB | above 120 MB
38+
--------------+--------------
39+
t | t
40+
(1 row)
41+
42+
SELECT gp_inject_fault('check_dispatch_result_end', 'reset', dbid) FROM gp_segment_configuration WHERE role = 'p' AND content = -1;
43+
gp_inject_fault
44+
-----------------
45+
Success:
46+
(1 row)
47+
48+
1<: <... completed>
49+
count
50+
---------
51+
4000000
52+
(1 row)
53+
54+
-- And finally, make sure we don't enter error recursion on fail.
55+
ALTER RESOURCE GROUP rg_oom_test SET memory_shared_quota 0;
56+
ALTER
57+
1: SELECT count(*) FROM gp_mock_cdbdispatchcommand(10000000);
58+
ERROR: Out of memory
59+
DETAIL: Resource group memory limit reached
60+
61+
DROP FUNCTION gp_mock_cdbdispatchcommand(amount int);
62+
DROP
63+
DROP ROLE role_oom_test;
64+
DROP
65+
DROP RESOURCE GROUP rg_oom_test;
66+
DROP

0 commit comments

Comments
 (0)