Skip to content

Commit

Permalink
No commit message
Browse files Browse the repository at this point in the history
  • Loading branch information
ooststep committed Mar 5, 2024
1 parent a063376 commit 99c6e48
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 225 deletions.
30 changes: 5 additions & 25 deletions prov/tcp/src/xnet.h
Original file line number Diff line number Diff line change
Expand Up @@ -482,21 +482,6 @@ struct xnet_domain {
size_t rx_size;
};

static inline struct xnet_progress *xnet_ep2_progress(struct xnet_ep *ep)
{
return ep->progress;
}

static inline struct xnet_progress *xnet_rdm2_progress(struct xnet_rdm *rdm)
{
return &rdm->srx->progress;
}

static inline struct xnet_progress *xnet_srx2_progress(struct xnet_srx *srx)
{
return &srx->progress;
}

struct xnet_cq {
struct util_cq util_cq;
};
Expand All @@ -516,11 +501,6 @@ struct xnet_eq {
struct dlist_entry fabric_entry;
};

static inline struct xnet_progress *xnet_eq2_progress(struct xnet_eq *eq)
{
return &eq->progress;
}

int xnet_eq_write(struct util_eq *eq, uint32_t event,
const void *buf, size_t len, uint64_t flags);

Expand Down Expand Up @@ -666,8 +646,8 @@ xnet_alloc_rx(struct xnet_ep *ep)
{
struct xnet_xfer_entry *xfer;

assert(xnet_progress_locked(xnet_ep2_progress(ep)));
xfer = xnet_alloc_xfer(xnet_ep2_progress(ep));
assert(xnet_progress_locked(ep->progress));
xfer = xnet_alloc_xfer(ep->progress);
if (xfer) {
xfer->cntr = ep->util_ep.cntrs[CNTR_RX];
xfer->cq = xnet_ep_rx_cq(ep);
Expand All @@ -681,8 +661,8 @@ xnet_alloc_tx(struct xnet_ep *ep)
{
struct xnet_xfer_entry *xfer;

assert(xnet_progress_locked(xnet_ep2_progress(ep)));
xfer = xnet_alloc_xfer(xnet_ep2_progress(ep));
assert(xnet_progress_locked(ep->progress));
xfer = xnet_alloc_xfer(ep->progress);
if (xfer) {
xfer->hdr.base_hdr.version = XNET_HDR_VERSION;
xfer->hdr.base_hdr.op_data = 0;
Expand Down Expand Up @@ -714,7 +694,7 @@ xnet_alloc_xfer_buf(struct xnet_xfer_entry *xfer, size_t len)
*/
static inline bool xnet_has_unexp(struct xnet_ep *ep)
{
assert(xnet_progress_locked(xnet_ep2_progress(ep)));
assert(xnet_progress_locked(ep->progress));
return ep->cur_rx.handler && !ep->cur_rx.entry;
}

Expand Down
8 changes: 4 additions & 4 deletions prov/tcp/src/xnet_cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ void xnet_req_done(struct xnet_ep *ep)
ssize_t ret;

FI_DBG(&xnet_prov, FI_LOG_EP_CTRL, "connect request done\n");
assert(xnet_progress_locked(xnet_ep2_progress(ep)));
assert(xnet_progress_locked(ep->progress));

ret = xnet_recv_cm_msg(ep->bsock.sock, ep->cm_msg);
if (ret == 0)
Expand Down Expand Up @@ -216,7 +216,7 @@ void xnet_uring_req_done(struct xnet_ep *ep, int res)
ssize_t ret;

FI_DBG(&xnet_prov, FI_LOG_EP_CTRL, "connect request done\n");
assert(xnet_progress_locked(xnet_ep2_progress(ep)));
assert(xnet_progress_locked(ep->progress));

len = sizeof(ep->cm_msg->hdr);
if (res < 0)
Expand All @@ -238,7 +238,7 @@ void xnet_uring_req_done(struct xnet_ep *ep, int res)
}

ep->pollflags = POLLIN;
ret = xnet_uring_pollin_add(xnet_ep2_progress(ep), ep->bsock.sock,
ret = xnet_uring_pollin_add(ep->progress, ep->bsock.sock,
false, &ep->bsock.pollin_sockctx);
if (ret)
goto disable;
Expand Down Expand Up @@ -330,7 +330,7 @@ void xnet_connect_done(struct xnet_ep *ep)
int status, ret;

FI_DBG(&xnet_prov, FI_LOG_EP_CTRL, "socket connected, sending req\n");
progress = xnet_ep2_progress(ep);
progress = ep->progress;
assert(xnet_progress_locked(progress));

len = sizeof(status);
Expand Down
26 changes: 13 additions & 13 deletions prov/tcp/src/xnet_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ static int xnet_ep_connect(struct fid_ep *ep_fid, const void *addr,
}
}

progress = xnet_ep2_progress(ep);
progress = ep->progress;
ofi_genlock_lock(&progress->ep_lock);
ep->pollflags = POLLOUT;
ret = xnet_monitor_ep(progress, ep);
Expand Down Expand Up @@ -303,7 +303,7 @@ xnet_ep_accept(struct fid_ep *ep_fid, const void *param, size_t paramlen)
ep->state = XNET_CONNECTED;
assert(!ofi_bsock_readable(&ep->bsock) && !ep->cur_rx.handler);

progress = xnet_ep2_progress(ep);
progress = ep->progress;
ofi_genlock_lock(&progress->ep_lock);
ep->pollflags = POLLIN;
ret = xnet_monitor_ep(progress, ep);
Expand Down Expand Up @@ -369,7 +369,7 @@ static void xnet_ep_flush_all_queues(struct xnet_ep *ep)
struct xnet_progress *progress;
int ret;

progress = xnet_ep2_progress(ep);
progress = ep->progress;
assert(xnet_progress_locked(progress));

ret = xnet_uring_cancel(progress, &progress->tx_uring,
Expand Down Expand Up @@ -398,7 +398,7 @@ static void xnet_ep_flush_all_queues(struct xnet_ep *ep)
ep->cur_tx.entry->hdr.base_hdr.op_data);
}
xnet_report_error(ep->cur_tx.entry, FI_ECANCELED);
xnet_free_xfer(xnet_ep2_progress(ep), ep->cur_tx.entry);
xnet_free_xfer(ep->progress, ep->cur_tx.entry);
ep->cur_tx.entry = NULL;
}

Expand All @@ -414,7 +414,7 @@ static void xnet_ep_flush_all_queues(struct xnet_ep *ep)
if (ep->cur_rx.entry &&
!(ep->cur_rx.entry->ctrl_flags & XNET_SAVED_XFER)) {
xnet_report_error(ep->cur_rx.entry, FI_ECANCELED);
xnet_free_xfer(xnet_ep2_progress(ep), ep->cur_rx.entry);
xnet_free_xfer(ep->progress, ep->cur_rx.entry);
}
xnet_reset_rx(ep);
xnet_flush_xfer_queue(progress, &ep->rx_queue, NULL);
Expand All @@ -429,7 +429,7 @@ void xnet_ep_disable(struct xnet_ep *ep, int cm_err, void* err_data,
struct fi_eq_err_entry err_entry = {0};
int ret;

assert(xnet_progress_locked(xnet_ep2_progress(ep)));
assert(xnet_progress_locked(ep->progress));
switch (ep->state) {
case XNET_CONNECTING:
case XNET_REQ_SENT:
Expand All @@ -442,7 +442,7 @@ void xnet_ep_disable(struct xnet_ep *ep, int cm_err, void* err_data,
ep->state = XNET_DISCONNECTED;
dlist_remove_init(&ep->unexp_entry);
if (!xnet_io_uring)
xnet_halt_sock(xnet_ep2_progress(ep), ep->bsock.sock);
xnet_halt_sock(ep->progress, ep->bsock.sock);

ret = ofi_shutdown(ep->bsock.sock, SHUT_RDWR);
if (ret && ofi_sockerr() != ENOTCONN)
Expand Down Expand Up @@ -475,10 +475,10 @@ static int xnet_ep_shutdown(struct fid_ep *ep_fid, uint64_t flags)

ep = container_of(ep_fid, struct xnet_ep, util_ep.ep_fid);

ofi_genlock_lock(&xnet_ep2_progress(ep)->ep_lock);
ofi_genlock_lock(&ep->progress->ep_lock);
(void) ofi_bsock_flush_sync(&ep->bsock);
xnet_ep_disable(ep, 0, NULL, 0);
ofi_genlock_unlock(&xnet_ep2_progress(ep)->ep_lock);
ofi_genlock_unlock(&ep->progress->ep_lock);

return FI_SUCCESS;
}
Expand Down Expand Up @@ -539,7 +539,7 @@ static void xnet_ep_cancel_rx(struct xnet_ep *ep, void *context)
struct slist_entry *cur, *prev;
struct xnet_xfer_entry *xfer_entry;

assert(xnet_progress_locked(xnet_ep2_progress(ep)));
assert(xnet_progress_locked(ep->progress));

/* To cancel an active receive, we would need to flush the socket of
* all data associated with that message. Since some of that data
Expand All @@ -562,7 +562,7 @@ static void xnet_ep_cancel_rx(struct xnet_ep *ep, void *context)
slist_remove(&ep->rx_queue, cur, prev);
ep->rx_avail++;
xnet_report_error(xfer_entry, FI_ECANCELED);
xnet_free_xfer(xnet_ep2_progress(ep), xfer_entry);
xnet_free_xfer(ep->progress, xfer_entry);
}

/* We currently only support canceling receives, which is the common case.
Expand All @@ -575,9 +575,9 @@ static ssize_t xnet_ep_cancel(fid_t fid, void *context)

ep = container_of(fid, struct xnet_ep, util_ep.ep_fid.fid);

ofi_genlock_lock(&xnet_ep2_progress(ep)->ep_lock);
ofi_genlock_lock(&ep->progress->ep_lock);
xnet_ep_cancel_rx(ep, context);
ofi_genlock_unlock(&xnet_ep2_progress(ep)->ep_lock);
ofi_genlock_unlock(&ep->progress->ep_lock);

return 0;
}
Expand Down
4 changes: 2 additions & 2 deletions prov/tcp/src/xnet_eq.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ int xnet_eq_write(struct util_eq *eq, uint32_t event,
}

assert(rdm->util_ep.ep_fid.fid.fclass == FI_CLASS_EP);
assert(xnet_progress_locked(xnet_rdm2_progress(rdm)));
assert(xnet_progress_locked(&rdm->srx->progress));
entry = malloc(sizeof(*entry) + len);
if (!entry)
return -FI_ENOMEM;
Expand All @@ -67,7 +67,7 @@ int xnet_eq_write(struct util_eq *eq, uint32_t event,
entry->event = event;
memcpy(&entry->cm_entry, buf, len);
slist_insert_tail(&entry->list_entry,
&xnet_rdm2_progress(rdm)->event_list);
&rdm->srx->progress.event_list);
return 0;
}

Expand Down
Loading

0 comments on commit 99c6e48

Please sign in to comment.