Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
affd3c0
Exclude temp tables from shrink + test unlogged tables
whitehawk Feb 10, 2026
ae5133e
Support shrink of matviews
whitehawk Feb 11, 2026
ff134f9
Add check for shrink of partitioned tables
whitehawk Feb 11, 2026
afec7ba
Support ext writable tables in shrink
whitehawk Feb 11, 2026
11024d2
Rework matviews handling, add more table types into tests, improve lo…
whitehawk Feb 13, 2026
8014899
Add more interruption points into the test with cluster restart
whitehawk Feb 13, 2026
1c3f8fe
Check table and db existence
whitehawk Feb 16, 2026
0848321
Updates for mat views
whitehawk Feb 16, 2026
7e1ecf2
Update segments stop procedure
whitehawk Feb 17, 2026
643f838
Fix the case when table is dropped in a parallel transaction
whitehawk Feb 18, 2026
ad6430e
Cosmetic changes
whitehawk Feb 18, 2026
3ff91f0
Merge branch 'feature/ADBDEV-6608' into GG-110
whitehawk Feb 18, 2026
0c4487d
Fix tests
whitehawk Feb 18, 2026
6fec958
Remove redundant test
whitehawk Feb 18, 2026
35bab85
Cosmetic changes
whitehawk Feb 18, 2026
af75169
Use CTAS approach for rebalancing the materialized view
whitehawk Feb 19, 2026
cabac88
Remove refresh step for MV handling
whitehawk Feb 20, 2026
aa00ef2
Reduce delta
whitehawk Feb 20, 2026
793a963
Use existing test steps instead of new ones
whitehawk Feb 20, 2026
57e45a9
Add timeout into wait for logs step
whitehawk Feb 20, 2026
cd8394d
Reduce delta accross files
whitehawk Feb 20, 2026
08481d6
Revert "Use CTAS approach for rebalancing the materialized view"
whitehawk Feb 24, 2026
2718221
Merge branch 'feature/ADBDEV-6608' into GG-110
whitehawk Feb 24, 2026
f090cb9
Use stderr
whitehawk Feb 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions gpMgmt/bin/gppylib/fault_injection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,17 @@

GPMGMT_FAULT_POINT = 'GPMGMT_FAULT_POINT'
GPMGMT_FAULT_DELAY_MS = 'GPMGMT_FAULT_DELAY_MS'
GPMGMT_FAULT_TYPE = 'GPMGMT_FAULT_TYPE'
GPMGMT_FAULT_FILE_FLAG = 'GPMGMT_FAULT_FILE_FLAG'

GPMGMT_FAULT_TYPE_SYSPEND = 'suspend'

def inject_fault(fault_point):
if GPMGMT_FAULT_POINT in os.environ and fault_point == os.environ[GPMGMT_FAULT_POINT]:
if GPMGMT_FAULT_TYPE in os.environ and os.environ[GPMGMT_FAULT_TYPE] == GPMGMT_FAULT_TYPE_SYSPEND:
while GPMGMT_FAULT_FILE_FLAG in os.environ and os.path.exists(os.environ[GPMGMT_FAULT_FILE_FLAG]):
time.sleep(0.1)
return
if GPMGMT_FAULT_DELAY_MS in os.environ and int(os.environ[GPMGMT_FAULT_DELAY_MS]) > 0:
delay_ms = int(os.environ[GPMGMT_FAULT_DELAY_MS])

Expand Down
129 changes: 90 additions & 39 deletions gpMgmt/bin/gprebalance_modules/shrink.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ def __init__(self, conn: dbconn.Connection,
self.gparray_dump_file = gpArrayDumpFilename
self.rebalance_schema = schema
self.shrink_plan = None
self.needs_repopulate = False
self.dumped_gparray = gparray.GpArray.initFromFile(self.gparray_dump_file) if os.path.exists(self.gparray_dump_file) else None

self.machine = Machine(model = self,
Expand Down Expand Up @@ -276,7 +275,6 @@ def get_state_after_interrupt(self, prev_state) -> str:
# means that target rebalance numsegments is reset, and new tables are created at old segment count
if bool(row[0]) is False:
self.logger.info("Cluster restarted after previous run, trying to repopulate the relation queue")
self.needs_repopulate = True
return 'STATE_BACKUP_CATALOG_AND_UPDATE_TARGET_SEGMENT_COUNT_STARTED'

return self.states_main_shrink_flow[prev_idx + 1]
Expand Down Expand Up @@ -444,29 +442,28 @@ def on_enter_STATE_SHRINK_SEGMENTS_STOP_STARTED(self) -> None:
gp_array = self.gparray

segments_to_stop = gp_array.get_segment_count() - self.shrink_plan.getTargetSegmentCount()
segments_to_stop = segments_to_stop * 2 # consider mirrors
self.workers_for_segment_stop = WorkerPool(numWorkers=min(segments_to_stop, self.options.batch_size))

for seg_pair in gp_array.getSegmentList():
primary_seg = seg_pair.primaryDB
mirror_seg = seg_pair.mirrorDB
if primary_seg.getSegmentContentId() >= self.shrink_plan.getTargetSegmentCount():
if primary_seg.isSegmentUp():
cmd = self.SegmentStopAfterShrink(self, primary_seg)
# Stop primaries first, and mirrors after primaries,
# to avoid hanging replication processes
seg_roles = [gparray.ROLE_PRIMARY, gparray.ROLE_MIRROR]
for seg_role in seg_roles:
self.logger.info(f"Prepare to stop segments with role '{seg_role}'")
for seg in gp_array.getSegDbList():
if (seg.getSegmentContentId() >= self.shrink_plan.getTargetSegmentCount() and
seg.getSegmentRole() == seg_role and seg.isSegmentUp()):
cmd = self.SegmentStopAfterShrink(self, seg)
self.workers_for_segment_stop.addCommand(cmd)

if mirror_seg != None and mirror_seg.isSegmentUp():
cmd = self.SegmentStopAfterShrink(self, mirror_seg)
self.workers_for_segment_stop.addCommand(cmd)

print_progress(self.workers_for_segment_stop, interval=1)
if self.shutdown_requested:
break
print_progress(self.workers_for_segment_stop, interval=1)

self.workers_for_segment_stop.haltWork()
self.workers_for_segment_stop.joinWorkers()

for task in self.workers_for_segment_stop.getCompletedItems():
if not task.was_successful():
raise Exception('Failed to stop segments')
self.logger.warning('Failed to stop segments')

self.workers_for_segment_stop = None

Expand Down Expand Up @@ -566,20 +563,28 @@ def __init__(self, shrink: 'GGShrink', segment: Segment) -> None:
# decorator to inject a fault before running SegmentStopAfterShrink for a specific dbid
def wrap_segment_stop_with_faults(fun):
def func_with_faults(self):
inject_fault(f'fault_segment_stop_dbid_{self.segment.getSegmentDbId()}')
try:
inject_fault(f'fault_segment_stop_dbid_{self.segment.getSegmentDbId()}')
except Exception as e:
os.kill(os.getpid(), signal.SIGINT)
return
fun(self)
return func_with_faults

@wrap_segment_stop_with_faults
def run(self) -> None:
self.shrink.logger.info(f'Stopping shrinked segment dbid {self.segment.getSegmentDbId()} @ host={self.remoteHost}, datadir={self.segment.getSegmentDataDirectory()}')
self.shrink.logger.info(f'Stopping shrinked segment {str(self.segment)}')
self.checkRunningSegment.run()
if self.checkRunningSegment.is_shutdown():
self.shrink.logger.info(f'Segment dbid {self.segment.getSegmentDbId()} is already down @ host={self.remoteHost}, datadir={self.segment.getSegmentDataDirectory()} ')
self.shrink.logger.info(f'Segment {str(self.segment)} is already down')
self.set_results(CommandResult(0, b'', b'', True, False))
else:
SegmentStop.run(self)
self.shrink.logger.info(f'Stopped shrinked segment dbid {self.segment.getSegmentDbId()} @ host={self.remoteHost}, datadir={self.segment.getSegmentDataDirectory()}')
try:
SegmentStop.run(self, validateAfter = True)
except ExecutionError:
self.shrink.logger.info(f'Failed to stop shrinked segment {str(self.segment)}')
return
self.shrink.logger.info(f'Stopped shrinked segment {str(self.segment)}')

class TableRebalanceTask(SQLCommand):
def __init__(self,
Expand All @@ -599,28 +604,70 @@ def __init__(self,

# decorator to inject a fault before running TableRebalanceTask for a specific {db_name, schema_name, rel_name}
def wrap_table_rebalance_with_faults(fun):
def func_with_faults(self):
def func_with_faults(self, attempt: int):
inject_fault(f'fault_rebalance_table_{self.db_name}.{self.schema_name}.{self.rel_name}')
fun(self)
fun(self, attempt)
return func_with_faults

def table_exists(self, conn: dbconn.Connection, schema_name: str, rel_name: str) -> bool:
if dbconn.querySingleton(conn, f"""
SELECT count(1)
FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE c.relname = '{rel_name}' AND n.nspname = '{schema_name}' AND c.relnamespace = n.oid
""") == 0:
return False
return True

def db_exists(self, conn: dbconn.Connection, db_name: str) -> bool:
if dbconn.querySingleton(conn, f"""SELECT count(*) FROM pg_database WHERE datname = '{db_name}'""") == 0:
return False
return True

@wrap_table_rebalance_with_faults
def run(self) -> None:
self.shrink.logger.info(f'Start table rebalance for "{self.db_name}"."{self.schema_name}"."{self.rel_name}" to {self.target_segment_count} segments')
dburl = dbconn.DbURL(dbname=self.db_name, port=self.shrink.gpEnv.getCoordinatorPort())
with closing(dbconn.connect(dburl, encoding='UTF8')) as conn:
dbconn.execSQL(conn, 'BEGIN')
dbconn.execSQL(conn,
f'''ALTER TABLE "{self.schema_name}"."{self.rel_name}"
REBALANCE {self.target_segment_count}''')
self.shrink.rebalance_schema.setStatusForTableToRebalance(self.db_name, self.schema_name, self.rel_name, self.table_status_after_rebalance)
if self.shrink.options.analyze:
dbconn.execSQL(conn,
f'''ANALYZE "{self.schema_name}"."{self.rel_name}"''')
dbconn.execSQL(conn, 'COMMIT')
def process_table(self, attempt: int) -> None:
self.shrink.logger.info(f'Start table rebalance for "{self.db_name}"."{self.schema_name}"."{self.rel_name}" to {self.target_segment_count} segments (attempt {attempt})')
if self.db_exists(self.shrink.rebalance_schema.conn, self.db_name):
dburl = dbconn.DbURL(dbname=self.db_name, port=self.shrink.gpEnv.getCoordinatorPort())
with closing(dbconn.connect(dburl, encoding='UTF8')) as conn:
dbconn.execSQL(conn, 'BEGIN')

table_exists = self.table_exists(conn, self.schema_name, self.rel_name)
if table_exists:
dbconn.execSQL(conn,
f'''ALTER TABLE "{self.schema_name}"."{self.rel_name}"
REBALANCE {self.target_segment_count}''')
if self.shrink.options.analyze:
dbconn.execSQL(conn,
f'''ANALYZE "{self.schema_name}"."{self.rel_name}"''')
else:
self.shrink.logger.info(f'''Table "{self.db_name}"."{self.schema_name}"."{self.rel_name}" doesn't exist, skipping actual rebalance''')

self.shrink.rebalance_schema.setStatusForTableToRebalance(self.db_name, self.schema_name, self.rel_name, self.table_status_after_rebalance)
dbconn.execSQL(conn, 'COMMIT')
else:
self.shrink.logger.info(f'''DB "{self.db_name}" doesn't exist, skipping actual rebalance for "{self.schema_name}"."{self.rel_name}"''')
self.shrink.logger.info(f'Complete table rebalance for "{self.db_name}"."{self.schema_name}"."{self.rel_name}"')
self.set_results(CommandResult(0, b'', b'', True, False))

def run(self) -> None:
# Give 2 attempts to process a table. It is needed, when, for example,
# other session opens a transaction after we have created the rebalance table
# list, drops the table before we started to rebalance it, and commits the
# transaction when we've started to rebalance the table.
attempt_max_cnt = 2
for i in range(attempt_max_cnt):
attempt = i + 1
try:
self.process_table(attempt)
except Exception as e:
if attempt < attempt_max_cnt:
logger.warning(f"{str(e)}")
else:
logger.error(f"{str(e)}")
raise Exception(f'Failed to process the db object for {attempt_max_cnt} attempts')
continue
break

def prepare_shrink_schema(self, is_rollback: bool) -> None:
status = 'done' if is_rollback else 'none'
cmp = '<=' if is_rollback else '>'
Expand All @@ -642,14 +689,18 @@ def prepare_shrink_schema(self, is_rollback: bool) -> None:
dburl = dbconn.DbURL(dbname=db, port=self.gpEnv.getCoordinatorPort())
with closing(dbconn.connect(dburl, encoding='UTF8')) as conn:
cursor = dbconn.query(conn,
f'''SELECT n.nspname, c.relname
f'''SELECT n.nspname, c.relname, c.relkind, pe.writable is not null as external_writable
FROM pg_class c
JOIN pg_namespace n ON c.relnamespace = n.oid
JOIN gp_distribution_policy p ON c.oid = p.localoid
WHERE c.relkind IN ('r', 'p') AND c.relispartition = FALSE AND
LEFT JOIN pg_exttable pe on (c.oid=pe.reloid and pe.writable)
WHERE c.relkind IN ('r', 'p', 'm', 'f') AND c.relispartition = FALSE AND
c.relpersistence != 't' AND
p.numsegments {cmp} {self.shrink_plan.getTargetSegmentCount()} AND
n.nspname NOT IN ('pg_catalog', 'information_schema', '{self.rebalance_schema.getSchemaName()}')''')
for schema_name, rel_name in cursor:
for schema_name, rel_name, rel_kind, external_writable in cursor:
if rel_kind == 'f' and not external_writable:
continue
self.rebalance_schema.addTableToRebalance(db, schema_name, rel_name, status)

dbconn.execSQL(self.conn, 'COMMIT')
Expand Down
Loading