diff --git a/acconfig.h b/acconfig.h index 54c3206e4..f423c0a77 100644 --- a/acconfig.h +++ b/acconfig.h @@ -513,6 +513,7 @@ #undef HAVE_IBV_SRQ #undef HAVE_IBV_TRANSPORT_TYPE #undef HAVE_IBV_CREATE_QP_EX +#undef GASNETC_HAVE_IBV_WR_API #undef GASNETC_IBV_MAX_MEDIUM #undef GASNETC_IBV_ODP #undef GASNETC_IBV_ODP_MLNX diff --git a/configure.in b/configure.in index 586edbfda..5e11c3756 100644 --- a/configure.in +++ b/configure.in @@ -3905,6 +3905,28 @@ if test "$enabled_ibv" = yes; then device.transport_type = IBV_TRANSPORT_IB; ], [ AC_DEFINE(HAVE_IBV_TRANSPORT_TYPE) ] ) + GASNET_IF_DISABLED(ibv-wr-api, + [Disable use of verbs work request API (default is to probe)], + [:], + [GASNET_TRY_CACHE_LINK(for verbs work request API support, ibv_wr_api, [ + INCLUDE_VERBS_H + ], [ + struct ibv_qp_init_attr_ex init_attr_ex; + init_attr_ex.send_ops_flags = IBV_QP_EX_WITH_SEND_WITH_IMM | + IBV_QP_EX_WITH_RDMA_WRITE | + IBV_QP_EX_WITH_RDMA_READ; + init_attr_ex.comp_mask = IBV_QP_INIT_ATTR_SEND_OPS_FLAGS; + struct ibv_qp *qp = NULL; + struct ibv_qp_ex *qpx = ibv_qp_to_qp_ex(qp); + ibv_wr_start(qpx); + ibv_wr_send_imm(qpx, 0); + ibv_wr_rdma_write(qpx, 0, 0); + ibv_wr_rdma_read(qpx, 0, 0); + ibv_wr_atomic_fetch_add(qpx, 0, 0, 0); + ibv_wr_complete(qpx); + ], [ AC_DEFINE(GASNETC_HAVE_IBV_WR_API) ] ) + ]) + AC_CHECK_FUNCS(ibv_wc_status_str) GASNET_TRY_CACHE_LINK(for ibv_create_qp_ex, have_ibv_create_qp_ex, [ diff --git a/ibv-conduit/README b/ibv-conduit/README index bd71346ed..bf52a34fc 100644 --- a/ibv-conduit/README +++ b/ibv-conduit/README @@ -204,6 +204,11 @@ Paul H. Hargrove behavior. For more information, see the `GASNET_RCV_THREAD_POLL_MODE` environment variable documentation, below. + By default, ibv-conduit will use the Verbs Work Request API if support is + detected at configure time. One can configure using `--disable-ibv-wr-api` + to force use of the older `ibv_post_send()` interface. There is no mechanism + to choose between the two implementations at run time. + @ Section: Job Spawning @ If using UPC++, Chapel, etc. the language-specific commands should be used diff --git a/ibv-conduit/gasnet_core.c b/ibv-conduit/gasnet_core.c index e91c97c85..bcd1b531d 100644 --- a/ibv-conduit/gasnet_core.c +++ b/ibv-conduit/gasnet_core.c @@ -66,6 +66,10 @@ GASNETI_IDENT(gasnetc_IdentString_MaxHCAs, "$GASNetIbvMaxHCAs: " _STRINGIFY(GASN GASNETI_IDENT(gasnetc_IdentString_SerializeCqPoll, "$GASNetIbvSerializeCqPoll: 1 $"); #endif +#if GASNETC_HAVE_IBV_WR_API + GASNETI_IDENT(gasnetc_IdentString_WR, "$GASNetIbvWR: 1 $"); +#endif + int gex_System_QueryHiddenAMConcurrencyLevel(void) { #if !GASNETC_USE_RCV_THREAD gasneti_assert(! gasnetc_use_rcv_thread); @@ -4305,9 +4309,6 @@ int gasnetc_am_commit( gasnetc_buffer_t *buf, gasnetc_buffer_t *buf_alloc, GASNETC_DECL_SR_DESC(sr_desc, 2); int numargs_field = have_flow ? GASNETC_MAX_ARGS : numargs; - sr_desc->imm_data = GASNETC_MSG_GENFLAGS(!is_reply, category, numargs_field, handler, - gasneti_mynode); - sr_desc->opcode = IBV_WR_SEND_WITH_IMM; sr_desc->num_sge = 1; sr_desc->sg_list[0].addr = (uintptr_t)buf; sr_desc->sg_list[0].length = head_len + (in_place ? nbytes : copy_len); @@ -4338,8 +4339,10 @@ int gasnetc_am_commit( gasnetc_buffer_t *buf, gasnetc_buffer_t *buf_alloc, } #endif + uint32_t imm_data = GASNETC_MSG_GENFLAGS(!is_reply, category, numargs_field, + handler, gasneti_mynode); int reserved = (immediate != 0); // CQ slot was pre-reserved if and only if immediate - gasnetc_snd_post_common(sreq, sr_desc, reserved, !buf_alloc GASNETI_THREAD_PASS); + gasnetc_post_send_imm(sreq, sr_desc, imm_data, reserved, !buf_alloc GASNETI_THREAD_PASS); } return 0; diff --git a/ibv-conduit/gasnet_core_connect.c b/ibv-conduit/gasnet_core_connect.c index 2b8f2b023..5614cdd57 100644 --- a/ibv-conduit/gasnet_core_connect.c +++ b/ibv-conduit/gasnet_core_connect.c @@ -177,12 +177,35 @@ gasnetc_parse_filename(const char *filename) #if HAVE_IBV_CREATE_QP_EX typedef struct ibv_qp_init_attr_ex gasnetc_qp_init_attr_t; - #define gasnetc_create_qp(hca, attr) ibv_create_qp_ex((hca)->handle, attr) #else typedef struct ibv_qp_init_attr gasnetc_qp_init_attr_t; - #define gasnetc_create_qp(hca, attr) ibv_create_qp((hca)->pd, attr) #endif +GASNETI_INLINE(gasnetc_create_qp) +struct ibv_qp *gasnetc_create_qp(gasnetc_hca_t *hca, gasnetc_qp_init_attr_t *init_attr_p) +{ +#if GASNETC_HAVE_IBV_WR_API + uint64_t ops = IBV_QP_EX_WITH_SEND_WITH_IMM | + IBV_QP_EX_WITH_RDMA_WRITE | + IBV_QP_EX_WITH_RDMA_READ; + #if GASNETC_BUILD_IBVRATOMIC + ops |= IBV_QP_EX_WITH_ATOMIC_FETCH_AND_ADD | + IBV_QP_EX_WITH_ATOMIC_CMP_AND_SWP; + #elif GASNETC_HAVE_FENCED_PUTS + if (gasnetc_use_fenced_puts) { + ops |= IBV_QP_EX_WITH_ATOMIC_FETCH_AND_ADD; + } + #endif + init_attr_p->send_ops_flags = ops; + init_attr_p->comp_mask |= IBV_QP_INIT_ATTR_SEND_OPS_FLAGS; +#endif +#if HAVE_IBV_CREATE_QP_EX + return ibv_create_qp_ex((hca)->handle, init_attr_p); +#else + return ibv_create_qp((hca)->pd, init_attr_p); +#endif +} + /* ------------------------------------------------------------------------------------ */ #if GASNETC_IBV_XRC typedef struct gasnetc_xrc_snd_qp_s { @@ -749,6 +772,9 @@ gasnetc_qp_create(gasnetc_conn_info_t *conn_info) #endif cep->qp_handle = hndl; + #if GASNETC_HAVE_IBV_WR_API + cep->qp_ex_handle = ibv_qp_to_qp_ex(hndl); + #endif conn_info->local_qpn[qpi] = hndl->qp_num; } diff --git a/ibv-conduit/gasnet_core_fwd.h b/ibv-conduit/gasnet_core_fwd.h index 6bfa07fbe..f7afccafb 100644 --- a/ibv-conduit/gasnet_core_fwd.h +++ b/ibv-conduit/gasnet_core_fwd.h @@ -268,8 +268,11 @@ CNT(C, GET_RATOMICBUF, cnt) \ TIME(C, GET_RATOMICBUF_STALL, stalled time) \ VAL(C, ALLOC_SREQ, sreqs) \ - VAL(C, POST_SR, segments) \ - CNT(C, POST_INLINE_SR, cnt) \ + VAL(C, POST_SEND_IMM, segments) \ + VAL(C, POST_WRITE, segments) \ + VAL(C, POST_READ, segments) \ + CNT(C, POST_FADD, cnt) \ + CNT(C, POST_FCAS, cnt) \ TIME(C, POST_SR_STALL_CQ, stalled time) \ TIME(C, POST_SR_STALL_SQ, stalled time) \ TIME(C, POST_SR_STALL_SQ2, stalled time) \ diff --git a/ibv-conduit/gasnet_core_internal.h b/ibv-conduit/gasnet_core_internal.h index 9d8d90fa8..7348de8af 100644 --- a/ibv-conduit/gasnet_core_internal.h +++ b/ibv-conduit/gasnet_core_internal.h @@ -658,7 +658,12 @@ struct gasnetc_cep_t_ { gasnetc_lifo_head_t *rbuf_freelist; /* Source of rcv buffers for AMs. Copy of &hca->rbuf_freelist */ struct ibv_qp *qp_handle; +#if GASNETC_HAVE_IBV_WR_API + struct ibv_qp_ex *qp_ex_handle; + #define _GASNETC_CEP_PTR_0 6*sizeof(void*) +#else #define _GASNETC_CEP_PTR_0 5*sizeof(void*) +#endif #if GASNETC_IBV_SRQ struct ibv_srq *srq; // Copy of hca->repl_srq OR hca->rqst_srq #define _GASNETC_CEP_PTR_1 1*sizeof(void*) @@ -1075,9 +1080,32 @@ extern gasnetc_epid_t gasnetc_epid_select_qpi(gasnetc_cep_t *ceps, gasnetc_epid_ #endif extern int gasnetc_snd_cq_reserve(gasnetc_cep_t * const cep); -extern void gasnetc_snd_post_common( - gasnetc_sreq_t *sreq, struct ibv_send_wr *sr_desc, - int reserved, int is_inline GASNETI_THREAD_FARG); +extern void gasnetc_post_send_imm( + gasnetc_sreq_t *sreq, + struct ibv_send_wr *sr_desc, + uint32_t imm_data, + int reserved, int is_inline + GASNETI_THREAD_FARG); +// Currently write and read are static and/or inline within gasnet_core_sndrcv.c +//extern void gasnetc_post_write( +// gasnetc_sreq_t *sreq, +// struct ibv_send_wr *sr_desc, +// int is_inline +// GASNETI_THREAD_FARG); +//extern void gasnetc_post_read( +// gasnetc_sreq_t *sreq, +// struct ibv_send_wr *sr_desc +// GASNETI_THREAD_FARG); +extern void gasnetc_post_fetch_add( + gasnetc_sreq_t *sreq, + struct ibv_send_wr *sr_desc, + uint64_t op1 + GASNETI_THREAD_FARG); +extern void gasnetc_post_cmp_swp( + gasnetc_sreq_t *sreq, + struct ibv_send_wr *sr_desc, + uint64_t op1, uint64_t op2 + GASNETI_THREAD_FARG); extern void gasnetc_poll_rcv_hca(gasnetc_EP_t ep, gasnetc_hca_t *hca, int limit GASNETI_THREAD_FARG); extern void gasnetc_poll_rcv_all(gasnetc_EP_t ep, int limit GASNETI_THREAD_FARG); diff --git a/ibv-conduit/gasnet_core_sndrcv.c b/ibv-conduit/gasnet_core_sndrcv.c index 51a782199..abf543836 100644 --- a/ibv-conduit/gasnet_core_sndrcv.c +++ b/ibv-conduit/gasnet_core_sndrcv.c @@ -105,7 +105,11 @@ typedef struct { * ------------------------------------------------------------------------------------ */ #if GASNETC_HAVE_FENCED_PUTS -static int gasnetc_op_needs_fence_mask; + static int gasnetc_op_needs_fence_mask; + static enum ibv_send_flags gasnetc_signal_flag; +#else + #define gasnetc_op_needs_fence_mask 0 + #define gasnetc_signal_flag (enum ibv_send_flags)0 #endif #if !GASNETC_PIN_SEGMENT @@ -1612,6 +1616,12 @@ void gasnetc_snd_validate(gasnetc_sreq_t *sreq, struct ibv_send_wr *sr_desc, int gasneti_assert(count > 0); gasneti_assert(type); + gasneti_assert(sreq->opcode != GASNETC_OP_FREE); + gasneti_assert(sreq->opcode != GASNETC_OP_INVALID); + + /* A valid callback will never take NULL as its data */ + gasneti_assert(sreq->comp.cb == NULL || sreq->comp.data != NULL); + GASNETI_TRACE_PRINTF(D,("%s sreq=%p peer=%d qp=%d hca=%d\n", type, (void *)sreq, gasnetc_epid2node(sreq->cep->epid), gasnetc_epid2qpi(sreq->cep->epid) - 1, @@ -1739,7 +1749,7 @@ static void gasnetc_snd_post_fail(int rc, int is_inline) { } GASNETI_NORETURNP(gasnetc_snd_post_fail) -// Used in the IMMEDIATE case to reserve a CQ slot separate from gasnetc_snd_post*() +// Used in the IMMEDIATE case to reserve a CQ slot separate from gasnetc_*post*() // Returns non-zero on success, zero on failure int gasnetc_snd_cq_reserve(gasnetc_cep_t * const cep) { gasnetc_sema_t *sema = cep->snd_cq_sema_p; @@ -1754,6 +1764,253 @@ int gasnetc_snd_cq_reserve(gasnetc_cep_t * const cep) { #endif } +// Loop until space is available for 1 new entry on the CQ. +// If we hold the last one then threads sending to ANY node will stall. +// So this must be the last resource to acquire. +// Use of a macro avoids THREAD_FARG goop. +#define gasnetc_snd_cq_allocate(cep) do { \ + gasnetc_sema_t *sema = (cep)->snd_cq_sema_p; \ + GASNETI_SPIN_UNTIL_TRACE(gasnetc_sema_trydown(sema), C, POST_SR_STALL_CQ, gasnetc_poll_snd()); \ +} while (0) + +#if GASNETC_HAVE_IBV_WR_API // Use ibv_wr_*() APIs + +#if !GASNETC_IBV_XRC_OFED + #define GASNETC_WR_XRC(qpx, cep) ((void)0) +#else + #define GASNETC_WR_XRC(qpx, cep) do { \ + if (gasnetc_use_xrc) ibv_wr_set_xrc_srqn(qpx, cep->xrc_remote_srq_num); \ + } while (0) +#endif + +#define GASNETC_WR_BEFORE(qpx, sreq, flags) do { \ + (qpx)->wr_id = (uintptr_t)(sreq); \ + (qpx)->wr_flags = (flags); \ +} while (0) + +#define GASNETC_WR_AFTER(qpx, cep) do { \ + GASNETC_WR_XRC(qpx, cep); \ +} while (0) + +void gasnetc_post_send_imm( + gasnetc_sreq_t *am_sreq, + struct ibv_send_wr *sr_desc, + uint32_t imm_data, + int reserved, + int is_inline + GASNETI_THREAD_FARG) +{ + #if GASNET_TRACE || GASNET_DEBUG + // Ensure fields needed for validation and postmortem are set correctly + sr_desc->opcode = IBV_WR_SEND_WITH_IMM; + sr_desc->imm_data = imm_data; + sr_desc->send_flags = gasnetc_signal_flag; + #endif + GASNETC_TRACE_EVENT_VAL(POST_SEND_IMM, is_inline ? 0 : sr_desc->num_sge); + gasnetc_snd_validate(am_sreq, sr_desc, 1, "POST_SEND_IMM"); + + gasnetc_cep_t *cep = am_sreq->cep; + struct ibv_qp_ex *qpx = cep->qp_ex_handle; + if (!reserved) gasnetc_snd_cq_allocate(cep); + ibv_wr_start(qpx); + GASNETC_WR_BEFORE(qpx, am_sreq, gasnetc_signal_flag); + ibv_wr_send_imm(qpx, imm_data); + if (is_inline) { + ibv_wr_set_inline_data(qpx, (void*)sr_desc->sg_list[0].addr, sr_desc->sg_list[0].length); + } else { + ibv_wr_set_sge_list(qpx, sr_desc->num_sge, sr_desc->sg_list); + } + GASNETC_WR_AFTER(qpx, cep); + int rc = ibv_wr_complete(qpx); + if_pf (rc) gasnetc_snd_post_fail(rc, is_inline); +} + +void gasnetc_post_write( + gasnetc_sreq_t *put_sreq, + struct ibv_send_wr *sr_desc, + int is_inline + GASNETI_THREAD_FARG) +{ + #if GASNET_TRACE || GASNET_DEBUG + // Ensure fields needed for validation and postmortem are set correctly + gasneti_assert_uint(sr_desc->opcode ,==, IBV_WR_RDMA_WRITE); + sr_desc->send_flags = gasnetc_signal_flag; + if (is_inline) sr_desc->send_flags |= IBV_SEND_INLINE; + #endif + GASNETC_TRACE_EVENT_VAL(POST_WRITE, is_inline ? 0 : sr_desc->num_sge); + gasnetc_snd_validate(put_sreq, sr_desc, 1, "POST_WRITE"); + + gasnetc_cep_t *cep = put_sreq->cep; + gasnetc_hca_t *hca = cep->hca; + struct ibv_qp_ex *qpx = cep->qp_ex_handle; + enum ibv_send_flags write_flags = gasnetc_signal_flag; + + #if GASNETC_HAVE_FENCED_PUTS + gasnetc_sreq_t *amo_sreq = NULL; + int split = 0; + + int fence = put_sreq->opcode & gasnetc_op_needs_fence_mask; + if (fence) { + amo_sreq = gasnetc_get_sreq(GASNETC_OP_FENCE GASNETI_THREAD_PASS); + amo_sreq->cep = cep; + split = !gasnetc_sema_trydown(GASNETC_CEP_SQ_SEMA(cep)) && + (gasnetc_snd_reap_hca(hca,1), !gasnetc_sema_trydown(GASNETC_CEP_SQ_SEMA(cep))); + if (split) { + // Since we failed to get a second SQ slot we split the two post operations, + // moving the remote completion callback from the Put to the Atomic + GASNETC_STAT_EVENT(POST_SR_SPLIT); + gasneti_assert(write_flags == IBV_SEND_SIGNALED); + amo_sreq->opcode = GASNETC_OP_ATOMIC; + amo_sreq->comp = put_sreq->comp; + put_sreq->comp.cb = NULL; + } else { + // Will post both operations together, with only the AMO signalling + gasneti_assert(amo_sreq->opcode == GASNETC_OP_FENCE); + amo_sreq->fence_sreq = put_sreq; + write_flags = 0; + } + } + #endif + + gasnetc_snd_cq_allocate(cep); + ibv_wr_start(qpx); + // Inject the PUT + GASNETC_WR_BEFORE(qpx, put_sreq, write_flags); + ibv_wr_rdma_write(qpx, sr_desc->wr.rdma.rkey, sr_desc->wr.rdma.remote_addr); + if (is_inline) { + ibv_wr_set_inline_data(qpx, (void*)sr_desc->sg_list[0].addr, + sr_desc->sg_list[0].length); + } else { + ibv_wr_set_sge_list(qpx, sr_desc->num_sge, sr_desc->sg_list); + } + GASNETC_WR_AFTER(qpx, cep); + + #if GASNETC_HAVE_FENCED_PUTS + if (fence) { + if (split) { + // Complete the PUT + int rc = ibv_wr_complete(qpx); + if_pf (rc) gasnetc_snd_post_fail(rc, is_inline); + + // Start the AMO + GASNETI_SPIN_UNTIL_TRACE(gasnetc_sema_trydown(GASNETC_CEP_SQ_SEMA(cep)), + C, POST_SR_STALL_SQ2, gasnetc_snd_reap_hca(hca,1)); + gasnetc_snd_cq_allocate(cep); + ibv_wr_start(qpx); + } + + // Inject the AMO + GASNETC_WR_BEFORE(qpx, amo_sreq, IBV_SEND_SIGNALED); + ibv_wr_atomic_fetch_add(qpx, hca->aux_rkeys[gasnetc_epid2node(cep->epid)], + GASNETC_FENCE_REM_ADDR(cep), 0); + ibv_wr_set_sge(qpx, hca->aux_reg.handle->lkey, + GASNETC_FENCE_LOC_ADDR(cep), + sizeof(uint64_t)); + GASNETC_WR_AFTER(qpx, cep); + } + + int rc = ibv_wr_complete(qpx); + if_pf (rc) gasnetc_snd_post_fail(rc, split ? 0 : is_inline); + #else + int rc = ibv_wr_complete(qpx); + if_pf (rc) gasnetc_snd_post_fail(rc, is_inline); + #endif +} + +void gasnetc_post_read( + gasnetc_sreq_t *get_sreq, + struct ibv_send_wr *sr_desc + GASNETI_THREAD_FARG) +{ + #if GASNET_TRACE || GASNET_DEBUG + // Ensure fields needed for validation and postmortem are set correctly + gasneti_assert_uint(sr_desc->opcode ,==, IBV_WR_RDMA_READ); + sr_desc->send_flags = gasnetc_signal_flag; + #endif + GASNETC_TRACE_EVENT_VAL(POST_READ, sr_desc->num_sge); + gasnetc_snd_validate(get_sreq, sr_desc, 1, "POST_READ"); + + gasnetc_cep_t *cep = get_sreq->cep; + struct ibv_qp_ex *qpx = cep->qp_ex_handle; + gasnetc_snd_cq_allocate(cep); + ibv_wr_start(qpx); + GASNETC_WR_BEFORE(qpx, get_sreq, gasnetc_signal_flag); + ibv_wr_rdma_read(qpx, sr_desc->wr.rdma.rkey, sr_desc->wr.rdma.remote_addr); + ibv_wr_set_sge_list(qpx, sr_desc->num_sge, sr_desc->sg_list); + GASNETC_WR_AFTER(qpx, cep); + int rc = ibv_wr_complete(qpx); + if_pf (rc) gasnetc_snd_post_fail(rc, 0); +} + +void gasnetc_post_fetch_add( + gasnetc_sreq_t *fadd_sreq, + struct ibv_send_wr *sr_desc, + uint64_t op1 + GASNETI_THREAD_FARG) +{ + #if GASNET_TRACE || GASNET_DEBUG + // Ensure fields needed for validation and postmortem are set correctly + sr_desc->opcode = IBV_WR_ATOMIC_FETCH_AND_ADD; + sr_desc->num_sge = 1; + sr_desc->sg_list[0].length = sizeof(uint64_t); + sr_desc->wr.atomic.compare_add = op1; + sr_desc->send_flags = gasnetc_signal_flag; + #endif + GASNETC_TRACE_EVENT(POST_FADD); + gasnetc_snd_validate(fadd_sreq, sr_desc, 1, "POST_FADD"); + + gasnetc_cep_t *cep = fadd_sreq->cep; + struct ibv_qp_ex *qpx = cep->qp_ex_handle; + gasnetc_snd_cq_allocate(cep); + ibv_wr_start(qpx); + GASNETC_WR_BEFORE(qpx, fadd_sreq, gasnetc_signal_flag); + ibv_wr_atomic_fetch_add(qpx, sr_desc->wr.atomic.rkey, + sr_desc->wr.atomic.remote_addr, + op1); + ibv_wr_set_sge(qpx, sr_desc->sg_list[0].lkey, + sr_desc->sg_list[0].addr, + sizeof(uint64_t)); + GASNETC_WR_AFTER(qpx, cep); + int rc = ibv_wr_complete(qpx); + if_pf (rc) gasnetc_snd_post_fail(rc, 0); +} + +void gasnetc_post_cmp_swp( + gasnetc_sreq_t *cswap_sreq, + struct ibv_send_wr *sr_desc, + uint64_t op1, uint64_t op2 + GASNETI_THREAD_FARG) +{ + #if GASNET_TRACE || GASNET_DEBUG + // Ensure fields needed for validation and postmortem are set correctly + sr_desc->opcode = IBV_WR_ATOMIC_CMP_AND_SWP; + sr_desc->num_sge = 1; + sr_desc->sg_list[0].length = sizeof(uint64_t); + sr_desc->wr.atomic.compare_add = op1; + sr_desc->wr.atomic.swap = op2; + sr_desc->send_flags = gasnetc_signal_flag; + #endif + GASNETC_TRACE_EVENT(POST_FCAS); + gasnetc_snd_validate(cswap_sreq, sr_desc, 1, "POST_FCAS"); + + gasnetc_cep_t *cep = cswap_sreq->cep; + struct ibv_qp_ex *qpx = cep->qp_ex_handle; + gasnetc_snd_cq_allocate(cep); + ibv_wr_start(qpx); + GASNETC_WR_BEFORE(qpx, cswap_sreq, gasnetc_signal_flag); + ibv_wr_atomic_cmp_swp(qpx, sr_desc->wr.atomic.rkey, + sr_desc->wr.atomic.remote_addr, + op1, op2); + ibv_wr_set_sge(qpx, sr_desc->sg_list[0].lkey, + sr_desc->sg_list[0].addr, + sizeof(uint64_t)); + GASNETC_WR_AFTER(qpx, cep); + int rc = ibv_wr_complete(qpx); + if_pf (rc) gasnetc_snd_post_fail(rc, 0); +} + +#else // !GASNETC_HAVE_IBV_WR_API So use use legacy ibv_send_post() + GASNETI_INLINE(gasnetc_snd_post_inner) void gasnetc_snd_post_inner( gasnetc_cep_t * const cep, @@ -1762,38 +2019,27 @@ void gasnetc_snd_post_inner( int is_inline GASNETI_THREAD_FARG) { - if (! reserved) { - // Loop until space is available for 1 new entry on the CQ. - // If we hold the last one then threads sending to ANY node will stall. - // So this is the last resource to acquire - GASNETI_SPIN_UNTIL_TRACE(gasnetc_sema_trydown(cep->snd_cq_sema_p), - C, POST_SR_STALL_CQ, gasnetc_poll_snd()); - } - - // Post the operation + if (! reserved) gasnetc_snd_cq_allocate(cep); struct ibv_send_wr *bad_wr; int rc = ibv_post_send(cep->qp_handle, sr_desc, &bad_wr); if_pf (rc) gasnetc_snd_post_fail(rc, is_inline); } -void gasnetc_snd_post_common(gasnetc_sreq_t *sreq, struct ibv_send_wr *sr_desc, int reserved, int is_inline GASNETI_THREAD_FARG) { +// Wrapper around ibv_send_post() +// Handles all opcodes with the necessary metadata in sr_desc +static void gasnetc_snd_post_common( + gasnetc_sreq_t *sreq, + struct ibv_send_wr *sr_desc, + int reserved, + int is_inline + GASNETI_THREAD_FARG) +{ gasnetc_cep_t * const cep = sreq->cep; - /* Must be bound to a qp by now */ - gasneti_assert(cep != NULL ); - - gasneti_assert(sreq->opcode != GASNETC_OP_FREE); - gasneti_assert(sreq->opcode != GASNETC_OP_INVALID); - - /* A valid callback will never take NULL as its data */ - gasneti_assert(sreq->comp.cb == NULL || sreq->comp.data != NULL); - // setup some remaining fields const enum ibv_send_flags inline_flag = is_inline ? IBV_SEND_INLINE : (enum ibv_send_flags)0; - const enum ibv_send_flags signal_flag = GASNETC_USE_SEND_SIGNALLED ? IBV_SEND_SIGNALED - : (enum ibv_send_flags)0; - sr_desc->send_flags = inline_flag | signal_flag; + sr_desc->send_flags = inline_flag | gasnetc_signal_flag; sr_desc->wr_id = (uintptr_t)sreq; #if GASNETC_IBV_XRC_OFED sr_desc->qp_type.xrc.remote_srqn = cep->xrc_remote_srq_num; /* Even if unused */ @@ -1802,15 +2048,6 @@ void gasnetc_snd_post_common(gasnetc_sreq_t *sreq, struct ibv_send_wr *sr_desc, #endif sr_desc->next = NULL; - /* Trace and debug */ - if (is_inline) { - GASNETC_STAT_EVENT(POST_INLINE_SR); - gasnetc_snd_validate(sreq, sr_desc, 1, "POST_INLINE_SR"); - } else { - GASNETC_STAT_EVENT_VAL(POST_SR, sr_desc->num_sge); - gasnetc_snd_validate(sreq, sr_desc, 1, "POST_SR"); - } - #if GASNETC_HAVE_FENCED_PUTS // When GASNET_USE_FENCED_PUTS is enabled, we must post both the Put and an // Atomic such that the conduit-level remote completion callback for the Put @@ -1845,7 +2082,7 @@ void gasnetc_snd_post_common(gasnetc_sreq_t *sreq, struct ibv_send_wr *sr_desc, sr_desc->send_flags = inline_flag; // Strips IBV_SEND_SIGNALED sr_desc->next = amo_sr_desc; - // Try at most twice (w/ a CQ poll between) to obtain a second SQ slot + // Here we try to obtain a second SQ slot, *without* spinning. // Spinning indefinitely while holding one slot could deadlock if // multiple threads in a PAR build are all doing the same. // Even in a SEQ or PARSYNC build, there is an advantage to posting the @@ -1857,8 +2094,7 @@ void gasnetc_snd_post_common(gasnetc_sreq_t *sreq, struct ibv_send_wr *sr_desc, GASNETC_STAT_EVENT(POST_SR_SPLIT); // Move the remote completion callback from the Put to the Atomic amo_sreq->opcode = GASNETC_OP_ATOMIC; - amo_sreq->comp.cb = sreq->comp.cb; - amo_sreq->comp.data = sreq->comp.data; + amo_sreq->comp = sreq->comp; sreq->comp.cb = NULL; // Post only the Put, releasing a SQ slot for eventual reclamation sr_desc->next = NULL; @@ -1877,8 +2113,77 @@ void gasnetc_snd_post_common(gasnetc_sreq_t *sreq, struct ibv_send_wr *sr_desc, /* Post it */ gasnetc_snd_post_inner(cep, sr_desc, reserved, is_inline GASNETI_THREAD_PASS); } -#define gasnetc_snd_post(x,y) gasnetc_snd_post_common(x,y,0,0 GASNETI_THREAD_PASS) -#define gasnetc_snd_post_inline(x,y) gasnetc_snd_post_common(x,y,0,1 GASNETI_THREAD_PASS) + +void gasnetc_post_send_imm( + gasnetc_sreq_t *am_sreq, + struct ibv_send_wr *sr_desc, + uint32_t imm_data, + int reserved, + int is_inline + GASNETI_THREAD_FARG) +{ + GASNETC_TRACE_EVENT_VAL(POST_SEND_IMM, is_inline ? 0 : sr_desc->num_sge); + sr_desc->opcode = IBV_WR_SEND_WITH_IMM; + sr_desc->imm_data = imm_data; + gasnetc_snd_validate(am_sreq, sr_desc, 1, "POST_SEND_IMM"); + gasnetc_snd_post_common(am_sreq, sr_desc, reserved, is_inline GASNETI_THREAD_PASS); +} + +GASNETI_INLINE(gasnetc_post_write) +void gasnetc_post_write( + gasnetc_sreq_t *put_sreq, + struct ibv_send_wr *sr_desc, + int is_inline + GASNETI_THREAD_FARG) +{ + GASNETC_TRACE_EVENT_VAL(POST_WRITE, is_inline ? 0 : sr_desc->num_sge); + gasnetc_snd_validate(put_sreq, sr_desc, 1, "POST_WRITE"); + gasnetc_snd_post_common(put_sreq, sr_desc, 0, is_inline GASNETI_THREAD_PASS); +} + +GASNETI_INLINE(gasnetc_post_read) +void gasnetc_post_read( + gasnetc_sreq_t *get_sreq, + struct ibv_send_wr *sr_desc + GASNETI_THREAD_FARG) +{ + GASNETC_TRACE_EVENT_VAL(POST_READ, sr_desc->num_sge); + gasnetc_snd_validate(get_sreq, sr_desc, 1, "POST_READ"); + gasnetc_snd_post_common(get_sreq, sr_desc, 0, 0 GASNETI_THREAD_PASS); +} + +void gasnetc_post_fetch_add( + gasnetc_sreq_t *fadd_sreq, + struct ibv_send_wr *sr_desc, + uint64_t op1 + GASNETI_THREAD_FARG) +{ + GASNETC_TRACE_EVENT(POST_FADD); + sr_desc->opcode = IBV_WR_ATOMIC_FETCH_AND_ADD; + sr_desc->num_sge = 1; + sr_desc->sg_list[0].length = sizeof(uint64_t); + sr_desc->wr.atomic.compare_add = op1; + gasnetc_snd_validate(fadd_sreq, sr_desc, 1, "POST_FADD"); + gasnetc_snd_post_common(fadd_sreq, sr_desc, 0, 0 GASNETI_THREAD_PASS); +} + +void gasnetc_post_cmp_swp( + gasnetc_sreq_t *cswap_sreq, + struct ibv_send_wr *sr_desc, + uint64_t op1, uint64_t op2 + GASNETI_THREAD_FARG) +{ + GASNETC_TRACE_EVENT(POST_FCAS); + sr_desc->opcode = IBV_WR_ATOMIC_CMP_AND_SWP; + sr_desc->num_sge = 1; + sr_desc->sg_list[0].length = sizeof(uint64_t); + sr_desc->wr.atomic.compare_add = op1; + sr_desc->wr.atomic.swap = op2; + gasnetc_snd_validate(cswap_sreq, sr_desc, 1, "POST_FCAS"); + gasnetc_snd_post_common(cswap_sreq, sr_desc, 0, 0 GASNETI_THREAD_PASS); +} + +#endif // !GASNETC_HAVE_IBV_WR_API use legacy ibv_send_post() #if GASNETC_USE_RCV_THREAD static void gasnetc_rcv_thread(struct ibv_wc *comp_p, void *arg) @@ -1955,7 +2260,7 @@ static void gasnetc_snd_thread(struct ibv_wc *comp_p, void *arg) * ############################################################### */ -/* Assemble and post a bounce-buffer PUT or GET */ +// Assemble and all-but-post a bounce-buffer PUT or GET GASNETI_INLINE(gasnetc_bounce_common) void gasnetc_bounce_common( gasnetc_EP_t ep, gasnetc_epid_t epid, @@ -1974,9 +2279,6 @@ void gasnetc_bounce_common( gasnetc_cep_t *cep = gasnetc_bind_cep(ep, epid, sreq); sr_desc->wr.rdma.rkey = gasnetc_seg_rkey(cep, rem_epidx); sr_desc->sg_list[0].lkey = GASNETC_SND_LKEY(cep); - - gasnetc_snd_post(sreq, sr_desc); - sr_desc->wr.rdma.remote_addr += len; } // Assemble and all-but-post a zero-copy PUT or GET using either the seg_lkey, @@ -2117,7 +2419,8 @@ void gasnetc_do_put_inline( cep = gasnetc_bind_cep(ep, epid, sreq); sr_desc->wr.rdma.rkey = gasnetc_seg_rkey(cep, rem_epidx); - gasnetc_snd_post_inline(sreq, sr_desc); + gasnetc_post_write(sreq, sr_desc, 1 GASNETI_THREAD_PASS); + sr_desc->wr.rdma.remote_addr += nbytes; sr_desc->sg_list[0].addr += nbytes; } @@ -2154,7 +2457,9 @@ void gasnetc_do_put_bounce( } gasnetc_bounce_common(ep, epid, rem_epidx, sr_desc, count, sreq, IBV_WR_RDMA_WRITE GASNETI_THREAD_PASS); + gasnetc_post_write(sreq, sr_desc, 0 GASNETI_THREAD_PASS); + sr_desc->wr.rdma.remote_addr += count; src += count; nbytes -= count; } while (nbytes); @@ -2202,7 +2507,7 @@ size_t gasnetc_do_put_zerocp( sreq->comp.cb = cb; } - gasnetc_snd_post(sreq, sr_desc); + gasnetc_post_write(sreq, sr_desc, 0 GASNETI_THREAD_PASS); sr_desc->wr.rdma.remote_addr += count; sr_desc->sg_list[0].addr += count; } while (nbytes); @@ -2241,7 +2546,9 @@ void gasnetc_do_get_bounce( sreq->comp.data = remote_cnt; gasnetc_bounce_common(ep, epid, rem_epidx, sr_desc, count, sreq, IBV_WR_RDMA_READ GASNETI_THREAD_PASS); + gasnetc_post_read(sreq, sr_desc GASNETI_THREAD_PASS); + sr_desc->wr.rdma.remote_addr += count; dst += count; } while (nbytes); sr_desc->sg_list[0].addr = dst; @@ -2284,7 +2591,7 @@ void gasnetc_do_get_zerocp( sreq->comp.cb = remote_cb; sreq->comp.data = remote_cnt; - gasnetc_snd_post(sreq, sr_desc); + gasnetc_post_read(sreq, sr_desc GASNETI_THREAD_PASS); sr_desc->wr.rdma.remote_addr += count; sr_desc->sg_list[0].addr += count; } while (nbytes); @@ -2321,7 +2628,7 @@ void gasnetc_fh_put_inline(gasnetc_sreq_t *sreq GASNETI_THREAD_FARG) { cep = gasnetc_bind_cep(sreq->fh_ep, sreq->epid, sreq); sr_desc->wr.rdma.rkey = GASNETC_FH_RKEY(cep, fh_rem); - gasnetc_snd_post_inline(sreq, sr_desc); + gasnetc_post_write(sreq, sr_desc, 1 GASNETI_THREAD_PASS); if_pf (lc_cb) lc_cb(lc); /* locally complete */ } @@ -2363,7 +2670,7 @@ void gasnetc_fh_put_bounce(gasnetc_sreq_t *orig_sreq GASNETI_THREAD_FARG) { /* Send all ops on same qp to get point-to-point ordering for proper fh_release() */ epid = sreq->epid; - gasnetc_snd_post(sreq, sr_desc); + gasnetc_post_write(sreq, sr_desc, 0 GASNETI_THREAD_PASS); src += GASNETC_BUFSZ; dst += GASNETC_BUFSZ; @@ -2387,7 +2694,7 @@ void gasnetc_fh_put_bounce(gasnetc_sreq_t *orig_sreq GASNETI_THREAD_FARG) { sr_desc->wr.rdma.rkey = GASNETC_FH_RKEY(cep, fh_rem); sr_desc->sg_list[0].lkey = GASNETC_SND_LKEY(cep); - gasnetc_snd_post(orig_sreq, sr_desc); + gasnetc_post_write(orig_sreq, sr_desc, 0 GASNETI_THREAD_PASS); } GASNETI_INLINE(gasnetc_fh_post) @@ -2434,7 +2741,11 @@ void gasnetc_fh_post(gasnetc_sreq_t *sreq, enum ibv_wr_opcode op GASNETI_THREAD_ } gasneti_assert(remain == 0); - gasnetc_snd_post(sreq, sr_desc); + if (op == IBV_WR_RDMA_WRITE) { + gasnetc_post_write(sreq, sr_desc, 0 GASNETI_THREAD_PASS); + } else { + gasnetc_post_read(sreq, sr_desc GASNETI_THREAD_PASS); + } } static void gasnetc_fh_do_put(gasnetc_sreq_t *sreq GASNETI_THREAD_FARG) { @@ -3207,8 +3518,9 @@ extern int gasnetc_sndrcv_init(gasnetc_EP_t ep) { #endif #if GASNETC_HAVE_FENCED_PUTS - // Speed critical path checks + // Speed critical paths gasnetc_op_needs_fence_mask = gasnetc_use_fenced_puts ? GASNETC_OP_NEEDS_FENCE : 0; + gasnetc_signal_flag = gasnetc_use_fenced_puts ? IBV_SEND_SIGNALED : (enum ibv_send_flags)0; #endif /* Init thread-local data */ diff --git a/ibv-conduit/gasnet_ratomic.c b/ibv-conduit/gasnet_ratomic.c index ab3e94b89..8aa373020 100644 --- a/ibv-conduit/gasnet_ratomic.c +++ b/ibv-conduit/gasnet_ratomic.c @@ -183,8 +183,6 @@ int gasnete_ratomic_inner( sr_desc->wr.atomic.remote_addr = (uintptr_t)tgt_addr; sr_desc->wr.atomic.rkey = rem_auxseg ? cep->hca->aux_rkeys[jobrank] : GASNETC_SEG_RKEY(cep); - sr_desc->num_sge = 1; - sr_desc->sg_list[0].length = sizeof(uint64_t); if (!fetching) { gasneti_assert(! result_p); @@ -200,17 +198,15 @@ int gasnete_ratomic_inner( } sr_desc->sg_list[0].lkey = cep->hca->aux_reg.handle->lkey; - sr_desc->opcode = opcode; - sr_desc->wr.atomic.compare_add = operand1; - if (opcode == IBV_WR_ATOMIC_CMP_AND_SWP) { - sr_desc->wr.atomic.swap = operand2; - } - sreq->comp.cb = completion_cb; sreq->comp.data = initiated_cnt; - (*initiated_cnt) += 1; - gasnetc_snd_post_common(sreq, sr_desc, 0, 0 GASNETI_THREAD_PASS); + + if (opcode == IBV_WR_ATOMIC_CMP_AND_SWP) { + gasnetc_post_cmp_swp(sreq, sr_desc, operand1, operand2 GASNETI_THREAD_PASS); + } else { + gasnetc_post_fetch_add(sreq, sr_desc, operand1 GASNETI_THREAD_PASS); + } return 0; }