Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions src/pool/srv_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -2516,6 +2516,10 @@ cont_discard_cb(daos_handle_t ih, vos_iter_entry_t *entry,
struct vos_iter_anchors anchor = { 0 };
daos_handle_t coh;
struct d_backoff_seq backoff_seq;
uint64_t start_hlc = d_hlc_get();
uint64_t last_warn_hlc = start_hlc;
uint64_t now_hlc;
unsigned int retry_cnt = 0;
int rc;

D_ASSERT(type == VOS_ITER_COUUID);
Expand Down Expand Up @@ -2556,16 +2560,28 @@ cont_discard_cb(daos_handle_t ih, vos_iter_entry_t *entry,
if (rc != -DER_BUSY && rc != -DER_INPROGRESS)
break;

D_DEBUG(DB_REBUILD, "retry by "DF_RC"/"DF_UUID"\n",
DP_RC(rc), DP_UUID(entry->ie_couuid));
retry_cnt++;
now_hlc = d_hlc_get();
if (now_hlc - last_warn_hlc >= d_sec2hlc(300)) {
D_WARN(DF_UUID "/" DF_UUID ": discard still retrying after " DF_U64
"s, retries=%u: " DF_RC "\n",
DP_UUID(arg->tgt_discard->pool_uuid), DP_UUID(entry->ie_couuid),
d_hlc2sec(now_hlc - start_hlc), retry_cnt, DP_RC(rc));
last_warn_hlc = now_hlc;
}
D_DEBUG(DB_REBUILD,
DF_UUID "/" DF_UUID ": discard retry %u after " DF_U64 "s: " DF_RC "\n",
DP_UUID(arg->tgt_discard->pool_uuid), DP_UUID(entry->ie_couuid), retry_cnt,
d_hlc2sec(now_hlc - start_hlc), DP_RC(rc));
dss_sleep(d_backoff_seq_next(&backoff_seq));
} while (1);

d_backoff_seq_fini(&backoff_seq);
vos_cont_close(coh);
D_DEBUG(DB_TRACE, DF_UUID"/"DF_UUID" discard cont done: "DF_RC"\n",
D_DEBUG(DB_TRACE,
DF_UUID "/" DF_UUID " discard cont done after " DF_U64 "s, retries=%u: " DF_RC "\n",
DP_UUID(arg->tgt_discard->pool_uuid), DP_UUID(entry->ie_couuid),
DP_RC(rc));
d_hlc2sec(d_hlc_get() - start_hlc), retry_cnt, DP_RC(rc));

put:
ds_cont_child_put(cont);
Expand Down
48 changes: 32 additions & 16 deletions src/tests/ftest/osa/online_reintegration.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,26 @@ def daos_racer_thread(self):
self.daos_racer.get_params(self)
self.daos_racer.run()

def run_online_reintegration_test(self, num_pool, racer=False, server_boot=False, oclass=None,
num_ranks=1):
def run_online_reintegration_test(self, num_pool, ranks, racer=False, server_boot=False,
oclass=None, include_pool_svc_leader=False):
"""Run the Online reintegration without data.

Args:
num_pool (int) : total pools to create for testing purposes.
ranks (list) : list of ranks to reintegrate.
racer (bool) : whether pool has no data or to create some data in pool.
Defaults to False.
server_boot (bool) : Perform system stop/start on a rank. Defaults to False.
oclass (str) : daos object class string (eg: "RP_2G8"). Defaults to None.
num_ranks (int): Number of ranks to drain. Defaults to 1.
include_pool_svc_leader (bool): Ensure selected exclude/reintegrate ranks include
the current pool service leader. Defaults to False.
"""
if oclass is None:
oclass = self.ior_cmd.dfs_oclass.value
test_seq = self.ior_test_sequence[0]
# Create a pool
pool = {}

ranklist = list(self.server_managers[0].ranks.keys())
rank = ",".join(map(str, self.random.sample(ranklist, k=num_ranks)))

# Start the daos_racer thread
if racer is True:
daos_racer_thread = threading.Thread(target=self.daos_racer_thread)
Expand Down Expand Up @@ -97,16 +96,26 @@ def run_online_reintegration_test(self, num_pool, racer=False, server_boot=False
self.pool.display_pool_daos_space("Pool space: Beginning")
pver_begin = self.pool.get_version(True)
self.log.info("Pool Version at the beginning %s", pver_begin)

# Preserve existing behavior unless explicitly requested by a test.
test_ranks = ranks
if include_pool_svc_leader:
join_ranks = not (len(ranks) == 1 and "," in str(ranks[0]))
total_ranks = len(ranks) if join_ranks else len(str(ranks[0]).split(","))
test_ranks = self.get_ranks_with_pool_svc_leader(
self.pool, total_ranks=total_ranks, join_ranks=join_ranks)
self.log.info("Using leader-inclusive test ranks: %s", test_ranks)

# Get initial total free space (scm+nvme)
initial_free_space = self.pool.get_total_free_space(refresh=True)
if server_boot is False:
output = self.pool.exclude(rank)
output = self.pool.exclude(test_ranks)
else:
output = self.dmg_command.system_stop(ranks=rank, force=True)
output = self.dmg_command.system_stop(ranks=test_ranks, force=True)
self.pool.wait_for_rebuild_to_start()
self.pool.wait_for_rebuild_to_end()
self.log.info(output)
output = self.dmg_command.system_start(ranks=rank)
output = self.dmg_command.system_start(ranks=test_ranks)
self.pool.wait_for_rebuild_to_start()

self.print_and_assert_on_rebuild_failure(output)
Expand All @@ -120,7 +129,7 @@ def run_online_reintegration_test(self, num_pool, racer=False, server_boot=False
self.assertTrue(pver_exclude > (pver_begin + 8), "Pool Version Error: After exclude")
self.assertTrue(initial_free_space > free_space_after_exclude,
"Expected space after exclude is less than initial")
output = self.pool.reintegrate(rank)
output = self.pool.reintegrate(test_ranks)
self.print_and_assert_on_rebuild_failure(output)
free_space_after_reintegration = self.pool.get_total_free_space(refresh=True)

Expand Down Expand Up @@ -164,7 +173,8 @@ def test_osa_online_reintegration(self):
:avocado: tags=OSAOnlineReintegration,test_osa_online_reintegration
"""
self.log.info("Online Reintegration : Basic test")
self.run_online_reintegration_test(1)
ranks = self.get_random_test_ranks(total_ranks=1)
self.run_online_reintegration_test(num_pool=1, ranks=ranks)

def test_osa_online_reintegration_server_stop(self):
"""Test ID: DAOS-5920.
Expand All @@ -177,7 +187,8 @@ def test_osa_online_reintegration_server_stop(self):
:avocado: tags=OSAOnlineReintegration,test_osa_online_reintegration_server_stop
"""
self.log.info("Online Reintegration : System stop/start")
self.run_online_reintegration_test(1, server_boot=True)
ranks = self.get_random_test_ranks(total_ranks=1)
self.run_online_reintegration_test(num_pool=1, server_boot=True, ranks=ranks)

def test_osa_online_reintegration_without_csum(self):
"""Test ID: DAOS-5075.
Expand All @@ -191,7 +202,8 @@ def test_osa_online_reintegration_without_csum(self):
"""
self.log.info("Online Reintegration : No Checksum")
self.test_with_checksum = self.params.get("test_with_checksum", "/run/checksum/*")
self.run_online_reintegration_test(1)
ranks = self.get_random_test_ranks(total_ranks=1)
self.run_online_reintegration_test(num_pool=1, ranks=ranks)

def test_osa_online_reintegration_with_aggregation(self):
"""Test ID: DAOS-6715.
Expand All @@ -206,7 +218,8 @@ def test_osa_online_reintegration_with_aggregation(self):
self.test_during_aggregation = self.params.get("test_with_aggregation",
'/run/aggregation/*')
self.log.info("Online Reintegration : Aggregation")
self.run_online_reintegration_test(1)
ranks = self.get_random_test_ranks(total_ranks=1)
self.run_online_reintegration_test(num_pool=1, ranks=ranks)

def test_osa_online_reintegration_oclass(self):
"""Test ID: DAOS-6715.
Expand All @@ -219,8 +232,9 @@ def test_osa_online_reintegration_oclass(self):
:avocado: tags=OSAOnlineReintegration,test_osa_online_reintegration_oclass
"""
self.log.info("Online Reintegration : Object Class")
ranks = self.get_random_test_ranks(total_ranks=1)
for oclass in self.test_oclass:
self.run_online_reintegration_test(1, oclass=oclass)
self.run_online_reintegration_test(num_pool=1, oclass=oclass, ranks=ranks)

def test_osa_online_reintegration_with_multiple_ranks(self):
"""Test ID: DAOS-4753.
Expand All @@ -233,4 +247,6 @@ def test_osa_online_reintegration_with_multiple_ranks(self):
:avocado: tags=OSAOnlineReintegration,test_osa_online_reintegration_with_multiple_ranks
"""
self.log.info("Online Reintegration : Multiple ranks")
self.run_online_reintegration_test(1, oclass="RP_3G1", num_ranks=2)
ranks = self.get_random_test_ranks(join_ranks=False)
self.run_online_reintegration_test(
num_pool=1, oclass="RP_3G1", ranks=ranks, include_pool_svc_leader=True)
66 changes: 63 additions & 3 deletions src/tests/ftest/util/osa_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""
(C) Copyright 2020-2024 Intel Corporation.
(C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP

SPDX-License-Identifier: BSD-2-Clause-Patent
"""
Expand Down Expand Up @@ -287,9 +288,70 @@ def set_cont_class_properties(self, oclass="S1"):
"Detected container redundancy factor: %s",
self.container.properties.value)
self.ior_cmd.dfs_oclass.update(None, "ior.dfs_oclass")
self.ior_cmd.dfs_dir_oclass.update(None, "ior.dfs_dir_oclass")
self.container.oclass.update(None)

def get_random_test_ranks(self, total_ranks=2, join_ranks=True):
"""Get random list of ranks for OSA tests.

Args:
total_ranks (list): Random rank list for testing. Defaults to 2.
join_ranks (bool): Stop ranks individual ranks. Defaults to True.

Returns:
list: a list of random ranks either as individual strings,
or one comma-separated string.

"""
# Get a random rank(s) based on num_ranks input.
ranklist = list(self.server_managers[0].ranks.keys())
if join_ranks is True:
return list(map(str, self.random.sample(ranklist, k=total_ranks)))
return [",".join(map(str, self.random.sample(ranklist, k=total_ranks)))]

@fail_on(CommandFailure)
def get_ranks_with_pool_svc_leader(self, pool, total_ranks=2, join_ranks=True):
"""Get test ranks that always include the current pool service leader.

Args:
pool (TestPool): Pool object to query for the current service leader.
total_ranks (int, optional): Number of ranks to return. Defaults to 2.
join_ranks (bool, optional): Return as individual rank strings when True,
else return one comma-separated rank string in a list.

Returns:
list: Rank selection including the current pool service leader.
"""
if pool is None:
raise CommandFailure("Pool is required to select ranks with pool service leader")

output = self.dmg_command.pool_query(pool.identifier)
leader = int(output["response"]["svc_ldr"])
ranklist = list(self.server_managers[0].ranks.keys())

if total_ranks < 1:
raise CommandFailure("total_ranks must be at least 1")
if total_ranks > len(ranklist):
raise CommandFailure(
"Requested {} ranks but only {} are available".format(total_ranks, len(ranklist)))

remaining = [rank for rank in ranklist if rank != leader]
need = total_ranks - 1
if need > len(remaining):
raise CommandFailure(
"Unable to select {} additional non-leader ranks from {}".format(
need, len(remaining)))

selected = [leader]
if need > 0:
selected.extend(self.random.sample(remaining, k=need))

self.log.info(
"Selected ranks including pool service leader %s: %s",
leader, selected)
if join_ranks:
return list(map(str, selected))
return [",".join(map(str, selected))]

def assert_on_exception(self, out_queue=None):
"""Assert on exception while executing an application.

Expand Down Expand Up @@ -377,7 +439,6 @@ def ior_thread(self, pool, oclass, test, flags, single_cont_read=True, fail_on_w
self.ior_cmd.set_daos_params(self.pool, None)
self.log.info("Redundancy Factor : %s", self.test_with_rf)
self.ior_cmd.dfs_oclass.update(oclass)
self.ior_cmd.dfs_dir_oclass.update(oclass)
if single_cont_read is True:
# Prepare the containers created and use in a specific
# way defined in prepare_cont_ior_write.
Expand All @@ -402,7 +463,6 @@ def ior_thread(self, pool, oclass, test, flags, single_cont_read=True, fail_on_w
self.log.info(
"Detected container redundancy factor: %s", self.container.properties.value)
self.ior_cmd.dfs_oclass.update(None, "ior.dfs_oclass")
self.ior_cmd.dfs_dir_oclass.update(None, "ior.dfs_dir_oclass")
# Run run_ior_with_pool without invoking the pool query method for
# displaying pool space information (display_space=False)
self.run_ior_with_pool(create_pool=False, create_cont=False,
Expand Down
Loading