From 1685ef62af8caf1952ab3262684f74da91f259c0 Mon Sep 17 00:00:00 2001 From: Liang Zhen Date: Sat, 24 Jan 2026 09:27:34 +0800 Subject: [PATCH 01/14] DAOS-18487 object: control EC rebuild resource consumption A degraded EC read will allocate and register an extra buffer to recover data, which may cause ENOMEM in some cases. this workaround does not prevent dynamic buffer allocation and registration, it does provide relatively precise control over the resources consumed by degraded EC reads. Signed-off-by: Liang Zhen --- src/object/srv_obj_migrate.c | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/object/srv_obj_migrate.c b/src/object/srv_obj_migrate.c index 854d03f5ead..85013a6c453 100644 --- a/src/object/srv_obj_migrate.c +++ b/src/object/srv_obj_migrate.c @@ -2022,6 +2022,21 @@ migrate_one_ult(void *arg) } data_size = daos_iods_len(mrone->mo_iods, mrone->mo_iod_num); + if (daos_oclass_is_ec(&mrone->mo_oca)) { + /* NB: this is a workaround for EC object: + * The fetch buffer is taken from a pre-registered (R)DMA buffer; + * however, a degraded EC read will allocate and register an extra + * buffer to recover data. + * + * Currently, the resource manager cannot control this extra allocation, + * which can lead to increased memory consumption. + * + * While this workaround does not prevent dynamic buffer allocation and + * registration, it does provide relatively precise control over the + * resources consumed by degraded EC reads. + */ + data_size *= MIN(8, obj_ec_data_tgt_nr(&mrone->mo_oca)); + } data_size += daos_iods_len(mrone->mo_iods_from_parity, mrone->mo_iods_num_from_parity); From fc7efdca4f9a2a48841163e853ac221c0f7d3bae Mon Sep 17 00:00:00 2001 From: Liang Zhen Date: Mon, 26 Jan 2026 20:52:47 +0800 Subject: [PATCH 02/14] DAOS-18487 object: degraded buffer size only impact resource control Signed-off-by: Liang Zhen --- src/object/srv_obj_migrate.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/object/srv_obj_migrate.c b/src/object/srv_obj_migrate.c index 85013a6c453..d3a532277d1 100644 --- a/src/object/srv_obj_migrate.c +++ b/src/object/srv_obj_migrate.c @@ -2010,6 +2010,7 @@ migrate_one_ult(void *arg) struct migrate_one *mrone = arg; struct migrate_pool_tls *tls; daos_size_t data_size; + daos_size_t degraded_size = 0; int rc = 0; while (daos_fail_check(DAOS_REBUILD_TGT_REBUILD_HANG)) @@ -2035,7 +2036,7 @@ migrate_one_ult(void *arg) * registration, it does provide relatively precise control over the * resources consumed by degraded EC reads. */ - data_size *= MIN(8, obj_ec_data_tgt_nr(&mrone->mo_oca)); + degraded_size = data_size * MIN(8, obj_ec_data_tgt_nr(&mrone->mo_oca)); } data_size += daos_iods_len(mrone->mo_iods_from_parity, mrone->mo_iods_num_from_parity); @@ -2045,13 +2046,13 @@ migrate_one_ult(void *arg) D_ASSERT(data_size != (daos_size_t)-1); - rc = migrate_res_hold(tls, MIGR_DATA, data_size, NULL); + rc = migrate_res_hold(tls, MIGR_DATA, data_size + degraded_size, NULL); if (rc) D_GOTO(out, rc); rc = migrate_dkey(tls, mrone, data_size); - migrate_res_release(tls, MIGR_DATA, data_size); + migrate_res_release(tls, MIGR_DATA, data_size + degraded_size); D_DEBUG(DB_REBUILD, DF_RB ": " DF_UOID " layout %u migrate dkey " DF_KEY " inflight_size " DF_U64 From cf0d064b05bfc72fc087ee3ce13163a5ed9d9717 Mon Sep 17 00:00:00 2001 From: Liang Zhen Date: Wed, 28 Jan 2026 10:39:49 +0800 Subject: [PATCH 03/14] DAOS-18487 object: amplify credits also for data from parity shard Signed-off-by: Liang Zhen --- src/object/srv_obj_migrate.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/object/srv_obj_migrate.c b/src/object/srv_obj_migrate.c index d3a532277d1..3a39b842ead 100644 --- a/src/object/srv_obj_migrate.c +++ b/src/object/srv_obj_migrate.c @@ -2023,6 +2023,7 @@ migrate_one_ult(void *arg) } data_size = daos_iods_len(mrone->mo_iods, mrone->mo_iod_num); + data_size += daos_iods_len(mrone->mo_iods_from_parity, mrone->mo_iods_num_from_parity); if (daos_oclass_is_ec(&mrone->mo_oca)) { /* NB: this is a workaround for EC object: * The fetch buffer is taken from a pre-registered (R)DMA buffer; @@ -2038,8 +2039,6 @@ migrate_one_ult(void *arg) */ degraded_size = data_size * MIN(8, obj_ec_data_tgt_nr(&mrone->mo_oca)); } - data_size += daos_iods_len(mrone->mo_iods_from_parity, - mrone->mo_iods_num_from_parity); D_DEBUG(DB_TRACE, DF_RB ": mrone %p data size is " DF_U64 " %d/%d\n", DP_RB_MPT(tls), mrone, data_size, mrone->mo_iod_num, mrone->mo_iods_num_from_parity); From b000ff0b188b669c2a2c58314cb0fd7344ad5504 Mon Sep 17 00:00:00 2001 From: Liang Zhen Date: Thu, 29 Jan 2026 16:19:49 +0800 Subject: [PATCH 04/14] DAOS-18487 object: try to wake up more ULTs For data migration, after being waken up, the ULT should try to wake up another ULT if there is still available resource. Signed-off-by: Liang Zhen --- src/object/srv_obj_migrate.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/object/srv_obj_migrate.c b/src/object/srv_obj_migrate.c index 3a39b842ead..561153805b9 100644 --- a/src/object/srv_obj_migrate.c +++ b/src/object/srv_obj_migrate.c @@ -1945,12 +1945,16 @@ migrate_res_hold(struct migrate_pool_tls *tls, int res_type, long units, bool *y *yielded = waited; /* per-pool counters for rebuild status tracking */ - if (res_type == MIGR_OBJ) + if (res_type == MIGR_OBJ) { tls->mpt_tgt_obj_ult_cnt++; - else if (res_type == MIGR_KEY) + } else if (res_type == MIGR_KEY) { tls->mpt_tgt_dkey_ult_cnt++; - else + } else { tls->mpt_inflight_size += units; + /* remaining resource may be sufficient for more waiters */ + if (waited && res->res_units < res->res_limit) + ABT_cond_signal(res->res_cond); + } D_DEBUG(DB_REBUILD, "res=%s, hold=%lu, used=%lu, limit=%lu, waited=%d)\n" DF_RB From 9086fc34324c1955061bbf016bbe121c7177592f Mon Sep 17 00:00:00 2001 From: Liang Zhen Date: Tue, 3 Feb 2026 21:09:19 +0800 Subject: [PATCH 05/14] DAOS-18487 object: decrease upper limit of rebuild resource Signed-off-by: Liang Zhen --- src/object/srv_obj_migrate.c | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/object/srv_obj_migrate.c b/src/object/srv_obj_migrate.c index 561153805b9..7c6c85ba81d 100644 --- a/src/object/srv_obj_migrate.c +++ b/src/object/srv_obj_migrate.c @@ -32,10 +32,10 @@ #endif /* Max in-flight transfer size per xstream */ -/* Set the total in-flight size to be 50% of MAX DMA size for +/* Set the total in-flight size to be 1/3 of MAX DMA size for * the moment, will adjust it later if needed. */ -#define MIGR_TGT_INF_DATA (1 << 29) +#define MIGR_TGT_INF_DATA (300 << 20) /* Threshold for very large transfers. * This may exceed the MIGR_TGT_INF_DATA limit to prevent starvation. @@ -51,10 +51,10 @@ /* Number of migration ULTs per target */ #define MIGR_TGT_ULTS_MIN 100 -#define MIGR_TGT_ULTS_DEF 500 -#define MIGR_TGT_ULTS_MAX 2000 +#define MIGR_TGT_ULTS_DEF 300 +#define MIGR_TGT_ULTS_MAX 1000 -/* 1/3 object ults, 2/3 key ULTs */ +/* 1/3 object ults (100), 2/3 key ULTs (200) */ #define MIGR_OBJ_ULT_PERCENT 33 #define MIGR_TGT_OBJ_ULTS(ults) ((ults * MIGR_OBJ_ULT_PERCENT) / 100) @@ -715,16 +715,16 @@ mrone_obj_fetch_internal(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_ */ DL_WARN(rc, DF_RB ": retry " DF_UOID, DP_RB_MPT(tls), DP_UOID(mrone->mo_oid)); if (rc == -DER_NOMEM) { - /* sleep 10 seconds before retry, give other layers a chance to + /* sleep a few seconds before retry, give other layers a chance to * release resources. */ - dss_sleep(10 * 1000); + dss_sleep((10 + rand() % 20) * 1000); if (waited != 0 && waited % 3600 == 0) { DL_ERROR(rc, DF_RB ": waited memory for %d hour(s)", DP_RB_MRO(mrone), waited / 3600); } } - waited += 10; + waited += 20; D_GOTO(retry, rc); } @@ -2041,7 +2041,7 @@ migrate_one_ult(void *arg) * registration, it does provide relatively precise control over the * resources consumed by degraded EC reads. */ - degraded_size = data_size * MIN(8, obj_ec_data_tgt_nr(&mrone->mo_oca)); + degraded_size = data_size * MIN(16, obj_ec_data_tgt_nr(&mrone->mo_oca)); } D_DEBUG(DB_TRACE, DF_RB ": mrone %p data size is " DF_U64 " %d/%d\n", DP_RB_MPT(tls), mrone, From c0f60ee8b0ad5e4392c15f6273d25ac1487a4325 Mon Sep 17 00:00:00 2001 From: Liang Zhen Date: Thu, 12 Feb 2026 16:03:53 +0800 Subject: [PATCH 06/14] DAOS-18487 object: add resource bucket for rebuild - Add resource bucket so overall resource consumption wouldn't grow on system configured with more targets - Track demanded resource and waitq for blocked ULT, and wakeup as many waiters as resource(being released) allowed - Code cleanup Signed-off-by: Liang Zhen --- src/object/srv_obj_migrate.c | 568 ++++++++++++++++++++++++----------- 1 file changed, 396 insertions(+), 172 deletions(-) diff --git a/src/object/srv_obj_migrate.c b/src/object/srv_obj_migrate.c index 772434390cb..76341884a64 100644 --- a/src/object/srv_obj_migrate.c +++ b/src/object/srv_obj_migrate.c @@ -35,7 +35,7 @@ /* Set the total in-flight size to be 1/3 of MAX DMA size for * the moment, will adjust it later if needed. */ -#define MIGR_TGT_INF_DATA (300 << 20) +#define MIGR_TGT_INF_DATA (320 << 20) /* Threshold for very large transfers. * This may exceed the MIGR_TGT_INF_DATA limit to prevent starvation. @@ -47,23 +47,30 @@ */ #define MIGR_INF_DATA_LWM (1 << 28) -#define ENV_MIGRATE_ULT_CNT "D_MIGRATE_ULT_CNT" +#define ENV_MIGRATE_OBJ_CONCUR "D_MIGRATE_OBJ_CONCUR" -/* Number of migration ULTs per target */ -#define MIGR_TGT_ULTS_MIN 100 -#define MIGR_TGT_ULTS_DEF 300 -#define MIGR_TGT_ULTS_MAX 1000 - -/* 1/3 object ults (100), 2/3 key ULTs (200) */ -#define MIGR_OBJ_ULT_PERCENT 33 +/* Number of concurrent objects being migrated per engine, consider we have 4 resource + * buckets, this number divided by 4 is the per-bucket concurrency. + * + * Default=1600 means an engine can concurrently rebuild 100 EC(16+P) objects, which + * has high enough concurrency and wouldn't cause network resource exhaustion. + */ +enum { + MIGR_OBJ_CONCUR_MIN = 400, + MIGR_OBJ_CONCUR_DEF = 1600, + MIGR_OBJ_CONCUR_MAX = 4000, +}; -#define MIGR_TGT_OBJ_ULTS(ults) ((ults * MIGR_OBJ_ULT_PERCENT) / 100) -#define MIGR_TGT_KEY_ULTS(ults) (ults - MIGR_TGT_OBJ_ULTS(ults)) +enum { + MIGR_TGT_UNITS_MIN = 50, + MIGR_TGT_UNITS_DEF = 200, +}; enum { MIGR_OBJ = 0, MIGR_KEY, MIGR_DATA, + MIGR_CC, /* rebuild concurrency of the entire engine */ MIGR_MAX, }; @@ -74,25 +81,70 @@ struct migr_resource { long res_limit; /* resource amount in "unit" */ long res_units; + /* last time logging starveling */ + uint64_t res_log_since; + /* resource type */ + int res_type; + /* active ULTs */ + int res_holders; /* number of waiters on this resource */ int res_waiters; /* Only used by MIGR_DATA, it always allows exactly one ULT to use unbounded * buffer for super large value (rare). */ int res_hulk; + /* list head of all waiters */ + d_list_t res_waitq; /* ABT_cond for waiters */ ABT_cond res_cond; }; +/* anchor point of resource waiter */ +struct migr_res_waiter { + struct migrate_pool_tls *rw_tls; + /* link chain on resource manager */ + d_list_t rw_link; + /* quantity of resource being demanded */ + uint64_t rw_units; + /* start to wait since... */ + uint64_t rw_wait_since; +}; + /* migration resources manager */ struct migr_res_manager { ABT_mutex rmg_mutex; + /* total number of occurred memory errors */ + unsigned long rmg_mem_err; + /* waited more than 10 minutes, serious errors */ + unsigned long rmg_mem_ser_err; + /* number of revived ULTs after running into memory error */ + unsigned long rmg_mem_revived; + /* number of waiting ULTs */ + unsigned long rmg_mem_waiting; struct migr_resource rmg_resources[MIGR_MAX]; }; +/* Distribute the global resource into 4 buckets to avoid high lock contention, meanwhile, + * units owned by each bucket can remain the same for engine configuration with different + * number of targets. + */ +#define MIGR_RES_BUCKETS 4 + +enum migr_bucket_type { + MIGR_BUCKET_PRIV, /* private */ + MIGR_BUCKET_MAP, /* shared, N to 1 map */ + MIGR_BUCKET_ROTATE, /* shared, rotating on buckets */ +}; + struct migr_engine_res { - /* total ULTs per target, it a tunable which can be set by admin */ - unsigned int er_max_ults; + /* type of bucket, see migr_bucket_type */ + enum migr_bucket_type er_bucket_type; + /* number of resource sharing buckets */ + int er_bucket_nr; + /* number of targets sharing the same resource bucket */ + int er_bucket_size; + /* round-robin bucket selector */ + ATOMIC int er_bucket_selector; /* dss_tgt_nr resource managers */ struct migr_res_manager *er_rmgs; }; @@ -166,6 +218,8 @@ struct iter_cont_arg { uuid_t pool_hdl_uuid; uuid_t cont_uuid; uuid_t cont_hdl_uuid; + daos_handle_t cont_hdl; + struct cont_props cont_props; struct tree_cache_root *cont_root; unsigned int yield_freq; uint64_t *snaps; @@ -181,8 +235,13 @@ struct iter_obj_arg { uuid_t cont_uuid; daos_unit_oid_t oid; daos_handle_t ioa_oh; + daos_handle_t ioa_coh; int ioa_obj_ref; struct daos_oclass_attr ioa_oca; + /* RPC fanout of degraded I/O and enumeration, it should be deemed as + * amplification factor for resource management + */ + int ioa_fanout; daos_epoch_t epoch; daos_epoch_t punched_epoch; unsigned int shard; @@ -685,14 +744,23 @@ mrone_recx_vos2_daos(struct migrate_one *mrone, int shard, daos_iod_t *iods, int mrone_recx_daos_vos_internal(mrone, false, shard, iods, iods_num); } +enum { + MEM_NO_WAIT, + MEM_WAIT, + MEM_LONG_WAIT, +}; + static int mrone_obj_fetch_internal(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_t *sgls, daos_iod_t *iods, int iod_num, daos_epoch_t eph, uint32_t flags, d_iov_t *csum_iov_fetch, struct migrate_pool_tls *tls) { + struct migr_res_manager *rmg = tls->mpt_rmg; uint32_t *extra_arg = NULL; - int waited = 0; + uint64_t then = 0; + uint64_t now; int rc; + int wait = MEM_NO_WAIT; /* pass rebuild epoch by extra_arg */ if (flags & DIOF_FETCH_EPOCH_EC_AGG_BOUNDARY) { @@ -700,6 +768,7 @@ mrone_obj_fetch_internal(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_ mrone->mo_epoch); extra_arg = (uint32_t *)mrone->mo_epoch; } + D_ASSERT(rmg != NULL); retry: rc = dsc_obj_fetch(oh, eph, &mrone->mo_dkey, iod_num, iods, sgls, NULL, flags, extra_arg, csum_iov_fetch); @@ -713,21 +782,40 @@ mrone_obj_fetch_internal(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_ /* If pool map does not change, then let's retry for timeout, instead of * fail out. */ - DL_WARN(rc, DF_RB ": retry " DF_UOID, DP_RB_MPT(tls), DP_UOID(mrone->mo_oid)); - if (rc == -DER_NOMEM) { - /* sleep a few seconds before retry, give other layers a chance to - * release resources. - */ - dss_sleep((10 + rand() % 20) * 1000); - if (waited != 0 && waited % 3600 == 0) { - DL_ERROR(rc, DF_RB ": waited memory for %d hour(s)", - DP_RB_MRO(mrone), waited / 3600); - } + if (rc != -DER_NOMEM) { + DL_WARN(rc, DF_RB ": retry " DF_UOID, DP_RB_MPT(tls), + DP_UOID(mrone->mo_oid)); + dss_sleep(1); + D_GOTO(retry, rc); + } + + now = daos_gettime_coarse(); + if (wait == MEM_NO_WAIT) { + wait = MEM_WAIT; + rmg->rmg_mem_waiting++; + rmg->rmg_mem_err++; + then = now; + } + /* sleep a few seconds before retry, give other layers a chance to + * release resources. + */ + dss_sleep((10 + rand() % 20) * 1000); + if (wait != MEM_LONG_WAIT && now - then >= 600) { + wait = MEM_LONG_WAIT; /* flagged as long waiter */ + rmg->rmg_mem_ser_err++; /* counted as serious error */ + DL_ERROR(rc, + DF_RB " waited for 10 minutes, total memory errors: %lu/%lu," + " total waiters: %lu, total revived: %lu\n", + DP_RB_MRO(mrone), rmg->rmg_mem_ser_err, rmg->rmg_mem_err, + rmg->rmg_mem_waiting, rmg->rmg_mem_revived); } - waited += 20; D_GOTO(retry, rc); } - + if (wait != MEM_NO_WAIT) { + D_ASSERT(rmg->rmg_mem_waiting > 0); + rmg->rmg_mem_revived++; + rmg->rmg_mem_waiting--; + } return rc; } @@ -1885,9 +1973,85 @@ migrate_one_destroy(struct migrate_one *mrone) } static bool -migr_res_is_hulk(int res_type, long units) +migr_res_is_hulk(struct migr_resource *res, long units) { - return res_type == MIGR_DATA && units >= MIGR_INF_DATA_HULK; + return res->res_type == MIGR_DATA && units >= MIGR_INF_DATA_HULK; +} + +static bool +migr_res_has_starveling(struct migr_resource *res, uint64_t now) +{ + static unsigned int starving_threshold = 30; /* 30 seconds */ + struct migr_res_waiter *waiter; + + if (res->res_waiters == 0) + return false; + + D_ASSERT(!d_list_empty(&res->res_waitq)); + waiter = d_list_entry(res->res_waitq.next, struct migr_res_waiter, rw_link); + + if (now - waiter->rw_wait_since < starving_threshold) + return false; + + /* If someone has already waited for more than 30 seconds, we should prioritize it and + * queue the current ULT. Log this waiter every 10 minutes so we can find out if it's + * blocked permentally. + */ + if (now - res->res_log_since > 600) { + D_DEBUG(DB_REBUILD, + DF_RB + " starving: res=%s, since=%lu, ask:limit=%lu:%lu, holders:waiters=%d:%d\n", + DP_RB_MPT(waiter->rw_tls), res->res_name, waiter->rw_wait_since, + waiter->rw_units, res->res_limit, res->res_holders, res->res_waiters); + res->res_log_since = now; + } + return true; +} + +static void +migrate_res_wakeup(struct migr_resource *res, uint64_t units) +{ + struct migr_res_waiter *waiter, *tmp; + + d_list_for_each_entry_safe(waiter, tmp, &res->res_waitq, rw_link) { + if (waiter->rw_units > units) + break; + + if (units != -1ULL) + units -= waiter->rw_units; + + /* NB: for -1ULL case we should just wakeup who have the same uuid of the pool, + * however, ABT_cond can't support removing waiter from the queue, so we just + * wakeup everyone. + */ + res->res_waiters--; + d_list_del_init(&waiter->rw_link); + ABT_cond_signal(res->res_cond); + } +} + +static struct migr_res_manager * +migr_type2rmg(struct dss_module_info *dmi, int res_type, bool *shared_res) +{ + struct migr_res_manager *rmg; + int idx; + + if (res_type != MIGR_CC || migr_eng_res.er_bucket_type == MIGR_BUCKET_PRIV) { + rmg = &migr_eng_res.er_rmgs[dmi->dmi_tgt_id]; + *shared_res = false; + return rmg; + } + + if (migr_eng_res.er_bucket_type == MIGR_BUCKET_MAP) { + idx = dmi->dmi_tgt_id / migr_eng_res.er_bucket_size; + D_ASSERT(idx < migr_eng_res.er_bucket_nr); + } else /* rotation */ { + idx = atomic_fetch_add(&migr_eng_res.er_bucket_selector, 1); + idx %= migr_eng_res.er_bucket_nr; + } + rmg = &migr_eng_res.er_rmgs[idx]; + *shared_res = true; + return rmg; } static int @@ -1896,116 +2060,152 @@ migrate_res_hold(struct migrate_pool_tls *tls, int res_type, long units, bool *y struct dss_module_info *dmi = dss_get_module_info(); struct migr_res_manager *rmg; struct migr_resource *res; + struct migr_res_waiter waiter; + uint64_t now; bool is_hulk; - bool waited = false; + bool shared_res; + bool locked = false; int rc = 0; D_ASSERT(dmi->dmi_xs_id != 0); - - rmg = &migr_eng_res.er_rmgs[dmi->dmi_tgt_id]; - if (tls->mpt_rmg == NULL) { + if (res_type == MIGR_OBJ) + res_type = MIGR_CC; /* require the global concurrent permission first */ +again: + rmg = migr_type2rmg(dmi, res_type, &shared_res); + if (tls->mpt_rmg == NULL && !shared_res) tls->mpt_rmg = rmg; - } else { - D_ASSERTF(tls->mpt_rmg == rmg, "target=%d, rmg_off=%d\n", dmi->dmi_tgt_id, - (int)(tls->mpt_rmg - &migr_eng_res.er_rmgs[0])); + + if (shared_res) { + ABT_mutex_lock(rmg->rmg_mutex); + locked = true; } + waiter.rw_tls = tls; + waiter.rw_units = units; + waiter.rw_wait_since = 0; res = &rmg->rmg_resources[res_type]; - is_hulk = migr_res_is_hulk(res_type, units); + is_hulk = migr_res_is_hulk(res, units); while (1) { if (tls->mpt_fini) { rc = migrate_pool_tls_get_status(tls); D_GOTO(out, rc); } + now = daos_gettime_coarse(); if (is_hulk && res->res_hulk == 0 && res->res_units < MIGR_INF_DATA_LWM) { /* skip the limit check and allow (only) one hulk transfer at a time */ res->res_units += units; res->res_hulk = 1; break; - } else if (!is_hulk && res->res_units + units <= res->res_limit) { + } else if (!is_hulk && !migr_res_has_starveling(res, now) && + res->res_units + units <= res->res_limit) { res->res_units += units; break; } - ABT_mutex_lock(rmg->rmg_mutex); - res->res_waiters++; - if (res->res_waiters >= 100 && res->res_waiters % 100 == 0) { - D_DEBUG(DB_REBUILD, - "%d waiters are waiting on res=%s (target=%d, unit=%lu)\n", - res->res_waiters, res->res_name, dmi->dmi_tgt_id, units); + if (waiter.rw_wait_since == 0) + waiter.rw_wait_since = now; + + if (!shared_res) { + ABT_mutex_lock(rmg->rmg_mutex); + locked = true; } + res->res_waiters++; + d_list_add_tail(&waiter.rw_link, &res->res_waitq); + ABT_cond_wait(res->res_cond, rmg->rmg_mutex); - res->res_waiters--; - ABT_mutex_unlock(rmg->rmg_mutex); - waited = true; + + D_ASSERT(d_list_empty(&waiter.rw_link)); + if (!shared_res) + ABT_mutex_unlock(rmg->rmg_mutex); } - if (yielded) - *yielded = waited; + res->res_holders++; /* per-pool counters for rebuild status tracking */ - if (res_type == MIGR_OBJ) { + if (res_type == MIGR_OBJ) tls->mpt_tgt_obj_ult_cnt++; - } else if (res_type == MIGR_KEY) { + else if (res_type == MIGR_KEY) tls->mpt_tgt_dkey_ult_cnt++; - } else { + else if (res_type == MIGR_DATA) tls->mpt_inflight_size += units; - /* remaining resource may be sufficient for more waiters */ - if (waited && res->res_units < res->res_limit) - ABT_cond_signal(res->res_cond); - } D_DEBUG(DB_REBUILD, - "res=%s, hold=%lu, used=%lu, limit=%lu, waited=%d)\n" DF_RB - " obj_ults=%u, key_ults=%u, inf_data=" DF_U64 ")\n", - res->res_name, units, res->res_units, res->res_limit, waited, DP_RB_MPT(tls), + DF_RB " " + "res=%s units:total:limit=%lu:%lu:%lu waiters:holders=%d:%d, waited=%s, " + " obj_ults=%u, key_ults=%u, inf_data=" DF_U64 ")\n", + DP_RB_MPT(tls), res->res_name, units, res->res_units, res->res_limit, + res->res_waiters, res->res_holders, !!waiter.rw_wait_since ? "yes" : "no", tls->mpt_tgt_obj_ult_cnt, tls->mpt_tgt_dkey_ult_cnt, tls->mpt_inflight_size); + + if (shared_res) { + ABT_mutex_unlock(rmg->rmg_mutex); + shared_res = false; + } + + if (res_type == MIGR_CC) { + /* go back and require local thread object resource. + * NB: MIGR_CC and MIGR_OBJ have the same unit. + */ + res_type = MIGR_OBJ; + goto again; + } out: + if (yielded && !*yielded) + *yielded = locked; return rc; } static void migrate_res_release(struct migrate_pool_tls *tls, int res_type, long units) { + struct dss_module_info *dmi = dss_get_module_info(); struct migr_res_manager *rmg; struct migr_resource *res; + bool shared_res; - rmg = tls->mpt_rmg; - D_ASSERT(rmg != NULL); +again: + rmg = migr_type2rmg(dmi, res_type, &shared_res); + D_ASSERT(shared_res || rmg == tls->mpt_rmg); + if (shared_res) + ABT_mutex_lock(rmg->rmg_mutex); res = &rmg->rmg_resources[res_type]; - D_DEBUG(DB_REBUILD, - "%s: release=%lu, used=%lu, limit=%lu\n" DF_RB - " obj_ults=%u, key_ults=%u, inf_data=" DF_U64 ")\n", - res->res_name, units, res->res_units, res->res_limit, DP_RB_MPT(tls), - tls->mpt_tgt_obj_ult_cnt, tls->mpt_tgt_dkey_ult_cnt, tls->mpt_inflight_size); - if (res_type == MIGR_OBJ) { D_ASSERT(tls->mpt_tgt_obj_ult_cnt > 0); tls->mpt_tgt_obj_ult_cnt--; } else if (res_type == MIGR_KEY) { D_ASSERT(tls->mpt_tgt_dkey_ult_cnt > 0); tls->mpt_tgt_dkey_ult_cnt--; - } else { + } else if (res_type == MIGR_DATA) { D_ASSERT(tls->mpt_inflight_size >= units); tls->mpt_inflight_size -= units; } D_ASSERT(res->res_units >= units); res->res_units -= units; + D_ASSERT(res->res_holders > 0); + res->res_holders--; - if (migr_res_is_hulk(res_type, units)) { + if (migr_res_is_hulk(res, units)) { D_ASSERT(res->res_hulk == 1); res->res_hulk = 0; } - if (res->res_waiters > 0) { - ABT_mutex_lock(rmg->rmg_mutex); - ABT_cond_signal(res->res_cond); + if (res->res_waiters > 0) + migrate_res_wakeup(res, units); + + if (shared_res) { ABT_mutex_unlock(rmg->rmg_mutex); + shared_res = false; + } + + if (res_type == MIGR_OBJ) { + /* NB: MIGR_CC and MIGR_OBJ have the same unit. */ + res_type = MIGR_CC; + goto again; } } @@ -2013,9 +2213,9 @@ static void migrate_one_ult(void *arg) { struct migrate_one *mrone = arg; + struct iter_obj_arg *ioa = mrone->mo_obj_arg; struct migrate_pool_tls *tls; - daos_size_t data_size; - daos_size_t degraded_size = 0; + daos_size_t data_size; int rc = 0; while (daos_fail_check(DAOS_REBUILD_TGT_REBUILD_HANG)) @@ -2029,34 +2229,19 @@ migrate_one_ult(void *arg) data_size = daos_iods_len(mrone->mo_iods, mrone->mo_iod_num); data_size += daos_iods_len(mrone->mo_iods_from_parity, mrone->mo_iods_num_from_parity); - if (daos_oclass_is_ec(&mrone->mo_oca)) { - /* NB: this is a workaround for EC object: - * The fetch buffer is taken from a pre-registered (R)DMA buffer; - * however, a degraded EC read will allocate and register an extra - * buffer to recover data. - * - * Currently, the resource manager cannot control this extra allocation, - * which can lead to increased memory consumption. - * - * While this workaround does not prevent dynamic buffer allocation and - * registration, it does provide relatively precise control over the - * resources consumed by degraded EC reads. - */ - degraded_size = data_size * MIN(16, obj_ec_data_tgt_nr(&mrone->mo_oca)); - } D_DEBUG(DB_TRACE, DF_RB ": mrone %p data size is " DF_U64 " %d/%d\n", DP_RB_MPT(tls), mrone, data_size, mrone->mo_iod_num, mrone->mo_iods_num_from_parity); D_ASSERT(data_size != (daos_size_t)-1); - rc = migrate_res_hold(tls, MIGR_DATA, data_size + degraded_size, NULL); + rc = migrate_res_hold(tls, MIGR_DATA, data_size * ioa->ioa_fanout, NULL); if (rc) D_GOTO(out, rc); rc = migrate_dkey(tls, mrone, data_size); - migrate_res_release(tls, MIGR_DATA, data_size + degraded_size); + migrate_res_release(tls, MIGR_DATA, data_size * ioa->ioa_fanout); D_DEBUG(DB_REBUILD, DF_RB ": " DF_UOID " layout %u migrate dkey " DF_KEY " inflight_size " DF_U64 @@ -2080,7 +2265,7 @@ migrate_one_ult(void *arg) tls->mpt_fini = 1; } out: - migrate_res_release(tls, MIGR_KEY, 1); + migrate_res_release(tls, MIGR_KEY, ioa->ioa_fanout); migrate_one_destroy(mrone); } @@ -2873,7 +3058,7 @@ migrate_start_ult(struct enum_unpack_arg *unpack_arg) continue; } - rc = migrate_res_hold(tls, MIGR_KEY, 1, NULL); + rc = migrate_res_hold(tls, MIGR_KEY, arg->ioa_fanout, NULL); if (rc) break; d_list_del_init(&mrone->mo_list); @@ -2887,7 +3072,7 @@ migrate_start_ult(struct enum_unpack_arg *unpack_arg) rc = dss_ult_create(migrate_one_ult, mrone, DSS_XS_SELF, 0, MIGRATE_STACK_SIZE, NULL); if (rc) { - migrate_res_release(tls, MIGR_KEY, 1); + migrate_res_release(tls, MIGR_KEY, arg->ioa_fanout); migrate_one_destroy(mrone); break; } @@ -2903,8 +3088,7 @@ migrate_start_ult(struct enum_unpack_arg *unpack_arg) * Iterate akeys/dkeys of the object */ static int -migrate_one_epoch_object(daos_epoch_range_t *epr, struct migrate_pool_tls *tls, - struct iter_obj_arg *arg) +migrate_obj_epoch(struct migrate_pool_tls *tls, struct iter_obj_arg *arg, daos_epoch_range_t *epr) { daos_anchor_t anchor; daos_anchor_t dkey_anchor; @@ -3175,10 +3359,9 @@ migrate_fini_one_ult(void *data) struct migr_res_manager *rmg = tls->mpt_rmg; int i; - /* NB: no big deal but ULTs of all pools will be waken up */ ABT_mutex_lock(rmg->rmg_mutex); for (i = 0; i < MIGR_MAX; i++) - ABT_cond_broadcast(rmg->rmg_resources[i].res_cond); + migrate_res_wakeup(&rmg->rmg_resources[i], -1ULL); ABT_mutex_unlock(rmg->rmg_mutex); } @@ -3253,8 +3436,7 @@ migrate_obj_ult(void *data) struct migrate_pool_tls *tls = NULL; daos_epoch_range_t epr; daos_epoch_t stable_epoch = 0; - daos_handle_t coh = DAOS_HDL_INVAL; - struct cont_props props; + daos_handle_t coh = arg->ioa_coh; int i; int rc = 0; @@ -3265,27 +3447,6 @@ migrate_obj_ult(void *data) D_GOTO(free_notls, rc); } - /* Only reintegrating targets/pool needs to discard the object, - * if sp_need_discard is 0, either the target does not need to - * discard, or discard has been done. spc_discard_done means - * discarding has been done in the current VOS target. - */ - if (tls->mpt_pool->spc_pool->sp_need_discard) { - while(!tls->mpt_pool->spc_discard_done) { - D_DEBUG(DB_REBUILD, DF_RB ": wait for discard to finish.\n", - DP_RB_MPT(tls)); - dss_sleep(2 * 1000); - if (tls->mpt_fini) - D_GOTO(free_notls, rc); - } - if (tls->mpt_pool->spc_pool->sp_discard_status) { - rc = tls->mpt_pool->spc_pool->sp_discard_status; - D_DEBUG(DB_REBUILD, DF_RB ": discard failure: " DF_RC "\n", DP_RB_MPT(tls), - DP_RC(rc)); - D_GOTO(out, rc); - } - } - if (tls->mpt_reintegrating) { struct ds_cont_child *cont_child = NULL; @@ -3310,33 +3471,12 @@ migrate_obj_ult(void *data) ds_cont_child_put(cont_child); } - rc = dsc_pool_open(tls->mpt_pool_uuid, tls->mpt_poh_uuid, 0, NULL, - tls->mpt_pool->spc_pool->sp_map, &tls->mpt_svc_list, &tls->mpt_pool_hdl); - if (rc) { - DL_ERROR(rc, DF_RB ": dsc_pool_open failed", DP_RB_MPT(tls)); - D_GOTO(out, rc); - } - - rc = migrate_cont_open(tls, arg->cont_uuid, 0, &coh); - if (rc) { - DL_ERROR(rc, DF_RB ": migrate_cont_open failed", DP_RB_MPT(tls)); - D_GOTO(out, rc); - } - rc = dsc_obj_open(coh, arg->oid.id_pub, DAOS_OO_RO, &arg->ioa_oh); if (rc) { DL_ERROR(rc, DF_RB ": dsc_obj_open failed", DP_RB_MPT(tls)); D_GOTO(out, rc); } - dsc_cont_get_props(coh, &props); - rc = dsc_obj_id2oc_attr(arg->oid.id_pub, &props, &arg->ioa_oca); - if (rc) { - DL_ERROR(rc, DF_RB ": unknown object class: %u", DP_RB_MPT(tls), - daos_obj_id2class(arg->oid.id_pub)); - D_GOTO(out, rc); - } - for (i = 0; i < arg->snap_cnt; i++) { daos_epoch_t lower_epoch = 0; @@ -3356,7 +3496,7 @@ migrate_obj_ult(void *data) epr.epr_hi = arg->snaps[i]; D_DEBUG(DB_REBUILD, DF_RB ": rebuild_snap %d " DF_X64 "-" DF_X64 "\n", DP_RB_MPT(tls), i, epr.epr_lo, epr.epr_hi); - rc = migrate_one_epoch_object(&epr, tls, arg); + rc = migrate_obj_epoch(tls, arg, &epr); if (rc) D_GOTO(free, rc); } @@ -3372,7 +3512,7 @@ migrate_obj_ult(void *data) D_ASSERT(tls->mpt_max_eph != 0); epr.epr_hi = tls->mpt_max_eph; if (arg->epoch > 0) { - rc = migrate_one_epoch_object(&epr, tls, arg); + rc = migrate_obj_epoch(tls, arg, &epr); } else { /* The obj has been punched for this range */ D_DEBUG(DB_REBUILD, @@ -3411,7 +3551,7 @@ migrate_obj_ult(void *data) DP_RB_MPT(tls), DP_UOID(arg->oid), arg->shard, tls->mpt_tgt_obj_ult_cnt, tls->mpt_tgt_dkey_ult_cnt, tls->mpt_obj_count, DP_RC(rc)); free_notls: - migrate_res_release(tls, MIGR_OBJ, 1); + migrate_res_release(tls, MIGR_OBJ, arg->ioa_fanout); migrate_obj_put(arg); } @@ -3424,10 +3564,9 @@ struct migrate_obj_val { /* This is still running on the main migration ULT */ static int -migrate_one_object(daos_unit_oid_t oid, daos_epoch_t eph, daos_epoch_t punched_eph, - unsigned int shard, unsigned int tgt_idx, void *data) +migrate_object(daos_unit_oid_t oid, daos_epoch_t eph, daos_epoch_t punched_eph, unsigned int shard, + unsigned int tgt_idx, struct iter_cont_arg *cont_arg, bool *yielded) { - struct iter_cont_arg *cont_arg = data; struct iter_obj_arg *obj_arg; struct migrate_pool_tls *tls = cont_arg->pool_tls; daos_handle_t toh = tls->mpt_migrated_root_hdl; @@ -3454,6 +3593,7 @@ migrate_one_object(daos_unit_oid_t oid, daos_epoch_t eph, daos_epoch_t punched_e uuid_copy(obj_arg->cont_uuid, cont_arg->cont_uuid); obj_arg->version = cont_arg->pool_tls->mpt_version; obj_arg->generation = cont_arg->pool_tls->mpt_generation; + obj_arg->ioa_coh = cont_arg->cont_hdl; if (cont_arg->snaps) { D_ALLOC(obj_arg->snaps, sizeof(*cont_arg->snaps) * cont_arg->snap_cnt); @@ -3465,6 +3605,26 @@ migrate_one_object(daos_unit_oid_t oid, daos_epoch_t eph, daos_epoch_t punched_e sizeof(*obj_arg->snaps) * cont_arg->snap_cnt); } + rc = dsc_obj_id2oc_attr(oid.id_pub, &cont_arg->cont_props, &obj_arg->ioa_oca); + if (rc) { + DL_ERROR(rc, DF_RB ": unknown object class: %u", DP_RB_MPT(tls), + daos_obj_id2class(oid.id_pub)); + D_GOTO(free, rc); + } + + if (daos_oclass_is_ec(&obj_arg->ioa_oca)) /* RPC fanout has to be considered */ + obj_arg->ioa_fanout = MIN(16, obj_ec_data_tgt_nr(&obj_arg->ioa_oca)); + else + obj_arg->ioa_fanout = 1; + + *yielded = false; + rc = migrate_res_hold(cont_arg->pool_tls, MIGR_OBJ, obj_arg->ioa_fanout, yielded); + if (rc != 0) { + DL_ERROR(rc, DF_UUID " enter migrate failed.", DP_UUID(cont_arg->cont_uuid)); + migrate_res_release(cont_arg->pool_tls, MIGR_OBJ, obj_arg->ioa_fanout); + goto free; + } + D_ASSERT(tgt_idx == dss_get_module_info()->dmi_tgt_id); rc = dss_ult_create(migrate_obj_ult, obj_arg, DSS_XS_SELF, 0, MIGRATE_STACK_SIZE, NULL); if (rc) @@ -3509,18 +3669,10 @@ migrate_obj_iter_cb(daos_handle_t ih, d_iov_t *key_iov, d_iov_t *val_iov, void * DF_RB ": obj migrate " DF_UUID "/" DF_UOID " %" PRIx64 " eph " DF_U64 " start\n", DP_RB_MPT(arg->pool_tls), DP_UUID(arg->cont_uuid), DP_UOID(*oid), ih.cookie, epoch); - rc = migrate_res_hold(arg->pool_tls, MIGR_OBJ, 1, &yielded); - if (rc) { - DL_ERROR(rc, DF_RB ": " DF_UUID " enter migrate failed.", DP_RB_MPT(arg->pool_tls), - DP_UUID(arg->cont_uuid)); - return rc; - } - - rc = migrate_one_object(*oid, epoch, punched_epoch, shard, tgt_idx, arg); + rc = migrate_object(*oid, epoch, punched_epoch, shard, tgt_idx, arg, &yielded); if (rc != 0) { DL_ERROR(rc, DF_RB ": obj " DF_UOID " migration failed", DP_RB_MPT(arg->pool_tls), DP_UOID(*oid)); - migrate_res_release(arg->pool_tls, MIGR_OBJ, 1); return rc; } @@ -3649,6 +3801,14 @@ migrate_cont_iter_cb(daos_handle_t ih, d_iov_t *key_iov, arg.snap_cnt = fetch_arg.snap_cnt; arg.pool_tls = tls; uuid_copy(arg.cont_uuid, cont_uuid); + + rc = migrate_cont_open(tls, cont_uuid, 0, &arg.cont_hdl); + if (rc) { + DL_ERROR(rc, DF_RB ": migrate_cont_open failed", DP_RB_MPT(tls)); + D_GOTO(free, rc); + } + + dsc_cont_get_props(arg.cont_hdl, &arg.cont_props); while (!dbtree_is_empty(root->tcr_root_hdl)) { if (tls->mpt_fini) break; @@ -3710,20 +3870,51 @@ static void migrate_ult(void *arg) { struct migrate_pool_tls *pool_tls = arg; - int rc; + struct ds_pool_child *pool; + int rc = 0; D_ASSERT(pool_tls != NULL); + pool = pool_tls->mpt_pool; + /* Only reintegrating targets/pool needs to discard the object, if sp_need_discard is 0, + * either the target does not need to discard, or discard has been done. + * spc_discard_done means discarding has been done in the current VOS target. + */ + if (pool->spc_pool->sp_need_discard) { + while (!pool->spc_discard_done) { + D_DEBUG(DB_REBUILD, DF_RB " wait for discard to finish.\n", + DP_RB_MPT(pool_tls)); + dss_sleep(2 * 1000); + if (pool_tls->mpt_fini) + D_GOTO(out, rc); + } + if (pool->spc_pool->sp_discard_status) { + rc = pool->spc_pool->sp_discard_status; + D_DEBUG(DB_REBUILD, DF_RB " discard failure: " DF_RC, DP_RB_MPT(pool_tls), + DP_RC(rc)); + D_GOTO(out, rc); + } + } + + rc = + dsc_pool_open(pool_tls->mpt_pool_uuid, pool_tls->mpt_poh_uuid, 0, NULL, + pool->spc_pool->sp_map, &pool_tls->mpt_svc_list, &pool_tls->mpt_pool_hdl); + if (rc) { + DL_ERROR(rc, DF_RB ": dsc_pool_open failed", DP_RB_MPT(pool_tls)); + D_GOTO(out, rc); + } + while (!dbtree_is_empty(pool_tls->mpt_root_hdl) && !pool_tls->mpt_fini) { rc = dbtree_iterate(pool_tls->mpt_root_hdl, DAOS_INTENT_PURGE, false, migrate_cont_iter_cb, pool_tls); if (rc < 0) { DL_ERROR(rc, DF_RB ": dbtree iterate failed", DP_RB_MPT(pool_tls)); - if (pool_tls->mpt_status == 0) - pool_tls->mpt_status = rc; - break; + goto out; } } +out: + if (pool_tls->mpt_status == 0) + pool_tls->mpt_status = rc; pool_tls->mpt_ult_running = 0; migrate_pool_tls_put(pool_tls); @@ -4547,12 +4738,15 @@ ds_object_migrate_send(struct ds_pool *pool, uuid_t pool_hdl_uuid, uuid_t cont_h } static int -migr_res_init(struct migr_resource *res, const char *name, long limit) +migr_res_init(struct migr_res_manager *rmg, int type, const char *name, long limit) { + struct migr_resource *res = &rmg->rmg_resources[type]; int rc; memset(res, 0, sizeof(*res)); + D_INIT_LIST_HEAD(&res->res_waitq); res->res_name = name; + res->res_type = type; res->res_limit = limit; rc = ABT_cond_create(&res->res_cond); @@ -4562,6 +4756,7 @@ migr_res_init(struct migr_resource *res, const char *name, long limit) static void migr_res_fini(struct migr_resource *res) { + D_ASSERT(!res->res_name || d_list_empty(&res->res_waitq)); /* unset or drained */ if (res->res_cond) ABT_cond_free(&res->res_cond); } @@ -4569,23 +4764,47 @@ migr_res_fini(struct migr_resource *res) int obj_migrate_init(void) { - unsigned int ults = MIGR_TGT_ULTS_DEF; + unsigned int obj_units = MIGR_OBJ_CONCUR_DEF; + unsigned int tgt_units = MIGR_TGT_UNITS_DEF; + unsigned int bkt_units; int i; int rc = 0; D_CASSERT(MIGR_TGT_INF_DATA > MIGR_INF_DATA_LWM); D_CASSERT(MIGR_TGT_INF_DATA > MIGR_INF_DATA_HULK); + D_ASSERT(dss_tgt_nr > 0); - d_getenv_uint(ENV_MIGRATE_ULT_CNT, &ults); - if (ults < MIGR_TGT_ULTS_MIN) - ults = MIGR_TGT_ULTS_MIN; - if (ults > MIGR_TGT_ULTS_MAX) - ults = MIGR_TGT_ULTS_MAX; + if (dss_tgt_nr <= MIGR_RES_BUCKETS) { + migr_eng_res.er_bucket_type = MIGR_BUCKET_PRIV; + migr_eng_res.er_bucket_nr = dss_tgt_nr; + migr_eng_res.er_bucket_size = 1; + } else { + migr_eng_res.er_bucket_nr = MIGR_RES_BUCKETS; + migr_eng_res.er_bucket_size = dss_tgt_nr / MIGR_RES_BUCKETS; + if (dss_tgt_nr % MIGR_RES_BUCKETS != 0) + migr_eng_res.er_bucket_type = MIGR_BUCKET_ROTATE; + else + migr_eng_res.er_bucket_type = MIGR_BUCKET_MAP; + } + + rc = d_getenv_uint(ENV_MIGRATE_OBJ_CONCUR, &obj_units); + if (rc == 0) { + if (obj_units < MIGR_OBJ_CONCUR_MIN) + obj_units = MIGR_OBJ_CONCUR_MIN; + if (obj_units > MIGR_OBJ_CONCUR_MAX) + obj_units = MIGR_OBJ_CONCUR_MAX; + } + + bkt_units = obj_units / migr_eng_res.er_bucket_nr; + if (migr_eng_res.er_bucket_type == MIGR_BUCKET_PRIV) + tgt_units = bkt_units; + if (tgt_units > bkt_units) + tgt_units = bkt_units; + if (tgt_units * migr_eng_res.er_bucket_size < bkt_units) + tgt_units = bkt_units / migr_eng_res.er_bucket_size; memset(&migr_eng_res, 0, sizeof(migr_eng_res)); - migr_eng_res.er_max_ults = ults; - D_ASSERT(dss_tgt_nr > 0); D_ALLOC(migr_eng_res.er_rmgs, sizeof(struct migr_res_manager) * dss_tgt_nr); if (!migr_eng_res.er_rmgs) return -DER_NOMEM; @@ -4597,15 +4816,20 @@ obj_migrate_init(void) if (rc != ABT_SUCCESS) D_GOTO(out, rc = dss_abterr2der(rc)); - rc = migr_res_init(&rmg->rmg_resources[MIGR_OBJ], "OBJ", MIGR_TGT_OBJ_ULTS(ults)); + /* NB: only a setset of CONCUR RMGs are actually used */ + rc = migr_res_init(rmg, MIGR_CC, "CONCUR", bkt_units); + if (rc) + D_GOTO(out, rc); + + rc = migr_res_init(rmg, MIGR_OBJ, "OBJ", tgt_units); if (rc) D_GOTO(out, rc); - rc = migr_res_init(&rmg->rmg_resources[MIGR_KEY], "KEY", MIGR_TGT_KEY_ULTS(ults)); + rc = migr_res_init(rmg, MIGR_KEY, "KEY", tgt_units); if (rc) D_GOTO(out, rc); - rc = migr_res_init(&rmg->rmg_resources[MIGR_DATA], "DATA", MIGR_TGT_INF_DATA); + rc = migr_res_init(rmg, MIGR_DATA, "DATA", MIGR_TGT_INF_DATA); if (rc) D_GOTO(out, rc); } From fb8b053848686b4cf83d33ac37b3cb6e8bf283a7 Mon Sep 17 00:00:00 2001 From: Liang Zhen Date: Fri, 13 Feb 2026 17:29:23 +0800 Subject: [PATCH 07/14] DAOS-18487 object: forget to unlock Increase default values Signed-off-by: Liang Zhen --- src/object/srv_obj_migrate.c | 52 ++++++++++++++++++++++++------------ 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/src/object/srv_obj_migrate.c b/src/object/srv_obj_migrate.c index 76341884a64..529f9e832c4 100644 --- a/src/object/srv_obj_migrate.c +++ b/src/object/srv_obj_migrate.c @@ -48,6 +48,7 @@ #define MIGR_INF_DATA_LWM (1 << 28) #define ENV_MIGRATE_OBJ_CONCUR "D_MIGRATE_OBJ_CONCUR" +#define ENV_MIGRATE_KEY_CONCUR "D_MIGRATE_KEY_CONCUR" /* Number of concurrent objects being migrated per engine, consider we have 4 resource * buckets, this number divided by 4 is the per-bucket concurrency. @@ -55,15 +56,22 @@ * Default=1600 means an engine can concurrently rebuild 100 EC(16+P) objects, which * has high enough concurrency and wouldn't cause network resource exhaustion. */ +; + enum { - MIGR_OBJ_CONCUR_MIN = 400, + MIGR_OBJ_CONCUR_MIN = 800, MIGR_OBJ_CONCUR_DEF = 1600, - MIGR_OBJ_CONCUR_MAX = 4000, + MIGR_OBJ_CONCUR_MAX = 3200, }; enum { - MIGR_TGT_UNITS_MIN = 50, - MIGR_TGT_UNITS_DEF = 200, + /* OBJ_CUR + KEY_CUR is less than 1/3 of HW limit, the reason of being conservative + * is because we don't have precise control and some data pattern (fragmented RDMA) + * can lead to peak * workload. + */ + MIGR_KEY_CONCUR_MIN = (1600 - MIGR_OBJ_CONCUR_MIN), + MIGR_KEY_CONCUR_DEF = (4800 - MIGR_OBJ_CONCUR_DEF), + MIGR_KEY_CONCUR_MAX = (6400 - MIGR_OBJ_CONCUR_MAX), }; enum { @@ -2088,6 +2096,8 @@ migrate_res_hold(struct migrate_pool_tls *tls, int res_type, long units, bool *y while (1) { if (tls->mpt_fini) { rc = migrate_pool_tls_get_status(tls); + if (shared_res) + ABT_mutex_lock(rmg->rmg_mutex); D_GOTO(out, rc); } @@ -4765,8 +4775,11 @@ int obj_migrate_init(void) { unsigned int obj_units = MIGR_OBJ_CONCUR_DEF; - unsigned int tgt_units = MIGR_TGT_UNITS_DEF; - unsigned int bkt_units; + unsigned int key_units = MIGR_KEY_CONCUR_DEF; + unsigned int bkt_obj_units; + unsigned int bkt_key_units; + unsigned int tgt_obj_units; + unsigned int tgt_key_units; int i; int rc = 0; @@ -4795,13 +4808,19 @@ obj_migrate_init(void) obj_units = MIGR_OBJ_CONCUR_MAX; } - bkt_units = obj_units / migr_eng_res.er_bucket_nr; - if (migr_eng_res.er_bucket_type == MIGR_BUCKET_PRIV) - tgt_units = bkt_units; - if (tgt_units > bkt_units) - tgt_units = bkt_units; - if (tgt_units * migr_eng_res.er_bucket_size < bkt_units) - tgt_units = bkt_units / migr_eng_res.er_bucket_size; + rc = d_getenv_uint(ENV_MIGRATE_KEY_CONCUR, &key_units); + if (rc == 0) { + if (key_units < MIGR_KEY_CONCUR_MIN) + key_units = MIGR_KEY_CONCUR_MIN; + if (key_units > MIGR_KEY_CONCUR_MAX) + key_units = MIGR_KEY_CONCUR_MAX; + } + + bkt_obj_units = obj_units / migr_eng_res.er_bucket_nr; + bkt_key_units = key_units / migr_eng_res.er_bucket_nr; + + tgt_obj_units = bkt_obj_units / MIN(2, migr_eng_res.er_bucket_size); + tgt_key_units = bkt_key_units / MIN(2, migr_eng_res.er_bucket_size); memset(&migr_eng_res, 0, sizeof(migr_eng_res)); @@ -4816,16 +4835,15 @@ obj_migrate_init(void) if (rc != ABT_SUCCESS) D_GOTO(out, rc = dss_abterr2der(rc)); - /* NB: only a setset of CONCUR RMGs are actually used */ - rc = migr_res_init(rmg, MIGR_CC, "CONCUR", bkt_units); + rc = migr_res_init(rmg, MIGR_CC, "CONCUR", bkt_obj_units); if (rc) D_GOTO(out, rc); - rc = migr_res_init(rmg, MIGR_OBJ, "OBJ", tgt_units); + rc = migr_res_init(rmg, MIGR_OBJ, "OBJ", tgt_obj_units); if (rc) D_GOTO(out, rc); - rc = migr_res_init(rmg, MIGR_KEY, "KEY", tgt_units); + rc = migr_res_init(rmg, MIGR_KEY, "KEY", tgt_key_units); if (rc) D_GOTO(out, rc); From 71814b40e3cb9e8c55cabb42c62587f997d762be Mon Sep 17 00:00:00 2001 From: Liang Zhen Date: Fri, 13 Feb 2026 17:32:16 +0800 Subject: [PATCH 08/14] DAOS-18487 object: fix a typo Signed-off-by: Liang Zhen --- src/object/srv_obj_migrate.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/object/srv_obj_migrate.c b/src/object/srv_obj_migrate.c index 529f9e832c4..1c2e1f01a63 100644 --- a/src/object/srv_obj_migrate.c +++ b/src/object/srv_obj_migrate.c @@ -2097,7 +2097,7 @@ migrate_res_hold(struct migrate_pool_tls *tls, int res_type, long units, bool *y if (tls->mpt_fini) { rc = migrate_pool_tls_get_status(tls); if (shared_res) - ABT_mutex_lock(rmg->rmg_mutex); + ABT_mutex_unlock(rmg->rmg_mutex); D_GOTO(out, rc); } From 135d93e73ecb233b75f6e92014dc02b502e7631c Mon Sep 17 00:00:00 2001 From: Liang Zhen Date: Mon, 16 Feb 2026 17:49:13 +0800 Subject: [PATCH 09/14] DAOS-18487 object: shared resource waiters can miss the wakeup Signed-off-by: Liang Zhen --- src/object/srv_obj_migrate.c | 136 +++++++++++++++++------------------ 1 file changed, 68 insertions(+), 68 deletions(-) diff --git a/src/object/srv_obj_migrate.c b/src/object/srv_obj_migrate.c index 1c2e1f01a63..fa26445c44d 100644 --- a/src/object/srv_obj_migrate.c +++ b/src/object/srv_obj_migrate.c @@ -47,11 +47,23 @@ */ #define MIGR_INF_DATA_LWM (1 << 28) +/* Distribute the global resource into 4 buckets to avoid high lock contention, meanwhile, + * units owned by each bucket can remain the same for engine configuration with different + * number of targets. + */ +#define MIGR_RES_BUCKETS 8 + +enum migr_bucket_type { + MIGR_BUCKET_PRIV, /* private */ + MIGR_BUCKET_MAP, /* shared, N to 1 map */ + MIGR_BUCKET_ROTATE, /* shared, rotating on buckets */ +}; + #define ENV_MIGRATE_OBJ_CONCUR "D_MIGRATE_OBJ_CONCUR" #define ENV_MIGRATE_KEY_CONCUR "D_MIGRATE_KEY_CONCUR" -/* Number of concurrent objects being migrated per engine, consider we have 4 resource - * buckets, this number divided by 4 is the per-bucket concurrency. +/* Number of concurrent objects being migrated per engine, consider we have 8 resource + * buckets, this number divided by 8 is the per-bucket concurrency. * * Default=1600 means an engine can concurrently rebuild 100 EC(16+P) objects, which * has high enough concurrency and wouldn't cause network resource exhaustion. @@ -78,7 +90,6 @@ enum { MIGR_OBJ = 0, MIGR_KEY, MIGR_DATA, - MIGR_CC, /* rebuild concurrency of the entire engine */ MIGR_MAX, }; @@ -87,8 +98,8 @@ struct migr_resource { const char *res_name; /* upper limit of the resource */ long res_limit; - /* resource amount in "unit" */ - long res_units; + /* used resource amount in "unit" */ + long res_used; /* last time logging starveling */ uint64_t res_log_since; /* resource type */ @@ -132,18 +143,6 @@ struct migr_res_manager { struct migr_resource rmg_resources[MIGR_MAX]; }; -/* Distribute the global resource into 4 buckets to avoid high lock contention, meanwhile, - * units owned by each bucket can remain the same for engine configuration with different - * number of targets. - */ -#define MIGR_RES_BUCKETS 4 - -enum migr_bucket_type { - MIGR_BUCKET_PRIV, /* private */ - MIGR_BUCKET_MAP, /* shared, N to 1 map */ - MIGR_BUCKET_ROTATE, /* shared, rotating on buckets */ -}; - struct migr_engine_res { /* type of bucket, see migr_bucket_type */ enum migr_bucket_type er_bucket_type; @@ -2044,7 +2043,7 @@ migr_type2rmg(struct dss_module_info *dmi, int res_type, bool *shared_res) struct migr_res_manager *rmg; int idx; - if (res_type != MIGR_CC || migr_eng_res.er_bucket_type == MIGR_BUCKET_PRIV) { + if (res_type == MIGR_DATA || migr_eng_res.er_bucket_type == MIGR_BUCKET_PRIV) { rmg = &migr_eng_res.er_rmgs[dmi->dmi_tgt_id]; *shared_res = false; return rmg; @@ -2076,9 +2075,6 @@ migrate_res_hold(struct migrate_pool_tls *tls, int res_type, long units, bool *y int rc = 0; D_ASSERT(dmi->dmi_xs_id != 0); - if (res_type == MIGR_OBJ) - res_type = MIGR_CC; /* require the global concurrent permission first */ -again: rmg = migr_type2rmg(dmi, res_type, &shared_res); if (tls->mpt_rmg == NULL && !shared_res) tls->mpt_rmg = rmg; @@ -2102,27 +2098,25 @@ migrate_res_hold(struct migrate_pool_tls *tls, int res_type, long units, bool *y } now = daos_gettime_coarse(); - if (is_hulk && res->res_hulk == 0 && res->res_units < MIGR_INF_DATA_LWM) { + if (is_hulk && res->res_hulk == 0 && res->res_used < MIGR_INF_DATA_LWM) { /* skip the limit check and allow (only) one hulk transfer at a time */ - res->res_units += units; + res->res_used += units; res->res_hulk = 1; break; } else if (!is_hulk && !migr_res_has_starveling(res, now) && - res->res_units + units <= res->res_limit) { - res->res_units += units; + res->res_used + units <= res->res_limit) { + res->res_used += units; break; } - if (waiter.rw_wait_since == 0) - waiter.rw_wait_since = now; - if (!shared_res) { ABT_mutex_lock(rmg->rmg_mutex); locked = true; } res->res_waiters++; + waiter.rw_wait_since = now; d_list_add_tail(&waiter.rw_link, &res->res_waitq); ABT_cond_wait(res->res_cond, rmg->rmg_mutex); @@ -2145,7 +2139,7 @@ migrate_res_hold(struct migrate_pool_tls *tls, int res_type, long units, bool *y DF_RB " " "res=%s units:total:limit=%lu:%lu:%lu waiters:holders=%d:%d, waited=%s, " " obj_ults=%u, key_ults=%u, inf_data=" DF_U64 ")\n", - DP_RB_MPT(tls), res->res_name, units, res->res_units, res->res_limit, + DP_RB_MPT(tls), res->res_name, units, res->res_used, res->res_limit, res->res_waiters, res->res_holders, !!waiter.rw_wait_since ? "yes" : "no", tls->mpt_tgt_obj_ult_cnt, tls->mpt_tgt_dkey_ult_cnt, tls->mpt_inflight_size); @@ -2153,14 +2147,6 @@ migrate_res_hold(struct migrate_pool_tls *tls, int res_type, long units, bool *y ABT_mutex_unlock(rmg->rmg_mutex); shared_res = false; } - - if (res_type == MIGR_CC) { - /* go back and require local thread object resource. - * NB: MIGR_CC and MIGR_OBJ have the same unit. - */ - res_type = MIGR_OBJ; - goto again; - } out: if (yielded && !*yielded) *yielded = locked; @@ -2175,7 +2161,6 @@ migrate_res_release(struct migrate_pool_tls *tls, int res_type, long units) struct migr_resource *res; bool shared_res; -again: rmg = migr_type2rmg(dmi, res_type, &shared_res); D_ASSERT(shared_res || rmg == tls->mpt_rmg); if (shared_res) @@ -2194,8 +2179,8 @@ migrate_res_release(struct migrate_pool_tls *tls, int res_type, long units) tls->mpt_inflight_size -= units; } - D_ASSERT(res->res_units >= units); - res->res_units -= units; + D_ASSERT(res->res_used >= units); + res->res_used -= units; D_ASSERT(res->res_holders > 0); res->res_holders--; @@ -2205,18 +2190,12 @@ migrate_res_release(struct migrate_pool_tls *tls, int res_type, long units) } if (res->res_waiters > 0) - migrate_res_wakeup(res, units); + migrate_res_wakeup(res, res->res_limit - res->res_used); if (shared_res) { ABT_mutex_unlock(rmg->rmg_mutex); shared_res = false; } - - if (res_type == MIGR_OBJ) { - /* NB: MIGR_CC and MIGR_OBJ have the same unit. */ - res_type = MIGR_CC; - goto again; - } } static void @@ -3345,13 +3324,17 @@ struct migrate_stop_arg { unsigned int generation; unsigned int stop_count; ABT_mutex stop_lock; + bool set_fini_only; }; static int migrate_fini_one_ult(void *data) { + struct dss_module_info *dmi = dss_get_module_info(); + struct migr_res_manager *rmg; struct migrate_stop_arg *arg = data; struct migrate_pool_tls *tls; + int i; int rc; tls = migrate_pool_tls_lookup(arg->pool_uuid, arg->version, arg->generation); @@ -3359,20 +3342,26 @@ migrate_fini_one_ult(void *data) return 0; D_ASSERT(dss_get_module_info()->dmi_xs_id != 0); - tls->mpt_fini = 1; + /* This function should be called for twice: + * the first call can set mpt_fini on all xstream, the second call wakeup all ULTs, + * otherwise some UTLs could miss the mpt_fini check because they are waiting on + * other xstreams. + */ + if (arg->set_fini_only) { + tls->mpt_fini = 1; + return 0; + } + D_ASSERT(tls->mpt_fini); ABT_mutex_lock(arg->stop_lock); arg->stop_count++; ABT_mutex_unlock(arg->stop_lock); - if (tls->mpt_rmg) { - struct migr_res_manager *rmg = tls->mpt_rmg; - int i; - - ABT_mutex_lock(rmg->rmg_mutex); - for (i = 0; i < MIGR_MAX; i++) + rmg = &migr_eng_res.er_rmgs[dmi->dmi_tgt_id]; + for (i = 0; i < MIGR_MAX; i++) { + /* private resource has to be processed by ULT on the owner xstream */ + if (migr_eng_res.er_bucket_type == MIGR_BUCKET_PRIV || i == MIGR_DATA) migrate_res_wakeup(&rmg->rmg_resources[i], -1ULL); - ABT_mutex_unlock(rmg->rmg_mutex); } migrate_pool_tls_put(tls); /* lookup */ @@ -3396,7 +3385,8 @@ void ds_migrate_stop(struct ds_pool *pool, unsigned int version, unsigned int generation) { struct migrate_stop_arg arg; - int rc; + int i; + int rc; uuid_copy(arg.pool_uuid, pool->sp_uuid); arg.version = version; @@ -3408,8 +3398,27 @@ ds_migrate_stop(struct ds_pool *pool, unsigned int version, unsigned int generat return; } + /* 1st round, set fini flag */ + arg.set_fini_only = true; + ds_pool_thread_collective(pool->sp_uuid, 0, migrate_fini_one_ult, &arg, 0); + /* wakeup ULTs waiting on shared resources */ + for (i = 0; i < dss_tgt_nr; i++) { + struct migr_res_manager *rmg = &migr_eng_res.er_rmgs[i]; + int j; + + for (j = 0; j < MIGR_MAX; j++) { + if (migr_eng_res.er_bucket_type != MIGR_BUCKET_PRIV && j != MIGR_DATA) { + ABT_mutex_lock(rmg->rmg_mutex); + migrate_res_wakeup(&rmg->rmg_resources[j], -1ULL); + ABT_mutex_unlock(rmg->rmg_mutex); + } + } + } + + /* second round, xstreams wakeup local waiters and finalize other things */ + arg.set_fini_only = false; rc = ds_pool_thread_collective(pool->sp_uuid, 0, migrate_fini_one_ult, &arg, 0); - if (rc) + if (rc != 0) D_ERROR(DF_UUID" migrate stop: %d\n", DP_UUID(pool->sp_uuid), rc); D_ASSERT(atomic_load(&pool->sp_rebuilding) >= arg.stop_count); @@ -4778,8 +4787,6 @@ obj_migrate_init(void) unsigned int key_units = MIGR_KEY_CONCUR_DEF; unsigned int bkt_obj_units; unsigned int bkt_key_units; - unsigned int tgt_obj_units; - unsigned int tgt_key_units; int i; int rc = 0; @@ -4819,9 +4826,6 @@ obj_migrate_init(void) bkt_obj_units = obj_units / migr_eng_res.er_bucket_nr; bkt_key_units = key_units / migr_eng_res.er_bucket_nr; - tgt_obj_units = bkt_obj_units / MIN(2, migr_eng_res.er_bucket_size); - tgt_key_units = bkt_key_units / MIN(2, migr_eng_res.er_bucket_size); - memset(&migr_eng_res, 0, sizeof(migr_eng_res)); D_ALLOC(migr_eng_res.er_rmgs, sizeof(struct migr_res_manager) * dss_tgt_nr); @@ -4835,15 +4839,11 @@ obj_migrate_init(void) if (rc != ABT_SUCCESS) D_GOTO(out, rc = dss_abterr2der(rc)); - rc = migr_res_init(rmg, MIGR_CC, "CONCUR", bkt_obj_units); - if (rc) - D_GOTO(out, rc); - - rc = migr_res_init(rmg, MIGR_OBJ, "OBJ", tgt_obj_units); + rc = migr_res_init(rmg, MIGR_OBJ, "OBJ", bkt_obj_units); if (rc) D_GOTO(out, rc); - rc = migr_res_init(rmg, MIGR_KEY, "KEY", tgt_key_units); + rc = migr_res_init(rmg, MIGR_KEY, "KEY", bkt_key_units); if (rc) D_GOTO(out, rc); From 672a738e2bd8ac0ab84d0db312245da09059cf4f Mon Sep 17 00:00:00 2001 From: Liang Zhen Date: Wed, 18 Feb 2026 23:04:02 +0800 Subject: [PATCH 10/14] DAOS-18487 object: remove the incorrect error handling Signed-off-by: Liang Zhen --- src/object/srv_obj_migrate.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/object/srv_obj_migrate.c b/src/object/srv_obj_migrate.c index fa26445c44d..68b0a71dcc2 100644 --- a/src/object/srv_obj_migrate.c +++ b/src/object/srv_obj_migrate.c @@ -3640,7 +3640,6 @@ migrate_object(daos_unit_oid_t oid, daos_epoch_t eph, daos_epoch_t punched_eph, rc = migrate_res_hold(cont_arg->pool_tls, MIGR_OBJ, obj_arg->ioa_fanout, yielded); if (rc != 0) { DL_ERROR(rc, DF_UUID " enter migrate failed.", DP_UUID(cont_arg->cont_uuid)); - migrate_res_release(cont_arg->pool_tls, MIGR_OBJ, obj_arg->ioa_fanout); goto free; } From 6f7b3ea0a13767e1cfbe09d234be28417570419d Mon Sep 17 00:00:00 2001 From: Liang Zhen Date: Mon, 23 Feb 2026 01:33:30 +0800 Subject: [PATCH 11/14] DAOS-18487 object: code reorgnization Fix a reference leak in migrate_fini_one_ult() Signed-off-by: Liang Zhen --- src/object/srv_internal.h | 6 +- src/object/srv_obj_migrate.c | 487 ++++++++++++++++++----------------- 2 files changed, 258 insertions(+), 235 deletions(-) diff --git a/src/object/srv_internal.h b/src/object/srv_internal.h index 3d3fbc15447..4c6d286eb25 100644 --- a/src/object/srv_internal.h +++ b/src/object/srv_internal.h @@ -1,6 +1,6 @@ /** * (C) Copyright 2016-2024 Intel Corporation. - * (C) Copyright 2025 Hewlett Packard Enterprise Development LP + * (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -26,7 +26,7 @@ extern struct dss_module_key obj_module_key; -struct migr_res_manager; +struct migr_resource; /* Per pool attached to the migrate tls(per xstream) */ struct migrate_pool_tls { @@ -80,7 +80,7 @@ struct migrate_pool_tls { /* The current in-flight data size */ uint64_t mpt_inflight_size; - struct migr_res_manager *mpt_rmg; + struct migr_resource *mpt_data_res; /* reference count for the structure */ uint64_t mpt_refcount; diff --git a/src/object/srv_obj_migrate.c b/src/object/srv_obj_migrate.c index 68b0a71dcc2..ddd5d0759ed 100644 --- a/src/object/srv_obj_migrate.c +++ b/src/object/srv_obj_migrate.c @@ -31,36 +31,14 @@ #pragma GCC diagnostic ignored "-Wframe-larger-than=" #endif -/* Max in-flight transfer size per xstream */ -/* Set the total in-flight size to be 1/3 of MAX DMA size for - * the moment, will adjust it later if needed. - */ -#define MIGR_TGT_INF_DATA (320 << 20) - -/* Threshold for very large transfers. - * This may exceed the MIGR_TGT_INF_DATA limit to prevent starvation. - * Only one such transfer is allowed at a time. - */ -#define MIGR_INF_DATA_HULK (1 << 28) - -/* Low water mark for DMA buffer usage, hulk transfer is allowed in this case. - */ -#define MIGR_INF_DATA_LWM (1 << 28) - -/* Distribute the global resource into 4 buckets to avoid high lock contention, meanwhile, - * units owned by each bucket can remain the same for engine configuration with different - * number of targets. - */ -#define MIGR_RES_BUCKETS 8 - -enum migr_bucket_type { - MIGR_BUCKET_PRIV, /* private */ - MIGR_BUCKET_MAP, /* shared, N to 1 map */ - MIGR_BUCKET_ROTATE, /* shared, rotating on buckets */ +enum migr_res_type { + MIGR_OBJ = 0, + MIGR_KEY, + MIGR_DATA, + MIGR_MAX, }; -#define ENV_MIGRATE_OBJ_CONCUR "D_MIGRATE_OBJ_CONCUR" -#define ENV_MIGRATE_KEY_CONCUR "D_MIGRATE_KEY_CONCUR" +#define ENV_MIGRATE_OBJ_CONCUR "D_MIGRATE_OBJ_CONCUR" /* Number of concurrent objects being migrated per engine, consider we have 8 resource * buckets, this number divided by 8 is the per-bucket concurrency. @@ -68,54 +46,83 @@ enum migr_bucket_type { * Default=1600 means an engine can concurrently rebuild 100 EC(16+P) objects, which * has high enough concurrency and wouldn't cause network resource exhaustion. */ -; - enum { MIGR_OBJ_CONCUR_MIN = 800, MIGR_OBJ_CONCUR_DEF = 1600, MIGR_OBJ_CONCUR_MAX = 3200, }; +#define ENV_MIGRATE_KEY_CONCUR "D_MIGRATE_KEY_CONCUR" + +/* OBJ_CUR + KEY_CUR is less than 1/3 of HW limit, the reason of being conservative + * is because we don't have precise control and some data pattern (fragmented RDMA) + * can lead to peak * workload. + */ enum { - /* OBJ_CUR + KEY_CUR is less than 1/3 of HW limit, the reason of being conservative - * is because we don't have precise control and some data pattern (fragmented RDMA) - * can lead to peak * workload. - */ MIGR_KEY_CONCUR_MIN = (1600 - MIGR_OBJ_CONCUR_MIN), MIGR_KEY_CONCUR_DEF = (4800 - MIGR_OBJ_CONCUR_DEF), MIGR_KEY_CONCUR_MAX = (6400 - MIGR_OBJ_CONCUR_MAX), }; -enum { - MIGR_OBJ = 0, - MIGR_KEY, - MIGR_DATA, - MIGR_MAX, -}; +/* Max in-flight transfer size per xstream */ +/* Set the total in-flight size to be 1/3 of MAX DMA size for + * the moment, will adjust it later if needed. + */ +#define MIGR_TGT_INF_DATA (320 << 20) + +/* Threshold for very large transfers. + * This may exceed the MIGR_TGT_INF_DATA limit to prevent starvation. + * Only one such transfer is allowed at a time. + */ +#define MIGR_INF_DATA_HULK (1 << 28) + +/* Low water mark for DMA buffer usage, hulk transfer is allowed in this case. + */ +#define MIGR_INF_DATA_LWM (1 << 28) + +/* Distribute the global resource into 4 buckets to avoid high lock contention, meanwhile, + * units owned by each bucket can remain the same for engine configuration with different + * number of targets. + */ + +struct migr_res_manager; /* resource consumed by migration */ struct migr_resource { - const char *res_name; + /* back reference to res_manager */ + struct migr_res_manager *res_rmg; /* upper limit of the resource */ - long res_limit; + long res_limit; /* used resource amount in "unit" */ - long res_used; + long res_used; /* last time logging starveling */ - uint64_t res_log_since; - /* resource type */ - int res_type; + uint64_t res_log_since; /* active ULTs */ - int res_holders; + int res_holders; /* number of waiters on this resource */ - int res_waiters; - /* Only used by MIGR_DATA, it always allows exactly one ULT to use unbounded - * buffer for super large value (rare). - */ - int res_hulk; + int res_waiters; /* list head of all waiters */ - d_list_t res_waitq; + d_list_t res_waitq; /* ABT_cond for waiters */ - ABT_cond res_cond; + ABT_cond res_cond; + /* serialization */ + ABT_mutex res_mutex; + /* + * Begin: members specific for MIGR_DATA + */ + /* total number of occurred memory errors */ + unsigned long res_mem_err; + /* waited more than 10 minutes, serious errors */ + unsigned long res_mem_ser_err; + /* number of revived ULTs after running into memory error */ + unsigned long res_mem_revived; + /* number of waiting ULTs */ + unsigned long res_mem_waiting; + /* allows exactly one ULT to use unbounded buffer for super large value (rare). */ + int res_hulk; + /* + * End: members for MIGR_DATA + */ }; /* anchor point of resource waiter */ @@ -129,34 +136,32 @@ struct migr_res_waiter { uint64_t rw_wait_since; }; -/* migration resources manager */ -struct migr_res_manager { - ABT_mutex rmg_mutex; - /* total number of occurred memory errors */ - unsigned long rmg_mem_err; - /* waited more than 10 minutes, serious errors */ - unsigned long rmg_mem_ser_err; - /* number of revived ULTs after running into memory error */ - unsigned long rmg_mem_revived; - /* number of waiting ULTs */ - unsigned long rmg_mem_waiting; - struct migr_resource rmg_resources[MIGR_MAX]; +#define MIGR_RES_BUCKETS 8 + +enum migr_bucket_type { + MIGR_BUCKET_PRIV, /* private */ + MIGR_BUCKET_MAP, /* shared, N to 1 map */ + MIGR_BUCKET_ROTATE, /* shared, rotating on buckets */ }; -struct migr_engine_res { +/* migration resources manager */ +struct migr_res_manager { + enum migr_res_type rmg_res_type; /* type of bucket, see migr_bucket_type */ - enum migr_bucket_type er_bucket_type; + enum migr_bucket_type rmg_bkt_type; /* number of resource sharing buckets */ - int er_bucket_nr; + int rmg_bkt_nr; /* number of targets sharing the same resource bucket */ - int er_bucket_size; + int rmg_bkt_size; /* round-robin bucket selector */ - ATOMIC int er_bucket_selector; - /* dss_tgt_nr resource managers */ - struct migr_res_manager *er_rmgs; + ATOMIC int rmg_bkt_selector; + /* resource name */ + char *rmg_name; + /* all resource buckets */ + struct migr_resource *rmg_res_buckets; }; -static struct migr_engine_res migr_eng_res; +static struct migr_res_manager migr_res_managers[MIGR_MAX]; struct migrate_one { struct migrate_pool_tls *mo_tls; @@ -762,7 +767,7 @@ mrone_obj_fetch_internal(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_ daos_iod_t *iods, int iod_num, daos_epoch_t eph, uint32_t flags, d_iov_t *csum_iov_fetch, struct migrate_pool_tls *tls) { - struct migr_res_manager *rmg = tls->mpt_rmg; + struct migr_resource *res = tls->mpt_data_res; uint32_t *extra_arg = NULL; uint64_t then = 0; uint64_t now; @@ -775,7 +780,7 @@ mrone_obj_fetch_internal(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_ mrone->mo_epoch); extra_arg = (uint32_t *)mrone->mo_epoch; } - D_ASSERT(rmg != NULL); + D_ASSERT(res != NULL); retry: rc = dsc_obj_fetch(oh, eph, &mrone->mo_dkey, iod_num, iods, sgls, NULL, flags, extra_arg, csum_iov_fetch); @@ -799,8 +804,8 @@ mrone_obj_fetch_internal(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_ now = daos_gettime_coarse(); if (wait == MEM_NO_WAIT) { wait = MEM_WAIT; - rmg->rmg_mem_waiting++; - rmg->rmg_mem_err++; + res->res_mem_waiting++; + res->res_mem_err++; then = now; } /* sleep a few seconds before retry, give other layers a chance to @@ -809,19 +814,19 @@ mrone_obj_fetch_internal(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_ dss_sleep((10 + rand() % 20) * 1000); if (wait != MEM_LONG_WAIT && now - then >= 600) { wait = MEM_LONG_WAIT; /* flagged as long waiter */ - rmg->rmg_mem_ser_err++; /* counted as serious error */ + res->res_mem_ser_err++; /* counted as serious error */ DL_ERROR(rc, DF_RB " waited for 10 minutes, total memory errors: %lu/%lu," " total waiters: %lu, total revived: %lu\n", - DP_RB_MRO(mrone), rmg->rmg_mem_ser_err, rmg->rmg_mem_err, - rmg->rmg_mem_waiting, rmg->rmg_mem_revived); + DP_RB_MRO(mrone), res->res_mem_ser_err, res->res_mem_err, + res->res_mem_waiting, res->res_mem_revived); } D_GOTO(retry, rc); } if (wait != MEM_NO_WAIT) { - D_ASSERT(rmg->rmg_mem_waiting > 0); - rmg->rmg_mem_revived++; - rmg->rmg_mem_waiting--; + D_ASSERT(res->res_mem_waiting > 0); + res->res_mem_revived++; + res->res_mem_waiting--; } return rc; } @@ -1979,10 +1984,16 @@ migrate_one_destroy(struct migrate_one *mrone) D_FREE(mrone); } -static bool +static inline bool +migr_res_is_private(struct migr_resource *res) +{ + return res->res_rmg->rmg_bkt_type == MIGR_BUCKET_PRIV; +} + +static inline bool migr_res_is_hulk(struct migr_resource *res, long units) { - return res->res_type == MIGR_DATA && units >= MIGR_INF_DATA_HULK; + return res->res_rmg->rmg_res_type == MIGR_DATA && units >= MIGR_INF_DATA_HULK; } static bool @@ -2008,7 +2019,7 @@ migr_res_has_starveling(struct migr_resource *res, uint64_t now) D_DEBUG(DB_REBUILD, DF_RB " starving: res=%s, since=%lu, ask:limit=%lu:%lu, holders:waiters=%d:%d\n", - DP_RB_MPT(waiter->rw_tls), res->res_name, waiter->rw_wait_since, + DP_RB_MPT(waiter->rw_tls), res->res_rmg->rmg_name, waiter->rw_wait_since, waiter->rw_units, res->res_limit, res->res_holders, res->res_waiters); res->res_log_since = now; } @@ -2037,63 +2048,66 @@ migrate_res_wakeup(struct migr_resource *res, uint64_t units) } } -static struct migr_res_manager * -migr_type2rmg(struct dss_module_info *dmi, int res_type, bool *shared_res) +static struct migr_resource * +migr_type2res(struct migrate_pool_tls *tls, int res_type) { + struct dss_module_info *dmi = dss_get_module_info(); struct migr_res_manager *rmg; + struct migr_resource *res; int idx; - if (res_type == MIGR_DATA || migr_eng_res.er_bucket_type == MIGR_BUCKET_PRIV) { - rmg = &migr_eng_res.er_rmgs[dmi->dmi_tgt_id]; - *shared_res = false; - return rmg; + D_ASSERT(dmi->dmi_xs_id != 0); + + rmg = &migr_res_managers[res_type]; + if (rmg->rmg_bkt_type == MIGR_BUCKET_PRIV) { + D_ASSERT(dmi->dmi_tgt_id < rmg->rmg_bkt_nr); + + res = &rmg->rmg_res_buckets[dmi->dmi_tgt_id]; + if (res_type == MIGR_DATA) { + if (tls->mpt_data_res == NULL) + tls->mpt_data_res = res; + else + D_ASSERT(tls->mpt_data_res == res); + } + return res; } + D_ASSERT(res_type != MIGR_DATA); - if (migr_eng_res.er_bucket_type == MIGR_BUCKET_MAP) { - idx = dmi->dmi_tgt_id / migr_eng_res.er_bucket_size; - D_ASSERT(idx < migr_eng_res.er_bucket_nr); + if (rmg->rmg_bkt_type == MIGR_BUCKET_MAP) { + idx = dmi->dmi_tgt_id / rmg->rmg_bkt_size; + D_ASSERT(idx < rmg->rmg_bkt_nr); } else /* rotation */ { - idx = atomic_fetch_add(&migr_eng_res.er_bucket_selector, 1); - idx %= migr_eng_res.er_bucket_nr; + idx = atomic_fetch_add(&rmg->rmg_bkt_selector, 1); + idx %= rmg->rmg_bkt_nr; } - rmg = &migr_eng_res.er_rmgs[idx]; - *shared_res = true; - return rmg; + return &rmg->rmg_res_buckets[idx]; } static int migrate_res_hold(struct migrate_pool_tls *tls, int res_type, long units, bool *yielded) { - struct dss_module_info *dmi = dss_get_module_info(); - struct migr_res_manager *rmg; struct migr_resource *res; struct migr_res_waiter waiter; uint64_t now; bool is_hulk; - bool shared_res; bool locked = false; int rc = 0; - D_ASSERT(dmi->dmi_xs_id != 0); - rmg = migr_type2rmg(dmi, res_type, &shared_res); - if (tls->mpt_rmg == NULL && !shared_res) - tls->mpt_rmg = rmg; - - if (shared_res) { - ABT_mutex_lock(rmg->rmg_mutex); + res = migr_type2res(tls, res_type); + if (!migr_res_is_private(res)) { + ABT_mutex_lock(res->res_mutex); locked = true; } waiter.rw_tls = tls; waiter.rw_units = units; waiter.rw_wait_since = 0; - res = &rmg->rmg_resources[res_type]; is_hulk = migr_res_is_hulk(res, units); while (1) { if (tls->mpt_fini) { rc = migrate_pool_tls_get_status(tls); - if (shared_res) - ABT_mutex_unlock(rmg->rmg_mutex); + if (!migr_res_is_private(res)) + ABT_mutex_unlock(res->res_mutex); D_GOTO(out, rc); } @@ -2110,8 +2124,8 @@ migrate_res_hold(struct migrate_pool_tls *tls, int res_type, long units, bool *y break; } - if (!shared_res) { - ABT_mutex_lock(rmg->rmg_mutex); + if (migr_res_is_private(res)) { + ABT_mutex_lock(res->res_mutex); locked = true; } @@ -2119,11 +2133,11 @@ migrate_res_hold(struct migrate_pool_tls *tls, int res_type, long units, bool *y waiter.rw_wait_since = now; d_list_add_tail(&waiter.rw_link, &res->res_waitq); - ABT_cond_wait(res->res_cond, rmg->rmg_mutex); + ABT_cond_wait(res->res_cond, res->res_mutex); D_ASSERT(d_list_empty(&waiter.rw_link)); - if (!shared_res) - ABT_mutex_unlock(rmg->rmg_mutex); + if (migr_res_is_private(res)) + ABT_mutex_unlock(res->res_mutex); } res->res_holders++; @@ -2139,14 +2153,12 @@ migrate_res_hold(struct migrate_pool_tls *tls, int res_type, long units, bool *y DF_RB " " "res=%s units:total:limit=%lu:%lu:%lu waiters:holders=%d:%d, waited=%s, " " obj_ults=%u, key_ults=%u, inf_data=" DF_U64 ")\n", - DP_RB_MPT(tls), res->res_name, units, res->res_used, res->res_limit, + DP_RB_MPT(tls), res->res_rmg->rmg_name, units, res->res_used, res->res_limit, res->res_waiters, res->res_holders, !!waiter.rw_wait_since ? "yes" : "no", tls->mpt_tgt_obj_ult_cnt, tls->mpt_tgt_dkey_ult_cnt, tls->mpt_inflight_size); - if (shared_res) { - ABT_mutex_unlock(rmg->rmg_mutex); - shared_res = false; - } + if (!migr_res_is_private(res)) + ABT_mutex_unlock(res->res_mutex); out: if (yielded && !*yielded) *yielded = locked; @@ -2156,17 +2168,11 @@ migrate_res_hold(struct migrate_pool_tls *tls, int res_type, long units, bool *y static void migrate_res_release(struct migrate_pool_tls *tls, int res_type, long units) { - struct dss_module_info *dmi = dss_get_module_info(); - struct migr_res_manager *rmg; - struct migr_resource *res; - bool shared_res; - - rmg = migr_type2rmg(dmi, res_type, &shared_res); - D_ASSERT(shared_res || rmg == tls->mpt_rmg); - if (shared_res) - ABT_mutex_lock(rmg->rmg_mutex); + struct migr_resource *res; - res = &rmg->rmg_resources[res_type]; + res = migr_type2res(tls, res_type); + if (!migr_res_is_private(res)) + ABT_mutex_lock(res->res_mutex); if (res_type == MIGR_OBJ) { D_ASSERT(tls->mpt_tgt_obj_ult_cnt > 0); @@ -2192,10 +2198,8 @@ migrate_res_release(struct migrate_pool_tls *tls, int res_type, long units) if (res->res_waiters > 0) migrate_res_wakeup(res, res->res_limit - res->res_used); - if (shared_res) { - ABT_mutex_unlock(rmg->rmg_mutex); - shared_res = false; - } + if (!migr_res_is_private(res)) + ABT_mutex_unlock(res->res_mutex); } static void @@ -3324,17 +3328,19 @@ struct migrate_stop_arg { unsigned int generation; unsigned int stop_count; ABT_mutex stop_lock; - bool set_fini_only; }; static int migrate_fini_one_ult(void *data) { struct dss_module_info *dmi = dss_get_module_info(); - struct migr_res_manager *rmg; struct migrate_stop_arg *arg = data; struct migrate_pool_tls *tls; + struct migr_res_manager *rmg; + struct migr_resource *res; + bool last_one; int i; + int j; int rc; tls = migrate_pool_tls_lookup(arg->pool_uuid, arg->version, arg->generation); @@ -3347,21 +3353,32 @@ migrate_fini_one_ult(void *data) * otherwise some UTLs could miss the mpt_fini check because they are waiting on * other xstreams. */ - if (arg->set_fini_only) { - tls->mpt_fini = 1; - return 0; - } - D_ASSERT(tls->mpt_fini); + tls->mpt_fini = 1; ABT_mutex_lock(arg->stop_lock); arg->stop_count++; + last_one = (arg->stop_count == dss_tgt_nr); ABT_mutex_unlock(arg->stop_lock); - rmg = &migr_eng_res.er_rmgs[dmi->dmi_tgt_id]; for (i = 0; i < MIGR_MAX; i++) { + rmg = &migr_res_managers[i]; + /* private resource has to be processed by ULT on the owner xstream */ - if (migr_eng_res.er_bucket_type == MIGR_BUCKET_PRIV || i == MIGR_DATA) - migrate_res_wakeup(&rmg->rmg_resources[i], -1ULL); + if (rmg->rmg_bkt_type == MIGR_BUCKET_PRIV) { + /* locking is not required by private resource */ + migrate_res_wakeup(&rmg->rmg_res_buckets[dmi->dmi_tgt_id], -1ULL); + + } else if (last_one) { /* shared resource */ + /* on behalf of all target xstreams, wakeup ULTs for shared resources. */ + for (j = 0; j < rmg->rmg_bkt_nr; j++) { + res = &rmg->rmg_res_buckets[j]; + + /* locking is required by shared resource */ + ABT_mutex_lock(res->res_mutex); + migrate_res_wakeup(res, -1ULL); + ABT_mutex_unlock(res->res_mutex); + } + } } migrate_pool_tls_put(tls); /* lookup */ @@ -3385,7 +3402,6 @@ void ds_migrate_stop(struct ds_pool *pool, unsigned int version, unsigned int generation) { struct migrate_stop_arg arg; - int i; int rc; uuid_copy(arg.pool_uuid, pool->sp_uuid); @@ -3398,25 +3414,6 @@ ds_migrate_stop(struct ds_pool *pool, unsigned int version, unsigned int generat return; } - /* 1st round, set fini flag */ - arg.set_fini_only = true; - ds_pool_thread_collective(pool->sp_uuid, 0, migrate_fini_one_ult, &arg, 0); - /* wakeup ULTs waiting on shared resources */ - for (i = 0; i < dss_tgt_nr; i++) { - struct migr_res_manager *rmg = &migr_eng_res.er_rmgs[i]; - int j; - - for (j = 0; j < MIGR_MAX; j++) { - if (migr_eng_res.er_bucket_type != MIGR_BUCKET_PRIV && j != MIGR_DATA) { - ABT_mutex_lock(rmg->rmg_mutex); - migrate_res_wakeup(&rmg->rmg_resources[j], -1ULL); - ABT_mutex_unlock(rmg->rmg_mutex); - } - } - } - - /* second round, xstreams wakeup local waiters and finalize other things */ - arg.set_fini_only = false; rc = ds_pool_thread_collective(pool->sp_uuid, 0, migrate_fini_one_ult, &arg, 0); if (rc != 0) D_ERROR(DF_UUID" migrate stop: %d\n", DP_UUID(pool->sp_uuid), rc); @@ -4756,17 +4753,17 @@ ds_object_migrate_send(struct ds_pool *pool, uuid_t pool_hdl_uuid, uuid_t cont_h } static int -migr_res_init(struct migr_res_manager *rmg, int type, const char *name, long limit) +migr_res_init(struct migr_resource *res, struct migr_res_manager *rmg, long limit) { - struct migr_resource *res = &rmg->rmg_resources[type]; int rc; memset(res, 0, sizeof(*res)); D_INIT_LIST_HEAD(&res->res_waitq); - res->res_name = name; - res->res_type = type; + res->res_rmg = rmg; res->res_limit = limit; rc = ABT_cond_create(&res->res_cond); + if (rc == 0) + rc = ABT_mutex_create(&res->res_mutex); return (rc != ABT_SUCCESS) ? dss_abterr2der(rc) : 0; } @@ -4774,38 +4771,93 @@ migr_res_init(struct migr_res_manager *rmg, int type, const char *name, long lim static void migr_res_fini(struct migr_resource *res) { - D_ASSERT(!res->res_name || d_list_empty(&res->res_waitq)); /* unset or drained */ + D_ASSERT(!res->res_rmg || d_list_empty(&res->res_waitq)); /* unset or drained */ if (res->res_cond) ABT_cond_free(&res->res_cond); + if (res->res_mutex) + ABT_mutex_free(&res->res_mutex); +} + +static int +migr_rmg_init(int type, int concur_max, uint64_t units) +{ + struct migr_res_manager *rmg; + char *name; + int bkt_nr; + int bkt_size; + int bkt_type; + int i; + int rc; + + if (type == MIGR_OBJ) { + name = "OBJ"; + } else if (type == MIGR_KEY) { + name = "KEY"; + } else if (type == MIGR_DATA) { + name = "DATA"; + } else { + name = "UNKNOWN"; + D_GOTO(failed, rc = -DER_INVAL); + } + rmg = &migr_res_managers[type]; + + if (concur_max >= dss_tgt_nr) { /* concurrency can't be higher than target number */ + bkt_nr = dss_tgt_nr; + bkt_type = MIGR_BUCKET_PRIV; + bkt_size = 1; + } else { + bkt_nr = concur_max; + bkt_size = dss_tgt_nr / concur_max; + bkt_type = (dss_tgt_nr % concur_max == 0) ? MIGR_BUCKET_MAP : MIGR_BUCKET_ROTATE; + } + + D_ALLOC(rmg->rmg_res_buckets, bkt_nr * sizeof(*rmg->rmg_res_buckets)); + if (!rmg->rmg_res_buckets) + D_GOTO(failed, rc = -DER_NOMEM); + + rmg->rmg_name = name; + rmg->rmg_res_type = type; + rmg->rmg_bkt_nr = bkt_nr; + rmg->rmg_bkt_type = bkt_type; + rmg->rmg_bkt_size = bkt_size; + + for (i = 0; i < bkt_nr; i++) { + rc = migr_res_init(&rmg->rmg_res_buckets[i], rmg, units / bkt_nr); + if (rc) + D_GOTO(failed, rc); + } + return 0; +failed: + D_ERROR("Failed to initialize resource %s, error=%d\n", name, rc); + return rc; +} + +static void +migr_rmg_fini(int type) +{ + struct migr_res_manager *rmg; + int i; + + rmg = &migr_res_managers[type]; + if (!rmg->rmg_res_buckets) + return; + + for (i = 0; i < rmg->rmg_bkt_nr; i++) + migr_res_fini(&rmg->rmg_res_buckets[i]); + memset(rmg, 0, sizeof(*rmg)); } int obj_migrate_init(void) { - unsigned int obj_units = MIGR_OBJ_CONCUR_DEF; - unsigned int key_units = MIGR_KEY_CONCUR_DEF; - unsigned int bkt_obj_units; - unsigned int bkt_key_units; - int i; - int rc = 0; + unsigned obj_units = MIGR_OBJ_CONCUR_DEF; + unsigned key_units = MIGR_KEY_CONCUR_DEF; + int rc; D_CASSERT(MIGR_TGT_INF_DATA > MIGR_INF_DATA_LWM); D_CASSERT(MIGR_TGT_INF_DATA > MIGR_INF_DATA_HULK); D_ASSERT(dss_tgt_nr > 0); - if (dss_tgt_nr <= MIGR_RES_BUCKETS) { - migr_eng_res.er_bucket_type = MIGR_BUCKET_PRIV; - migr_eng_res.er_bucket_nr = dss_tgt_nr; - migr_eng_res.er_bucket_size = 1; - } else { - migr_eng_res.er_bucket_nr = MIGR_RES_BUCKETS; - migr_eng_res.er_bucket_size = dss_tgt_nr / MIGR_RES_BUCKETS; - if (dss_tgt_nr % MIGR_RES_BUCKETS != 0) - migr_eng_res.er_bucket_type = MIGR_BUCKET_ROTATE; - else - migr_eng_res.er_bucket_type = MIGR_BUCKET_MAP; - } - rc = d_getenv_uint(ENV_MIGRATE_OBJ_CONCUR, &obj_units); if (rc == 0) { if (obj_units < MIGR_OBJ_CONCUR_MIN) @@ -4813,6 +4865,9 @@ obj_migrate_init(void) if (obj_units > MIGR_OBJ_CONCUR_MAX) obj_units = MIGR_OBJ_CONCUR_MAX; } + rc = migr_rmg_init(MIGR_OBJ, MIGR_RES_BUCKETS, obj_units); + if (rc) + D_GOTO(out, rc); rc = d_getenv_uint(ENV_MIGRATE_KEY_CONCUR, &key_units); if (rc == 0) { @@ -4821,35 +4876,14 @@ obj_migrate_init(void) if (key_units > MIGR_KEY_CONCUR_MAX) key_units = MIGR_KEY_CONCUR_MAX; } + rc = migr_rmg_init(MIGR_KEY, MIGR_RES_BUCKETS, key_units); + if (rc) + D_GOTO(out, rc); - bkt_obj_units = obj_units / migr_eng_res.er_bucket_nr; - bkt_key_units = key_units / migr_eng_res.er_bucket_nr; - - memset(&migr_eng_res, 0, sizeof(migr_eng_res)); - - D_ALLOC(migr_eng_res.er_rmgs, sizeof(struct migr_res_manager) * dss_tgt_nr); - if (!migr_eng_res.er_rmgs) - return -DER_NOMEM; - - for (i = 0; i < dss_tgt_nr; i++) { - struct migr_res_manager *rmg = &migr_eng_res.er_rmgs[i]; - - rc = ABT_mutex_create(&rmg->rmg_mutex); - if (rc != ABT_SUCCESS) - D_GOTO(out, rc = dss_abterr2der(rc)); - - rc = migr_res_init(rmg, MIGR_OBJ, "OBJ", bkt_obj_units); - if (rc) - D_GOTO(out, rc); - - rc = migr_res_init(rmg, MIGR_KEY, "KEY", bkt_key_units); - if (rc) - D_GOTO(out, rc); + rc = migr_rmg_init(MIGR_DATA, dss_tgt_nr, MIGR_TGT_INF_DATA * dss_tgt_nr); + if (rc) + D_GOTO(out, rc); - rc = migr_res_init(rmg, MIGR_DATA, "DATA", MIGR_TGT_INF_DATA); - if (rc) - D_GOTO(out, rc); - } return 0; out: obj_migrate_fini(); @@ -4860,18 +4894,7 @@ void obj_migrate_fini(void) { int i; - int j; - - if (migr_eng_res.er_rmgs) { - for (i = 0; i < dss_tgt_nr; i++) { - struct migr_res_manager *rmg = &migr_eng_res.er_rmgs[i]; - for (j = 0; j < MIGR_MAX; j++) - migr_res_fini(&rmg->rmg_resources[j]); - if (rmg->rmg_mutex) - ABT_mutex_free(&rmg->rmg_mutex); - } - D_FREE(migr_eng_res.er_rmgs); - } - memset(&migr_eng_res, 0, sizeof(migr_eng_res)); + for (i = 0; i < MIGR_MAX; i++) + migr_rmg_fini(i); } From 3d242d2fa1839ba694f3811b84d4af6a600a90d9 Mon Sep 17 00:00:00 2001 From: Liang Zhen Date: Mon, 23 Feb 2026 21:53:44 +0800 Subject: [PATCH 12/14] DAOS-18487 object: code cleanup Signed-off-by: Liang Zhen --- src/object/srv_obj_migrate.c | 69 +++++++++++++++++++----------------- 1 file changed, 36 insertions(+), 33 deletions(-) diff --git a/src/object/srv_obj_migrate.c b/src/object/srv_obj_migrate.c index 53eb2a9b14a..22d0ebba0bf 100644 --- a/src/object/srv_obj_migrate.c +++ b/src/object/srv_obj_migrate.c @@ -80,11 +80,6 @@ enum { */ #define MIGR_INF_DATA_LWM (1 << 28) -/* Distribute the global resource into 4 buckets to avoid high lock contention, meanwhile, - * units owned by each bucket can remain the same for engine configuration with different - * number of targets. - */ - struct migr_res_manager; /* resource consumed by migration */ @@ -107,22 +102,25 @@ struct migr_resource { ABT_cond res_cond; /* serialization */ ABT_mutex res_mutex; - /* - * Begin: members specific for MIGR_DATA - */ - /* total number of occurred memory errors */ - unsigned long res_mem_err; - /* waited more than 10 minutes, serious errors */ - unsigned long res_mem_ser_err; - /* number of revived ULTs after running into memory error */ - unsigned long res_mem_revived; - /* number of waiting ULTs */ - unsigned long res_mem_waiting; - /* allows exactly one ULT to use unbounded buffer for super large value (rare). */ - int res_hulk; - /* - * End: members for MIGR_DATA - */ + /* members for specific resource type */ + union { + /* MIGR_DATA only */ + struct { + /* total number of occurred memory errors */ + unsigned long mem_err; + /* waited more than 10 minutes, serious errors */ + unsigned long mem_ser_err; + /* number of revived ULTs after running into memory error */ + unsigned long mem_revived; + /* number of waiting ULTs */ + unsigned long mem_waiting; + /* allows exactly one ULT to use unbounded buffer for super large + * value (rare). + */ + int mem_hulk; + } res_data; + /* may add other members for MIGR_OBJ and MIGR_KEY */ + }; }; /* anchor point of resource waiter */ @@ -136,6 +134,10 @@ struct migr_res_waiter { uint64_t rw_wait_since; }; +/* Distribute the global resource into 8 buckets to avoid high lock contention, meanwhile, + * units owned by each bucket can remain the same for engine configuration with different + * number of targets. + */ #define MIGR_RES_BUCKETS 8 enum migr_bucket_type { @@ -161,6 +163,7 @@ struct migr_res_manager { struct migr_resource *rmg_res_buckets; }; +/* per-engine resources */ static struct migr_res_manager migr_res_managers[MIGR_MAX]; struct migrate_one { @@ -804,8 +807,8 @@ mrone_obj_fetch_internal(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_ now = daos_gettime_coarse(); if (wait == MEM_NO_WAIT) { wait = MEM_WAIT; - res->res_mem_waiting++; - res->res_mem_err++; + res->res_data.mem_waiting++; + res->res_data.mem_err++; then = now; } /* sleep a few seconds before retry, give other layers a chance to @@ -814,19 +817,19 @@ mrone_obj_fetch_internal(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_ dss_sleep((10 + rand() % 20) * 1000); if (wait != MEM_LONG_WAIT && now - then >= 600) { wait = MEM_LONG_WAIT; /* flagged as long waiter */ - res->res_mem_ser_err++; /* counted as serious error */ + res->res_data.mem_ser_err++; /* counted as serious error */ DL_ERROR(rc, DF_RB " waited for 10 minutes, total memory errors: %lu/%lu," " total waiters: %lu, total revived: %lu\n", - DP_RB_MRO(mrone), res->res_mem_ser_err, res->res_mem_err, - res->res_mem_waiting, res->res_mem_revived); + DP_RB_MRO(mrone), res->res_data.mem_ser_err, res->res_data.mem_err, + res->res_data.mem_waiting, res->res_data.mem_revived); } D_GOTO(retry, rc); } if (wait != MEM_NO_WAIT) { - D_ASSERT(res->res_mem_waiting > 0); - res->res_mem_revived++; - res->res_mem_waiting--; + D_ASSERT(res->res_data.mem_waiting > 0); + res->res_data.mem_revived++; + res->res_data.mem_waiting--; } return rc; } @@ -2110,10 +2113,10 @@ migrate_res_hold(struct migrate_pool_tls *tls, int res_type, long units, bool *y } now = daos_gettime_coarse(); - if (is_hulk && res->res_hulk == 0 && res->res_used < MIGR_INF_DATA_LWM) { + if (is_hulk && res->res_data.mem_hulk == 0 && res->res_used < MIGR_INF_DATA_LWM) { /* skip the limit check and allow (only) one hulk transfer at a time */ res->res_used += units; - res->res_hulk = 1; + res->res_data.mem_hulk = 1; break; } else if (!is_hulk && !migr_res_has_starveling(res, now) && @@ -2189,8 +2192,8 @@ migrate_res_release(struct migrate_pool_tls *tls, int res_type, long units) res->res_holders--; if (migr_res_is_hulk(res, units)) { - D_ASSERT(res->res_hulk == 1); - res->res_hulk = 0; + D_ASSERT(res->res_data.mem_hulk == 1); + res->res_data.mem_hulk = 0; } if (res->res_waiters > 0) From be0f4bed9168e3f8bef7bc60bd65a450c12bdb29 Mon Sep 17 00:00:00 2001 From: Liang Zhen Date: Mon, 23 Feb 2026 22:04:04 +0800 Subject: [PATCH 13/14] DAOS-18487 object: clang fix Signed-off-by: Liang Zhen --- src/object/srv_obj_migrate.c | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/object/srv_obj_migrate.c b/src/object/srv_obj_migrate.c index 22d0ebba0bf..a0af9ef803d 100644 --- a/src/object/srv_obj_migrate.c +++ b/src/object/srv_obj_migrate.c @@ -770,12 +770,12 @@ mrone_obj_fetch_internal(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_ daos_iod_t *iods, int iod_num, daos_epoch_t eph, uint32_t flags, d_iov_t *csum_iov_fetch, struct migrate_pool_tls *tls) { - struct migr_resource *res = tls->mpt_data_res; + struct migr_resource *res = tls->mpt_data_res; uint32_t *extra_arg = NULL; - uint64_t then = 0; - uint64_t now; + uint64_t then = 0; + uint64_t now; int rc; - int wait = MEM_NO_WAIT; + int wait = MEM_NO_WAIT; /* pass rebuild epoch by extra_arg */ if (flags & DIOF_FETCH_EPOCH_EC_AGG_BOUNDARY) { @@ -816,7 +816,7 @@ mrone_obj_fetch_internal(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_ */ dss_sleep((10 + rand() % 20) * 1000); if (wait != MEM_LONG_WAIT && now - then >= 600) { - wait = MEM_LONG_WAIT; /* flagged as long waiter */ + wait = MEM_LONG_WAIT; /* flagged as long waiter */ res->res_data.mem_ser_err++; /* counted as serious error */ DL_ERROR(rc, DF_RB " waited for 10 minutes, total memory errors: %lu/%lu," @@ -2087,12 +2087,12 @@ migr_type2res(struct migrate_pool_tls *tls, int res_type) static int migrate_res_hold(struct migrate_pool_tls *tls, int res_type, long units, bool *yielded) { - struct migr_resource *res; - struct migr_res_waiter waiter; - uint64_t now; - bool is_hulk; - bool locked = false; - int rc = 0; + struct migr_resource *res; + struct migr_res_waiter waiter; + uint64_t now; + bool is_hulk; + bool locked = false; + int rc = 0; res = migr_type2res(tls, res_type); if (!migr_res_is_private(res)) { From 92a00230b72af8190c7c7c728c37d29c643bf9f0 Mon Sep 17 00:00:00 2001 From: Liang Zhen Date: Mon, 2 Mar 2026 11:15:50 +0800 Subject: [PATCH 14/14] DAOS-18487 rebuild: remove obsolete comment Signed-off-by: Liang Zhen --- src/object/srv_obj_migrate.c | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/object/srv_obj_migrate.c b/src/object/srv_obj_migrate.c index a0af9ef803d..80dafbffb55 100644 --- a/src/object/srv_obj_migrate.c +++ b/src/object/srv_obj_migrate.c @@ -3344,16 +3344,11 @@ migrate_fini_one_ult(void *data) int j; int rc; + D_ASSERT(dss_get_module_info()->dmi_xs_id != 0); tls = migrate_pool_tls_lookup(arg->pool_uuid, arg->version, arg->generation); if (tls == NULL) return 0; - D_ASSERT(dss_get_module_info()->dmi_xs_id != 0); - /* This function should be called for twice: - * the first call can set mpt_fini on all xstream, the second call wakeup all ULTs, - * otherwise some UTLs could miss the mpt_fini check because they are waiting on - * other xstreams. - */ tls->mpt_fini = 1; ABT_mutex_lock(arg->stop_lock); @@ -3392,9 +3387,8 @@ migrate_fini_one_ult(void *data) rc = 0; } - migrate_pool_tls_put(tls); /* destroy */ - D_INFO("migrate fini one ult "DF_UUID"\n", DP_UUID(arg->pool_uuid)); + migrate_pool_tls_put(tls); /* destroy */ return rc; }