Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/include/daos_srv/rsvc.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ void ds_rsvc_set_state(struct ds_rsvc *svc, enum ds_rsvc_state state);
void ds_rsvc_begin_stepping_up(struct ds_rsvc *svc);
int ds_rsvc_end_stepping_up(struct ds_rsvc *svc, int rc_in, enum ds_rsvc_state state);
int ds_rsvc_add_replicas_s(struct ds_rsvc *svc, d_rank_list_t *ranks, size_t size,
uint32_t vos_df_version);
uint32_t vos_df_version, bool (*abort)(void *arg), void *abort_arg);
int ds_rsvc_add_replicas(enum ds_rsvc_class_id class, d_iov_t *id, d_rank_list_t *ranks,
size_t size, uint32_t vos_df_version, struct rsvc_hint *hint);
int ds_rsvc_remove_replicas_s(struct ds_rsvc *svc, d_rank_list_t *ranks, bool destroy);
Expand Down
11 changes: 10 additions & 1 deletion src/pool/srv_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -6969,6 +6969,14 @@ struct pool_svc_reconf_arg {
bool sca_sync_remove;
};

static bool
pool_svc_reconf_abort(void *arg)
{
struct pool_svc_sched *reconf = arg;

return reconf->psc_canceled;
}

/* Must be used with pool_svc.ps_reconf_sched (see container_of below). */
static void
pool_svc_reconf_ult(void *varg)
Expand Down Expand Up @@ -7051,7 +7059,8 @@ pool_svc_reconf_ult(void *varg)
vos_df_version = ds_pool_get_vos_df_version(svc->ps_global_version);
D_ASSERTF(vos_df_version != 0, DF_UUID ": vos_df_version=0 global_version=%u\n",
DP_UUID(svc->ps_uuid), svc->ps_global_version);
ds_rsvc_add_replicas_s(&svc->ps_rsvc, to_add, rdb_nbytes, vos_df_version);
ds_rsvc_add_replicas_s(&svc->ps_rsvc, to_add, rdb_nbytes, vos_df_version,
pool_svc_reconf_abort, reconf);
if (reconf->psc_canceled) {
rc = -DER_OP_CANCELED;
goto out_to_add_remove;
Expand Down
52 changes: 37 additions & 15 deletions src/rsvc/srv.c
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,7 @@ ds_rsvc_stop_leader(enum ds_rsvc_class_id class, d_iov_t *id,

int
ds_rsvc_add_replicas_s(struct ds_rsvc *svc, d_rank_list_t *ranks, size_t size,
uint32_t vos_df_version)
uint32_t vos_df_version, bool (*abort)(void *arg), void *abort_arg)
{
int i;
int rc = 0;
Expand All @@ -1212,6 +1212,7 @@ ds_rsvc_add_replicas_s(struct ds_rsvc *svc, d_rank_list_t *ranks, size_t size,
rdb_replica_id_t id;
int ids_len = 1;
struct ds_rsvc_create_params create_params;
struct d_backoff_seq backoff_seq;

rl.rl_ranks = &r;
rl.rl_nr = 1;
Expand All @@ -1232,8 +1233,27 @@ ds_rsvc_add_replicas_s(struct ds_rsvc *svc, d_rank_list_t *ranks, size_t size,
create_params.scp_replicas = &id;
create_params.scp_replicas_len = 1;

rc = ds_rsvc_dist_start(svc->s_class, &svc->s_id, svc->s_db_uuid, &rl, svc->s_term,
DS_RSVC_CREATE, &create_params);
/*
* Create and start the replica. If abort is not NULL, retry
* upon retryable errors.
*/
rc = d_backoff_seq_init(&backoff_seq, 0 /* nzeros */, 2 /* factor */, 1 /* next */,
8 /* max */);
D_ASSERTF(rc == 0, "d_backoff_seq_init: " DF_RC "\n", DP_RC(rc));
for (;;) {
rc = ds_rsvc_dist_start(svc->s_class, &svc->s_id, svc->s_db_uuid, &rl,
svc->s_term, DS_RSVC_CREATE, &create_params);
if (!daos_rpc_retryable_rc(rc) || abort == NULL)
break;
if (abort(abort_arg)) {
rc = -DER_OP_CANCELED;
break;
}
DL_INFO(rc, "%s: retrying to create replica " RDB_F_RID, svc->s_name,
RDB_P_RID(id));
dss_sleep(d_backoff_seq_next(&backoff_seq) * 1000);
}
d_backoff_seq_fini(&backoff_seq);
if (rc != 0)
break;

Expand Down Expand Up @@ -1277,7 +1297,8 @@ ds_rsvc_add_replicas(enum ds_rsvc_class_id class, d_iov_t *id, d_rank_list_t *ra
rc = ds_rsvc_lookup_leader(class, id, &svc, hint);
if (rc != 0)
return rc;
rc = ds_rsvc_add_replicas_s(svc, ranks, size, vos_df_version);
rc = ds_rsvc_add_replicas_s(svc, ranks, size, vos_df_version, NULL /* abort */,
NULL /* abort_arg */);
ds_rsvc_set_hint(svc, hint);
ds_rsvc_put_leader(svc);
return rc;
Expand Down Expand Up @@ -1383,25 +1404,26 @@ enum rdb_stop_flag {
* filter_ranks only.
*/
static int
bcast_create(crt_opcode_t opc, bool filter_invert, d_rank_list_t *filter_ranks,
crt_rpc_t **rpc)
bcast_create(crt_opcode_t opc, bool filter_invert, d_rank_list_t *filter_ranks, crt_rpc_t **rpc)
{
struct dss_module_info *info = dss_get_module_info();
crt_opcode_t opc_full;
crt_opcode_t opc_full;
uint8_t rsvc_ver;
uint32_t flags = CRT_RPC_FLAG_CO_FAILOUT;
int rc;

rc = ds_rsvc_rpc_protocol(&rsvc_ver);
if (rc)
if (rc != 0)
return rc;

D_ASSERT(!filter_invert || filter_ranks != NULL);
opc_full = DAOS_RPC_OPCODE(opc, DAOS_RSVC_MODULE, rsvc_ver);
return crt_corpc_req_create(info->dmi_ctx, NULL /* grp */,
filter_ranks, opc_full,
NULL /* co_bulk_hdl */, NULL /* priv */,
filter_invert ?
CRT_RPC_FLAG_FILTER_INVERT : 0,

if (filter_invert) {
D_ASSERT(filter_ranks != NULL);
flags |= CRT_RPC_FLAG_FILTER_INVERT;
}

return crt_corpc_req_create(info->dmi_ctx, NULL /* grp */, filter_ranks, opc_full,
NULL /* co_bulk_hdl */, NULL /* priv */, flags,
crt_tree_topo(CRT_TREE_KNOMIAL, 2), rpc);
}

Expand Down
Loading