Skip to content

Commit

Permalink
prov/efa: Remove fi_sendmsg/recvmsg but still use efa_ep's qp and av
Browse files Browse the repository at this point in the history
For SHM EP, use fi_sendv and fi_recvv
For EFA EP
* Allocate ibv_send_wr and ibv_recv_wr in rxr_pkt_entry
* Still use efa_ep->qp, efa_ep->av and efa_ep->xmit_more_wr*
* Directly call efa_post_flush or ibv_post_recv in rxr_pkt_entry.c

Signed-off-by: Sai Sunku <sunkusa@amazon.com>
  • Loading branch information
sunkuamzn committed Jan 4, 2023
1 parent fddfb73 commit 2b3a7de
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 96 deletions.
12 changes: 1 addition & 11 deletions prov/efa/src/efa.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,16 +231,6 @@ struct efa_ep {
bool util_ep_initialized;
};

struct efa_send_wr {
struct ibv_send_wr wr;
struct ibv_sge sge[];
};

struct efa_recv_wr {
struct ibv_recv_wr wr;
struct ibv_sge sge[];
};

struct efa_av {
struct fid_av *shm_rdm_av;
fi_addr_t shm_rdm_addr_map[EFA_SHM_MAX_AV_COUNT];
Expand Down Expand Up @@ -333,7 +323,7 @@ int efa_prov_initialize(void);

void efa_prov_finalize(void);

ssize_t efa_post_flush(struct efa_ep *ep, struct ibv_send_wr **bad_wr);
ssize_t efa_post_flush(struct efa_ep *ep, struct ibv_send_wr **bad_wr, bool free);

ssize_t efa_cq_readfrom(struct fid_cq *cq_fid, void *buf, size_t count, fi_addr_t *src_addr);

Expand Down
13 changes: 7 additions & 6 deletions prov/efa/src/efa_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -671,17 +671,18 @@ int efa_ep_open(struct fid_domain *domain_fid, struct fi_info *user_info,

ep->util_ep_initialized = true;

/* struct efa_send_wr allocates memory for 2 IOV
* So check with an assert statement that iov_limit is 2 or less
*/
assert(user_info->tx_attr->iov_limit <= 2);

ret = ofi_bufpool_create(&ep->send_wr_pool,
sizeof(struct efa_send_wr) +
user_info->tx_attr->iov_limit * sizeof(struct ibv_sge),
16, 0, 1024, 0);
sizeof(struct efa_send_wr), 16, 0, 1024, 0);
if (ret)
goto err_ep_destroy;

ret = ofi_bufpool_create(&ep->recv_wr_pool,
sizeof(struct efa_recv_wr) +
user_info->rx_attr->iov_limit * sizeof(struct ibv_sge),
16, 0, 1024, 0);
sizeof(struct efa_recv_wr), 16, 0, 1024, 0);
if (ret)
goto err_send_wr_destroy;

Expand Down
9 changes: 6 additions & 3 deletions prov/efa/src/efa_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ static void efa_post_send_sgl(struct efa_ep *ep, const struct fi_msg *msg,
}
}

ssize_t efa_post_flush(struct efa_ep *ep, struct ibv_send_wr **bad_wr)
ssize_t efa_post_flush(struct efa_ep *ep, struct ibv_send_wr **bad_wr, bool free)
{
ssize_t ret;

Expand All @@ -332,7 +332,10 @@ ssize_t efa_post_flush(struct efa_ep *ep, struct ibv_send_wr **bad_wr)
#endif

ret = ibv_post_send(ep->qp->ibv_qp, ep->xmit_more_wr_head.next, bad_wr);
free_send_wr_list(ep->xmit_more_wr_head.next);
if (free)
free_send_wr_list(ep->xmit_more_wr_head.next);
else
ep->xmit_more_wr_head.next = NULL;
ep->xmit_more_wr_tail = &ep->xmit_more_wr_head;
return ret;
}
Expand Down Expand Up @@ -391,7 +394,7 @@ static ssize_t efa_post_send(struct efa_ep *ep, const struct fi_msg *msg, uint64
if (flags & FI_MORE)
return 0;

ret = efa_post_flush(ep, &bad_wr);
ret = efa_post_flush(ep, &bad_wr, true /* free ibv_send_wr */);

return ret;

Expand Down
11 changes: 11 additions & 0 deletions prov/efa/src/rxr/rxr_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,17 @@ void rxr_cq_queue_rnr_pkt(struct rxr_ep *ep,
#endif
dlist_insert_tail(&pkt_entry->entry, list);

/*
* When the EFA RDM provider wants to send multiple packets,
* it connects the packets in a linked list through ibv_send_wr.next
* and calls ibv_post_send once on the head of the linked list.
*
* When a packet encounters RNR, we want to queue only the packet that
* encountered RNR. So we remove the other packets in the linked list
* by setting ibv_send_wr.next to NULL
*/
pkt_entry->send_wr.wr.next = NULL;

peer = rxr_ep_get_peer(ep, pkt_entry->addr);
assert(peer);
if (!(pkt_entry->flags & RXR_PKT_ENTRY_RNR_RETRANSMIT)) {
Expand Down
25 changes: 4 additions & 21 deletions prov/efa/src/rxr/rxr_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,6 @@ int rxr_ep_post_user_recv_buf(struct rxr_ep *ep, struct rxr_op_entry *rx_entry,
{
struct rxr_pkt_entry *pkt_entry;
struct efa_mr *mr;
struct iovec msg_iov;
struct fi_msg msg = {0};
int err;

assert(rx_entry->iov_count == 1);
Expand Down Expand Up @@ -198,18 +196,7 @@ int rxr_ep_post_user_recv_buf(struct rxr_ep *ep, struct rxr_op_entry *rx_entry,
pkt_entry->x_entry = rx_entry;
rx_entry->state = RXR_RX_MATCHED;

msg_iov.iov_base = pkt_entry->wiredata;
msg_iov.iov_len = pkt_entry->pkt_size;
assert(msg_iov.iov_len <= ep->mtu_size);

msg.iov_count = 1;
msg.msg_iov = &msg_iov;
msg.desc = rx_entry->desc;
msg.addr = FI_ADDR_UNSPEC;
msg.context = pkt_entry;
msg.data = 0;

err = fi_recvmsg(ep->rdm_ep, &msg, flags);
err = rxr_pkt_entry_recv(ep, pkt_entry, rx_entry->desc, flags);
if (OFI_UNLIKELY(err)) {
rxr_pkt_entry_release_rx(ep, pkt_entry);
FI_WARN(&rxr_prov, FI_LOG_EP_CTRL,
Expand All @@ -235,7 +222,6 @@ int rxr_ep_post_user_recv_buf(struct rxr_ep *ep, struct rxr_op_entry *rx_entry,
*/
int rxr_ep_post_internal_rx_pkt(struct rxr_ep *ep, uint64_t flags, enum rxr_lower_ep_type lower_ep_type)
{
struct fi_msg msg = {0};
struct iovec msg_iov;
void *desc;
struct rxr_pkt_entry *rx_pkt_entry = NULL;
Expand Down Expand Up @@ -266,7 +252,6 @@ int rxr_ep_post_internal_rx_pkt(struct rxr_ep *ep, uint64_t flags, enum rxr_lowe

msg_iov.iov_base = (void *)rxr_pkt_start(rx_pkt_entry);
msg_iov.iov_len = ep->mtu_size;
rxr_msg_construct(&msg, &msg_iov, NULL, 1, FI_ADDR_UNSPEC, rx_pkt_entry, 0);

switch (lower_ep_type) {
case SHM_EP:
Expand All @@ -276,8 +261,7 @@ int rxr_ep_post_internal_rx_pkt(struct rxr_ep *ep, uint64_t flags, enum rxr_lowe
&ep->rx_posted_buf_shm_list);
#endif
desc = NULL;
msg.desc = &desc;
ret = fi_recvmsg(ep->shm_ep, &msg, flags);
ret = fi_recvv(ep->shm_ep, &msg_iov, &desc, 1, FI_ADDR_UNSPEC, rx_pkt_entry);
if (OFI_UNLIKELY(ret)) {
rxr_pkt_entry_release_rx(ep, rx_pkt_entry);
FI_WARN(&rxr_prov, FI_LOG_EP_CTRL,
Expand All @@ -293,8 +277,7 @@ int rxr_ep_post_internal_rx_pkt(struct rxr_ep *ep, uint64_t flags, enum rxr_lowe
&ep->rx_posted_buf_list);
#endif
desc = fi_mr_desc(rx_pkt_entry->mr);
msg.desc = &desc;
ret = fi_recvmsg(ep->rdm_ep, &msg, flags);
ret = rxr_pkt_entry_recv(ep, rx_pkt_entry, &desc, flags);
if (OFI_UNLIKELY(ret)) {
rxr_pkt_entry_release_rx(ep, rx_pkt_entry);
FI_WARN(&rxr_prov, FI_LOG_EP_CTRL,
Expand Down Expand Up @@ -2169,7 +2152,7 @@ void rxr_ep_progress_internal(struct rxr_ep *ep)
out:
efa_ep = container_of(ep->rdm_ep, struct efa_ep, util_ep.ep_fid);
if (efa_ep->xmit_more_wr_tail != &efa_ep->xmit_more_wr_head) {
ret = efa_post_flush(efa_ep, &bad_wr);
ret = efa_post_flush(efa_ep, &bad_wr, false);
if (OFI_UNLIKELY(ret))
efa_eq_write_error(&ep->util_ep, -ret, FI_EFA_ERR_WR_POST_SEND);
}
Expand Down
164 changes: 109 additions & 55 deletions prov/efa/src/rxr/rxr_pkt_entry.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,19 @@ struct rxr_pkt_entry *rxr_pkt_entry_alloc(struct rxr_ep *ep, struct rxr_pkt_pool
dlist_init(&pkt_entry->dbg_entry);
#endif

/* Initialize necessary fields in pkt_entry.
* The memory region allocated by ofi_buf_alloc_ex is not initalized.
*/
pkt_entry->mr = mr;
pkt_entry->alloc_type = alloc_type;
pkt_entry->flags = RXR_PKT_ENTRY_IN_USE;
pkt_entry->next = NULL;
pkt_entry->x_entry = NULL;
pkt_entry->send.iov_count = 0; /* rxr_pkt_init methods expect iov_count = 0 */

pkt_entry->send_wr.wr.next = NULL;
pkt_entry->send_wr.wr.send_flags = 0;

return pkt_entry;
}

Expand Down Expand Up @@ -345,98 +351,146 @@ void rxr_pkt_entry_append(struct rxr_pkt_entry *dst,
}

/**
* @brief send a packet using lower provider
* @brief Populate pkt_entry->ibv_send_wr with the information stored in pkt_entry,
* and send it out
*
* @param ep[in] rxr end point
* @param pkt_entry[in] packet entry to be sent
* @param msg[in] information regarding that the send operation, such as
* memory buffer, remote EP address and local descriptor.
* If the shm provider is to be used. Remote EP address
* and local descriptor must be prepared for shm usage.
* @param flags[in] flags to be passed on to lower provider's send.
* @param[in] ep rxr endpoint
* @param[in] pkt_entry packet entry to be sent
* @param[in] flags flags to be applied to the send operation
* @return 0 on success
* On error, a negative value corresponding to fabric errno
*/
static inline
ssize_t rxr_pkt_entry_sendmsg(struct rxr_ep *ep, struct rxr_pkt_entry *pkt_entry,
const struct fi_msg *msg, uint64_t flags)
ssize_t rxr_pkt_entry_send(struct rxr_ep *ep, struct rxr_pkt_entry *pkt_entry,
uint64_t flags)
{
assert(pkt_entry->pkt_size);
struct rdm_peer *peer;
ssize_t ret;
struct rxr_pkt_sendv *send = &pkt_entry->send;
struct ibv_send_wr *bad_wr, *send_wr = &pkt_entry->send_wr.wr;
struct ibv_sge *sge;
int ret, total_len;
struct efa_ep *efa_ep;
struct efa_conn *conn;

if (pkt_entry->alloc_type == RXR_PKT_FROM_EFA_TX_POOL &&
ep->efa_outstanding_tx_ops == ep->efa_max_outstanding_tx_ops)
return -FI_EAGAIN;
assert(send->iov_count <= 2); // EFA device supports a maximum of 2 iov/SGE

peer = rxr_ep_get_peer(ep, pkt_entry->addr);
assert(peer);

if (peer->flags & RXR_PEER_IN_BACKOFF)
return -FI_EAGAIN;

efa_ep = container_of(ep->rdm_ep, struct efa_ep, util_ep.ep_fid);
conn = efa_av_addr_to_conn(efa_ep->av, pkt_entry->addr);
assert(conn && conn->ep_addr);

if (send->iov_count == 0) {
send->iov_count = 1;
send->iov[0].iov_base = pkt_entry->wiredata;
send->iov[0].iov_len = pkt_entry->pkt_size;
send->desc[0] = (pkt_entry->alloc_type == RXR_PKT_FROM_SHM_TX_POOL) ? NULL : pkt_entry->mr;
}

if (pkt_entry->alloc_type == RXR_PKT_FROM_SHM_TX_POOL) {
// rxr_convert_desc_for_shm(send->iov_count, NULL);
return fi_sendv(ep->shm_ep, send->iov, NULL, send->iov_count, peer->shm_fiaddr, pkt_entry);
}

#if ENABLE_DEBUG
dlist_insert_tail(&pkt_entry->dbg_entry, &ep->tx_pkt_list);
#ifdef ENABLE_RXR_PKT_DUMP
rxr_pkt_print("Sent", ep, (struct rxr_base_hdr *)pkt_entry->wiredata);
#endif
#endif
if (pkt_entry->alloc_type == RXR_PKT_FROM_SHM_TX_POOL) {
assert(peer->is_local && ep->use_shm_for_tx);
ret = fi_sendmsg(ep->shm_ep, msg, flags);
} else {
ret = fi_sendmsg(ep->rdm_ep, msg, flags);

send_wr->num_sge = send->iov_count;
send_wr->sg_list = pkt_entry->send_wr.sge;

total_len = 0;
for (int i = 0; i < send->iov_count; i++) {
sge = &send_wr->sg_list[i];
sge->addr = (uintptr_t)send->iov[i].iov_base;
sge->length = send->iov[i].iov_len;
sge->lkey = ((struct efa_mr *)send->desc[i])->ibv_mr->lkey;
total_len += sge->length;
}

if (OFI_UNLIKELY(ret))
if (total_len <= rxr_ep_domain(ep)->device->efa_attr.inline_buf_size &&
!rxr_pkt_entry_has_hmem_mr(send))
send_wr->send_flags |= IBV_SEND_INLINE;

send_wr->opcode = IBV_WR_SEND;
send_wr->wr_id = (uintptr_t)pkt_entry;
send_wr->wr.ud.ah = conn->ah->ibv_ah;
send_wr->wr.ud.remote_qpn = conn->ep_addr->qpn;
send_wr->wr.ud.remote_qkey = conn->ep_addr->qkey;

efa_ep->xmit_more_wr_tail->next = send_wr;
efa_ep->xmit_more_wr_tail = send_wr;

if (flags & FI_MORE) {
rxr_ep_record_tx_op_submitted(ep, pkt_entry);
return 0;
}

ret = efa_post_flush(efa_ep, &bad_wr, false /* don't free ibv_send_wr */);
if (OFI_UNLIKELY(ret)) {
return ret;
}

rxr_ep_record_tx_op_submitted(ep, pkt_entry);
return 0;
}

/**
* @brief Construct a fi_msg object with the information stored in pkt_entry,
* and send it out
* @brief Post a pkt_entry to receive message from EFA device
*
* @param[in] ep rxr endpoint
* @param[in] pkt_entry packet entry used to construct the fi_msg object
* @param[in] flags flags to be applied to lower provider's send operation
* @param[in] pkt_entry packet entry to be posted
* @param[in] desc Memory registration key
* @param[in] flags flags to be applied to the receive operation
* @return 0 on success
* On error, a negative value corresponding to fabric errno
*
*/
ssize_t rxr_pkt_entry_send(struct rxr_ep *ep, struct rxr_pkt_entry *pkt_entry,
uint64_t flags)
ssize_t rxr_pkt_entry_recv(struct rxr_ep *ep, struct rxr_pkt_entry *pkt_entry,
void **desc, uint64_t flags)
{
struct iovec iov;
void *desc;
struct fi_msg msg;
struct rdm_peer *peer;

peer = rxr_ep_get_peer(ep, pkt_entry->addr);
assert(peer);

if (pkt_entry->send.iov_count > 0) {
msg.msg_iov = pkt_entry->send.iov;
msg.iov_count = pkt_entry->send.iov_count;
msg.desc = pkt_entry->send.desc;
} else {
iov.iov_base = rxr_pkt_start(pkt_entry);
iov.iov_len = pkt_entry->pkt_size;
desc = (pkt_entry->alloc_type == RXR_PKT_FROM_SHM_TX_POOL) ? NULL : fi_mr_desc(pkt_entry->mr);
msg.msg_iov = &iov;
msg.iov_count = 1;
msg.desc = &desc;
struct ibv_recv_wr *bad_wr, *recv_wr = &pkt_entry->recv_wr.wr;
struct efa_ep *efa_ep;
int err;

recv_wr->wr_id = (uintptr_t)pkt_entry;
recv_wr->num_sge = 1; // Always post one iov/SGE
recv_wr->sg_list = pkt_entry->recv_wr.sge;

recv_wr->sg_list[0].length = ep->mtu_size;
recv_wr->sg_list[0].lkey = ((struct efa_mr *) desc[0])->ibv_mr->lkey;
recv_wr->sg_list[0].addr = (uintptr_t)pkt_entry->wiredata;

efa_ep = container_of(ep->rdm_ep, struct efa_ep, util_ep.ep_fid);
efa_ep->recv_more_wr_tail->next = recv_wr;
efa_ep->recv_more_wr_tail = recv_wr;

if (flags & FI_MORE)
return 0;

#if HAVE_LTTNG
struct ibv_recv_wr *head = efa_ep->recv_more_wr_head.next;
while (head) {
efa_tracing(post_recv, (void *) head->wr_id);
head = head->next;
}
#endif

msg.addr = pkt_entry->addr;
msg.context = pkt_entry;
msg.data = 0;

if (pkt_entry->alloc_type == RXR_PKT_FROM_SHM_TX_POOL) {
msg.addr = peer->shm_fiaddr;
rxr_convert_desc_for_shm(msg.iov_count, msg.desc);
err = ibv_post_recv(efa_ep->qp->ibv_qp, efa_ep->recv_more_wr_head.next, &bad_wr);
if (OFI_UNLIKELY(err)) {
err = (err == ENOMEM) ? -FI_EAGAIN : -err;
}

return rxr_pkt_entry_sendmsg(ep, pkt_entry, &msg, flags);
efa_ep->recv_more_wr_head.next = NULL;
efa_ep->recv_more_wr_tail = &efa_ep->recv_more_wr_head;

return err;
}

ssize_t rxr_pkt_entry_inject(struct rxr_ep *ep,
Expand Down
Loading

0 comments on commit 2b3a7de

Please sign in to comment.