Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions src/pool/srv_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -2581,6 +2581,10 @@ cont_discard_cb(daos_handle_t ih, vos_iter_entry_t *entry,
struct vos_iter_anchors anchor = { 0 };
daos_handle_t coh;
struct d_backoff_seq backoff_seq;
uint64_t start_hlc = d_hlc_get();
uint64_t last_warn_hlc = start_hlc;
uint64_t now_hlc;
unsigned int retry_cnt = 0;
int rc;

D_ASSERT(type == VOS_ITER_COUUID);
Expand Down Expand Up @@ -2619,15 +2623,28 @@ cont_discard_cb(daos_handle_t ih, vos_iter_entry_t *entry,
if (rc != -DER_BUSY && rc != -DER_INPROGRESS)
break;

D_DEBUG(DB_REBUILD, "retry by "DF_RC"/"DF_UUID"\n",
DP_RC(rc), DP_UUID(entry->ie_couuid));
retry_cnt++;
now_hlc = d_hlc_get();
if (now_hlc - last_warn_hlc >= d_sec2hlc(300)) {
D_WARN(DF_UUID "/" DF_UUID ": discard still retrying after " DF_U64
"s, retries=%u: " DF_RC "\n",
DP_UUID(arg->ca_po_uuid), DP_UUID(entry->ie_couuid),
d_hlc2sec(now_hlc - start_hlc), retry_cnt, DP_RC(rc));
last_warn_hlc = now_hlc;
}
D_DEBUG(DB_REBUILD,
DF_UUID "/" DF_UUID ": discard retry %u after " DF_U64 "s: " DF_RC "\n",
DP_UUID(arg->ca_po_uuid), DP_UUID(entry->ie_couuid), retry_cnt,
d_hlc2sec(now_hlc - start_hlc), DP_RC(rc));
dss_sleep(d_backoff_seq_next(&backoff_seq));
} while (1);

d_backoff_seq_fini(&backoff_seq);
vos_cont_close(coh);
D_DEBUG(DB_TRACE, DF_UUID "/" DF_UUID " discard cont done: " DF_RC "\n",
DP_UUID(arg->ca_po_uuid), DP_UUID(entry->ie_couuid), DP_RC(rc));
D_DEBUG(DB_TRACE,
DF_UUID "/" DF_UUID " discard cont done after " DF_U64 "s, retries=%u: " DF_RC "\n",
DP_UUID(arg->ca_po_uuid), DP_UUID(entry->ie_couuid),
d_hlc2sec(d_hlc_get() - start_hlc), retry_cnt, DP_RC(rc));

put:
ds_cont_child_put(cont);
Expand Down
27 changes: 20 additions & 7 deletions src/tests/ftest/osa/online_reintegration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""
(C) Copyright 2020-2023 Intel Corporation.
(C) Copyright 2025 Hewlett Packard Enterprise Development LP
(C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP

SPDX-License-Identifier: BSD-2-Clause-Patent
"""
Expand Down Expand Up @@ -44,7 +44,7 @@ def daos_racer_thread(self):
self.daos_racer.run()

def run_online_reintegration_test(self, num_pool, ranks, racer=False, server_boot=False,
oclass=None):
oclass=None, include_pool_svc_leader=False):
"""Run the Online reintegration without data.

Args:
Expand All @@ -54,6 +54,8 @@ def run_online_reintegration_test(self, num_pool, ranks, racer=False, server_boo
Defaults to False.
server_boot (bool) : Perform system stop/start on a rank. Defaults to False.
oclass (str) : daos object class string (eg: "RP_2G8"). Defaults to None.
include_pool_svc_leader (bool): Ensure selected exclude/reintegrate ranks include
the current pool service leader. Defaults to False.
"""
if oclass is None:
oclass = self.ior_cmd.dfs_oclass.value
Expand Down Expand Up @@ -94,16 +96,26 @@ def run_online_reintegration_test(self, num_pool, ranks, racer=False, server_boo
self.pool.display_pool_daos_space("Pool space: Beginning")
pver_begin = self.pool.get_version(True)
self.log.info("Pool Version at the beginning %s", pver_begin)

# Preserve existing behavior unless explicitly requested by a test.
test_ranks = ranks
if include_pool_svc_leader:
join_ranks = not (len(ranks) == 1 and "," in str(ranks[0]))
total_ranks = len(ranks) if join_ranks else len(str(ranks[0]).split(","))
test_ranks = self.get_ranks_with_pool_svc_leader(
self.pool, total_ranks=total_ranks, join_ranks=join_ranks)
self.log.info("Using leader-inclusive test ranks: %s", test_ranks)

# Get initial total free space (scm+nvme)
initial_free_space = self.pool.get_total_free_space(refresh=True)
if server_boot is False:
output = self.pool.exclude(ranks)
output = self.pool.exclude(test_ranks)
else:
output = self.dmg_command.system_stop(ranks=ranks, force=True)
output = self.dmg_command.system_stop(ranks=test_ranks, force=True)
self.pool.wait_for_rebuild_to_start()
self.pool.wait_for_rebuild_to_end()
self.log.info(output)
output = self.dmg_command.system_start(ranks=ranks)
output = self.dmg_command.system_start(ranks=test_ranks)
self.pool.wait_for_rebuild_to_start()

self.print_and_assert_on_rebuild_failure(output)
Expand All @@ -117,7 +129,7 @@ def run_online_reintegration_test(self, num_pool, ranks, racer=False, server_boo
self.assertTrue(pver_exclude > (pver_begin + 8), "Pool Version Error: After exclude")
self.assertTrue(initial_free_space > free_space_after_exclude,
"Expected space after exclude is less than initial")
output = self.pool.reintegrate(ranks)
output = self.pool.reintegrate(test_ranks)
self.print_and_assert_on_rebuild_failure(output)
free_space_after_reintegration = self.pool.get_total_free_space(refresh=True)

Expand Down Expand Up @@ -236,4 +248,5 @@ def test_osa_online_reintegration_with_multiple_ranks(self):
"""
self.log.info("Online Reintegration : Multiple ranks")
ranks = self.get_random_test_ranks(join_ranks=False)
self.run_online_reintegration_test(num_pool=1, oclass="RP_3G1", ranks=ranks)
self.run_online_reintegration_test(
num_pool=1, oclass="RP_3G1", ranks=ranks, include_pool_svc_leader=True)
44 changes: 44 additions & 0 deletions src/tests/ftest/util/osa_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,50 @@ def get_random_test_ranks(self, total_ranks=2, join_ranks=True):
return list(map(str, self.random.sample(ranklist, k=total_ranks)))
return [",".join(map(str, self.random.sample(ranklist, k=total_ranks)))]

@fail_on(CommandFailure)
def get_ranks_with_pool_svc_leader(self, pool, total_ranks=2, join_ranks=True):
"""Get test ranks that always include the current pool service leader.

Args:
pool (TestPool): Pool object to query for the current service leader.
total_ranks (int, optional): Number of ranks to return. Defaults to 2.
join_ranks (bool, optional): Return as individual rank strings when True,
else return one comma-separated rank string in a list.

Returns:
list: Rank selection including the current pool service leader.
"""
if pool is None:
raise CommandFailure("Pool is required to select ranks with pool service leader")

output = self.dmg_command.pool_query(pool.identifier)
leader = int(output["response"]["svc_ldr"])
ranklist = list(self.server_managers[0].ranks.keys())

if total_ranks < 1:
raise CommandFailure("total_ranks must be at least 1")
if total_ranks > len(ranklist):
raise CommandFailure(
"Requested {} ranks but only {} are available".format(total_ranks, len(ranklist)))

remaining = [rank for rank in ranklist if rank != leader]
need = total_ranks - 1
if need > len(remaining):
raise CommandFailure(
"Unable to select {} additional non-leader ranks from {}".format(
need, len(remaining)))

selected = [leader]
if need > 0:
selected.extend(self.random.sample(remaining, k=need))

self.log.info(
"Selected ranks including pool service leader %s: %s",
leader, selected)
if join_ranks:
return list(map(str, selected))
return [",".join(map(str, selected))]

def assert_on_exception(self, out_queue=None):
"""Assert on exception while executing an application.

Expand Down
Loading