diff --git a/include/ofi_net.h b/include/ofi_net.h index 53b3ad2e681..df36b9ee1bd 100644 --- a/include/ofi_net.h +++ b/include/ofi_net.h @@ -401,6 +401,21 @@ static inline int ofi_translate_addr_format(int family) } } +static inline size_t ofi_sizeof_addr_format(int format) +{ + switch (format) { + case FI_SOCKADDR_IN: + return sizeof(struct sockaddr_in); + case FI_SOCKADDR_IN6: + return sizeof(struct sockaddr_in6); + case FI_SOCKADDR_IB: + return sizeof(struct ofi_sockaddr_ib); + default: + FI_WARN(&core_prov, FI_LOG_CORE, "Unsupported address format\n"); + return 0; + } +} + uint16_t ofi_get_sa_family(const struct fi_info *info); static inline bool ofi_sin_is_any_addr(const struct sockaddr *sa) diff --git a/include/ofi_util.h b/include/ofi_util.h index 598d51dae8c..6f1add82052 100644 --- a/include/ofi_util.h +++ b/include/ofi_util.h @@ -766,6 +766,8 @@ int ofi_ip_av_create_flags(struct fid_domain *domain_fid, struct fi_av_attr *att void *ofi_av_get_addr(struct util_av *av, fi_addr_t fi_addr); #define ofi_ip_av_get_addr ofi_av_get_addr +void *ofi_av_addr_context(struct util_av *av, fi_addr_t fi_addr); + fi_addr_t ofi_ip_av_get_fi_addr(struct util_av *av, const void *addr); int ofi_get_addr(uint32_t *addr_format, uint64_t flags, diff --git a/prov/rxm/src/rxm.h b/prov/rxm/src/rxm.h index 7bf589be8ac..12fdc632c9a 100644 --- a/prov/rxm/src/rxm.h +++ b/prov/rxm/src/rxm.h @@ -64,6 +64,37 @@ #define RXM_OP_VERSION 3 #define RXM_CTRL_VERSION 4 +enum { + RXM_REJECT_UNSPEC, + RXM_REJECT_ECONNREFUSED, + RXM_REJECT_EALREADY, +}; + +union rxm_cm_data { + struct _connect { + uint8_t version; + uint8_t endianness; + uint8_t ctrl_version; + uint8_t op_version; + uint16_t port; + uint8_t padding[2]; + uint32_t eager_limit; + uint32_t rx_size; /* used? */ + uint64_t client_conn_id; + } connect; + + struct _accept { + uint64_t server_conn_id; + uint32_t rx_size; /* used? */ + } accept; + + struct _reject { + uint8_t version; + uint8_t reason; + } reject; +}; + + extern size_t rxm_buffer_size; extern size_t rxm_packet_size; @@ -139,141 +170,58 @@ extern int rxm_use_write_rndv; extern enum fi_wait_obj def_wait_obj, def_tcp_wait_obj; struct rxm_ep; +struct rxm_av; -/* - * Connection Map - */ - -#define RXM_CMAP_IDX_BITS OFI_IDX_INDEX_BITS - -enum rxm_cmap_signal { - RXM_CMAP_UNSPEC, - RXM_CMAP_FREE, - RXM_CMAP_EXIT, -}; - -#define RXM_CM_STATES(FUNC) \ - FUNC(RXM_CMAP_IDLE), \ - FUNC(RXM_CMAP_CONNREQ_SENT), \ - FUNC(RXM_CMAP_CONNREQ_RECV), \ - FUNC(RXM_CMAP_CONNECTED), \ - FUNC(RXM_CMAP_SHUTDOWN), \ - -enum rxm_cmap_state { - RXM_CM_STATES(OFI_ENUM_VAL) +enum rxm_cm_state { + RXM_CM_IDLE, + RXM_CM_CONNECTING, + RXM_CM_ACCEPTING, + RXM_CM_CONNECTED, }; -extern char *rxm_cm_state_str[]; -#define RXM_CM_UPDATE_STATE(handle, new_state) \ - do { \ - FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "[CM] handle: " \ - "%p %s -> %s\n", handle, \ - rxm_cm_state_str[handle->state], \ - rxm_cm_state_str[new_state]); \ - handle->state = new_state; \ - } while (0) - -struct rxm_cmap_handle { - struct rxm_cmap *cmap; - enum rxm_cmap_state state; - /* Unique identifier for a connection. Can be exchanged with a peer - * during connection setup and can later be used in a message header - * to identify the source of the message (Used for FI_SOURCE, RNDV - * protocol, etc.) */ - uint64_t key; - uint64_t remote_key; +/* There will be at most 1 peer address per AV entry. There + * may be addresses that have not been inserted into the local + * AV, and have no matching entry. This can occur if we are + * only receiving data from the remote rxm ep. + */ +struct rxm_peer_addr { + struct rxm_av *av; fi_addr_t fi_addr; - struct rxm_cmap_peer *peer; -}; - -struct rxm_cmap_peer { - struct rxm_cmap_handle *handle; - struct dlist_entry entry; - uint8_t addr[]; -}; - -struct rxm_cmap_attr { - void *name; + struct ofi_rbnode *node; + int index; + int refcnt; + union ofi_sock_ip addr; }; -struct rxm_cmap { - struct rxm_ep *ep; - struct util_av *av; - - /* cmap handles that correspond to addresses in AV */ - struct rxm_cmap_handle **handles_av; - size_t num_allocated; +struct rxm_peer_addr *rxm_get_peer(struct rxm_av *av, const void *addr); +void rxm_put_peer(struct rxm_peer_addr *peer); - /* Store all cmap handles (inclusive of handles_av) in an indexer. - * This allows reverse lookup of the handle using the index. */ - struct indexer handles_idx; - - struct ofi_key_idx key_idx; - - struct dlist_entry peer_list; - struct rxm_cmap_attr attr; - pthread_t cm_thread; - ofi_fastlock_acquire_t acquire; - ofi_fastlock_release_t release; - fastlock_t lock; -}; - -enum rxm_cmap_reject_reason { - RXM_CMAP_REJECT_UNSPEC, - RXM_CMAP_REJECT_GENUINE, - RXM_CMAP_REJECT_SIMULT_CONN, -}; - -union rxm_cm_data { - struct _connect { - uint8_t version; - uint8_t endianness; - uint8_t ctrl_version; - uint8_t op_version; - uint16_t port; - uint8_t padding[2]; - uint32_t eager_limit; - uint32_t rx_size; - uint64_t client_conn_id; - } connect; +/* Each local rxm ep will have at most 1 connection to a single + * remote rxm ep. A local rxm ep may not be connected to all + * remote rxm ep's. + */ +struct rxm_conn { + enum rxm_cm_state state; + struct rxm_peer_addr *peer; + struct fid_ep *msg_ep; + struct rxm_ep *ep; - struct _accept { - uint64_t server_conn_id; - uint32_t rx_size; - } accept; + /* Prior versions of libfabric did not guarantee that all connections + * from the same peer would have the same conn_id. For compatibility + * we need to store the remote_index per connection, rather than with + * the peer_addr. + */ + int remote_index; - struct _reject { - uint8_t version; - uint8_t reason; - } reject; + struct dlist_entry deferred_entry; + struct dlist_entry deferred_tx_queue; + struct dlist_entry deferred_sar_msgs; + struct dlist_entry deferred_sar_segments; }; -int rxm_cmap_alloc_handle(struct rxm_cmap *cmap, fi_addr_t fi_addr, - enum rxm_cmap_state state, - struct rxm_cmap_handle **handle); -struct rxm_cmap_handle *rxm_cmap_key2handle(struct rxm_cmap *cmap, uint64_t key); -int rxm_cmap_update(struct rxm_cmap *cmap, const void *addr, fi_addr_t fi_addr); - -void rxm_cmap_process_reject(struct rxm_cmap *cmap, - struct rxm_cmap_handle *handle, - enum rxm_cmap_reject_reason cm_reject_reason); -void rxm_cmap_process_shutdown(struct rxm_cmap *cmap, - struct rxm_cmap_handle *handle); -int rxm_cmap_connect(struct rxm_ep *rxm_ep, fi_addr_t fi_addr, - struct rxm_cmap_handle *handle); -void rxm_cmap_free(struct rxm_cmap *cmap); -int rxm_cmap_alloc(struct rxm_ep *rxm_ep, struct rxm_cmap_attr *attr); -int rxm_cmap_remove(struct rxm_cmap *cmap, int index); -int rxm_msg_eq_progress(struct rxm_ep *rxm_ep); - -static inline struct rxm_cmap_handle * -rxm_cmap_acquire_handle(struct rxm_cmap *cmap, fi_addr_t fi_addr) -{ - assert(fi_addr < cmap->num_allocated); - return cmap->handles_av[fi_addr]; -} +void rxm_freeall_conns(struct rxm_ep *ep); struct rxm_fabric { struct util_fabric util_fabric; @@ -292,8 +240,29 @@ struct rxm_domain { fastlock_t amo_bufpool_lock; }; +/* All peer addresses, whether they've been inserted into the AV + * or an endpoint has an active connection to it, are stored in + * the addr_map. Peers are allocated from a buffer pool and + * assigned a local index using the pool. All rxm endpoints + * maintain a connection array which is aligned with the peer_pool. + * + * We technically only need to store the index of each peer in + * the AV itself. The 'util_av' could basically be replaced by + * an ofi_index_map. However, too much of the existing code + * relies on the util_av existing and storing the AV addresses. + * + * A future cleanup would be to remove using the util_av and have the + * rxm_av implementation be independent. + */ + struct rxm_av { + struct util_av util_av; + struct ofi_rbmap addr_map; + struct ofi_bufpool *peer_pool; + struct ofi_bufpool *conn_pool; +}; + int rxm_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr, - struct fid_av **av, void *context); + struct fid_av **fid_av, void *context); struct rxm_mr { struct fid_mr mr_fid; @@ -622,21 +591,6 @@ struct rxm_recv_queue { dlist_func_t *match_unexp; }; -struct rxm_msg_eq_entry { - ssize_t rd; - uint32_t event; - /* Used for connection refusal */ - void *context; - struct fi_eq_err_entry err_entry; - /* must stay at the bottom */ - struct fi_eq_cm_entry cm_entry; -}; - -#define RXM_MSG_EQ_ENTRY_SZ (sizeof(struct rxm_msg_eq_entry) + \ - sizeof(union rxm_cm_data)) -#define RXM_CM_ENTRY_SZ (sizeof(struct fi_eq_cm_entry) + \ - sizeof(union rxm_cm_data)) - ssize_t rxm_get_dyn_rbuf(struct ofi_cq_rbuf_entry *entry, struct iovec *iov, size_t *count); @@ -663,12 +617,17 @@ struct rxm_ep { struct util_ep util_ep; struct fi_info *rxm_info; struct fi_info *msg_info; - struct rxm_cmap *cmap; + + struct index_map conn_idx_map; + union ofi_sock_ip addr; + + pthread_t cm_thread; struct fid_pep *msg_pep; struct fid_eq *msg_eq; + struct fid_ep *srx_ctx; + struct fid_cq *msg_cq; uint64_t msg_cq_last_poll; - struct fid_ep *srx_ctx; size_t comp_per_progress; int cq_eq_fairness; @@ -690,7 +649,7 @@ struct rxm_ep { struct ofi_bufpool *tx_pool; struct rxm_pkt *inject_pkt; - struct dlist_entry deferred_tx_conn_queue; + struct dlist_entry deferred_queue; struct dlist_entry rndv_wait_list; struct rxm_recv_queue recv_queue; @@ -701,17 +660,10 @@ struct rxm_ep { struct rxm_rndv_ops *rndv_ops; }; -struct rxm_conn { - /* This should stay at the top */ - struct rxm_cmap_handle handle; - - struct fid_ep *msg_ep; +int rxm_start_listen(struct rxm_ep *ep); +void rxm_stop_listen(struct rxm_ep *ep); +void rxm_conn_progress(struct rxm_ep *ep); - struct dlist_entry deferred_conn_entry; - struct dlist_entry deferred_tx_queue; - struct dlist_entry sar_rx_msg_list; - struct dlist_entry sar_deferred_rx_msg_list; -}; extern struct fi_provider rxm_prov; extern struct fi_fabric_attr rxm_fabric_attr; @@ -736,7 +688,6 @@ ssize_t rxm_handle_rx_buf(struct rxm_rx_buf *rx_buf); int rxm_endpoint(struct fid_domain *domain, struct fi_info *info, struct fid_ep **ep, void *context); -int rxm_conn_cmap_alloc(struct rxm_ep *rxm_ep); void rxm_cq_write_error(struct util_cq *cq, struct util_cntr *cntr, void *op_context, int err); void rxm_cq_write_error_all(struct rxm_ep *rxm_ep, int err); @@ -789,17 +740,6 @@ rxm_atomic_send_respmsg(struct rxm_ep *rxm_ep, struct rxm_conn *conn, return fi_sendmsg(conn->msg_ep, &msg, FI_COMPLETION); } -static inline int rxm_needs_atomic_progress(const struct fi_info *info) -{ - return (info->caps & FI_ATOMIC) && info->domain_attr && - info->domain_attr->data_progress == FI_PROGRESS_AUTO; -} - -static inline struct rxm_conn *rxm_key2conn(struct rxm_ep *rxm_ep, uint64_t key) -{ - return (struct rxm_conn *)rxm_cmap_key2handle(rxm_ep->cmap, key); -} - void rxm_ep_progress_deferred_queue(struct rxm_ep *rxm_ep, struct rxm_conn *rxm_conn); @@ -808,29 +748,32 @@ rxm_ep_alloc_deferred_tx_entry(struct rxm_ep *rxm_ep, struct rxm_conn *rxm_conn, enum rxm_deferred_tx_entry_type type); static inline void -rxm_ep_enqueue_deferred_tx_queue(struct rxm_deferred_tx_entry *tx_entry) +rxm_queue_deferred_tx(struct rxm_deferred_tx_entry *tx_entry, + enum ofi_list_end list_end) { - if (dlist_empty(&tx_entry->rxm_conn->deferred_tx_queue)) - dlist_insert_tail(&tx_entry->rxm_conn->deferred_conn_entry, - &tx_entry->rxm_ep->deferred_tx_conn_queue); - dlist_insert_tail(&tx_entry->entry, &tx_entry->rxm_conn->deferred_tx_queue); + struct rxm_conn *conn = tx_entry->rxm_conn; + + if (dlist_empty(&conn->deferred_tx_queue)) + dlist_insert_tail(&conn->deferred_entry, + &conn->ep->deferred_queue); + if (list_end == OFI_LIST_HEAD) { + dlist_insert_head(&tx_entry->entry, + &conn->deferred_tx_queue); + } else { + dlist_insert_tail(&tx_entry->entry, + &conn->deferred_tx_queue); + } } static inline void -rxm_ep_enqueue_deferred_tx_queue_priority(struct rxm_deferred_tx_entry *tx_entry) +rxm_dequeue_deferred_tx(struct rxm_deferred_tx_entry *tx_entry) { - if (dlist_empty(&tx_entry->rxm_conn->deferred_tx_queue)) - dlist_insert_head(&tx_entry->rxm_conn->deferred_conn_entry, - &tx_entry->rxm_ep->deferred_tx_conn_queue); - dlist_insert_head(&tx_entry->entry, &tx_entry->rxm_conn->deferred_tx_queue); -} + struct rxm_conn *conn = tx_entry->rxm_conn; -static inline void -rxm_ep_dequeue_deferred_tx_queue(struct rxm_deferred_tx_entry *tx_entry) -{ - dlist_remove_init(&tx_entry->entry); - if (dlist_empty(&tx_entry->rxm_conn->deferred_tx_queue)) - dlist_remove(&tx_entry->rxm_conn->deferred_conn_entry); + assert(!dlist_empty(&conn->deferred_tx_queue)); + dlist_remove(&tx_entry->entry); + if (dlist_empty(&conn->deferred_tx_queue)) + dlist_remove_init(&conn->deferred_entry); } int rxm_conn_process_eq_events(struct rxm_ep *rxm_ep); @@ -891,7 +834,7 @@ rxm_ep_format_tx_buf_pkt(struct rxm_conn *rxm_conn, size_t len, uint8_t op, uint64_t data, uint64_t tag, uint64_t flags, struct rxm_pkt *pkt) { - pkt->ctrl_hdr.conn_id = rxm_conn->handle.remote_key; + pkt->ctrl_hdr.conn_id = rxm_conn->remote_index; pkt->hdr.size = len; pkt->hdr.op = op; pkt->hdr.tag = tag; @@ -934,7 +877,7 @@ rxm_cq_write_recv_comp(struct rxm_rx_buf *rx_buf, void *context, uint64_t flags, rxm_cq_write_src(rx_buf->ep->util_ep.rx_cq, context, flags, len, buf, rx_buf->pkt.hdr.data, rx_buf->pkt.hdr.tag, - rx_buf->conn->handle.fi_addr); + rx_buf->conn->peer->fi_addr); else rxm_cq_write(rx_buf->ep->util_ep.rx_cq, context, flags, len, buf, rx_buf->pkt.hdr.data, diff --git a/prov/rxm/src/rxm_av.c b/prov/rxm/src/rxm_av.c index b278dcb0551..bfb71ed6ece 100644 --- a/prov/rxm/src/rxm_av.c +++ b/prov/rxm/src/rxm_av.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018 Intel Corporation. All rights reserved. + * Copyright (c) 2018-2021 Intel Corporation. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU @@ -34,142 +34,215 @@ #include "rxm.h" -static int rxm_av_remove(struct fid_av *av_fid, fi_addr_t *fi_addr, - size_t count, uint64_t flags) + +static int rxm_addr_compare(struct ofi_rbmap *map, void *key, void *data) { - struct util_av *av = container_of(av_fid, struct util_av, av_fid); - struct rxm_ep *rxm_ep; - int i, ret = 0; - - fastlock_acquire(&av->ep_list_lock); - /* This should be before ofi_ip_av_remove as we need to know - * fi_addr -> addr mapping when moving handle to peer list. */ - dlist_foreach_container(&av->ep_list, struct rxm_ep, - rxm_ep, util_ep.av_entry) { - ofi_ep_lock_acquire(&rxm_ep->util_ep); - for (i = 0; i < count; i++) { - ret = rxm_cmap_remove(rxm_ep->cmap, *fi_addr + i); - if (ret) - FI_WARN(&rxm_prov, FI_LOG_AV, - "cmap remove failed for fi_addr: %" - PRIu64 "\n", *fi_addr + i); - } - ofi_ep_lock_release(&rxm_ep->util_ep); + return memcmp(data, key, sizeof(union ofi_sock_ip)); +} + +static struct rxm_peer_addr * +rxm_alloc_peer(struct rxm_av *av, const void *addr) +{ + struct rxm_peer_addr *peer; + + peer = ofi_ibuf_alloc(av->peer_pool); + if (!peer) + return NULL; + + peer->av = av; + peer->index = (int) ofi_buf_index(peer); + peer->fi_addr = FI_ADDR_NOTAVAIL; + peer->refcnt = 1; + memcpy(&peer->addr, addr, av->util_av.addrlen); + + if (ofi_rbmap_insert(&av->addr_map, peer, &peer->addr, &peer->node)) { + ofi_ibuf_free(peer); + peer = NULL; } - fastlock_release(&av->ep_list_lock); - return ofi_ip_av_remove(av_fid, fi_addr, count, flags); + return peer; +} + +static void rxm_free_peer(struct rxm_peer_addr *peer) +{ + assert(!peer->refcnt); + ofi_rbmap_delete(&peer->av->addr_map, peer->node); + ofi_ibuf_free(peer); +} + +struct rxm_peer_addr * +rxm_get_peer(struct rxm_av *av, const void *addr) +{ + struct rxm_peer_addr *peer; + struct ofi_rbnode *node; + + fastlock_acquire(&av->util_av.lock); + node = ofi_rbmap_find(&av->addr_map, (void *) addr); + if (node) { + peer = node->data; + peer->refcnt++; + } else { + peer = rxm_alloc_peer(av, addr); + } + + fastlock_release(&av->util_av.lock); + return peer; +} + +void rxm_put_peer(struct rxm_peer_addr *peer) +{ + struct rxm_av *av; + + av = peer->av; + fastlock_acquire(&av->util_av.lock); + if (--peer->refcnt == 0) + rxm_free_peer(peer); + fastlock_release(&av->util_av.lock); +} + +static void +rxm_set_av_context(struct rxm_av *av, fi_addr_t fi_addr, + struct rxm_peer_addr *peer) +{ + struct rxm_peer_addr **peer_ctx; + + peer_ctx = ofi_av_addr_context(&av->util_av, fi_addr); + *peer_ctx = peer; +} + +static void +rxm_put_peer_addr(struct rxm_av *av, fi_addr_t fi_addr) +{ + struct rxm_peer_addr **peer; + + fastlock_acquire(&av->util_av.lock); + peer = ofi_av_addr_context(&av->util_av, fi_addr); + if (--(*peer)->refcnt == 0) + rxm_free_peer(*peer); + + rxm_set_av_context(av, fi_addr, NULL); + fastlock_release(&av->util_av.lock); } -/* TODO: Determine if it's cleaner to insert an address into the cmap only - * when we need to send to that address, rather than inserting the address - * into the cmap when adding it to the AV. - */ static int -rxm_av_insert_cmap(struct fid_av *av_fid, const void *addr, size_t count, - fi_addr_t *fi_addr, uint64_t flags) +rxm_av_add_peers(struct rxm_av *av, const void *addr, size_t count, + fi_addr_t *fi_addr) { - struct util_av *av = container_of(av_fid, struct util_av, av_fid); - struct rxm_ep *rxm_ep; - fi_addr_t fi_addr_tmp; - size_t i; - int ret = 0; + struct rxm_peer_addr *peer; const void *cur_addr; + fi_addr_t cur_fi_addr; + size_t i; + + for (i = 0; i < count; i++) { + cur_addr = ((char *) addr + i * av->util_av.addrlen); + peer = rxm_get_peer(av, cur_addr); + if (!peer) + goto err; - fastlock_acquire(&av->ep_list_lock); - dlist_foreach_container(&av->ep_list, struct rxm_ep, - rxm_ep, util_ep.av_entry) { - ofi_ep_lock_acquire(&rxm_ep->util_ep); - for (i = 0; i < count; i++) { - if (!rxm_ep->cmap) - break; - - cur_addr = (const void *) ((char *) addr + i * av->addrlen); - fi_addr_tmp = (fi_addr ? fi_addr[i] : - ofi_av_lookup_fi_addr_unsafe(av, cur_addr)); - if (fi_addr_tmp == FI_ADDR_NOTAVAIL) - continue; - - ret = rxm_cmap_update(rxm_ep->cmap, cur_addr, fi_addr_tmp); - if (OFI_UNLIKELY(ret)) { - FI_WARN(&rxm_prov, FI_LOG_AV, - "cmap update failed for fi_addr: %" - PRIu64 "\n", fi_addr_tmp); - break; - } + peer->fi_addr = fi_addr ? fi_addr[i] : + ofi_av_lookup_fi_addr(&av->util_av, cur_addr); + + /* lookup can fail if prior AV insertion failed */ + if (peer->fi_addr != FI_ADDR_NOTAVAIL) + rxm_set_av_context(av, peer->fi_addr, peer); + } + return 0; + +err: + while (i--) { + if (fi_addr) { + cur_fi_addr = fi_addr[i]; + } else { + cur_addr = ((char *) addr + i * av->util_av.addrlen); + cur_fi_addr = ofi_av_lookup_fi_addr(&av->util_av, + cur_addr); } - ofi_ep_lock_release(&rxm_ep->util_ep); + if (cur_fi_addr != FI_ADDR_NOTAVAIL) + rxm_put_peer_addr(av, cur_fi_addr); } - fastlock_release(&av->ep_list_lock); - return ret; + return -FI_ENOMEM; +} + +static int rxm_av_remove(struct fid_av *av_fid, fi_addr_t *fi_addr, + size_t count, uint64_t flags) +{ + struct rxm_av *av; + size_t i; + + av = container_of(av_fid, struct rxm_av, util_av.av_fid); + for (i = 0; i < count; i++) + rxm_put_peer_addr(av, fi_addr[i]); + + return ofi_ip_av_remove(av_fid, fi_addr, count, flags); } static int rxm_av_insert(struct fid_av *av_fid, const void *addr, size_t count, fi_addr_t *fi_addr, uint64_t flags, void *context) { - struct util_av *av = container_of(av_fid, struct util_av, av_fid); - int ret, retv; + struct rxm_av *av; + int ret; + av = container_of(av_fid, struct rxm_av, util_av.av_fid.fid); ret = ofi_ip_av_insert(av_fid, addr, count, fi_addr, flags, context); if (ret < 0) return ret; - if (!av->eq && !ret) - return ret; + if (!av->util_av.eq) + count = ret; - retv = rxm_av_insert_cmap(av_fid, addr, count, fi_addr, flags); - if (retv) { - ret = rxm_av_remove(av_fid, fi_addr, count, flags); - if (ret) - FI_WARN(&rxm_prov, FI_LOG_AV, "Failed to remove addr " - "from AV during error handling\n"); - return retv; + ret = rxm_av_add_peers(av, addr, count, fi_addr); + if (ret) { + /* If insert was async, ofi_ip_av_insert() will have written + * an event to the EQ with the number of insertions. For + * correctness we need to delay writing the event to the EQ + * until all processing has completed. This should be done + * when separating the rxm av from the util av. For now, + * assume synchronous operation (most common case) and fail + * the insert. This could leave a bogus entry on the EQ. + * But the app should detect that insert failed and is likely + * to abort. + */ + rxm_av_remove(av_fid, fi_addr, count, flags); + return ret; } - return ret; + + return av->util_av.eq ? 0 : count; } static int rxm_av_insertsym(struct fid_av *av_fid, const char *node, size_t nodecnt, const char *service, size_t svccnt, fi_addr_t *fi_addr, uint64_t flags, void *context) { - struct util_av *av = container_of(av_fid, struct util_av, av_fid); + struct rxm_av *av; void *addr; - size_t addrlen, count = nodecnt * svccnt; - int ret, retv; + size_t addrlen, count; + int ret; - ret = ofi_verify_av_insert(av, flags, context); + av = container_of(av_fid, struct rxm_av, util_av.av_fid.fid); + ret = ofi_verify_av_insert(&av->util_av, flags, context); if (ret) return ret; - ret = ofi_ip_av_sym_getaddr(av, node, nodecnt, service, + ret = ofi_ip_av_sym_getaddr(&av->util_av, node, nodecnt, service, svccnt, &addr, &addrlen); if (ret <= 0) return ret; - assert(ret == count); - - ret = ofi_ip_av_insertv(av, addr, addrlen, count, fi_addr, flags, + count = ret; + ret = ofi_ip_av_insertv(&av->util_av, addr, addrlen, count, fi_addr, flags, context); - if (!av->eq && ret < count) { + if (ret > 0 && ret < count) count = ret; - } - /* If the AV is bound to an EQ, we can't determine which entries were - * added successfully to the AV until we process the insertion events - * later when reading the EQ. Add all addresses to the cmap - * optimistically. - */ - retv = rxm_av_insert_cmap(av_fid, addr, count, fi_addr, flags); - if (retv) { - ret = rxm_av_remove(av_fid, fi_addr, count, flags); - if (ret) - FI_WARN(&rxm_prov, FI_LOG_AV, "Failed to remove addr " - "from AV during error handling\n"); - ret = retv; + ret = rxm_av_add_peers(av, addr, count, fi_addr); + if (ret) { + /* See comment in rxm_av_insert. */ + rxm_av_remove(av_fid, fi_addr, count, flags); + return ret; } free(addr); - return ret; + return av->util_av.eq ? 0 : count; } int rxm_av_insertsvc(struct fid_av *av, const char *node, const char *service, @@ -190,6 +263,30 @@ int rxm_av_lookup(struct fid_av *av_fid, fi_addr_t fi_addr, return ofi_ip_av_lookup(av_fid, fi_addr, addr, addrlen); } +static int rxm_av_close(struct fid *av_fid) +{ + struct rxm_av *av; + int ret; + + av = container_of(av_fid, struct rxm_av, util_av.av_fid.fid); + ret = ofi_av_close(&av->util_av); + if (ret) + return ret; + + ofi_rbmap_cleanup(&av->addr_map); + ofi_bufpool_destroy(av->conn_pool); + ofi_bufpool_destroy(av->peer_pool); + free(av); + return 0; +} + +static struct fi_ops rxm_av_fi_ops = { + .size = sizeof(struct fi_ops), + .close = rxm_av_close, + .bind = ofi_av_bind, + .control = fi_no_control, + .ops_open = fi_no_ops_open, +}; static struct fi_ops_av rxm_av_ops = { .size = sizeof(struct fi_ops_av), @@ -203,15 +300,54 @@ static struct fi_ops_av rxm_av_ops = { }; int rxm_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr, - struct fid_av **av, void *context) + struct fid_av **fid_av, void *context) { + struct rxm_domain *domain; + struct util_av_attr util_attr; + struct rxm_av *av; int ret; - ret = ofi_ip_av_create(domain_fid, attr, av, context); + av = calloc(1, sizeof(*av)); + if (!av) + return -FI_ENOMEM; + + ret = ofi_bufpool_create(&av->peer_pool, sizeof(struct rxm_peer_addr), + 0, 0, 0, OFI_BUFPOOL_INDEXED | + OFI_BUFPOOL_NO_TRACK); if (ret) - return ret; + goto free; + + ret = ofi_bufpool_create(&av->conn_pool, sizeof(struct rxm_conn), + 0, 0, 0, 0); + if (ret) + goto destroy1; + + ofi_rbmap_init(&av->addr_map, rxm_addr_compare); + domain = container_of(domain_fid, struct rxm_domain, + util_domain.domain_fid); + + util_attr.context_len = sizeof(struct rxm_peer_addr *); + util_attr.flags = 0; + util_attr.addrlen = ofi_sizeof_addr_format(domain->util_domain. + addr_format); + if (attr->type == FI_AV_UNSPEC) + attr->type = FI_AV_TABLE; - (*av)->ops = &rxm_av_ops; + ret = ofi_av_init(&domain->util_domain, attr, &util_attr, + &av->util_av, context); + if (ret) + goto destroy2; + + av->util_av.av_fid.fid.ops = &rxm_av_fi_ops; + av->util_av.av_fid.ops = &rxm_av_ops; + *fid_av = &av->util_av.av_fid; return 0; -} +destroy2: + ofi_bufpool_destroy(av->conn_pool); +destroy1: + ofi_bufpool_destroy(av->peer_pool); +free: + free(av); + return ret; +} diff --git a/prov/rxm/src/rxm_conn.c b/prov/rxm/src/rxm_conn.c index 14f0bf1cf6f..de4d3cee897 100644 --- a/prov/rxm/src/rxm_conn.c +++ b/prov/rxm/src/rxm_conn.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Intel Corporation, Inc. All rights reserved. + * Copyright (c) 2016-2021 Intel Corporation, Inc. All rights reserved. * Copyright (c) 2019 Amazon.com, Inc. or its affiliates. All rights reserved. * * This software is available to you under a choice of one of two @@ -39,1244 +39,688 @@ #include #include "rxm.h" -static struct rxm_cmap_handle *rxm_conn_alloc(struct rxm_cmap *cmap); -static int rxm_conn_connect(struct rxm_ep *ep, - struct rxm_cmap_handle *handle, const void *addr); -static int rxm_conn_signal(struct rxm_ep *ep, void *context, - enum rxm_cmap_signal signal); -static void rxm_conn_av_updated_handler(struct rxm_cmap_handle *handle); -static void *rxm_conn_progress(void *arg); -static void *rxm_conn_atomic_progress(void *arg); -static int rxm_conn_handle_event(struct rxm_ep *rxm_ep, - struct rxm_msg_eq_entry *entry); +static void *rxm_cm_progress(void *arg); +static void *rxm_cm_atomic_progress(void *arg); +static void rxm_flush_msg_cq(struct rxm_ep *rxm_ep); -/* - * Connection map - */ -char *rxm_cm_state_str[] = { - RXM_CM_STATES(OFI_STR) +/* castable to fi_eq_cm_entry - we can't use fi_eq_cm_entry directly + * here because of a compiler error with a 0-sized array + */ +struct rxm_eq_cm_entry { + fid_t fid; + struct fi_info *info; + union rxm_cm_data data; }; -static inline ssize_t rxm_eq_readerr(struct rxm_ep *rxm_ep, - struct rxm_msg_eq_entry *entry) + +static void rxm_close_conn(struct rxm_conn *conn) { - ssize_t ret; + struct rxm_deferred_tx_entry *tx_entry; + struct rxm_recv_entry *rx_entry; + struct rxm_rx_buf *buf; - /* reset previous err data info */ - entry->err_entry.err_data_size = 0; + FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "closing conn %p\n", conn); - ret = fi_eq_readerr(rxm_ep->msg_eq, &entry->err_entry, 0); - if (ret != sizeof(entry->err_entry)) { - if (ret != -FI_EAGAIN) - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, - "unable to fi_eq_readerr: %zd\n", ret); - return ret < 0 ? ret : -FI_EINVAL; + assert(ofi_ep_lock_held(&conn->ep->util_ep)); + /* All deferred transfers are internally generated */ + while (!dlist_empty(&conn->deferred_tx_queue)) { + tx_entry = container_of(conn->deferred_tx_queue.next, + struct rxm_deferred_tx_entry, entry); + rxm_dequeue_deferred_tx(tx_entry); + free(tx_entry); } - if (entry->err_entry.err == ECONNREFUSED) { - entry->context = entry->err_entry.fid->context; - return -FI_ECONNREFUSED; + while (!dlist_empty(&conn->deferred_sar_segments)) { + buf = container_of(conn->deferred_sar_segments.next, + struct rxm_rx_buf, unexp_msg.entry); + dlist_remove(&buf->unexp_msg.entry); + rxm_rx_buf_free(buf); } - OFI_EQ_STRERROR(&rxm_prov, FI_LOG_WARN, FI_LOG_EP_CTRL, - rxm_ep->msg_eq, &entry->err_entry); - return -entry->err_entry.err; -} - -static ssize_t rxm_eq_read(struct rxm_ep *ep, size_t len, - struct rxm_msg_eq_entry *entry) -{ - ssize_t ret; - - ret = fi_eq_read(ep->msg_eq, &entry->event, &entry->cm_entry, len, 0); - if (ret == -FI_EAVAIL) - ret = rxm_eq_readerr(ep, entry); - - return ret; -} - -static void rxm_cmap_set_key(struct rxm_cmap_handle *handle) -{ - handle->key = ofi_idx2key(&handle->cmap->key_idx, - ofi_idx_insert(&handle->cmap->handles_idx, handle)); -} - -static void rxm_cmap_clear_key(struct rxm_cmap_handle *handle) -{ - int index = ofi_key2idx(&handle->cmap->key_idx, handle->key); - - if (!ofi_idx_is_valid(&handle->cmap->handles_idx, index)) - FI_WARN(handle->cmap->av->prov, FI_LOG_AV, "Invalid key!\n"); - else - ofi_idx_remove(&handle->cmap->handles_idx, index); -} - -struct rxm_cmap_handle *rxm_cmap_key2handle(struct rxm_cmap *cmap, uint64_t key) -{ - struct rxm_cmap_handle *handle; - - if (!(handle = ofi_idx_lookup(&cmap->handles_idx, - ofi_key2idx(&cmap->key_idx, key)))) { - FI_WARN(cmap->av->prov, FI_LOG_AV, "Invalid key!\n"); - } else { - if (handle->key != key) { - FI_WARN(cmap->av->prov, FI_LOG_AV, - "handle->key not matching given key\n"); - handle = NULL; - } + while (!dlist_empty(&conn->deferred_sar_msgs)) { + rx_entry = container_of(conn->deferred_sar_msgs.next, + struct rxm_recv_entry, sar.entry); + dlist_remove(&rx_entry->entry); + rxm_recv_entry_release(rx_entry); } - return handle; + fi_close(&conn->msg_ep->fid); + rxm_flush_msg_cq(conn->ep); + conn->msg_ep = NULL; + conn->state = RXM_CM_IDLE; } -static void rxm_cmap_init_handle(struct rxm_cmap_handle *handle, - struct rxm_cmap *cmap, - enum rxm_cmap_state state, - fi_addr_t fi_addr, - struct rxm_cmap_peer *peer) +static int rxm_open_conn(struct rxm_conn *conn, struct fi_info *msg_info) { - handle->cmap = cmap; - RXM_CM_UPDATE_STATE(handle, state); - rxm_cmap_set_key(handle); - handle->fi_addr = fi_addr; - handle->peer = peer; -} - -static int rxm_cmap_match_peer(struct dlist_entry *entry, const void *addr) -{ - struct rxm_cmap_peer *peer; - - peer = container_of(entry, struct rxm_cmap_peer, entry); - return !memcmp(peer->addr, addr, peer->handle->cmap->av->addrlen); -} - -static int rxm_cmap_del_handle(struct rxm_cmap_handle *handle) -{ - struct rxm_cmap *cmap = handle->cmap; + struct rxm_domain *domain; + struct rxm_ep *ep; + struct fid_ep *msg_ep; int ret; - FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, - "marking connection handle: %p for deletion\n", handle); - rxm_cmap_clear_key(handle); - - RXM_CM_UPDATE_STATE(handle, RXM_CMAP_SHUTDOWN); + FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "open msg ep %p\n", conn); - /* Signal CM thread to delete the handle. This is required - * so that the CM thread handles any pending events for this - * ep correctly. Handle would be freed finally after processing the - * events */ - ret = rxm_conn_signal(cmap->ep, handle, RXM_CMAP_FREE); + assert(ofi_ep_lock_held(&conn->ep->util_ep)); + ep = conn->ep; + domain = container_of(ep->util_ep.domain, struct rxm_domain, + util_domain); + ret = fi_endpoint(domain->msg_domain, msg_info, &msg_ep, conn); if (ret) { - FI_WARN(cmap->av->prov, FI_LOG_EP_CTRL, - "Unable to signal CM thread\n"); + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, + "unable to create msg_ep: %d\n", ret); return ret; } - return 0; -} - -ssize_t rxm_get_conn(struct rxm_ep *rxm_ep, fi_addr_t addr, - struct rxm_conn **rxm_conn) -{ - struct rxm_cmap_handle *handle; - ssize_t ret; - - assert(rxm_ep->util_ep.tx_cq); - handle = rxm_cmap_acquire_handle(rxm_ep->cmap, addr); - if (!handle) { - ret = rxm_cmap_alloc_handle(rxm_ep->cmap, addr, - RXM_CMAP_IDLE, &handle); - if (ret) - return ret; - } - - *rxm_conn = container_of(handle, struct rxm_conn, handle); - if (handle->state != RXM_CMAP_CONNECTED) { - ret = rxm_cmap_connect(rxm_ep, addr, handle); - if (ret) - return ret; - } - - if (!dlist_empty(&(*rxm_conn)->deferred_tx_queue)) { - rxm_ep_do_progress(&rxm_ep->util_ep); - if (!dlist_empty(&(*rxm_conn)->deferred_tx_queue)) - return -FI_EAGAIN; + ret = fi_ep_bind(msg_ep, &ep->msg_eq->fid, 0); + if (ret) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, + "unable to bind msg EP to EQ: %d\n", ret); + goto err; } - return 0; -} - -static inline int -rxm_cmap_check_and_realloc_handles_table(struct rxm_cmap *cmap, - fi_addr_t fi_addr) -{ - void *new_handles; - size_t grow_size; - - if (OFI_LIKELY(fi_addr < cmap->num_allocated)) - return 0; - grow_size = MAX(ofi_av_size(cmap->av), fi_addr - cmap->num_allocated + 1); - - new_handles = realloc(cmap->handles_av, - (grow_size + cmap->num_allocated) * - sizeof(*cmap->handles_av)); - if (OFI_LIKELY(!new_handles)) - return -FI_ENOMEM; - - cmap->handles_av = new_handles; - memset(&cmap->handles_av[cmap->num_allocated], 0, - sizeof(*cmap->handles_av) * grow_size); - cmap->num_allocated += grow_size; - return 0; -} - -static void rxm_conn_close(struct rxm_cmap_handle *handle) -{ - struct rxm_conn *rxm_conn = container_of(handle, struct rxm_conn, handle); - struct rxm_conn *rxm_conn_tmp; - struct rxm_deferred_tx_entry *def_tx_entry; - struct dlist_entry *conn_entry_tmp; - - dlist_foreach_container_safe(&handle->cmap->ep->deferred_tx_conn_queue, - struct rxm_conn, rxm_conn_tmp, - deferred_conn_entry, conn_entry_tmp) - { - if (rxm_conn_tmp->handle.key != handle->key) - continue; - - while (!dlist_empty(&rxm_conn_tmp->deferred_tx_queue)) { - def_tx_entry = - container_of(rxm_conn_tmp->deferred_tx_queue.next, - struct rxm_deferred_tx_entry, entry); - FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, - "cancelled deferred message\n"); - rxm_ep_dequeue_deferred_tx_queue(def_tx_entry); - free(def_tx_entry); + if (ep->srx_ctx) { + ret = fi_ep_bind(msg_ep, &ep->srx_ctx->fid, 0); + if (ret) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "unable to bind msg " + "EP to shared RX ctx: %d\n", ret); + goto err; } } - FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "closing msg ep\n"); - if (!rxm_conn->msg_ep) - return; - - if (fi_close(&rxm_conn->msg_ep->fid)) - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "unable to close msg_ep\n"); - - rxm_conn->msg_ep = NULL; -} - -static void rxm_conn_free(struct rxm_cmap_handle *handle) -{ - struct rxm_conn *rxm_conn = container_of(handle, struct rxm_conn, handle); - - rxm_conn_close(handle); - free(rxm_conn); -} - -int rxm_cmap_alloc_handle(struct rxm_cmap *cmap, fi_addr_t fi_addr, - enum rxm_cmap_state state, - struct rxm_cmap_handle **handle) -{ - int ret; - - *handle = rxm_conn_alloc(cmap); - if (!*handle) - return -FI_ENOMEM; - - FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, - "Allocated handle: %p for fi_addr: %" PRIu64 "\n", - *handle, fi_addr); - - ret = rxm_cmap_check_and_realloc_handles_table(cmap, fi_addr); + ret = fi_ep_bind(msg_ep, &ep->msg_cq->fid, FI_TRANSMIT | FI_RECV); if (ret) { - rxm_conn_free(*handle); - return ret; + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, + "unable to bind msg_ep to msg_cq: %d\n", ret); + goto err; } - rxm_cmap_init_handle(*handle, cmap, state, fi_addr, NULL); - cmap->handles_av[fi_addr] = *handle; - return 0; -} - -static int rxm_cmap_alloc_handle_peer(struct rxm_cmap *cmap, void *addr, - enum rxm_cmap_state state, - struct rxm_cmap_handle **handle) -{ - struct rxm_cmap_peer *peer; - - peer = calloc(1, sizeof(*peer) + cmap->av->addrlen); - if (!peer) - return -FI_ENOMEM; + ret = fi_enable(msg_ep); + if (ret) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, + "unable to enable msg_ep: %d\n", ret); + goto err; + } - *handle = rxm_conn_alloc(cmap); - if (!*handle) { - free(peer); - return -FI_ENOMEM; + ret = domain->flow_ctrl_ops->enable(msg_ep); + if (!ret) { + domain->flow_ctrl_ops->set_threshold(msg_ep, + ep->msg_info->rx_attr->size / 2); } - ofi_straddr_dbg(cmap->av->prov, FI_LOG_AV, - "Allocated handle for addr", addr); - FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, "handle: %p\n", *handle); + if (!ep->srx_ctx) { + ret = rxm_prepost_recv(ep, msg_ep); + if (ret) + goto err; + } - rxm_cmap_init_handle(*handle, cmap, state, FI_ADDR_NOTAVAIL, peer); - FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, "Adding handle to peer list\n"); - peer->handle = *handle; - memcpy(peer->addr, addr, cmap->av->addrlen); - dlist_insert_tail(&peer->entry, &cmap->peer_list); + conn->msg_ep = msg_ep; return 0; +err: + fi_close(&msg_ep->fid); + return ret; } -static struct rxm_cmap_handle * -rxm_cmap_get_handle_peer(struct rxm_cmap *cmap, const void *addr) +/* We send passive endpoint's port to the server as connection request + * would be from a different one. + */ +static int rxm_init_connect_data(struct rxm_conn *conn, + union rxm_cm_data *cm_data) { - struct rxm_cmap_peer *peer; - struct dlist_entry *entry; - - entry = dlist_find_first_match(&cmap->peer_list, rxm_cmap_match_peer, - addr); - if (!entry) - return NULL; - - ofi_straddr_dbg(cmap->av->prov, FI_LOG_AV, - "handle found in peer list for addr", addr); - peer = container_of(entry, struct rxm_cmap_peer, entry); - return peer->handle; -} + size_t cm_data_size = 0; + size_t opt_size = sizeof(cm_data_size); + int ret; -int rxm_cmap_remove(struct rxm_cmap *cmap, int index) -{ - struct rxm_cmap_handle *handle; - int ret = -FI_ENOENT; + memset(cm_data, 0, sizeof(*cm_data)); + cm_data->connect.version = RXM_CM_DATA_VERSION; + cm_data->connect.ctrl_version = RXM_CTRL_VERSION; + cm_data->connect.op_version = RXM_OP_VERSION; + cm_data->connect.endianness = ofi_detect_endianness(); + cm_data->connect.eager_limit = conn->ep->eager_limit; + cm_data->connect.rx_size = conn->ep->msg_info->rx_attr->size; - handle = cmap->handles_av[index]; - if (!handle) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "cmap entry not found\n"); + ret = fi_getopt(&conn->ep->msg_pep->fid, FI_OPT_ENDPOINT, + FI_OPT_CM_DATA_SIZE, &cm_data_size, &opt_size); + if (ret) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "fi_getopt failed\n"); return ret; } - handle->peer = calloc(1, sizeof(*handle->peer) + cmap->av->addrlen); - if (!handle->peer) { - ret = -FI_ENOMEM; - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "unable to allocate memory " - "for moving handle to peer list, deleting it instead\n"); - rxm_cmap_del_handle(handle); - return ret; + if (cm_data_size < sizeof(*cm_data)) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "cm data too small\n"); + return -FI_EOTHER; } - handle->fi_addr = FI_ADDR_NOTAVAIL; - cmap->handles_av[index] = NULL; - handle->peer->handle = handle; - memcpy(handle->peer->addr, ofi_av_get_addr(cmap->av, index), - cmap->av->addrlen); - dlist_insert_tail(&handle->peer->entry, &cmap->peer_list); - return 0; -} - -static int rxm_cmap_move_handle(struct rxm_cmap_handle *handle, - fi_addr_t fi_addr) -{ - int ret; - dlist_remove(&handle->peer->entry); - free(handle->peer); - handle->peer = NULL; - handle->fi_addr = fi_addr; - ret = rxm_cmap_check_and_realloc_handles_table(handle->cmap, fi_addr); - if (OFI_UNLIKELY(ret)) - return ret; - handle->cmap->handles_av[fi_addr] = handle; + cm_data->connect.port = ofi_addr_get_port(&conn->ep->addr.sa); + cm_data->connect.client_conn_id = conn->peer->index; return 0; } -int rxm_cmap_update(struct rxm_cmap *cmap, const void *addr, fi_addr_t fi_addr) +static int rxm_send_connect(struct rxm_conn *conn) { - struct rxm_cmap_handle *handle; + union rxm_cm_data cm_data; + struct fi_info *info; int ret; - /* Check whether we have already allocated a handle for this `fi_addr`. */ - /* We rely on the fact that `ofi_ip_av_insert`/`ofi_av_insert_addr` returns - * the same `fi_addr` for the equal addresses */ - if (fi_addr < cmap->num_allocated) { - handle = rxm_cmap_acquire_handle(cmap, fi_addr); - if (handle) - return 0; - } - - handle = rxm_cmap_get_handle_peer(cmap, addr); - if (!handle) { - ret = rxm_cmap_alloc_handle(cmap, fi_addr, - RXM_CMAP_IDLE, &handle); - return ret; - } - ret = rxm_cmap_move_handle(handle, fi_addr); - if (ret) - return ret; - - rxm_conn_av_updated_handler(handle); - return 0; -} + FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "connecting %p\n", conn); + assert(ofi_ep_lock_held(&conn->ep->util_ep)); -void rxm_cmap_process_shutdown(struct rxm_cmap *cmap, - struct rxm_cmap_handle *handle) -{ - FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, - "Processing shutdown for handle: %p\n", handle); - if (handle->state > RXM_CMAP_SHUTDOWN) { - FI_WARN(cmap->av->prov, FI_LOG_EP_CTRL, - "Invalid handle on shutdown event\n"); - } else if (handle->state != RXM_CMAP_SHUTDOWN) { - FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, "Got remote shutdown\n"); - rxm_cmap_del_handle(handle); - } else { - FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, "Got local shutdown\n"); - } -} - -void rxm_cmap_process_connect(struct rxm_cmap *cmap, - struct rxm_cmap_handle *handle, - union rxm_cm_data *cm_data) -{ - FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, - "processing FI_CONNECTED event for handle: %p\n", handle); - if (cm_data) { - assert(handle->state == RXM_CMAP_CONNREQ_SENT); - handle->remote_key = cm_data->accept.server_conn_id; - } else { - assert(handle->state == RXM_CMAP_CONNREQ_RECV); - } - RXM_CM_UPDATE_STATE(handle, RXM_CMAP_CONNECTED); -} + info = conn->ep->msg_info; + info->dest_addrlen = conn->ep->msg_info->src_addrlen; -void rxm_cmap_process_reject(struct rxm_cmap *cmap, - struct rxm_cmap_handle *handle, - enum rxm_cmap_reject_reason reject_reason) -{ - FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, - "Processing reject for handle: %p\n", handle); - switch (handle->state) { - case RXM_CMAP_CONNREQ_RECV: - case RXM_CMAP_CONNECTED: - /* Handle is being re-used for incoming connection request */ - break; - case RXM_CMAP_CONNREQ_SENT: - if (reject_reason == RXM_CMAP_REJECT_GENUINE) { - FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, - "Deleting connection handle\n"); - rxm_cmap_del_handle(handle); - } else { - FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, - "Connection handle is being re-used. Close the connection\n"); - rxm_conn_close(handle); - } - break; - case RXM_CMAP_SHUTDOWN: - FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, - "Connection handle already being deleted\n"); - break; - default: - FI_WARN(cmap->av->prov, FI_LOG_EP_CTRL, "Invalid cmap state: " - "%d when receiving connection reject\n", handle->state); - assert(0); - } -} + free(info->dest_addr); + info->dest_addr = mem_dup(&conn->peer->addr, info->dest_addrlen); + if (!info->dest_addr) + return -FI_ENOMEM; -int rxm_cmap_process_connreq(struct rxm_cmap *cmap, void *addr, - struct rxm_cmap_handle **handle_ret, - uint8_t *reject_reason) -{ - struct rxm_cmap_handle *handle; - int ret = 0, cmp; - fi_addr_t fi_addr = ofi_ip_av_get_fi_addr(cmap->av, addr); - - ofi_straddr_dbg(cmap->av->prov, FI_LOG_EP_CTRL, - "Processing connreq from remote pep", addr); - - if (fi_addr == FI_ADDR_NOTAVAIL) { - handle = rxm_cmap_get_handle_peer(cmap, addr); - if (!handle) - ret = rxm_cmap_alloc_handle_peer(cmap, addr, - RXM_CMAP_CONNREQ_RECV, - &handle); - } else { - handle = rxm_cmap_acquire_handle(cmap, fi_addr); - if (!handle) - ret = rxm_cmap_alloc_handle(cmap, fi_addr, - RXM_CMAP_CONNREQ_RECV, - &handle); - } + ret = rxm_open_conn(conn, info); if (ret) return ret; - switch (handle->state) { - case RXM_CMAP_CONNECTED: - FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, - "Connection already present.\n"); - ret = -FI_EALREADY; - break; - case RXM_CMAP_CONNREQ_SENT: - ofi_straddr_dbg(cmap->av->prov, FI_LOG_EP_CTRL, "local_name", - cmap->attr.name); - ofi_straddr_dbg(cmap->av->prov, FI_LOG_EP_CTRL, "remote_name", - addr); - - cmp = ofi_addr_cmp(cmap->av->prov, addr, cmap->attr.name); + ret = rxm_init_connect_data(conn, &cm_data); + if (ret) + goto err; - if (cmp < 0) { - FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, - "Remote name lower than local name.\n"); - *reject_reason = RXM_CMAP_REJECT_SIMULT_CONN; - ret = -FI_EALREADY; - break; - } else if (cmp > 0) { - FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, - "Re-using handle: %p to accept remote " - "connection\n", handle); - *reject_reason = RXM_CMAP_REJECT_GENUINE; - rxm_conn_close(handle); - } else { - FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, - "Endpoint connects to itself\n"); - ret = rxm_cmap_alloc_handle_peer(cmap, addr, - RXM_CMAP_CONNREQ_RECV, - &handle); - if (ret) - return ret; - - assert(fi_addr != FI_ADDR_NOTAVAIL); - handle->fi_addr = fi_addr; - } - /* Fall through */ - case RXM_CMAP_IDLE: - RXM_CM_UPDATE_STATE(handle, RXM_CMAP_CONNREQ_RECV); - /* Fall through */ - case RXM_CMAP_CONNREQ_RECV: - *handle_ret = handle; - break; - case RXM_CMAP_SHUTDOWN: - FI_WARN(cmap->av->prov, FI_LOG_EP_CTRL, "handle :%p marked for " - "deletion / shutdown, reject connection\n", handle); - *reject_reason = RXM_CMAP_REJECT_GENUINE; - ret = -FI_EOPBADSTATE; - break; - default: - FI_WARN(cmap->av->prov, FI_LOG_EP_CTRL, - "invalid handle state: %d\n", handle->state); - assert(0); - ret = -FI_EOPBADSTATE; + ret = fi_connect(conn->msg_ep, info->dest_addr, &cm_data, + sizeof(cm_data)); + if (ret) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "unable to connect msg_ep\n"); + goto err; } + conn->state = RXM_CM_CONNECTING; + return 0; +err: + fi_close(&conn->msg_ep->fid); + conn->msg_ep = NULL; return ret; } -int rxm_msg_eq_progress(struct rxm_ep *rxm_ep) +static int rxm_connect(struct rxm_conn *conn) { - struct rxm_msg_eq_entry *entry; int ret; - entry = alloca(RXM_MSG_EQ_ENTRY_SZ); - if (!entry) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, - "unable to allocate memory!\n"); - return -FI_ENOMEM; - } + assert(ofi_ep_lock_held(&conn->ep->util_ep)); - while (1) { - entry->rd = rxm_eq_read(rxm_ep, RXM_MSG_EQ_ENTRY_SZ, entry); - if (entry->rd < 0 && entry->rd != -FI_ECONNREFUSED) { - ret = (int) entry->rd; - break; - } - ret = rxm_conn_handle_event(rxm_ep, entry); - if (ret) { - FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, - "invalid connection handle event: %d\n", ret); - break; - } - } - return ret; -} - -int rxm_cmap_connect(struct rxm_ep *rxm_ep, fi_addr_t fi_addr, - struct rxm_cmap_handle *handle) -{ - int ret = FI_SUCCESS; - - switch (handle->state) { - case RXM_CMAP_IDLE: - FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "initiating MSG_EP connect " - "for fi_addr: %" PRIu64 "\n", fi_addr); - ret = rxm_conn_connect(rxm_ep, handle, - ofi_av_get_addr(rxm_ep->cmap->av, fi_addr)); - if (ret) { - if (ret == -FI_ECONNREFUSED) - return -FI_EAGAIN; - - rxm_cmap_del_handle(handle); - } else { - RXM_CM_UPDATE_STATE(handle, RXM_CMAP_CONNREQ_SENT); - ret = -FI_EAGAIN; - } + switch (conn->state) { + case RXM_CM_IDLE: + ret = rxm_send_connect(conn); + if (ret) + return ret; break; - case RXM_CMAP_CONNREQ_SENT: - case RXM_CMAP_CONNREQ_RECV: - case RXM_CMAP_SHUTDOWN: - ret = -FI_EAGAIN; + case RXM_CM_CONNECTING: + case RXM_CM_ACCEPTING: break; + case RXM_CM_CONNECTED: + return 0; default: - FI_WARN(rxm_ep->cmap->av->prov, FI_LOG_EP_CTRL, - "Invalid cmap handle state\n"); assert(0); - ret = -FI_EOPBADSTATE; - } - if (ret == -FI_EAGAIN) - rxm_msg_eq_progress(rxm_ep); - - return ret; -} - -static int rxm_cmap_cm_thread_close(struct rxm_cmap *cmap) -{ - int ret; - - FI_INFO(&rxm_prov, FI_LOG_EP_CTRL, "stopping CM thread\n"); - if (!cmap->cm_thread) - return 0; - - ofi_ep_lock_acquire(&cmap->ep->util_ep); - cmap->ep->do_progress = false; - ofi_ep_lock_release(&cmap->ep->util_ep); - ret = rxm_conn_signal(cmap->ep, NULL, RXM_CMAP_EXIT); - if (ret) { - FI_WARN(cmap->av->prov, FI_LOG_EP_CTRL, - "Unable to signal CM thread\n"); - return ret; - } - ret = pthread_join(cmap->cm_thread, NULL); - if (ret) { - FI_WARN(cmap->av->prov, FI_LOG_EP_CTRL, - "Unable to join CM thread\n"); - return ret; - } - return 0; -} - -void rxm_cmap_free(struct rxm_cmap *cmap) -{ - struct rxm_cmap_peer *peer; - struct dlist_entry *entry; - size_t i; - - FI_INFO(cmap->av->prov, FI_LOG_EP_CTRL, "Closing cmap\n"); - rxm_cmap_cm_thread_close(cmap); - - for (i = 0; i < cmap->num_allocated; i++) { - if (cmap->handles_av[i]) { - rxm_cmap_clear_key(cmap->handles_av[i]); - rxm_conn_free(cmap->handles_av[i]); - } - } - - while (!dlist_empty(&cmap->peer_list)) { - entry = cmap->peer_list.next; - peer = container_of(entry, struct rxm_cmap_peer, entry); - dlist_remove(&peer->entry); - rxm_cmap_clear_key(peer->handle); - rxm_conn_free(peer->handle); - free(peer); + conn->state = RXM_CM_IDLE; + break; } - free(cmap->handles_av); - free(cmap->attr.name); - ofi_idx_reset(&cmap->handles_idx); - free(cmap); + return -FI_EAGAIN; } -static int -rxm_cmap_update_addr(struct util_av *av, void *addr, - fi_addr_t fi_addr, void *arg) +static void rxm_free_conn(struct rxm_conn *conn) { - return rxm_cmap_update((struct rxm_cmap *)arg, addr, fi_addr); -} - -int rxm_cmap_bind_to_av(struct rxm_cmap *cmap, struct util_av *av) -{ - cmap->av = av; - return ofi_av_elements_iter(av, rxm_cmap_update_addr, (void *)cmap); -} - -int rxm_cmap_alloc(struct rxm_ep *rxm_ep, struct rxm_cmap_attr *attr) -{ - struct rxm_cmap *cmap; - struct util_ep *ep = &rxm_ep->util_ep; - int ret; - - cmap = calloc(1, sizeof *cmap); - if (!cmap) - return -FI_ENOMEM; - - cmap->ep = rxm_ep; - cmap->av = ep->av; - - cmap->handles_av = calloc(ofi_av_size(ep->av), sizeof(*cmap->handles_av)); - if (!cmap->handles_av) { - ret = -FI_ENOMEM; - goto err1; - } - cmap->num_allocated = ofi_av_size(ep->av); - - cmap->attr = *attr; - cmap->attr.name = mem_dup(attr->name, ep->av->addrlen); - if (!cmap->attr.name) { - ret = -FI_ENOMEM; - goto err2; - } - - memset(&cmap->handles_idx, 0, sizeof(cmap->handles_idx)); - ofi_key_idx_init(&cmap->key_idx, RXM_CMAP_IDX_BITS); - - dlist_init(&cmap->peer_list); - - rxm_ep->cmap = cmap; - - if (ep->domain->data_progress == FI_PROGRESS_AUTO || force_auto_progress) { - - assert(ep->domain->threading == FI_THREAD_SAFE); - rxm_ep->do_progress = true; - if (pthread_create(&cmap->cm_thread, 0, - rxm_ep->rxm_info->caps & FI_ATOMIC ? - rxm_conn_atomic_progress : - rxm_conn_progress, ep)) { - FI_WARN(ep->av->prov, FI_LOG_EP_CTRL, - "unable to create cmap thread\n"); - ret = -ofi_syserr(); - goto err3; - } - } - - assert(ep->av); - ret = rxm_cmap_bind_to_av(cmap, ep->av); - if (ret) - goto err4; + FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "free conn %p\n", conn); + assert(ofi_ep_lock_held(&conn->ep->util_ep)); - return FI_SUCCESS; -err4: - rxm_cmap_cm_thread_close(cmap); -err3: - rxm_ep->cmap = NULL; - free(cmap->attr.name); -err2: - free(cmap->handles_av); -err1: - free(cmap); - return ret; + conn->peer->refcnt--; + ofi_idm_clear(&conn->ep->conn_idx_map, conn->peer->index); + ofi_buf_free(conn); } -static int rxm_msg_ep_open(struct rxm_ep *rxm_ep, struct fi_info *msg_info, - struct rxm_conn *rxm_conn, void *context) +void rxm_freeall_conns(struct rxm_ep *ep) { - struct rxm_domain *rxm_domain; - struct fid_ep *msg_ep; - int ret; - - rxm_domain = container_of(rxm_ep->util_ep.domain, struct rxm_domain, - util_domain); - - ret = fi_endpoint(rxm_domain->msg_domain, msg_info, &msg_ep, context); - if (ret) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, - "unable to create msg_ep: %d\n", ret); - return ret; - } - - ret = fi_ep_bind(msg_ep, &rxm_ep->msg_eq->fid, 0); - if (ret) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, - "unable to bind msg EP to EQ: %d\n", ret); - goto err; - } - - if (rxm_ep->srx_ctx) { - ret = fi_ep_bind(msg_ep, &rxm_ep->srx_ctx->fid, 0); - if (ret) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "unable to bind msg " - "EP to shared RX ctx: %d\n", ret); - goto err; - } - } - - // TODO add other completion flags - ret = fi_ep_bind(msg_ep, &rxm_ep->msg_cq->fid, FI_TRANSMIT | FI_RECV); - if (ret) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, - "unable to bind msg_ep to msg_cq: %d\n", ret); - goto err; - } + struct rxm_conn *conn; + struct rxm_av *av; + int i; - ret = fi_enable(msg_ep); - if (ret) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, - "unable to enable msg_ep: %d\n", ret); - goto err; - } + av = container_of(ep->util_ep.av, struct rxm_av, util_av); + ofi_ep_lock_acquire(&ep->util_ep); - ret = rxm_domain->flow_ctrl_ops->enable(msg_ep); - if (!ret) { - rxm_domain->flow_ctrl_ops->set_threshold( - msg_ep, rxm_ep->msg_info->rx_attr->size / 2); - } + /* We can't have more connections than known peers */ + for (i = 0; i < av->peer_pool->entry_cnt; i++) { + conn = ofi_idm_lookup(&ep->conn_idx_map, i); + if (!conn) + continue; - if (!rxm_ep->srx_ctx) { - ret = rxm_prepost_recv(rxm_ep, msg_ep); - if (ret) - goto err; + if (conn->state != RXM_CM_IDLE) + rxm_close_conn(conn); + rxm_free_conn(conn); } - rxm_conn->msg_ep = msg_ep; - return 0; -err: - fi_close(&msg_ep->fid); - return ret; + ofi_ep_lock_release(&ep->util_ep); } -static int rxm_conn_reprocess_directed_recvs(struct rxm_recv_queue *recv_queue) +static struct rxm_conn * +rxm_add_conn(struct rxm_ep *ep, struct rxm_peer_addr *peer) { - struct rxm_rx_buf *rx_buf; - struct dlist_entry *entry, *tmp_entry; - struct rxm_recv_match_attr match_attr; - struct fi_cq_err_entry err_entry = {0}; - int ret, count = 0; - - dlist_foreach_container_safe(&recv_queue->unexp_msg_list, - struct rxm_rx_buf, rx_buf, - unexp_msg.entry, tmp_entry) { - if (rx_buf->unexp_msg.addr == rx_buf->conn->handle.fi_addr) - continue; + struct rxm_conn *conn; + struct rxm_av *av; - assert(rx_buf->unexp_msg.addr == FI_ADDR_NOTAVAIL); + assert(ofi_ep_lock_held(&ep->util_ep)); + conn = ofi_idm_lookup(&ep->conn_idx_map, peer->index); + if (conn) + return conn; - rx_buf->unexp_msg.addr = rx_buf->conn->handle.fi_addr; - match_attr.addr = rx_buf->unexp_msg.addr; - match_attr.tag = rx_buf->unexp_msg.tag; + av = container_of(ep->util_ep.av, struct rxm_av, util_av); + conn = ofi_buf_alloc(av->conn_pool); + if (!conn) + return NULL; - entry = dlist_remove_first_match(&recv_queue->recv_list, - recv_queue->match_recv, - &match_attr); - if (!entry) - continue; + if (ofi_idm_set(&ep->conn_idx_map, peer->index, conn) < 0) { + ofi_buf_free(conn); + return NULL; + } - dlist_remove(&rx_buf->unexp_msg.entry); - rx_buf->recv_entry = container_of(entry, struct rxm_recv_entry, - entry); + conn->ep = ep; + conn->state = RXM_CM_IDLE; + conn->remote_index = -1; + dlist_init(&conn->deferred_entry); + dlist_init(&conn->deferred_tx_queue); + dlist_init(&conn->deferred_sar_msgs); + dlist_init(&conn->deferred_sar_segments); - ret = rxm_handle_rx_buf(rx_buf); - if (ret) { - err_entry.op_context = rx_buf; - err_entry.flags = rx_buf->recv_entry->comp_flags; - err_entry.len = rx_buf->pkt.hdr.size; - err_entry.data = rx_buf->pkt.hdr.data; - err_entry.tag = rx_buf->pkt.hdr.tag; - err_entry.err = ret; - err_entry.prov_errno = ret; - ofi_cq_write_error(recv_queue->rxm_ep->util_ep.rx_cq, - &err_entry); - if (rx_buf->ep->util_ep.flags & OFI_CNTR_ENABLED) - rxm_cntr_incerr(rx_buf->ep->util_ep.rx_cntr); - - rxm_rx_buf_free(rx_buf); - - if (!(rx_buf->recv_entry->flags & FI_MULTI_RECV)) - rxm_recv_entry_release(rx_buf->recv_entry); - } - count++; - } - return count; + conn->peer = peer; + peer->refcnt++; + + FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "allocated conn %p\n", conn); + return conn; } -static void -rxm_conn_av_updated_handler(struct rxm_cmap_handle *handle) +/* The returned conn is only valid if the function returns success. */ +ssize_t rxm_get_conn(struct rxm_ep *ep, fi_addr_t addr, struct rxm_conn **conn) { - struct rxm_ep *ep = handle->cmap->ep; - int count = 0; + struct rxm_peer_addr **peer; + ssize_t ret; - if (ep->rxm_info->caps & FI_DIRECTED_RECV) { - count += rxm_conn_reprocess_directed_recvs(&ep->recv_queue); - count += rxm_conn_reprocess_directed_recvs(&ep->trecv_queue); + assert(ofi_ep_lock_held(&ep->util_ep)); + peer = ofi_av_addr_context(ep->util_ep.av, addr); + *conn = rxm_add_conn(ep, *peer); + if (!*conn) + return -FI_ENOMEM; - FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, - "Reprocessed directed recvs - %d\n", count); + if ((*conn)->state == RXM_CM_CONNECTED) { + if (!dlist_empty(&(*conn)->deferred_tx_queue)) { + rxm_ep_do_progress(&ep->util_ep); + if (!dlist_empty(&(*conn)->deferred_tx_queue)) + return -FI_EAGAIN; + } + return 0; } + + ret = rxm_connect(*conn); + + /* If the progress function encounters an error trying to establish + * the connection, it may free the connection object. This resets + * the connection process to restart from the beginning. + */ + if (ret == -FI_EAGAIN) + rxm_conn_progress(ep); + return ret; } -static struct rxm_cmap_handle *rxm_conn_alloc(struct rxm_cmap *cmap) +void rxm_process_connect(struct rxm_eq_cm_entry *cm_entry) { - struct rxm_conn *rxm_conn; + struct rxm_conn *conn; - rxm_conn = calloc(1, sizeof(*rxm_conn)); - if (!rxm_conn) - return NULL; + conn = cm_entry->fid->context; + FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, + "processing connected for handle: %p\n", conn); - dlist_init(&rxm_conn->deferred_conn_entry); - dlist_init(&rxm_conn->deferred_tx_queue); - dlist_init(&rxm_conn->sar_rx_msg_list); - dlist_init(&rxm_conn->sar_deferred_rx_msg_list); + assert(ofi_ep_lock_held(&conn->ep->util_ep)); + if (conn->state == RXM_CM_CONNECTING) + conn->remote_index = cm_entry->data.accept.server_conn_id; - return &rxm_conn->handle; + conn->state = RXM_CM_CONNECTED; } -static inline int -rxm_conn_verify_cm_data(union rxm_cm_data *remote_cm_data, - union rxm_cm_data *local_cm_data) +/* For simultaneous connection requests, if the peer won the coin + * flip (reject EALREADY), our connection request is discarded. + */ +static void +rxm_process_reject(struct rxm_conn *conn, struct fi_eq_err_entry *entry) { - /* This should stay at top as it helps to avoid endian conversion - * for other fields in rxm_cm_data */ - if (remote_cm_data->connect.version != local_cm_data->connect.version) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "cm data version mismatch " - "(local: %" PRIu8 ", remote: %" PRIu8 ")\n", - local_cm_data->connect.version, - remote_cm_data->connect.version); - goto err; - } - if (remote_cm_data->connect.endianness != local_cm_data->connect.endianness) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "cm data endianness mismatch " - "(local: %" PRIu8 ", remote: %" PRIu8 ")\n", - local_cm_data->connect.endianness, - remote_cm_data->connect.endianness); - goto err; - } - if (remote_cm_data->connect.ctrl_version != local_cm_data->connect.ctrl_version) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "cm data ctrl_version mismatch " - "(local: %" PRIu8 ", remote: %" PRIu8 ")\n", - local_cm_data->connect.ctrl_version, - remote_cm_data->connect.ctrl_version); - goto err; - } - if (remote_cm_data->connect.op_version != local_cm_data->connect.op_version) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "cm data op_version mismatch " - "(local: %" PRIu8 ", remote: %" PRIu8 ")\n", - local_cm_data->connect.op_version, - remote_cm_data->connect.op_version); - goto err; - } - if (remote_cm_data->connect.eager_limit != - local_cm_data->connect.eager_limit) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "cm data eager_limit mismatch " - "(local: %" PRIu32 ", remote: %" PRIu32 ")\n", - local_cm_data->connect.eager_limit, - remote_cm_data->connect.eager_limit); - goto err; + union rxm_cm_data *cm_data; + uint8_t reason; + + FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, + "Processing reject for handle: %p\n", conn); + assert(ofi_ep_lock_held(&conn->ep->util_ep)); + + if (entry->err_data_size >= sizeof(cm_data->reject)) { + cm_data = entry->err_data; + if (cm_data->reject.version != RXM_CM_DATA_VERSION) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "invalid reject version\n"); + reason = RXM_REJECT_ECONNREFUSED; + } else { + reason = cm_data->reject.reason; + } + } else { + reason = RXM_REJECT_ECONNREFUSED; } - return FI_SUCCESS; -err: - return -FI_EINVAL; -} -static size_t rxm_conn_get_rx_size(struct rxm_ep *rxm_ep, - struct fi_info *msg_info) -{ - if (msg_info->ep_attr->rx_ctx_cnt == FI_SHARED_CONTEXT) - return MAX(MIN(16, msg_info->rx_attr->size), - (msg_info->rx_attr->size / - ofi_av_size(rxm_ep->util_ep.av))); - else - return msg_info->rx_attr->size; + switch (conn->state) { + case RXM_CM_IDLE: + /* Unlikely, but can occur if our request was rejected, and + * there was a failure trying to accept the peer's. + */ + break; + case RXM_CM_CONNECTING: + rxm_close_conn(conn); + if (reason != RXM_REJECT_EALREADY) + rxm_free_conn(conn); + break; + case RXM_CM_ACCEPTING: + case RXM_CM_CONNECTED: + /* Our request was rejected, but we accepted the peer's. */ + break; + default: + assert(0); + break; + } } static int -rxm_msg_process_connreq(struct rxm_ep *rxm_ep, struct fi_info *msg_info, - union rxm_cm_data *remote_cm_data) +rxm_verify_connreq(struct rxm_ep *ep, union rxm_cm_data *cm_data) { - struct rxm_conn *rxm_conn; - union rxm_cm_data cm_data = { - .connect = { - .version = RXM_CM_DATA_VERSION, - .endianness = ofi_detect_endianness(), - .ctrl_version = RXM_CTRL_VERSION, - .op_version = RXM_OP_VERSION, - .eager_limit = rxm_ep->eager_limit, - }, - }; - union rxm_cm_data reject_cm_data = { - .reject = { - .version = RXM_CM_DATA_VERSION, - .reason = RXM_CMAP_REJECT_GENUINE, - } - }; - struct rxm_cmap_handle *handle; - struct sockaddr_storage remote_pep_addr; - int ret; - - assert(sizeof(uint32_t) == sizeof(cm_data.accept.rx_size)); - assert(msg_info->rx_attr->size <= (uint32_t)-1); - - if (rxm_conn_verify_cm_data(remote_cm_data, &cm_data)) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, - "CM data mismatch was detected\n"); - ret = -FI_EINVAL; - goto err1; + if (cm_data->connect.version != RXM_CM_DATA_VERSION) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "cm version mismatch"); + return -FI_EINVAL; } - memcpy(&remote_pep_addr, msg_info->dest_addr, msg_info->dest_addrlen); - ofi_addr_set_port((struct sockaddr *)&remote_pep_addr, - remote_cm_data->connect.port); + if (cm_data->connect.endianness != ofi_detect_endianness()) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "endianness mismatch"); + return -FI_EINVAL; + } - ret = rxm_cmap_process_connreq(rxm_ep->cmap, &remote_pep_addr, - &handle, &reject_cm_data.reject.reason); - if (ret) - goto err1; + if (cm_data->connect.ctrl_version != RXM_CTRL_VERSION) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "cm ctrl_version mismatch"); + return -FI_EINVAL; + } - rxm_conn = container_of(handle, struct rxm_conn, handle); + if (cm_data->connect.op_version != RXM_OP_VERSION) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "cm op_version mismatch"); + return -FI_EINVAL; + } - rxm_conn->handle.remote_key = remote_cm_data->connect.client_conn_id; + if (cm_data->connect.eager_limit != ep->eager_limit) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "eager_limit mismatch"); + return -FI_EINVAL; + } - ret = rxm_msg_ep_open(rxm_ep, msg_info, rxm_conn, handle); - if (ret) - goto err2; + return FI_SUCCESS; +} - cm_data.accept.server_conn_id = rxm_conn->handle.key; - cm_data.accept.rx_size = rxm_conn_get_rx_size(rxm_ep, msg_info); +static void +rxm_reject_connreq(struct rxm_ep *ep, struct rxm_eq_cm_entry *cm_entry, + uint8_t reason) +{ + union rxm_cm_data cm_data; - ret = fi_accept(rxm_conn->msg_ep, &cm_data.accept.server_conn_id, - sizeof(cm_data.accept)); - if (ret) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, - "Unable to accept incoming connection\n"); - goto err2; - } + cm_data.reject.version = RXM_CM_DATA_VERSION; + cm_data.reject.reason = reason; - return ret; -err2: - rxm_cmap_del_handle(&rxm_conn->handle); -err1: - FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, - "rejecting incoming connection request (reject reason: %d)\n", - (enum rxm_cmap_reject_reason)reject_cm_data.reject.reason); - fi_reject(rxm_ep->msg_pep, msg_info->handle, - &reject_cm_data.reject, sizeof(reject_cm_data.reject)); - return ret; + fi_reject(ep->msg_pep, cm_entry->info->handle, + &cm_data.reject, sizeof(cm_data.reject)); } -static void rxm_flush_msg_cq(struct rxm_ep *rxm_ep) +static int +rxm_accept_connreq(struct rxm_conn *conn, struct rxm_eq_cm_entry *cm_entry) { - struct fi_cq_data_entry comp; - int ret; - do { - ret = fi_cq_read(rxm_ep->msg_cq, &comp, 1); - if (ret > 0) { - ret = rxm_handle_comp(rxm_ep, &comp); - if (OFI_UNLIKELY(ret)) { - rxm_cq_write_error_all(rxm_ep, ret); - } else { - ret = 1; - } - } else if (ret == -FI_EAVAIL) { - rxm_handle_comp_error(rxm_ep); - ret = 1; - } else if (ret < 0 && ret != -FI_EAGAIN) { - rxm_cq_write_error_all(rxm_ep, ret); - } - } while (ret > 0); + union rxm_cm_data cm_data; + + cm_data.accept.server_conn_id = conn->peer->index; + cm_data.accept.rx_size = cm_entry->info->rx_attr->size; + + return fi_accept(conn->msg_ep, &cm_data.accept, sizeof(cm_data.accept)); } -static int rxm_conn_handle_notify(struct fi_eq_entry *eq_entry) +static void +rxm_process_connreq(struct rxm_ep *ep, struct rxm_eq_cm_entry *cm_entry) { - struct rxm_cmap *cmap; - struct rxm_cmap_handle *handle; - - FI_INFO(&rxm_prov, FI_LOG_EP_CTRL, "notify event %" PRIu64 "\n", - eq_entry->data); + union ofi_sock_ip peer_addr; + struct rxm_peer_addr *peer; + struct rxm_conn *conn; + struct rxm_av *av; + ssize_t ret; + int cmp; - if ((enum rxm_cmap_signal) eq_entry->data != RXM_CMAP_FREE) - return -FI_EOTHER; + assert(ofi_ep_lock_held(&ep->util_ep)); + if (rxm_verify_connreq(ep, &cm_entry->data)) + goto reject; - handle = eq_entry->context; - assert(handle->state == RXM_CMAP_SHUTDOWN); - FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "freeing handle: %p\n", handle); - cmap = handle->cmap; + memcpy(&peer_addr, cm_entry->info->dest_addr, + cm_entry->info->dest_addrlen); + ofi_addr_set_port(&peer_addr.sa, cm_entry->data.connect.port); - rxm_conn_close(handle); + av = container_of(ep->util_ep.av, struct rxm_av, util_av); + peer = rxm_get_peer(av, &peer_addr); + if (!peer) + goto reject; - // after closing the connection, we need to flush any dangling references to the - // handle from msg_cq entries that have not been cleaned up yet, otherwise we - // could run into problems during CQ cleanup. these entries will be errored so - // keep reading through EAVAIL. - rxm_flush_msg_cq(cmap->ep); + conn = rxm_add_conn(ep, peer); + if (!conn) + goto remove; - if (handle->peer) { - dlist_remove(&handle->peer->entry); - free(handle->peer); - handle->peer = NULL; + FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "connreq for %p\n", conn); + switch (conn->state) { + case RXM_CM_IDLE: + break; + case RXM_CM_CONNECTING: + /* simultaneous connections */ + cmp = ofi_addr_cmp(&rxm_prov, &peer_addr.sa, &peer->addr.sa); + if (cmp < 0) { + /* let our request finish */ + rxm_reject_connreq(ep, cm_entry, + RXM_REJECT_ECONNREFUSED); + goto put; + } else if (cmp > 0) { + /* accept peer's request */ + rxm_close_conn(conn); + } else { + /* FIXME: handle connection from self */ + break; + } + break; + case RXM_CM_ACCEPTING: + case RXM_CM_CONNECTED: + /* peer reset and lost previous connection state */ + rxm_close_conn(conn); + break; + default: + assert(0); + break; } - if (handle->fi_addr != FI_ADDR_NOTAVAIL) { - cmap->handles_av[handle->fi_addr] = NULL; - handle->fi_addr = FI_ADDR_NOTAVAIL; - } + conn->remote_index = cm_entry->data.connect.client_conn_id; + ret = rxm_open_conn(conn, cm_entry->info); + if (ret) + goto free; - rxm_conn_free(handle); - return 0; + ret = rxm_accept_connreq(conn, cm_entry); + if (ret) + goto close; + + conn->state = RXM_CM_ACCEPTING; +put: + rxm_put_peer(peer); + fi_freeinfo(cm_entry->info); + return; + +close: + rxm_close_conn(conn); +free: + rxm_free_conn(conn); +remove: + rxm_put_peer(peer); +reject: + rxm_reject_connreq(ep, cm_entry, RXM_REJECT_ECONNREFUSED); + fi_freeinfo(cm_entry->info); } -static void rxm_conn_wake_up_wait_obj(struct rxm_ep *rxm_ep) +void rxm_process_shutdown(struct rxm_conn *conn) { - if (rxm_ep->util_ep.tx_cq && rxm_ep->util_ep.tx_cq->wait) - util_cq_signal(rxm_ep->util_ep.tx_cq); - if (rxm_ep->util_ep.tx_cntr && rxm_ep->util_ep.tx_cntr->wait) - util_cntr_signal(rxm_ep->util_ep.tx_cntr); + FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "shutdown conn %p\n", conn); + assert(ofi_ep_lock_held(&conn->ep->util_ep)); + + switch (conn->state) { + case RXM_CM_IDLE: + break; + case RXM_CM_CONNECTING: + case RXM_CM_ACCEPTING: + case RXM_CM_CONNECTED: + rxm_close_conn(conn); + rxm_free_conn(conn); + break; + default: + break; + } } -static int -rxm_conn_handle_reject(struct rxm_ep *rxm_ep, struct rxm_msg_eq_entry *entry) +static void rxm_handle_error(struct rxm_ep *ep) { - union rxm_cm_data *cm_data = entry->err_entry.err_data; - - if (!cm_data || entry->err_entry.err_data_size != sizeof(cm_data->reject)) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "connection reject: " - "no reject error data (cm_data) was found " - "(data length expected: %zu found: %zu)\n", - sizeof(cm_data->reject), - entry->err_entry.err_data_size); - return -FI_EOTHER; - } + struct fi_eq_err_entry entry; + ssize_t ret; - if (cm_data->reject.version != RXM_CM_DATA_VERSION) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "connection reject: " - "cm data version mismatch (local: %" PRIu8 - ", remote: %" PRIu8 ")\n", - (uint8_t) RXM_CM_DATA_VERSION, - cm_data->reject.version); - return -FI_EOTHER; + assert(ofi_ep_lock_held(&ep->util_ep)); + ret = fi_eq_readerr(ep->msg_eq, &entry, 0); + if (ret != sizeof(entry)) { + if (ret != -FI_EAGAIN) + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, + "unable to fi_eq_readerr: %zd\n", ret); + return; } - if (cm_data->reject.reason == RXM_CMAP_REJECT_GENUINE) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "connection reject: " - "remote peer didn't accept the connection\n"); - FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "connection reject: " - "(reason: RXM_CMAP_REJECT_GENUINE)\n"); - OFI_EQ_STRERROR(&rxm_prov, FI_LOG_WARN, FI_LOG_EP_CTRL, - rxm_ep->msg_eq, &entry->err_entry); - } else if (cm_data->reject.reason == RXM_CMAP_REJECT_SIMULT_CONN) { - FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "connection reject: " - "(reason: RXM_CMAP_REJECT_SIMULT_CONN)\n"); + if (entry.err == ECONNREFUSED) { + rxm_process_reject(entry.fid->context, &entry); } else { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "connection reject: " - "received unknown reject reason: %d\n", - cm_data->reject.reason); + OFI_EQ_STRERROR(&rxm_prov, FI_LOG_WARN, FI_LOG_EP_CTRL, + ep->msg_eq, &entry); } - rxm_cmap_process_reject(rxm_ep->cmap, entry->context, - cm_data->reject.reason); - return 0; } -static int -rxm_conn_handle_event(struct rxm_ep *rxm_ep, struct rxm_msg_eq_entry *entry) +static void +rxm_handle_event(struct rxm_ep *ep, uint32_t event, + struct rxm_eq_cm_entry *cm_entry, size_t len) { - if (entry->rd == -FI_ECONNREFUSED) - return rxm_conn_handle_reject(rxm_ep, entry); - - switch (entry->event) { + assert(ofi_ep_lock_held(&ep->util_ep)); + switch (event) { case FI_NOTIFY: - return rxm_conn_handle_notify((struct fi_eq_entry *) - &entry->cm_entry); + break; case FI_CONNREQ: - FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "Got new connection\n"); - if ((size_t)entry->rd != RXM_CM_ENTRY_SZ) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, - "Received a connection request with no CM data. " - "Is sender running FI_PROTO_RXM?\n"); - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "Received CM entry " - "size (%zd) not matching expected (%zu)\n", - entry->rd, RXM_CM_ENTRY_SZ); - return -FI_EOTHER; - } - rxm_msg_process_connreq(rxm_ep, entry->cm_entry.info, - (union rxm_cm_data *) entry->cm_entry.data); - fi_freeinfo(entry->cm_entry.info); + rxm_process_connreq(ep, cm_entry); break; case FI_CONNECTED: - assert(entry->cm_entry.fid->context); - FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, - "connection successful\n"); - rxm_cmap_process_connect(rxm_ep->cmap, - entry->cm_entry.fid->context, - entry->rd - sizeof(entry->cm_entry) > 0 ? - (union rxm_cm_data *) entry->cm_entry.data : NULL); - rxm_conn_wake_up_wait_obj(rxm_ep); + rxm_process_connect(cm_entry); break; case FI_SHUTDOWN: - FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, - "Received connection shutdown\n"); - rxm_cmap_process_shutdown(rxm_ep->cmap, - entry->cm_entry.fid->context); + rxm_process_shutdown(cm_entry->fid->context); break; default: FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, - "Unknown event: %u\n", entry->event); - return -FI_EOTHER; + "Unknown event: %u\n", event); + break; } - return 0; } -static ssize_t rxm_eq_sread(struct rxm_ep *rxm_ep, size_t len, - struct rxm_msg_eq_entry *entry) +void rxm_conn_progress(struct rxm_ep *ep) { - ssize_t rd; + struct rxm_eq_cm_entry cm_entry; + uint32_t event; + int ret; - /* TODO convert this to poll + fi_eq_read so that we can grab - * rxm_ep lock before reading the EQ. This is needed to avoid - * processing events / error entries from closed MSG EPs. This - * can be done only for non-Windows OSes as Windows doesn't - * have poll for a generic file descriptor. - */ - rd = fi_eq_sread(rxm_ep->msg_eq, &entry->event, &entry->cm_entry, - len, -1, 0); - if (rd >= 0) - return rd; + assert(ofi_ep_lock_held(&ep->util_ep)); + do { + ret = fi_eq_read(ep->msg_eq, &event, &cm_entry, + sizeof(cm_entry), 0); + if (ret > 0) { + rxm_handle_event(ep, event, &cm_entry, ret); + } else if (ret == -FI_EAVAIL) { + rxm_handle_error(ep); + ret = 1; + } + } while (ret > 0); +} - if (rd != -FI_EAVAIL) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, - "unable to fi_eq_sread: %s (%zd)\n", - fi_strerror(-rd), -rd); - return rd; +void rxm_stop_listen(struct rxm_ep *ep) +{ + struct fi_eq_entry entry = {0}; + int ret; + + FI_INFO(&rxm_prov, FI_LOG_EP_CTRL, "stopping CM thread\n"); + if (!ep->cm_thread) + return; + + ofi_ep_lock_acquire(&ep->util_ep); + ep->do_progress = false; + ofi_ep_lock_release(&ep->util_ep); + + ret = fi_eq_write(ep->msg_eq, FI_NOTIFY, &entry, sizeof(entry), 0); + if (ret != sizeof(entry)) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "Unable to signal\n"); + return; } - ofi_ep_lock_acquire(&rxm_ep->util_ep); - rd = rxm_eq_readerr(rxm_ep, entry); - ofi_ep_lock_release(&rxm_ep->util_ep); - return rd; + ret = pthread_join(ep->cm_thread, NULL); + if (ret) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, + "Unable to join CM thread\n"); + } } -static inline int rxm_conn_eq_event(struct rxm_ep *rxm_ep, - struct rxm_msg_eq_entry *entry) +static void rxm_flush_msg_cq(struct rxm_ep *ep) { + struct fi_cq_data_entry comp; int ret; - ofi_ep_lock_acquire(&rxm_ep->util_ep); - ret = rxm_conn_handle_event(rxm_ep, entry) ? -1 : 0; - ofi_ep_lock_release(&rxm_ep->util_ep); - - return ret; + assert(ofi_ep_lock_held(&ep->util_ep)); + do { + ret = fi_cq_read(ep->msg_cq, &comp, 1); + if (ret > 0) { + ret = rxm_handle_comp(ep, &comp); + if (ret) { + rxm_cq_write_error_all(ep, ret); + } else { + ret = 1; + } + } else if (ret == -FI_EAVAIL) { + rxm_handle_comp_error(ep); + ret = 1; + } else if (ret < 0 && ret != -FI_EAGAIN) { + rxm_cq_write_error_all(ep, ret); + } + } while (ret > 0); } -static void *rxm_conn_progress(void *arg) +static void *rxm_cm_progress(void *arg) { struct rxm_ep *ep = container_of(arg, struct rxm_ep, util_ep); - struct rxm_msg_eq_entry *entry; - - entry = alloca(RXM_MSG_EQ_ENTRY_SZ); - if (!entry) - return NULL; + struct rxm_eq_cm_entry cm_entry; + uint32_t event; + ssize_t ret; FI_INFO(&rxm_prov, FI_LOG_EP_CTRL, "Starting auto-progress thread\n"); ofi_ep_lock_acquire(&ep->util_ep); while (ep->do_progress) { ofi_ep_lock_release(&ep->util_ep); - memset(entry, 0, RXM_MSG_EQ_ENTRY_SZ); - entry->rd = rxm_eq_sread(ep, RXM_CM_ENTRY_SZ, entry); - if (entry->rd >= 0 || entry->rd == -FI_ECONNREFUSED) - rxm_conn_eq_event(ep, entry); + + ret = fi_eq_sread(ep->msg_eq, &event, &cm_entry, + sizeof(cm_entry), -1, 0); ofi_ep_lock_acquire(&ep->util_ep); + if (ret > 0) { + rxm_handle_event(ep, event, &cm_entry, ret); + } else if (ret == -FI_EAVAIL) { + rxm_handle_error(ep); + } else { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, + "Fatal error reading from msg eq"); + break; + } } ofi_ep_lock_release(&ep->util_ep); @@ -1284,27 +728,9 @@ static void *rxm_conn_progress(void *arg) return NULL; } -static inline int -rxm_conn_auto_progress_eq(struct rxm_ep *rxm_ep, struct rxm_msg_eq_entry *entry) -{ - memset(entry, 0, RXM_MSG_EQ_ENTRY_SZ); - - ofi_ep_lock_acquire(&rxm_ep->util_ep); - entry->rd = rxm_eq_read(rxm_ep, RXM_CM_ENTRY_SZ, entry); - ofi_ep_lock_release(&rxm_ep->util_ep); - - if (!entry->rd || entry->rd == -FI_EAGAIN) - return FI_SUCCESS; - if (entry->rd < 0 && entry->rd != -FI_ECONNREFUSED) - return entry->rd; - - return rxm_conn_eq_event(rxm_ep, entry); -} - -static void *rxm_conn_atomic_progress(void *arg) +static void *rxm_cm_atomic_progress(void *arg) { struct rxm_ep *ep = container_of(arg, struct rxm_ep, util_ep); - struct rxm_msg_eq_entry *entry; struct rxm_fabric *fabric; struct fid *fids[2] = { &ep->msg_eq->fid, @@ -1316,13 +742,8 @@ static void *rxm_conn_atomic_progress(void *arg) }; int ret; - entry = alloca(RXM_MSG_EQ_ENTRY_SZ); - if (!entry) - return NULL; - fabric = container_of(ep->util_ep.domain->fabric, struct rxm_fabric, util_fabric); - ret = fi_control(&ep->msg_eq->fid, FI_GETWAIT, &fds[0].fd); if (ret) { FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, @@ -1344,18 +765,15 @@ static void *rxm_conn_atomic_progress(void *arg) ret = fi_trywait(fabric->msg_fabric, fids, 2); if (!ret) { - fds[0].revents = 0; - fds[1].revents = 0; - ret = poll(fds, 2, -1); if (ret == -1) { FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "Select error %s\n", strerror(errno)); } } - rxm_conn_auto_progress_eq(ep, entry); ep->util_ep.progress(&ep->util_ep); ofi_ep_lock_acquire(&ep->util_ep); + rxm_conn_progress(ep); } ofi_ep_lock_release(&ep->util_ep); @@ -1363,141 +781,39 @@ static void *rxm_conn_atomic_progress(void *arg) return NULL; } -static int rxm_prepare_cm_data(struct fid_pep *pep, struct rxm_cmap_handle *handle, - union rxm_cm_data *cm_data) +int rxm_start_listen(struct rxm_ep *ep) { - struct sockaddr_storage name; - size_t cm_data_size = 0; - size_t name_size = sizeof(name); - size_t opt_size = sizeof(cm_data_size); + size_t addr_len; int ret; - ret = fi_getopt(&pep->fid, FI_OPT_ENDPOINT, FI_OPT_CM_DATA_SIZE, - &cm_data_size, &opt_size); + ret = fi_listen(ep->msg_pep); if (ret) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "fi_getopt failed\n"); + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, + "unable to set msg PEP to listen state\n"); return ret; } - if (cm_data_size < sizeof(*cm_data)) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "MSG EP CM data size too small\n"); - return -FI_EOTHER; - } - - ret = fi_getname(&pep->fid, &name, &name_size); + addr_len = sizeof(ep->addr); + ret = fi_getname(&ep->msg_pep->fid, &ep->addr, &addr_len); if (ret) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "Unable to get msg pep name\n"); - return ret; - } - - cm_data->connect.port = ofi_addr_get_port((struct sockaddr *)&name); - cm_data->connect.client_conn_id = handle->key; - return 0; -} - -static int -rxm_conn_connect(struct rxm_ep *ep, struct rxm_cmap_handle *handle, - const void *addr) -{ - int ret; - struct rxm_conn *rxm_conn = container_of(handle, struct rxm_conn, handle); - union rxm_cm_data cm_data = { - .connect = { - .version = RXM_CM_DATA_VERSION, - .ctrl_version = RXM_CTRL_VERSION, - .op_version = RXM_OP_VERSION, - .endianness = ofi_detect_endianness(), - .eager_limit = ep->eager_limit, - }, - }; - - assert(sizeof(uint32_t) == sizeof(cm_data.connect.eager_limit)); - assert(sizeof(uint32_t) == sizeof(cm_data.connect.rx_size)); - assert(ep->msg_info->rx_attr->size <= (uint32_t) -1); - - free(ep->msg_info->dest_addr); - ep->msg_info->dest_addrlen = ep->msg_info->src_addrlen; - - ep->msg_info->dest_addr = mem_dup(addr, ep->msg_info->dest_addrlen); - if (!ep->msg_info->dest_addr) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "mem_dup failed, len %zu\n", - ep->msg_info->dest_addrlen); - return -FI_ENOMEM; - } - - ret = rxm_msg_ep_open(ep, ep->msg_info, rxm_conn, &rxm_conn->handle); - if (ret) + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, + "Unable to get msg pep name\n"); return ret; - - /* We have to send passive endpoint's address to the server since the - * address from which connection request would be sent would have a - * different port. */ - ret = rxm_prepare_cm_data(ep->msg_pep, &rxm_conn->handle, &cm_data); - if (ret) - goto err; - - cm_data.connect.rx_size = rxm_conn_get_rx_size(ep, ep->msg_info); - - ret = fi_connect(rxm_conn->msg_ep, ep->msg_info->dest_addr, - &cm_data, sizeof(cm_data)); - if (ret) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "unable to connect msg_ep\n"); - goto err; } - return 0; - -err: - fi_close(&rxm_conn->msg_ep->fid); - rxm_conn->msg_ep = NULL; - return ret; -} - -static int rxm_conn_signal(struct rxm_ep *ep, void *context, - enum rxm_cmap_signal signal) -{ - struct fi_eq_entry entry = {0}; - ssize_t rd; - entry.context = context; - entry.data = (uint64_t) signal; + if (ep->util_ep.domain->data_progress == FI_PROGRESS_AUTO || + force_auto_progress) { - rd = fi_eq_write(ep->msg_eq, FI_NOTIFY, &entry, sizeof(entry), 0); - if (rd != sizeof(entry)) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "Unable to signal\n"); - return (int)rd; + assert(ep->util_ep.domain->threading == FI_THREAD_SAFE); + ep->do_progress = true; + if (pthread_create(&ep->cm_thread, 0, + ep->rxm_info->caps & FI_ATOMIC ? + rxm_cm_atomic_progress : + rxm_cm_progress, ep)) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, + "unable to create cm thread\n"); + return -ofi_syserr(); + } } return 0; } - -int rxm_conn_cmap_alloc(struct rxm_ep *rxm_ep) -{ - struct rxm_cmap_attr attr; - int ret; - size_t len = rxm_ep->util_ep.av->addrlen; - void *name = calloc(1, len); - if (!name) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, - "Unable to allocate memory for EP name\n"); - return -FI_ENOMEM; - } - - /* Passive endpoint should already have fi_setname or fi_listen - * called on it for this to work */ - ret = fi_getname(&rxm_ep->msg_pep->fid, name, &len); - if (ret) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, - "Unable to fi_getname on msg_ep\n"); - goto fn; - } - ofi_straddr_dbg(&rxm_prov, FI_LOG_EP_CTRL, "local_name", name); - - attr.name = name; - - ret = rxm_cmap_alloc(rxm_ep, &attr); - if (ret) - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, - "Unable to allocate CMAP\n"); -fn: - free(name); - return ret; -} diff --git a/prov/rxm/src/rxm_cq.c b/prov/rxm/src/rxm_cq.c index d2e43c7e918..579c9552304 100644 --- a/prov/rxm/src/rxm_cq.c +++ b/prov/rxm/src/rxm_cq.c @@ -77,10 +77,8 @@ rxm_rx_buf_alloc(struct rxm_ep *rxm_ep, struct fid_ep *rx_ep) rx_buf->rx_ep = rx_ep; rx_buf->repost = true; - if (!rxm_ep->srx_ctx) { - rx_buf->conn = container_of(rx_ep->fid.context, - struct rxm_conn, handle); - } + if (!rxm_ep->srx_ctx) + rx_buf->conn = rx_ep->fid.context; return rx_buf; } @@ -112,7 +110,7 @@ static void rxm_finish_buf_recv(struct rxm_rx_buf *rx_buf) if ((rx_buf->pkt.ctrl_hdr.type == rxm_ctrl_seg) && rxm_sar_get_seg_type(&rx_buf->pkt.ctrl_hdr) != RXM_SAR_SEG_FIRST) { dlist_insert_tail(&rx_buf->unexp_msg.entry, - &rx_buf->conn->sar_deferred_rx_msg_list); + &rx_buf->conn->deferred_sar_segments); rxm_replace_rx_buf(rx_buf); } @@ -411,15 +409,15 @@ static void rxm_process_seg_data(struct rxm_rx_buf *rx_buf, int *done) } else { if (rx_buf->recv_entry->sar.msg_id == RXM_SAR_RX_INIT) { if (!rx_buf->conn) { - rx_buf->conn = rxm_key2conn(rx_buf->ep, - rx_buf->pkt.ctrl_hdr.conn_id); + rx_buf->conn = ofi_idm_at(&rx_buf->ep->conn_idx_map, + (int) rx_buf->pkt.ctrl_hdr.conn_id); } rx_buf->recv_entry->sar.conn = rx_buf->conn; rx_buf->recv_entry->sar.msg_id = rx_buf->pkt.ctrl_hdr.msg_id; dlist_insert_tail(&rx_buf->recv_entry->sar.entry, - &rx_buf->conn->sar_rx_msg_list); + &rx_buf->conn->deferred_sar_msgs); } /* The RX buffer can be reposted for further re-use */ @@ -446,7 +444,7 @@ static void rxm_handle_seg_data(struct rxm_rx_buf *rx_buf) conn = rx_buf->conn; msg_id = rx_buf->pkt.ctrl_hdr.msg_id; - dlist_foreach_container_safe(&conn->sar_deferred_rx_msg_list, + dlist_foreach_container_safe(&conn->deferred_sar_segments, struct rxm_rx_buf, rx_buf, unexp_msg.entry, entry) { if (!rxm_rx_buf_match_msg_id(&rx_buf->unexp_msg.entry, &msg_id)) @@ -495,7 +493,7 @@ static ssize_t rxm_rndv_xfer(struct rxm_ep *rxm_ep, struct fid_ep *msg_ep, if (ret) break; - rxm_ep_enqueue_deferred_tx_queue(def_tx_entry); + rxm_queue_deferred_tx(def_tx_entry, OFI_LIST_TAIL); continue; } break; @@ -579,8 +577,8 @@ static ssize_t rxm_handle_rndv(struct rxm_rx_buf *rx_buf) if (!rx_buf->conn) { assert(rx_buf->ep->srx_ctx); - rx_buf->conn = rxm_key2conn(rx_buf->ep, - rx_buf->pkt.ctrl_hdr.conn_id); + rx_buf->conn = ofi_idm_at(&rx_buf->ep->conn_idx_map, + (int) rx_buf->pkt.ctrl_hdr.conn_id); if (!rx_buf->conn) return -FI_EOTHER; } @@ -770,11 +768,11 @@ static ssize_t rxm_handle_recv_comp(struct rxm_rx_buf *rx_buf) if (rx_buf->ep->rxm_info->caps & (FI_SOURCE | FI_DIRECTED_RECV)) { if (rx_buf->ep->srx_ctx) - rx_buf->conn = rxm_key2conn(rx_buf->ep, rx_buf-> - pkt.ctrl_hdr.conn_id); + rx_buf->conn = ofi_idm_at(&rx_buf->ep->conn_idx_map, + (int) rx_buf->pkt.ctrl_hdr.conn_id); if (!rx_buf->conn) return -FI_EOTHER; - match_attr.addr = rx_buf->conn->handle.fi_addr; + match_attr.addr = rx_buf->conn->peer->fi_addr; } if (rx_buf->ep->rxm_info->mode & FI_BUFFERED_RECV) { @@ -812,15 +810,15 @@ static ssize_t rxm_sar_handle_segment(struct rxm_rx_buf *rx_buf) { struct dlist_entry *sar_entry; - rx_buf->conn = rxm_key2conn(rx_buf->ep, - rx_buf->pkt.ctrl_hdr.conn_id); + rx_buf->conn = ofi_idm_at(&rx_buf->ep->conn_idx_map, + (int) rx_buf->pkt.ctrl_hdr.conn_id); if (!rx_buf->conn) return -FI_EOTHER; FI_DBG(&rxm_prov, FI_LOG_CQ, "Got incoming recv with msg_id: 0x%" PRIx64 " for conn - %p\n", rx_buf->pkt.ctrl_hdr.msg_id, rx_buf->conn); - sar_entry = dlist_find_first_match(&rx_buf->conn->sar_rx_msg_list, + sar_entry = dlist_find_first_match(&rx_buf->conn->deferred_sar_msgs, rxm_sar_match_msg_id, &rx_buf->pkt.ctrl_hdr.msg_id); if (!sar_entry) @@ -849,7 +847,7 @@ static void rxm_rndv_send_rd_done(struct rxm_rx_buf *rx_buf) rx_buf->recv_entry->rndv.tx_buf = buf; buf->pkt.ctrl_hdr.type = rxm_ctrl_rndv_rd_done; - buf->pkt.ctrl_hdr.conn_id = rx_buf->conn->handle.remote_key; + buf->pkt.ctrl_hdr.conn_id = rx_buf->conn->remote_index; buf->pkt.ctrl_hdr.msg_id = rx_buf->pkt.ctrl_hdr.msg_id; ret = fi_send(rx_buf->conn->msg_ep, &buf->pkt, sizeof(buf->pkt), @@ -862,7 +860,7 @@ static void rxm_rndv_send_rd_done(struct rxm_rx_buf *rx_buf) if (def_entry) { def_entry->rndv_ack.rx_buf = rx_buf; def_entry->rndv_ack.pkt_size = sizeof(rx_buf->pkt); - rxm_ep_enqueue_deferred_tx_queue(def_entry); + rxm_queue_deferred_tx(def_entry, OFI_LIST_TAIL); return; } } @@ -915,7 +913,7 @@ rxm_rndv_send_wr_done(struct rxm_ep *rxm_ep, struct rxm_tx_buf *tx_buf) RXM_DEFERRED_TX_RNDV_DONE); if (def_entry) { def_entry->rndv_done.tx_buf = tx_buf; - rxm_ep_enqueue_deferred_tx_queue(def_entry); + rxm_queue_deferred_tx(def_entry, OFI_LIST_TAIL); return; } } @@ -956,7 +954,7 @@ ssize_t rxm_rndv_send_wr_data(struct rxm_rx_buf *rx_buf) rx_buf->recv_entry->rndv.tx_buf = buf; buf->pkt.ctrl_hdr.type = rxm_ctrl_rndv_wr_data; - buf->pkt.ctrl_hdr.conn_id = rx_buf->conn->handle.remote_key; + buf->pkt.ctrl_hdr.conn_id = rx_buf->conn->remote_index; buf->pkt.ctrl_hdr.msg_id = rx_buf->pkt.ctrl_hdr.msg_id; rxm_rndv_hdr_init(rx_buf->ep, buf->pkt.data, rx_buf->recv_entry->rxm_iov.iov, @@ -974,7 +972,7 @@ ssize_t rxm_rndv_send_wr_data(struct rxm_rx_buf *rx_buf) def_entry->rndv_ack.pkt_size = sizeof(buf->pkt) + sizeof(struct rxm_rndv_hdr); - rxm_ep_enqueue_deferred_tx_queue(def_entry); + rxm_queue_deferred_tx(def_entry, OFI_LIST_TAIL); return 0; } } @@ -1040,7 +1038,7 @@ static ssize_t rxm_atomic_send_resp(struct rxm_ep *rxm_ep, rx_buf->pkt.hdr.op, rx_buf->pkt.hdr.atomic.datatype, rx_buf->pkt.hdr.atomic.op); - resp_buf->pkt.ctrl_hdr.conn_id = rx_buf->conn->handle.remote_key; + resp_buf->pkt.ctrl_hdr.conn_id = rx_buf->conn->remote_index; resp_buf->pkt.ctrl_hdr.msg_id = rx_buf->pkt.ctrl_hdr.msg_id; atomic_hdr = (struct rxm_atomic_resp_hdr *) resp_buf->pkt.data; atomic_hdr->status = htonl(status); @@ -1071,7 +1069,7 @@ static ssize_t rxm_atomic_send_resp(struct rxm_ep *rxm_ep, def_tx_entry->atomic_resp.tx_buf = resp_buf; def_tx_entry->atomic_resp.len = tot_len; - rxm_ep_enqueue_deferred_tx_queue(def_tx_entry); + rxm_queue_deferred_tx(def_tx_entry, OFI_LIST_TAIL); ret = 0; } } @@ -1172,8 +1170,8 @@ static ssize_t rxm_handle_atomic_req(struct rxm_ep *rxm_ep, op == ofi_op_atomic_compare); if (rx_buf->ep->srx_ctx) - rx_buf->conn = rxm_key2conn(rx_buf->ep, - rx_buf->pkt.ctrl_hdr.conn_id); + rx_buf->conn = ofi_idm_at(&rx_buf->ep->conn_idx_map, + (int) rx_buf->pkt.ctrl_hdr.conn_id); if (!rx_buf->conn) return -FI_EOTHER; @@ -1491,14 +1489,14 @@ static void rxm_get_recv_entry(struct rxm_rx_buf *rx_buf, struct ofi_cq_rbuf_entry *cq_entry) { struct rxm_recv_match_attr match_attr; - struct rxm_cmap_handle *cm_handle; + struct rxm_conn *conn; struct rxm_recv_queue *recv_queue; struct dlist_entry *entry; assert(!rx_buf->recv_entry); if (rx_buf->ep->rxm_info->caps & (FI_SOURCE | FI_DIRECTED_RECV)) { - cm_handle = cq_entry->ep_context; - match_attr.addr = cm_handle->fi_addr; + conn = cq_entry->ep_context; + match_attr.addr = conn->peer->fi_addr; } else { match_attr.addr = FI_ADDR_UNSPEC; } @@ -1533,14 +1531,14 @@ static void rxm_get_recv_entry(struct rxm_rx_buf *rx_buf, static void rxm_fake_rx_hdr(struct rxm_rx_buf *rx_buf, struct ofi_cq_rbuf_entry *entry) { - struct rxm_cmap_handle *cm_handle; + struct rxm_conn *conn; - cm_handle = entry->ep_context; + conn = entry->ep_context; OFI_DBG_SET(rx_buf->pkt.hdr.version, OFI_OP_VERSION); OFI_DBG_SET(rx_buf->pkt.ctrl_hdr.version, RXM_CTRL_VERSION); rx_buf->pkt.ctrl_hdr.type = rxm_ctrl_eager; - rx_buf->pkt.ctrl_hdr.conn_id = cm_handle->remote_key; + rx_buf->pkt.ctrl_hdr.conn_id = conn->remote_index; rx_buf->pkt.hdr.op = ofi_op_tagged; rx_buf->pkt.hdr.tag = entry->tag; rx_buf->pkt.hdr.size = entry->len; @@ -1821,7 +1819,7 @@ void rxm_handle_comp_error(struct rxm_ep *rxm_ep) int rxm_post_recv(struct rxm_rx_buf *rx_buf) { struct rxm_domain *domain; - int ret, level; + int ret; if (rx_buf->ep->srx_ctx) rx_buf->conn = NULL; @@ -1837,9 +1835,7 @@ int rxm_post_recv(struct rxm_rx_buf *rx_buf) return 0; if (ret != -FI_EAGAIN) { - level = (rx_buf->conn->handle.state == RXM_CMAP_SHUTDOWN) ? - FI_LOG_DEBUG : FI_LOG_WARN; - FI_LOG(&rxm_prov, level, FI_LOG_EP_CTRL, + FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "unable to post recv buf: %d\n", ret); } return ret; @@ -1899,15 +1895,15 @@ void rxm_ep_do_progress(struct util_ep *util_ep) if (timestamp - rxm_ep->msg_cq_last_poll > rxm_cm_progress_interval) { rxm_ep->msg_cq_last_poll = timestamp; - rxm_msg_eq_progress(rxm_ep); + rxm_conn_progress(rxm_ep); } } } while ((ret > 0) && (++comp_read < rxm_ep->comp_per_progress)); - if (!dlist_empty(&rxm_ep->deferred_tx_conn_queue)) { - dlist_foreach_container_safe(&rxm_ep->deferred_tx_conn_queue, + if (!dlist_empty(&rxm_ep->deferred_queue)) { + dlist_foreach_container_safe(&rxm_ep->deferred_queue, struct rxm_conn, rxm_conn, - deferred_conn_entry, conn_entry_tmp) { + deferred_entry, conn_entry_tmp) { rxm_ep_progress_deferred_queue(rxm_ep, rxm_conn); } } diff --git a/prov/rxm/src/rxm_domain.c b/prov/rxm/src/rxm_domain.c index bab0b4ccbe1..cc1b86d0608 100644 --- a/prov/rxm/src/rxm_domain.c +++ b/prov/rxm/src/rxm_domain.c @@ -380,9 +380,8 @@ static struct fi_ops_mr rxm_domain_mr_ops = { static ssize_t rxm_send_credits(struct fid_ep *ep, size_t credits) { - struct rxm_conn *rxm_conn = - container_of(ep->fid.context, struct rxm_conn, handle); - struct rxm_ep *rxm_ep = rxm_conn->handle.cmap->ep; + struct rxm_conn *rxm_conn = ep->fid.context; + struct rxm_ep *rxm_ep = rxm_conn->ep; struct rxm_deferred_tx_entry *def_tx_entry; struct rxm_tx_buf *tx_buf; struct iovec iov; @@ -403,7 +402,7 @@ static ssize_t rxm_send_credits(struct fid_ep *ep, size_t credits) tx_buf->pkt.ctrl_hdr.msg_id = ofi_buf_index(tx_buf); tx_buf->pkt.ctrl_hdr.ctrl_data = credits; - if (rxm_conn->handle.state != RXM_CMAP_CONNECTED) + if (rxm_conn->state != RXM_CM_CONNECTED) goto defer; iov.iov_base = &tx_buf->pkt; @@ -428,7 +427,7 @@ static ssize_t rxm_send_credits(struct fid_ep *ep, size_t credits) } def_tx_entry->credit_msg.tx_buf = tx_buf; - rxm_ep_enqueue_deferred_tx_queue_priority(def_tx_entry); + rxm_queue_deferred_tx(def_tx_entry, OFI_LIST_HEAD); return FI_SUCCESS; } diff --git a/prov/rxm/src/rxm_ep.c b/prov/rxm/src/rxm_ep.c index 3d616c916a4..beac193b3f7 100644 --- a/prov/rxm/src/rxm_ep.c +++ b/prov/rxm/src/rxm_ep.c @@ -236,6 +236,7 @@ static void rxm_recv_queue_close(struct rxm_recv_queue *recv_queue) /* It indicates that the recv_queue were allocated */ if (recv_queue->fs) { rxm_recv_fs_free(recv_queue->fs); + recv_queue->fs = NULL; } // TODO cleanup recv_list and unexp msg list } @@ -325,18 +326,23 @@ static int rxm_ep_rx_queue_init(struct rxm_ep *rxm_ep) /* It is safe to call this function, even if `rxm_ep_txrx_res_open` * has not yet been called */ -static void rxm_ep_txrx_res_close(struct rxm_ep *rxm_ep) +static void rxm_ep_txrx_res_close(struct rxm_ep *ep) { - rxm_recv_queue_close(&rxm_ep->trecv_queue); - rxm_recv_queue_close(&rxm_ep->recv_queue); + rxm_recv_queue_close(&ep->trecv_queue); + rxm_recv_queue_close(&ep->recv_queue); - if (rxm_ep->multi_recv_pool) - ofi_bufpool_destroy(rxm_ep->multi_recv_pool); - - if (rxm_ep->rx_pool) - ofi_bufpool_destroy(rxm_ep->rx_pool); - if (rxm_ep->tx_pool) - ofi_bufpool_destroy(rxm_ep->tx_pool); + if (ep->multi_recv_pool) { + ofi_bufpool_destroy(ep->multi_recv_pool); + ep->multi_recv_pool = NULL; + } + if (ep->rx_pool) { + ofi_bufpool_destroy(ep->rx_pool); + ep->rx_pool = NULL; + } + if (ep->tx_pool) { + ofi_bufpool_destroy(ep->tx_pool); + ep->tx_pool = NULL; + } } static int rxm_setname(fid_t fid, void *addr, size_t addrlen) @@ -594,8 +600,8 @@ static int rxm_handle_unexp_sar(struct rxm_recv_queue *recv_queue, continue; if (!rx_buf->conn) { - rx_buf->conn = rxm_key2conn(rx_buf->ep, - rx_buf->pkt.ctrl_hdr.conn_id); + rx_buf->conn = ofi_idm_at(&rx_buf->ep->conn_idx_map, + (int) rx_buf->pkt.ctrl_hdr.conn_id); } if (recv_entry->sar.conn != rx_buf->conn) continue; @@ -1253,7 +1259,7 @@ rxm_send_sar(struct rxm_ep *rxm_ep, struct rxm_conn *rxm_conn, def_tx->sar_seg.msg_id = msg_id; def_tx->sar_seg.iface = iface; def_tx->sar_seg.device = device; - rxm_ep_enqueue_deferred_tx_queue(def_tx); + rxm_queue_deferred_tx(def_tx, OFI_LIST_TAIL); return 0; } @@ -1317,7 +1323,7 @@ rxm_ep_inject_send(struct rxm_ep *rxm_ep, struct rxm_conn *rxm_conn, assert(len <= rxm_ep->rxm_info->tx_attr->inject_size); - inject_pkt->ctrl_hdr.conn_id = rxm_conn->handle.remote_key; + inject_pkt->ctrl_hdr.conn_id = rxm_conn->remote_index; if (pkt_size <= rxm_ep->inject_limit && !rxm_ep->util_ep.tx_cntr) { if (rxm_use_msg_tinject(rxm_ep, inject_pkt->hdr.op)) { return rxm_msg_tinject(rxm_conn->msg_ep, buf, len, @@ -1636,7 +1642,7 @@ void rxm_ep_progress_deferred_queue(struct rxm_ep *rxm_ep, struct fi_msg msg; ssize_t ret = 0; - if (rxm_conn->handle.state != RXM_CMAP_CONNECTED) + if (rxm_conn->state != RXM_CM_CONNECTED) return; while (!dlist_empty(&rxm_conn->deferred_tx_queue) && !ret) { @@ -1761,7 +1767,7 @@ void rxm_ep_progress_deferred_queue(struct rxm_ep *rxm_ep, break; } - rxm_ep_dequeue_deferred_tx_queue(def_tx_entry); + rxm_dequeue_deferred_tx(def_tx_entry); free(def_tx_entry); } } @@ -2260,76 +2266,76 @@ static struct fi_ops_collective rxm_ops_collective_none = { .msg = fi_coll_no_msg, }; -static int rxm_ep_msg_res_close(struct rxm_ep *rxm_ep) -{ - int ret = 0; - - if (rxm_ep->srx_ctx) { - ret = fi_close(&rxm_ep->srx_ctx->fid); - if (ret) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, \ - "Unable to close msg shared ctx\n"); - } - } - fi_freeinfo(rxm_ep->msg_info); - return ret; -} - -static int rxm_listener_close(struct rxm_ep *rxm_ep) +static int rxm_listener_close(struct rxm_ep *ep) { - int ret, retv = 0; + int ret; - if (rxm_ep->msg_pep) { - ret = fi_close(&rxm_ep->msg_pep->fid); + if (ep->msg_pep) { + ret = fi_close(&ep->msg_pep->fid); if (ret) { FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "Unable to close msg pep\n"); - retv = ret; + return ret; } + ep->msg_pep = NULL; } - if (rxm_ep->msg_eq) { - ret = fi_close(&rxm_ep->msg_eq->fid); + + if (ep->msg_eq) { + ret = fi_close(&ep->msg_eq->fid); if (ret) { FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "Unable to close msg EQ\n"); - retv = ret; + return ret; } + ep->msg_eq = NULL; } - return retv; + return 0; } static int rxm_ep_close(struct fid *fid) { - int ret, retv = 0; - struct rxm_ep *rxm_ep; + struct rxm_ep *ep; + int ret; - rxm_ep = container_of(fid, struct rxm_ep, util_ep.ep_fid.fid); - if (rxm_ep->cmap) - rxm_cmap_free(rxm_ep->cmap); + ep = container_of(fid, struct rxm_ep, util_ep.ep_fid.fid); - ret = rxm_listener_close(rxm_ep); + /* Stop listener thread to halt event processing before closing all + * connections. + */ + rxm_stop_listen(ep); + rxm_freeall_conns(ep); + ret = rxm_listener_close(ep); if (ret) - retv = ret; + return ret; - rxm_ep_txrx_res_close(rxm_ep); - ret = rxm_ep_msg_res_close(rxm_ep); - if (ret) - retv = ret; + rxm_ep_txrx_res_close(ep); + if (ep->srx_ctx) { + ret = fi_close(&ep->srx_ctx->fid); + if (ret) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, \ + "Unable to close msg shared ctx\n"); + return ret; + } + ep->srx_ctx = NULL; + } - if (rxm_ep->msg_cq) { - ret = fi_close(&rxm_ep->msg_cq->fid); + if (ep->msg_cq) { + ret = fi_close(&ep->msg_cq->fid); if (ret) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "Unable to close msg CQ\n"); - retv = ret; + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, + "Unable to close msg CQ\n"); + return ret; } + ep->msg_cq = NULL; } - free(rxm_ep->inject_pkt); - ofi_endpoint_close(&rxm_ep->util_ep); - fi_freeinfo(rxm_ep->rxm_info); - free(rxm_ep); - return retv; + free(ep->inject_pkt); + ofi_endpoint_close(&ep->util_ep); + fi_freeinfo(ep->msg_info); + fi_freeinfo(ep->rxm_info); + free(ep); + return 0; } static int rxm_ep_trywait_cq(void *arg) @@ -2375,6 +2381,12 @@ static int rxm_ep_wait_fd_add(struct rxm_ep *rxm_ep, struct util_wait *wait) rxm_ep_trywait_eq); } +static bool rxm_needs_atomic_progress(const struct fi_info *info) +{ + return (info->caps & FI_ATOMIC) && info->domain_attr && + info->domain_attr->data_progress == FI_PROGRESS_AUTO; +} + static int rxm_msg_cq_fd_needed(struct rxm_ep *rxm_ep) { return (rxm_needs_atomic_progress(rxm_ep->rxm_info) || @@ -2568,7 +2580,7 @@ static int rxm_ep_txrx_res_open(struct rxm_ep *rxm_ep) if (ret) return ret; - dlist_init(&rxm_ep->deferred_tx_conn_queue); + dlist_init(&rxm_ep->deferred_queue); ret = rxm_ep_rx_queue_init(rxm_ep); if (ret) @@ -2628,23 +2640,13 @@ static int rxm_ep_ctrl(struct fid *fid, int command, void *arg) * and then progressing both MSG EQ and MSG CQ once the latter * is opened) */ assert(!(rxm_ep->rxm_info->caps & FI_ATOMIC) || - !rxm_ep->cmap || !rxm_ep->cmap->cm_thread); + !rxm_ep->cm_thread); ret = rxm_ep_msg_cq_open(rxm_ep); if (ret) return ret; - /* fi_listen should be called before cmap alloc as cmap alloc - * calls fi_getname on pep which would succeed only if fi_listen - * was called first */ - ret = fi_listen(rxm_ep->msg_pep); - if (ret) { - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, - "unable to set msg PEP to listen state\n"); - return ret; - } - - ret = rxm_conn_cmap_alloc(rxm_ep); + ret = rxm_start_listen(rxm_ep); if (ret) return ret; @@ -2658,19 +2660,17 @@ static int rxm_ep_ctrl(struct fid *fid, int command, void *arg) if (rxm_ep->srx_ctx) { ret = rxm_prepost_recv(rxm_ep, rxm_ep->srx_ctx); - if (ret) { - rxm_cmap_free(rxm_ep->cmap); - FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, - "unable to prepost recv bufs\n"); + if (ret) goto err; - } } break; default: return -FI_ENOSYS; } return 0; + err: + /* TODO: cleanup all allocated resources on error */ rxm_ep_txrx_res_close(rxm_ep); return ret; } @@ -2802,7 +2802,7 @@ rxm_prepare_deferred_rndv_write(struct rxm_deferred_tx_entry **def_tx_entry, { uint8_t i; struct rxm_tx_buf *tx_buf = buf; - struct rxm_ep *rxm_ep = tx_buf->write_rndv.conn->handle.cmap->ep; + struct rxm_ep *rxm_ep = tx_buf->write_rndv.conn->ep; *def_tx_entry = rxm_ep_alloc_deferred_tx_entry(rxm_ep, tx_buf->write_rndv.conn, RXM_DEFERRED_TX_RNDV_WRITE); diff --git a/prov/util/src/util_av.c b/prov/util/src/util_av.c index 4777ae8274c..9401189dbd5 100644 --- a/prov/util/src/util_av.c +++ b/prov/util/src/util_av.c @@ -247,6 +247,14 @@ void *ofi_av_get_addr(struct util_av *av, fi_addr_t fi_addr) return entry->data; } +void *ofi_av_addr_context(struct util_av *av, fi_addr_t fi_addr) +{ + void *addr; + + addr = ofi_av_get_addr(av, fi_addr); + return (char *) addr + av->context_offset; +} + int ofi_verify_av_insert(struct util_av *av, uint64_t flags, void *context) { if (av->flags & FI_EVENT) {