diff --git a/src/object/srv_internal.h b/src/object/srv_internal.h index 3d3fbc15447..4c6d286eb25 100644 --- a/src/object/srv_internal.h +++ b/src/object/srv_internal.h @@ -1,6 +1,6 @@ /** * (C) Copyright 2016-2024 Intel Corporation. - * (C) Copyright 2025 Hewlett Packard Enterprise Development LP + * (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -26,7 +26,7 @@ extern struct dss_module_key obj_module_key; -struct migr_res_manager; +struct migr_resource; /* Per pool attached to the migrate tls(per xstream) */ struct migrate_pool_tls { @@ -80,7 +80,7 @@ struct migrate_pool_tls { /* The current in-flight data size */ uint64_t mpt_inflight_size; - struct migr_res_manager *mpt_rmg; + struct migr_resource *mpt_data_res; /* reference count for the structure */ uint64_t mpt_refcount; diff --git a/src/object/srv_obj_migrate.c b/src/object/srv_obj_migrate.c index 2ccda79e4c2..80dafbffb55 100644 --- a/src/object/srv_obj_migrate.c +++ b/src/object/srv_obj_migrate.c @@ -31,73 +31,140 @@ #pragma GCC diagnostic ignored "-Wframe-larger-than=" #endif +enum migr_res_type { + MIGR_OBJ = 0, + MIGR_KEY, + MIGR_DATA, + MIGR_MAX, +}; + +#define ENV_MIGRATE_OBJ_CONCUR "D_MIGRATE_OBJ_CONCUR" + +/* Number of concurrent objects being migrated per engine, consider we have 8 resource + * buckets, this number divided by 8 is the per-bucket concurrency. + * + * Default=1600 means an engine can concurrently rebuild 100 EC(16+P) objects, which + * has high enough concurrency and wouldn't cause network resource exhaustion. + */ +enum { + MIGR_OBJ_CONCUR_MIN = 800, + MIGR_OBJ_CONCUR_DEF = 1600, + MIGR_OBJ_CONCUR_MAX = 3200, +}; + +#define ENV_MIGRATE_KEY_CONCUR "D_MIGRATE_KEY_CONCUR" + +/* OBJ_CUR + KEY_CUR is less than 1/3 of HW limit, the reason of being conservative + * is because we don't have precise control and some data pattern (fragmented RDMA) + * can lead to peak * workload. + */ +enum { + MIGR_KEY_CONCUR_MIN = (1600 - MIGR_OBJ_CONCUR_MIN), + MIGR_KEY_CONCUR_DEF = (4800 - MIGR_OBJ_CONCUR_DEF), + MIGR_KEY_CONCUR_MAX = (6400 - MIGR_OBJ_CONCUR_MAX), +}; + /* Max in-flight transfer size per xstream */ -/* Set the total in-flight size to be 50% of MAX DMA size for +/* Set the total in-flight size to be 1/3 of MAX DMA size for * the moment, will adjust it later if needed. */ -#define MIGR_TGT_INF_DATA (1 << 29) +#define MIGR_TGT_INF_DATA (320 << 20) /* Threshold for very large transfers. * This may exceed the MIGR_TGT_INF_DATA limit to prevent starvation. * Only one such transfer is allowed at a time. */ -#define MIGR_INF_DATA_HULK (1 << 28) +#define MIGR_INF_DATA_HULK (1 << 28) /* Low water mark for DMA buffer usage, hulk transfer is allowed in this case. */ -#define MIGR_INF_DATA_LWM (1 << 28) - -#define ENV_MIGRATE_ULT_CNT "D_MIGRATE_ULT_CNT" - -/* Number of migration ULTs per target */ -#define MIGR_TGT_ULTS_MIN 100 -#define MIGR_TGT_ULTS_DEF 500 -#define MIGR_TGT_ULTS_MAX 2000 +#define MIGR_INF_DATA_LWM (1 << 28) -/* 1/3 object ults, 2/3 key ULTs */ -#define MIGR_OBJ_ULT_PERCENT 33 - -#define MIGR_TGT_OBJ_ULTS(ults) ((ults * MIGR_OBJ_ULT_PERCENT) / 100) -#define MIGR_TGT_KEY_ULTS(ults) (ults - MIGR_TGT_OBJ_ULTS(ults)) - -enum { - MIGR_OBJ = 0, - MIGR_KEY, - MIGR_DATA, - MIGR_MAX, -}; +struct migr_res_manager; /* resource consumed by migration */ struct migr_resource { - const char *res_name; + /* back reference to res_manager */ + struct migr_res_manager *res_rmg; /* upper limit of the resource */ - long res_limit; - /* resource amount in "unit" */ - long res_units; + long res_limit; + /* used resource amount in "unit" */ + long res_used; + /* last time logging starveling */ + uint64_t res_log_since; + /* active ULTs */ + int res_holders; /* number of waiters on this resource */ - int res_waiters; - /* Only used by MIGR_DATA, it always allows exactly one ULT to use unbounded - * buffer for super large value (rare). - */ - int res_hulk; + int res_waiters; + /* list head of all waiters */ + d_list_t res_waitq; /* ABT_cond for waiters */ - ABT_cond res_cond; + ABT_cond res_cond; + /* serialization */ + ABT_mutex res_mutex; + /* members for specific resource type */ + union { + /* MIGR_DATA only */ + struct { + /* total number of occurred memory errors */ + unsigned long mem_err; + /* waited more than 10 minutes, serious errors */ + unsigned long mem_ser_err; + /* number of revived ULTs after running into memory error */ + unsigned long mem_revived; + /* number of waiting ULTs */ + unsigned long mem_waiting; + /* allows exactly one ULT to use unbounded buffer for super large + * value (rare). + */ + int mem_hulk; + } res_data; + /* may add other members for MIGR_OBJ and MIGR_KEY */ + }; }; -/* migration resources manager */ -struct migr_res_manager { - ABT_mutex rmg_mutex; - struct migr_resource rmg_resources[MIGR_MAX]; +/* anchor point of resource waiter */ +struct migr_res_waiter { + struct migrate_pool_tls *rw_tls; + /* link chain on resource manager */ + d_list_t rw_link; + /* quantity of resource being demanded */ + uint64_t rw_units; + /* start to wait since... */ + uint64_t rw_wait_since; +}; + +/* Distribute the global resource into 8 buckets to avoid high lock contention, meanwhile, + * units owned by each bucket can remain the same for engine configuration with different + * number of targets. + */ +#define MIGR_RES_BUCKETS 8 + +enum migr_bucket_type { + MIGR_BUCKET_PRIV, /* private */ + MIGR_BUCKET_MAP, /* shared, N to 1 map */ + MIGR_BUCKET_ROTATE, /* shared, rotating on buckets */ }; -struct migr_engine_res { - /* total ULTs per target, it a tunable which can be set by admin */ - unsigned int er_max_ults; - /* dss_tgt_nr resource managers */ - struct migr_res_manager *er_rmgs; +/* migration resources manager */ +struct migr_res_manager { + enum migr_res_type rmg_res_type; + /* type of bucket, see migr_bucket_type */ + enum migr_bucket_type rmg_bkt_type; + /* number of resource sharing buckets */ + int rmg_bkt_nr; + /* number of targets sharing the same resource bucket */ + int rmg_bkt_size; + /* round-robin bucket selector */ + ATOMIC int rmg_bkt_selector; + /* resource name */ + char *rmg_name; + /* all resource buckets */ + struct migr_resource *rmg_res_buckets; }; -static struct migr_engine_res migr_eng_res; +/* per-engine resources */ +static struct migr_res_manager migr_res_managers[MIGR_MAX]; struct migrate_one { struct migrate_pool_tls *mo_tls; @@ -166,6 +233,8 @@ struct iter_cont_arg { uuid_t pool_hdl_uuid; uuid_t cont_uuid; uuid_t cont_hdl_uuid; + daos_handle_t cont_hdl; + struct cont_props cont_props; struct tree_cache_root *cont_root; unsigned int yield_freq; uint64_t *snaps; @@ -181,8 +250,13 @@ struct iter_obj_arg { uuid_t cont_uuid; daos_unit_oid_t oid; daos_handle_t ioa_oh; + daos_handle_t ioa_coh; int ioa_obj_ref; struct daos_oclass_attr ioa_oca; + /* RPC fanout of degraded I/O and enumeration, it should be deemed as + * amplification factor for resource management + */ + int ioa_fanout; daos_epoch_t epoch; daos_epoch_t punched_epoch; unsigned int shard; @@ -685,14 +759,23 @@ mrone_recx_vos2_daos(struct migrate_one *mrone, int shard, daos_iod_t *iods, int mrone_recx_daos_vos_internal(mrone, false, shard, iods, iods_num); } +enum { + MEM_NO_WAIT, + MEM_WAIT, + MEM_LONG_WAIT, +}; + static int mrone_obj_fetch_internal(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_t *sgls, daos_iod_t *iods, int iod_num, daos_epoch_t eph, uint32_t flags, d_iov_t *csum_iov_fetch, struct migrate_pool_tls *tls) { + struct migr_resource *res = tls->mpt_data_res; uint32_t *extra_arg = NULL; - int waited = 0; + uint64_t then = 0; + uint64_t now; int rc; + int wait = MEM_NO_WAIT; /* pass rebuild epoch by extra_arg */ if (flags & DIOF_FETCH_EPOCH_EC_AGG_BOUNDARY) { @@ -700,6 +783,7 @@ mrone_obj_fetch_internal(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_ mrone->mo_epoch); extra_arg = (uint32_t *)mrone->mo_epoch; } + D_ASSERT(res != NULL); retry: rc = dsc_obj_fetch(oh, eph, &mrone->mo_dkey, iod_num, iods, sgls, NULL, flags, extra_arg, csum_iov_fetch); @@ -713,21 +797,40 @@ mrone_obj_fetch_internal(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_ /* If pool map does not change, then let's retry for timeout, instead of * fail out. */ - DL_WARN(rc, DF_RB ": retry " DF_UOID, DP_RB_MPT(tls), DP_UOID(mrone->mo_oid)); - if (rc == -DER_NOMEM) { - /* sleep 10 seconds before retry, give other layers a chance to - * release resources. - */ - dss_sleep(10 * 1000); - if (waited != 0 && waited % 3600 == 0) { - DL_ERROR(rc, DF_RB ": waited memory for %d hour(s)", - DP_RB_MRO(mrone), waited / 3600); - } + if (rc != -DER_NOMEM) { + DL_WARN(rc, DF_RB ": retry " DF_UOID, DP_RB_MPT(tls), + DP_UOID(mrone->mo_oid)); + dss_sleep(1); + D_GOTO(retry, rc); + } + + now = daos_gettime_coarse(); + if (wait == MEM_NO_WAIT) { + wait = MEM_WAIT; + res->res_data.mem_waiting++; + res->res_data.mem_err++; + then = now; + } + /* sleep a few seconds before retry, give other layers a chance to + * release resources. + */ + dss_sleep((10 + rand() % 20) * 1000); + if (wait != MEM_LONG_WAIT && now - then >= 600) { + wait = MEM_LONG_WAIT; /* flagged as long waiter */ + res->res_data.mem_ser_err++; /* counted as serious error */ + DL_ERROR(rc, + DF_RB " waited for 10 minutes, total memory errors: %lu/%lu," + " total waiters: %lu, total revived: %lu\n", + DP_RB_MRO(mrone), res->res_data.mem_ser_err, res->res_data.mem_err, + res->res_data.mem_waiting, res->res_data.mem_revived); } - waited += 10; D_GOTO(retry, rc); } - + if (wait != MEM_NO_WAIT) { + D_ASSERT(res->res_data.mem_waiting > 0); + res->res_data.mem_revived++; + res->res_data.mem_waiting--; + } return rc; } @@ -1882,100 +1985,195 @@ migrate_one_destroy(struct migrate_one *mrone) D_FREE(mrone); } +static inline bool +migr_res_is_private(struct migr_resource *res) +{ + return res->res_rmg->rmg_bkt_type == MIGR_BUCKET_PRIV; +} + +static inline bool +migr_res_is_hulk(struct migr_resource *res, long units) +{ + return res->res_rmg->rmg_res_type == MIGR_DATA && units >= MIGR_INF_DATA_HULK; +} + static bool -migr_res_is_hulk(int res_type, long units) +migr_res_has_starveling(struct migr_resource *res, uint64_t now) { - return res_type == MIGR_DATA && units >= MIGR_INF_DATA_HULK; + static unsigned int starving_threshold = 30; /* 30 seconds */ + struct migr_res_waiter *waiter; + + if (res->res_waiters == 0) + return false; + + D_ASSERT(!d_list_empty(&res->res_waitq)); + waiter = d_list_entry(res->res_waitq.next, struct migr_res_waiter, rw_link); + + if (now - waiter->rw_wait_since < starving_threshold) + return false; + + /* If someone has already waited for more than 30 seconds, we should prioritize it and + * queue the current ULT. Log this waiter every 10 minutes so we can find out if it's + * blocked permentally. + */ + if (now - res->res_log_since > 600) { + D_DEBUG(DB_REBUILD, + DF_RB + " starving: res=%s, since=%lu, ask:limit=%lu:%lu, holders:waiters=%d:%d\n", + DP_RB_MPT(waiter->rw_tls), res->res_rmg->rmg_name, waiter->rw_wait_since, + waiter->rw_units, res->res_limit, res->res_holders, res->res_waiters); + res->res_log_since = now; + } + return true; } -static int -migrate_res_hold(struct migrate_pool_tls *tls, int res_type, long units, bool *yielded) +static void +migrate_res_wakeup(struct migr_resource *res, uint64_t units) +{ + struct migr_res_waiter *waiter, *tmp; + + d_list_for_each_entry_safe(waiter, tmp, &res->res_waitq, rw_link) { + if (waiter->rw_units > units) + break; + + if (units != -1ULL) + units -= waiter->rw_units; + + /* NB: for -1ULL case we should just wakeup who have the same uuid of the pool, + * however, ABT_cond can't support removing waiter from the queue, so we just + * wakeup everyone. + */ + res->res_waiters--; + d_list_del_init(&waiter->rw_link); + ABT_cond_signal(res->res_cond); + } +} + +static struct migr_resource * +migr_type2res(struct migrate_pool_tls *tls, int res_type) { struct dss_module_info *dmi = dss_get_module_info(); struct migr_res_manager *rmg; struct migr_resource *res; - bool is_hulk; - bool waited = false; - int rc = 0; + int idx; D_ASSERT(dmi->dmi_xs_id != 0); - rmg = &migr_eng_res.er_rmgs[dmi->dmi_tgt_id]; - if (tls->mpt_rmg == NULL) { - tls->mpt_rmg = rmg; - } else { - D_ASSERTF(tls->mpt_rmg == rmg, "target=%d, rmg_off=%d\n", dmi->dmi_tgt_id, - (int)(tls->mpt_rmg - &migr_eng_res.er_rmgs[0])); + rmg = &migr_res_managers[res_type]; + if (rmg->rmg_bkt_type == MIGR_BUCKET_PRIV) { + D_ASSERT(dmi->dmi_tgt_id < rmg->rmg_bkt_nr); + + res = &rmg->rmg_res_buckets[dmi->dmi_tgt_id]; + if (res_type == MIGR_DATA) { + if (tls->mpt_data_res == NULL) + tls->mpt_data_res = res; + else + D_ASSERT(tls->mpt_data_res == res); + } + return res; } + D_ASSERT(res_type != MIGR_DATA); - res = &rmg->rmg_resources[res_type]; - is_hulk = migr_res_is_hulk(res_type, units); + if (rmg->rmg_bkt_type == MIGR_BUCKET_MAP) { + idx = dmi->dmi_tgt_id / rmg->rmg_bkt_size; + D_ASSERT(idx < rmg->rmg_bkt_nr); + } else /* rotation */ { + idx = atomic_fetch_add(&rmg->rmg_bkt_selector, 1); + idx %= rmg->rmg_bkt_nr; + } + return &rmg->rmg_res_buckets[idx]; +} + +static int +migrate_res_hold(struct migrate_pool_tls *tls, int res_type, long units, bool *yielded) +{ + struct migr_resource *res; + struct migr_res_waiter waiter; + uint64_t now; + bool is_hulk; + bool locked = false; + int rc = 0; + + res = migr_type2res(tls, res_type); + if (!migr_res_is_private(res)) { + ABT_mutex_lock(res->res_mutex); + locked = true; + } + waiter.rw_tls = tls; + waiter.rw_units = units; + waiter.rw_wait_since = 0; + + is_hulk = migr_res_is_hulk(res, units); while (1) { if (tls->mpt_fini) { rc = migrate_pool_tls_get_status(tls); + if (!migr_res_is_private(res)) + ABT_mutex_unlock(res->res_mutex); D_GOTO(out, rc); } - if (is_hulk && res->res_hulk == 0 && res->res_units < MIGR_INF_DATA_LWM) { + now = daos_gettime_coarse(); + if (is_hulk && res->res_data.mem_hulk == 0 && res->res_used < MIGR_INF_DATA_LWM) { /* skip the limit check and allow (only) one hulk transfer at a time */ - res->res_units += units; - res->res_hulk = 1; + res->res_used += units; + res->res_data.mem_hulk = 1; break; - } else if (!is_hulk && res->res_units + units <= res->res_limit) { - res->res_units += units; + } else if (!is_hulk && !migr_res_has_starveling(res, now) && + res->res_used + units <= res->res_limit) { + res->res_used += units; break; } - ABT_mutex_lock(rmg->rmg_mutex); - res->res_waiters++; - if (res->res_waiters >= 100 && res->res_waiters % 100 == 0) { - D_DEBUG(DB_REBUILD, - "%d waiters are waiting on res=%s (target=%d, unit=%lu)\n", - res->res_waiters, res->res_name, dmi->dmi_tgt_id, units); + if (migr_res_is_private(res)) { + ABT_mutex_lock(res->res_mutex); + locked = true; } - ABT_cond_wait(res->res_cond, rmg->rmg_mutex); - res->res_waiters--; - ABT_mutex_unlock(rmg->rmg_mutex); - waited = true; + res->res_waiters++; + waiter.rw_wait_since = now; + d_list_add_tail(&waiter.rw_link, &res->res_waitq); + + ABT_cond_wait(res->res_cond, res->res_mutex); + + D_ASSERT(d_list_empty(&waiter.rw_link)); + if (migr_res_is_private(res)) + ABT_mutex_unlock(res->res_mutex); } - if (yielded) - *yielded = waited; + res->res_holders++; /* per-pool counters for rebuild status tracking */ if (res_type == MIGR_OBJ) tls->mpt_tgt_obj_ult_cnt++; else if (res_type == MIGR_KEY) tls->mpt_tgt_dkey_ult_cnt++; - else + else if (res_type == MIGR_DATA) tls->mpt_inflight_size += units; D_DEBUG(DB_REBUILD, - "res=%s, hold=%lu, used=%lu, limit=%lu, waited=%d)\n" DF_RB - " obj_ults=%u, key_ults=%u, inf_data=" DF_U64 ")\n", - res->res_name, units, res->res_units, res->res_limit, waited, DP_RB_MPT(tls), + DF_RB " " + "res=%s units:total:limit=%lu:%lu:%lu waiters:holders=%d:%d, waited=%s, " + " obj_ults=%u, key_ults=%u, inf_data=" DF_U64 ")\n", + DP_RB_MPT(tls), res->res_rmg->rmg_name, units, res->res_used, res->res_limit, + res->res_waiters, res->res_holders, !!waiter.rw_wait_since ? "yes" : "no", tls->mpt_tgt_obj_ult_cnt, tls->mpt_tgt_dkey_ult_cnt, tls->mpt_inflight_size); + + if (!migr_res_is_private(res)) + ABT_mutex_unlock(res->res_mutex); out: + if (yielded && !*yielded) + *yielded = locked; return rc; } static void migrate_res_release(struct migrate_pool_tls *tls, int res_type, long units) { - struct migr_res_manager *rmg; - struct migr_resource *res; - - rmg = tls->mpt_rmg; - D_ASSERT(rmg != NULL); + struct migr_resource *res; - res = &rmg->rmg_resources[res_type]; - - D_DEBUG(DB_REBUILD, - "%s: release=%lu, used=%lu, limit=%lu\n" DF_RB - " obj_ults=%u, key_ults=%u, inf_data=" DF_U64 ")\n", - res->res_name, units, res->res_units, res->res_limit, DP_RB_MPT(tls), - tls->mpt_tgt_obj_ult_cnt, tls->mpt_tgt_dkey_ult_cnt, tls->mpt_inflight_size); + res = migr_type2res(tls, res_type); + if (!migr_res_is_private(res)) + ABT_mutex_lock(res->res_mutex); if (res_type == MIGR_OBJ) { D_ASSERT(tls->mpt_tgt_obj_ult_cnt > 0); @@ -1983,32 +2181,35 @@ migrate_res_release(struct migrate_pool_tls *tls, int res_type, long units) } else if (res_type == MIGR_KEY) { D_ASSERT(tls->mpt_tgt_dkey_ult_cnt > 0); tls->mpt_tgt_dkey_ult_cnt--; - } else { + } else if (res_type == MIGR_DATA) { D_ASSERT(tls->mpt_inflight_size >= units); tls->mpt_inflight_size -= units; } - D_ASSERT(res->res_units >= units); - res->res_units -= units; + D_ASSERT(res->res_used >= units); + res->res_used -= units; + D_ASSERT(res->res_holders > 0); + res->res_holders--; - if (migr_res_is_hulk(res_type, units)) { - D_ASSERT(res->res_hulk == 1); - res->res_hulk = 0; + if (migr_res_is_hulk(res, units)) { + D_ASSERT(res->res_data.mem_hulk == 1); + res->res_data.mem_hulk = 0; } - if (res->res_waiters > 0) { - ABT_mutex_lock(rmg->rmg_mutex); - ABT_cond_signal(res->res_cond); - ABT_mutex_unlock(rmg->rmg_mutex); - } + if (res->res_waiters > 0) + migrate_res_wakeup(res, res->res_limit - res->res_used); + + if (!migr_res_is_private(res)) + ABT_mutex_unlock(res->res_mutex); } static void migrate_one_ult(void *arg) { struct migrate_one *mrone = arg; + struct iter_obj_arg *ioa = mrone->mo_obj_arg; struct migrate_pool_tls *tls; - daos_size_t data_size; + daos_size_t data_size; int rc = 0; while (daos_fail_check(DAOS_REBUILD_TGT_REBUILD_HANG)) @@ -2021,21 +2222,20 @@ migrate_one_ult(void *arg) } data_size = daos_iods_len(mrone->mo_iods, mrone->mo_iod_num); - data_size += daos_iods_len(mrone->mo_iods_from_parity, - mrone->mo_iods_num_from_parity); + data_size += daos_iods_len(mrone->mo_iods_from_parity, mrone->mo_iods_num_from_parity); D_DEBUG(DB_TRACE, DF_RB ": mrone %p data size is " DF_U64 " %d/%d\n", DP_RB_MPT(tls), mrone, data_size, mrone->mo_iod_num, mrone->mo_iods_num_from_parity); D_ASSERT(data_size != (daos_size_t)-1); - rc = migrate_res_hold(tls, MIGR_DATA, data_size, NULL); + rc = migrate_res_hold(tls, MIGR_DATA, data_size * ioa->ioa_fanout, NULL); if (rc) D_GOTO(out, rc); rc = migrate_dkey(tls, mrone, data_size); - migrate_res_release(tls, MIGR_DATA, data_size); + migrate_res_release(tls, MIGR_DATA, data_size * ioa->ioa_fanout); D_DEBUG(DB_REBUILD, DF_RB ": " DF_UOID " layout %u migrate dkey " DF_KEY " inflight_size " DF_U64 @@ -2059,7 +2259,7 @@ migrate_one_ult(void *arg) tls->mpt_fini = 1; } out: - migrate_res_release(tls, MIGR_KEY, 1); + migrate_res_release(tls, MIGR_KEY, ioa->ioa_fanout); migrate_one_destroy(mrone); } @@ -2852,7 +3052,7 @@ migrate_start_ult(struct enum_unpack_arg *unpack_arg) continue; } - rc = migrate_res_hold(tls, MIGR_KEY, 1, NULL); + rc = migrate_res_hold(tls, MIGR_KEY, arg->ioa_fanout, NULL); if (rc) break; d_list_del_init(&mrone->mo_list); @@ -2866,7 +3066,7 @@ migrate_start_ult(struct enum_unpack_arg *unpack_arg) rc = dss_ult_create(migrate_one_ult, mrone, DSS_XS_SELF, 0, MIGRATE_STACK_SIZE, NULL); if (rc) { - migrate_res_release(tls, MIGR_KEY, 1); + migrate_res_release(tls, MIGR_KEY, arg->ioa_fanout); migrate_one_destroy(mrone); break; } @@ -2882,8 +3082,7 @@ migrate_start_ult(struct enum_unpack_arg *unpack_arg) * Iterate akeys/dkeys of the object */ static int -migrate_one_epoch_object(daos_epoch_range_t *epr, struct migrate_pool_tls *tls, - struct iter_obj_arg *arg) +migrate_obj_epoch(struct migrate_pool_tls *tls, struct iter_obj_arg *arg, daos_epoch_range_t *epr) { daos_anchor_t anchor; daos_anchor_t dkey_anchor; @@ -3135,30 +3334,47 @@ struct migrate_stop_arg { static int migrate_fini_one_ult(void *data) { + struct dss_module_info *dmi = dss_get_module_info(); struct migrate_stop_arg *arg = data; struct migrate_pool_tls *tls; + struct migr_res_manager *rmg; + struct migr_resource *res; + bool last_one; + int i; + int j; int rc; + D_ASSERT(dss_get_module_info()->dmi_xs_id != 0); tls = migrate_pool_tls_lookup(arg->pool_uuid, arg->version, arg->generation); if (tls == NULL) return 0; - D_ASSERT(dss_get_module_info()->dmi_xs_id != 0); tls->mpt_fini = 1; ABT_mutex_lock(arg->stop_lock); arg->stop_count++; + last_one = (arg->stop_count == dss_tgt_nr); ABT_mutex_unlock(arg->stop_lock); - if (tls->mpt_rmg) { - struct migr_res_manager *rmg = tls->mpt_rmg; - int i; + for (i = 0; i < MIGR_MAX; i++) { + rmg = &migr_res_managers[i]; - /* NB: no big deal but ULTs of all pools will be waken up */ - ABT_mutex_lock(rmg->rmg_mutex); - for (i = 0; i < MIGR_MAX; i++) - ABT_cond_broadcast(rmg->rmg_resources[i].res_cond); - ABT_mutex_unlock(rmg->rmg_mutex); + /* private resource has to be processed by ULT on the owner xstream */ + if (rmg->rmg_bkt_type == MIGR_BUCKET_PRIV) { + /* locking is not required by private resource */ + migrate_res_wakeup(&rmg->rmg_res_buckets[dmi->dmi_tgt_id], -1ULL); + + } else if (last_one) { /* shared resource */ + /* on behalf of all target xstreams, wakeup ULTs for shared resources. */ + for (j = 0; j < rmg->rmg_bkt_nr; j++) { + res = &rmg->rmg_res_buckets[j]; + + /* locking is required by shared resource */ + ABT_mutex_lock(res->res_mutex); + migrate_res_wakeup(res, -1ULL); + ABT_mutex_unlock(res->res_mutex); + } + } } migrate_pool_tls_put(tls); /* lookup */ @@ -3171,9 +3387,8 @@ migrate_fini_one_ult(void *data) rc = 0; } - migrate_pool_tls_put(tls); /* destroy */ - D_INFO("migrate fini one ult "DF_UUID"\n", DP_UUID(arg->pool_uuid)); + migrate_pool_tls_put(tls); /* destroy */ return rc; } @@ -3182,7 +3397,7 @@ void ds_migrate_stop(struct ds_pool *pool, unsigned int version, unsigned int generation) { struct migrate_stop_arg arg; - int rc; + int rc; uuid_copy(arg.pool_uuid, pool->sp_uuid); arg.version = version; @@ -3195,7 +3410,7 @@ ds_migrate_stop(struct ds_pool *pool, unsigned int version, unsigned int generat } rc = ds_pool_thread_collective(pool->sp_uuid, 0, migrate_fini_one_ult, &arg, 0); - if (rc) + if (rc != 0) D_ERROR(DF_UUID" migrate stop: %d\n", DP_UUID(pool->sp_uuid), rc); D_ASSERT(atomic_load(&pool->sp_rebuilding) >= arg.stop_count); @@ -3232,8 +3447,7 @@ migrate_obj_ult(void *data) struct migrate_pool_tls *tls = NULL; daos_epoch_range_t epr; daos_epoch_t stable_epoch = 0; - daos_handle_t coh = DAOS_HDL_INVAL; - struct cont_props props; + daos_handle_t coh = arg->ioa_coh; int i; int rc = 0; @@ -3244,27 +3458,6 @@ migrate_obj_ult(void *data) D_GOTO(free_notls, rc); } - /* Only reintegrating targets/pool needs to discard the object, - * if sp_need_discard is 0, either the target does not need to - * discard, or discard has been done. spc_discard_done means - * discarding has been done in the current VOS target. - */ - if (tls->mpt_pool->spc_pool->sp_need_discard) { - while(!tls->mpt_pool->spc_discard_done) { - D_DEBUG(DB_REBUILD, DF_RB ": wait for discard to finish.\n", - DP_RB_MPT(tls)); - dss_sleep(2 * 1000); - if (tls->mpt_fini) - D_GOTO(free_notls, rc); - } - if (tls->mpt_pool->spc_pool->sp_discard_status) { - rc = tls->mpt_pool->spc_pool->sp_discard_status; - D_DEBUG(DB_REBUILD, DF_RB ": discard failure: " DF_RC "\n", DP_RB_MPT(tls), - DP_RC(rc)); - D_GOTO(out, rc); - } - } - if (tls->mpt_reintegrating) { struct ds_cont_child *cont_child = NULL; @@ -3289,33 +3482,12 @@ migrate_obj_ult(void *data) ds_cont_child_put(cont_child); } - rc = dsc_pool_open(tls->mpt_pool_uuid, tls->mpt_poh_uuid, 0, NULL, - tls->mpt_pool->spc_pool->sp_map, &tls->mpt_svc_list, &tls->mpt_pool_hdl); - if (rc) { - DL_ERROR(rc, DF_RB ": dsc_pool_open failed", DP_RB_MPT(tls)); - D_GOTO(out, rc); - } - - rc = migrate_cont_open(tls, arg->cont_uuid, 0, &coh); - if (rc) { - DL_ERROR(rc, DF_RB ": migrate_cont_open failed", DP_RB_MPT(tls)); - D_GOTO(out, rc); - } - rc = dsc_obj_open(coh, arg->oid.id_pub, DAOS_OO_RO, &arg->ioa_oh); if (rc) { DL_ERROR(rc, DF_RB ": dsc_obj_open failed", DP_RB_MPT(tls)); D_GOTO(out, rc); } - dsc_cont_get_props(coh, &props); - rc = dsc_obj_id2oc_attr(arg->oid.id_pub, &props, &arg->ioa_oca); - if (rc) { - DL_ERROR(rc, DF_RB ": unknown object class: %u", DP_RB_MPT(tls), - daos_obj_id2class(arg->oid.id_pub)); - D_GOTO(out, rc); - } - for (i = 0; i < arg->snap_cnt; i++) { daos_epoch_t lower_epoch = 0; @@ -3335,7 +3507,7 @@ migrate_obj_ult(void *data) epr.epr_hi = arg->snaps[i]; D_DEBUG(DB_REBUILD, DF_RB ": rebuild_snap %d " DF_X64 "-" DF_X64 "\n", DP_RB_MPT(tls), i, epr.epr_lo, epr.epr_hi); - rc = migrate_one_epoch_object(&epr, tls, arg); + rc = migrate_obj_epoch(tls, arg, &epr); if (rc) D_GOTO(free, rc); } @@ -3351,7 +3523,7 @@ migrate_obj_ult(void *data) D_ASSERT(tls->mpt_max_eph != 0); epr.epr_hi = tls->mpt_max_eph; if (arg->epoch > 0) { - rc = migrate_one_epoch_object(&epr, tls, arg); + rc = migrate_obj_epoch(tls, arg, &epr); } else { /* The obj has been punched for this range */ D_DEBUG(DB_REBUILD, @@ -3390,7 +3562,7 @@ migrate_obj_ult(void *data) DP_RB_MPT(tls), DP_UOID(arg->oid), arg->shard, tls->mpt_tgt_obj_ult_cnt, tls->mpt_tgt_dkey_ult_cnt, tls->mpt_obj_count, DP_RC(rc)); free_notls: - migrate_res_release(tls, MIGR_OBJ, 1); + migrate_res_release(tls, MIGR_OBJ, arg->ioa_fanout); migrate_obj_put(arg); } @@ -3403,10 +3575,9 @@ struct migrate_obj_val { /* This is still running on the main migration ULT */ static int -migrate_one_object(daos_unit_oid_t oid, daos_epoch_t eph, daos_epoch_t punched_eph, - unsigned int shard, unsigned int tgt_idx, void *data) +migrate_object(daos_unit_oid_t oid, daos_epoch_t eph, daos_epoch_t punched_eph, unsigned int shard, + unsigned int tgt_idx, struct iter_cont_arg *cont_arg, bool *yielded) { - struct iter_cont_arg *cont_arg = data; struct iter_obj_arg *obj_arg; struct migrate_pool_tls *tls = cont_arg->pool_tls; daos_handle_t toh = tls->mpt_migrated_root_hdl; @@ -3433,6 +3604,7 @@ migrate_one_object(daos_unit_oid_t oid, daos_epoch_t eph, daos_epoch_t punched_e uuid_copy(obj_arg->cont_uuid, cont_arg->cont_uuid); obj_arg->version = cont_arg->pool_tls->mpt_version; obj_arg->generation = cont_arg->pool_tls->mpt_generation; + obj_arg->ioa_coh = cont_arg->cont_hdl; if (cont_arg->snaps) { D_ALLOC(obj_arg->snaps, sizeof(*cont_arg->snaps) * cont_arg->snap_cnt); @@ -3444,6 +3616,25 @@ migrate_one_object(daos_unit_oid_t oid, daos_epoch_t eph, daos_epoch_t punched_e sizeof(*obj_arg->snaps) * cont_arg->snap_cnt); } + rc = dsc_obj_id2oc_attr(oid.id_pub, &cont_arg->cont_props, &obj_arg->ioa_oca); + if (rc) { + DL_ERROR(rc, DF_RB ": unknown object class: %u", DP_RB_MPT(tls), + daos_obj_id2class(oid.id_pub)); + D_GOTO(free, rc); + } + + if (daos_oclass_is_ec(&obj_arg->ioa_oca)) /* RPC fanout has to be considered */ + obj_arg->ioa_fanout = MIN(16, obj_ec_data_tgt_nr(&obj_arg->ioa_oca)); + else + obj_arg->ioa_fanout = 1; + + *yielded = false; + rc = migrate_res_hold(cont_arg->pool_tls, MIGR_OBJ, obj_arg->ioa_fanout, yielded); + if (rc != 0) { + DL_ERROR(rc, DF_UUID " enter migrate failed.", DP_UUID(cont_arg->cont_uuid)); + goto free; + } + D_ASSERT(tgt_idx == dss_get_module_info()->dmi_tgt_id); rc = dss_ult_create(migrate_obj_ult, obj_arg, DSS_XS_SELF, 0, MIGRATE_STACK_SIZE, NULL); if (rc) @@ -3488,18 +3679,10 @@ migrate_obj_iter_cb(daos_handle_t ih, d_iov_t *key_iov, d_iov_t *val_iov, void * DF_RB ": obj migrate " DF_UUID "/" DF_UOID " %" PRIx64 " eph " DF_U64 " start\n", DP_RB_MPT(arg->pool_tls), DP_UUID(arg->cont_uuid), DP_UOID(*oid), ih.cookie, epoch); - rc = migrate_res_hold(arg->pool_tls, MIGR_OBJ, 1, &yielded); - if (rc) { - DL_ERROR(rc, DF_RB ": " DF_UUID " enter migrate failed.", DP_RB_MPT(arg->pool_tls), - DP_UUID(arg->cont_uuid)); - return rc; - } - - rc = migrate_one_object(*oid, epoch, punched_epoch, shard, tgt_idx, arg); + rc = migrate_object(*oid, epoch, punched_epoch, shard, tgt_idx, arg, &yielded); if (rc != 0) { DL_ERROR(rc, DF_RB ": obj " DF_UOID " migration failed", DP_RB_MPT(arg->pool_tls), DP_UOID(*oid)); - migrate_res_release(arg->pool_tls, MIGR_OBJ, 1); return rc; } @@ -3628,6 +3811,14 @@ migrate_cont_iter_cb(daos_handle_t ih, d_iov_t *key_iov, arg.snap_cnt = fetch_arg.snap_cnt; arg.pool_tls = tls; uuid_copy(arg.cont_uuid, cont_uuid); + + rc = migrate_cont_open(tls, cont_uuid, 0, &arg.cont_hdl); + if (rc) { + DL_ERROR(rc, DF_RB ": migrate_cont_open failed", DP_RB_MPT(tls)); + D_GOTO(free, rc); + } + + dsc_cont_get_props(arg.cont_hdl, &arg.cont_props); while (!dbtree_is_empty(root->tcr_root_hdl)) { if (tls->mpt_fini) break; @@ -3689,20 +3880,51 @@ static void migrate_ult(void *arg) { struct migrate_pool_tls *pool_tls = arg; - int rc; + struct ds_pool_child *pool; + int rc = 0; D_ASSERT(pool_tls != NULL); + pool = pool_tls->mpt_pool; + /* Only reintegrating targets/pool needs to discard the object, if sp_need_discard is 0, + * either the target does not need to discard, or discard has been done. + * spc_discard_done means discarding has been done in the current VOS target. + */ + if (pool->spc_pool->sp_need_discard) { + while (!pool->spc_discard_done) { + D_DEBUG(DB_REBUILD, DF_RB " wait for discard to finish.\n", + DP_RB_MPT(pool_tls)); + dss_sleep(2 * 1000); + if (pool_tls->mpt_fini) + D_GOTO(out, rc); + } + if (pool->spc_pool->sp_discard_status) { + rc = pool->spc_pool->sp_discard_status; + D_DEBUG(DB_REBUILD, DF_RB " discard failure: " DF_RC, DP_RB_MPT(pool_tls), + DP_RC(rc)); + D_GOTO(out, rc); + } + } + + rc = + dsc_pool_open(pool_tls->mpt_pool_uuid, pool_tls->mpt_poh_uuid, 0, NULL, + pool->spc_pool->sp_map, &pool_tls->mpt_svc_list, &pool_tls->mpt_pool_hdl); + if (rc) { + DL_ERROR(rc, DF_RB ": dsc_pool_open failed", DP_RB_MPT(pool_tls)); + D_GOTO(out, rc); + } + while (!dbtree_is_empty(pool_tls->mpt_root_hdl) && !pool_tls->mpt_fini) { rc = dbtree_iterate(pool_tls->mpt_root_hdl, DAOS_INTENT_PURGE, false, migrate_cont_iter_cb, pool_tls); if (rc < 0) { DL_ERROR(rc, DF_RB ": dbtree iterate failed", DP_RB_MPT(pool_tls)); - if (pool_tls->mpt_status == 0) - pool_tls->mpt_status = rc; - break; + goto out; } } +out: + if (pool_tls->mpt_status == 0) + pool_tls->mpt_status = rc; pool_tls->mpt_ult_running = 0; migrate_pool_tls_put(pool_tls); @@ -4526,14 +4748,17 @@ ds_object_migrate_send(struct ds_pool *pool, uuid_t pool_hdl_uuid, uuid_t cont_h } static int -migr_res_init(struct migr_resource *res, const char *name, long limit) +migr_res_init(struct migr_resource *res, struct migr_res_manager *rmg, long limit) { int rc; memset(res, 0, sizeof(*res)); - res->res_name = name; + D_INIT_LIST_HEAD(&res->res_waitq); + res->res_rmg = rmg; res->res_limit = limit; rc = ABT_cond_create(&res->res_cond); + if (rc == 0) + rc = ABT_mutex_create(&res->res_mutex); return (rc != ABT_SUCCESS) ? dss_abterr2der(rc) : 0; } @@ -4541,53 +4766,119 @@ migr_res_init(struct migr_resource *res, const char *name, long limit) static void migr_res_fini(struct migr_resource *res) { + D_ASSERT(!res->res_rmg || d_list_empty(&res->res_waitq)); /* unset or drained */ if (res->res_cond) ABT_cond_free(&res->res_cond); + if (res->res_mutex) + ABT_mutex_free(&res->res_mutex); } -int -obj_migrate_init(void) +static int +migr_rmg_init(int type, int concur_max, uint64_t units) { - unsigned int ults = MIGR_TGT_ULTS_DEF; - int i; - int rc = 0; + struct migr_res_manager *rmg; + char *name; + int bkt_nr; + int bkt_size; + int bkt_type; + int i; + int rc; + + if (type == MIGR_OBJ) { + name = "OBJ"; + } else if (type == MIGR_KEY) { + name = "KEY"; + } else if (type == MIGR_DATA) { + name = "DATA"; + } else { + name = "UNKNOWN"; + D_GOTO(failed, rc = -DER_INVAL); + } + rmg = &migr_res_managers[type]; - D_CASSERT(MIGR_TGT_INF_DATA > MIGR_INF_DATA_LWM); - D_CASSERT(MIGR_TGT_INF_DATA > MIGR_INF_DATA_HULK); + if (concur_max >= dss_tgt_nr) { /* concurrency can't be higher than target number */ + bkt_nr = dss_tgt_nr; + bkt_type = MIGR_BUCKET_PRIV; + bkt_size = 1; + } else { + bkt_nr = concur_max; + bkt_size = dss_tgt_nr / concur_max; + bkt_type = (dss_tgt_nr % concur_max == 0) ? MIGR_BUCKET_MAP : MIGR_BUCKET_ROTATE; + } - d_getenv_uint(ENV_MIGRATE_ULT_CNT, &ults); - if (ults < MIGR_TGT_ULTS_MIN) - ults = MIGR_TGT_ULTS_MIN; - if (ults > MIGR_TGT_ULTS_MAX) - ults = MIGR_TGT_ULTS_MAX; + D_ALLOC(rmg->rmg_res_buckets, bkt_nr * sizeof(*rmg->rmg_res_buckets)); + if (!rmg->rmg_res_buckets) + D_GOTO(failed, rc = -DER_NOMEM); - memset(&migr_eng_res, 0, sizeof(migr_eng_res)); - migr_eng_res.er_max_ults = ults; + rmg->rmg_name = name; + rmg->rmg_res_type = type; + rmg->rmg_bkt_nr = bkt_nr; + rmg->rmg_bkt_type = bkt_type; + rmg->rmg_bkt_size = bkt_size; - D_ASSERT(dss_tgt_nr > 0); - D_ALLOC(migr_eng_res.er_rmgs, sizeof(struct migr_res_manager) * dss_tgt_nr); - if (!migr_eng_res.er_rmgs) - return -DER_NOMEM; + for (i = 0; i < bkt_nr; i++) { + rc = migr_res_init(&rmg->rmg_res_buckets[i], rmg, units / bkt_nr); + if (rc) + D_GOTO(failed, rc); + } + return 0; +failed: + D_ERROR("Failed to initialize resource %s, error=%d\n", name, rc); + return rc; +} - for (i = 0; i < dss_tgt_nr; i++) { - struct migr_res_manager *rmg = &migr_eng_res.er_rmgs[i]; +static void +migr_rmg_fini(int type) +{ + struct migr_res_manager *rmg; + int i; - rc = ABT_mutex_create(&rmg->rmg_mutex); - if (rc != ABT_SUCCESS) - D_GOTO(out, rc = dss_abterr2der(rc)); + rmg = &migr_res_managers[type]; + if (!rmg->rmg_res_buckets) + return; - rc = migr_res_init(&rmg->rmg_resources[MIGR_OBJ], "OBJ", MIGR_TGT_OBJ_ULTS(ults)); - if (rc) - D_GOTO(out, rc); + for (i = 0; i < rmg->rmg_bkt_nr; i++) + migr_res_fini(&rmg->rmg_res_buckets[i]); + memset(rmg, 0, sizeof(*rmg)); +} - rc = migr_res_init(&rmg->rmg_resources[MIGR_KEY], "KEY", MIGR_TGT_KEY_ULTS(ults)); - if (rc) - D_GOTO(out, rc); +int +obj_migrate_init(void) +{ + unsigned obj_units = MIGR_OBJ_CONCUR_DEF; + unsigned key_units = MIGR_KEY_CONCUR_DEF; + int rc; - rc = migr_res_init(&rmg->rmg_resources[MIGR_DATA], "DATA", MIGR_TGT_INF_DATA); - if (rc) - D_GOTO(out, rc); + D_CASSERT(MIGR_TGT_INF_DATA > MIGR_INF_DATA_LWM); + D_CASSERT(MIGR_TGT_INF_DATA > MIGR_INF_DATA_HULK); + D_ASSERT(dss_tgt_nr > 0); + + rc = d_getenv_uint(ENV_MIGRATE_OBJ_CONCUR, &obj_units); + if (rc == 0) { + if (obj_units < MIGR_OBJ_CONCUR_MIN) + obj_units = MIGR_OBJ_CONCUR_MIN; + if (obj_units > MIGR_OBJ_CONCUR_MAX) + obj_units = MIGR_OBJ_CONCUR_MAX; } + rc = migr_rmg_init(MIGR_OBJ, MIGR_RES_BUCKETS, obj_units); + if (rc) + D_GOTO(out, rc); + + rc = d_getenv_uint(ENV_MIGRATE_KEY_CONCUR, &key_units); + if (rc == 0) { + if (key_units < MIGR_KEY_CONCUR_MIN) + key_units = MIGR_KEY_CONCUR_MIN; + if (key_units > MIGR_KEY_CONCUR_MAX) + key_units = MIGR_KEY_CONCUR_MAX; + } + rc = migr_rmg_init(MIGR_KEY, MIGR_RES_BUCKETS, key_units); + if (rc) + D_GOTO(out, rc); + + rc = migr_rmg_init(MIGR_DATA, dss_tgt_nr, MIGR_TGT_INF_DATA * dss_tgt_nr); + if (rc) + D_GOTO(out, rc); + return 0; out: obj_migrate_fini(); @@ -4598,18 +4889,7 @@ void obj_migrate_fini(void) { int i; - int j; - if (migr_eng_res.er_rmgs) { - for (i = 0; i < dss_tgt_nr; i++) { - struct migr_res_manager *rmg = &migr_eng_res.er_rmgs[i]; - - for (j = 0; j < MIGR_MAX; j++) - migr_res_fini(&rmg->rmg_resources[j]); - if (rmg->rmg_mutex) - ABT_mutex_free(&rmg->rmg_mutex); - } - D_FREE(migr_eng_res.er_rmgs); - } - memset(&migr_eng_res, 0, sizeof(migr_eng_res)); + for (i = 0; i < MIGR_MAX; i++) + migr_rmg_fini(i); }