From b3acd52a14c5035d83b7f70a44ddcda1f3cf35bb Mon Sep 17 00:00:00 2001 From: Stephen Oost Date: Tue, 2 Jul 2024 10:50:49 -0700 Subject: [PATCH] prov/tcp: introduce sub-domains to support FI_THREAD_COMPLETION By default, the tcp provider is optimized for single threaded applications that make use of FI_THREAD_DOMAIN but is inefficient for multithreaded applications that intend to follow FI_THREAD_COMPLETION semantics. The primary limiting factoe is that the progress engine is tied to the domain, and all endpoint under a single domain are synchronized by this lone progress engine In an effort to increase efficiency and performance of these multithreaded applications, multiplex the application's domain reference into a subdomain per ep, each having their own progress engine Signed-off-by: Stephen Oost --- include/ofi_util.h | 8 +- libfabric.vcxproj | 1 + libfabric.vcxproj.filters | 3 + prov/tcp/Makefile.include | 1 + prov/tcp/src/xnet.h | 62 ++++++- prov/tcp/src/xnet_av.c | 234 +++++++++++++++++++++++++++ prov/tcp/src/xnet_cq.c | 1 - prov/tcp/src/xnet_domain.c | 305 ++++++++++++++++++++++++++++++++--- prov/tcp/src/xnet_progress.c | 6 +- prov/tcp/src/xnet_rdm.c | 169 +++++++++++++++++++ prov/util/src/util_av.c | 16 +- prov/util/src/util_ep.c | 2 + prov/util/src/util_mr_map.c | 5 +- 13 files changed, 775 insertions(+), 38 deletions(-) create mode 100644 prov/tcp/src/xnet_av.c diff --git a/include/ofi_util.h b/include/ofi_util.h index 76c62d4fa42..bb4e8aacf0c 100644 --- a/include/ofi_util.h +++ b/include/ofi_util.h @@ -232,7 +232,7 @@ static const uint64_t ofi_rx_mr_flags[] = { [ofi_op_atomic_fetch] = FI_REMOTE_WRITE | FI_REMOTE_READ, [ofi_op_atomic_compare] = FI_REMOTE_WRITE | FI_REMOTE_READ, }; - +struct fi_mr_attr *dup_mr_attr(const struct fi_mr_attr *attr, uint64_t flags); static inline uint64_t ofi_rx_mr_reg_flags(uint32_t op, uint16_t atomic_op) { if (atomic_op == FI_ATOMIC_READ) @@ -1017,6 +1017,12 @@ int ofi_ip_av_remove(struct fid_av *av_fid, fi_addr_t *fi_addr, size_t count, uint64_t flags); int ofi_ip_av_lookup(struct fid_av *av_fid, fi_addr_t fi_addr, void *addr, size_t *addrlen); +int ofi_ip_av_insertsym(struct fid_av *av_fid, const char *node, + size_t nodecnt, const char *service, size_t svccnt, + fi_addr_t *fi_addr, uint64_t flags, void *context); +int ofi_ip_av_insertsvc(struct fid_av *av, const char *node, + const char *service, fi_addr_t *fi_addr, + uint64_t flags, void *context); const char * ofi_ip_av_straddr(struct fid_av *av, const void *addr, char *buf, size_t *len); diff --git a/libfabric.vcxproj b/libfabric.vcxproj index d297285781b..8daefa16e6c 100644 --- a/libfabric.vcxproj +++ b/libfabric.vcxproj @@ -701,6 +701,7 @@ + diff --git a/libfabric.vcxproj.filters b/libfabric.vcxproj.filters index adebb5a85ad..a0833325f6a 100644 --- a/libfabric.vcxproj.filters +++ b/libfabric.vcxproj.filters @@ -462,6 +462,9 @@ Source Files\prov\tcp\src + + Source Files\prov\tcp\src + Source Files\prov\tcp\src diff --git a/prov/tcp/Makefile.include b/prov/tcp/Makefile.include index d310f51d27f..34c109e92e2 100644 --- a/prov/tcp/Makefile.include +++ b/prov/tcp/Makefile.include @@ -5,6 +5,7 @@ _xnet_files = \ prov/tcp/src/xnet_cm.c \ prov/tcp/src/xnet_rdm_cm.c \ prov/tcp/src/xnet_domain.c \ + prov/tcp/src/xnet_av.c \ prov/tcp/src/xnet_rma.c \ prov/tcp/src/xnet_msg.c \ prov/tcp/src/xnet_ep.c \ diff --git a/prov/tcp/src/xnet.h b/prov/tcp/src/xnet.h index ee16d7e3a98..66243fafdda 100644 --- a/prov/tcp/src/xnet.h +++ b/prov/tcp/src/xnet.h @@ -467,12 +467,67 @@ struct xnet_xfer_entry { char msg_data[]; }; +struct xnet_mplex_av { + struct util_av util_av; + struct dlist_entry subav_list; + struct ofi_genlock lock; +}; + struct xnet_domain { struct util_domain util_domain; struct xnet_progress progress; enum fi_ep_type ep_type; + + // When an application requests FI_THREAD_COMPLETION + // the assumption is that the domain will be used + // across multiple threads. + // + // The xnet progress engine is optimized for single + // threaded performance, so instead of reworking the + // progress engine, likely losing single threaded + // performance, multiplex the domain into multiple + // subdomains for each ep. This way, each ep, with + // the assumption that the application wants to + // progress an ep per thread, can have it's own + // progress engine and avoid having a single + // synchronization point among all eps. + struct fi_info *subdomain_info; + struct ofi_genlock subdomain_list_lock; + struct dlist_entry subdomain_list; }; +int xnet_multiplex_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr, + struct fid_av **fid_av, void *context); + +int xnet_domain_open(struct fid_fabric *fabric, struct fi_info *info, + struct fid_domain **domain, void *context); + +static inline +int xnet_create_subdomain(struct xnet_domain *domain, struct xnet_domain **subdomain) +{ + int ret; + struct fid_domain *subdomain_fid; + + ret = xnet_domain_open(&domain->util_domain.fabric->fabric_fid, + domain->subdomain_info, + &subdomain_fid, NULL); + if (ret) + return ret; + + *subdomain = container_of(subdomain_fid, struct xnet_domain, + util_domain.domain_fid); + + return FI_SUCCESS; +} + +int xnet_domain_multiplexed(struct fid_domain *domain_fid); + +int xnet_multiplex_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr, + struct fid_av **fid_av, void *context); + +int xnet_domain_open(struct fid_fabric *fabric, struct fi_info *info, + struct fid_domain **domain, void *context); + static inline struct xnet_progress *xnet_ep2_progress(struct xnet_ep *ep) { struct xnet_domain *domain; @@ -544,10 +599,6 @@ int xnet_passive_ep(struct fid_fabric *fabric, struct fi_info *info, int xnet_set_port_range(void); -int xnet_domain_open(struct fid_fabric *fabric, struct fi_info *info, - struct fid_domain **domain, void *context); - - int xnet_setup_socket(SOCKET sock, struct fi_info *info); void xnet_set_zerocopy(SOCKET sock); @@ -566,7 +617,8 @@ static inline struct xnet_cq *xnet_ep_tx_cq(struct xnet_ep *ep) return container_of(ep->util_ep.tx_cq, struct xnet_cq, util_cq); } - +int xnet_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr, + struct fid_av **fid_av, void *context); int xnet_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, struct fid_cq **cq_fid, void *context); void xnet_report_success(struct xnet_xfer_entry *xfer_entry); diff --git a/prov/tcp/src/xnet_av.c b/prov/tcp/src/xnet_av.c new file mode 100644 index 00000000000..9a04fd6af0c --- /dev/null +++ b/prov/tcp/src/xnet_av.c @@ -0,0 +1,234 @@ +/* + * Copyright (c) 2017-2022 Intel Corporation. All rights reserved. + * + * This software is available to you under a choice of one of two + * licenses. You may choose to be licensed under the terms of the GNU + * General Public License (GPL) Version 2, available from the file + * COPYING in the main directory of this source tree, or the + * BSD license below: + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include +#include + +#include "xnet.h" + +int xnet_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr, + struct fid_av **fid_av, void *context) +{ + return rxm_util_av_open(domain_fid, attr, fid_av, context, + sizeof(struct xnet_conn), NULL); +} + +static int xnet_mplex_av_remove(struct fid_av *av_fid, fi_addr_t *fi_addr, + size_t count, uint64_t flags) +{ + int ret; + struct fid_list_entry *item; + struct fid_av *subav_fid; + struct xnet_mplex_av *av = container_of(av_fid, struct xnet_mplex_av, + util_av.av_fid); + + ofi_genlock_lock(&av->lock); + dlist_foreach_container(&av->subav_list, struct fid_list_entry, item, entry) { + subav_fid = container_of(item->fid, struct fid_av, fid); + ret = fi_av_remove(subav_fid, fi_addr, count, flags); + if (ret) + goto out; + } + ret = ofi_ip_av_remove(&av->util_av.av_fid, fi_addr, count, flags); +out: + ofi_genlock_unlock(&av->lock); + return ret; +} + +static int xnet_mplex_av_insert(struct fid_av *av_fid, const void *addr, size_t count, + fi_addr_t *fi_addr, uint64_t flags, void *context) +{ + int ret; + struct fid_list_entry *item; + struct fid_av *subav_fid; + struct xnet_mplex_av *av = container_of(av_fid, struct xnet_mplex_av, + util_av.av_fid.fid); + + ofi_genlock_lock(&av->lock); + dlist_foreach_container(&av->subav_list, struct fid_list_entry, item, entry) { + subav_fid = container_of(item->fid, struct fid_av, fid); + ret = fi_av_insert(subav_fid, addr, count, fi_addr, flags, context); + if (ret < count) + goto out; + } + ret = ofi_ip_av_insert(&av->util_av.av_fid, addr, count, fi_addr, flags, context); +out: + ofi_genlock_unlock(&av->lock); + return ret; +} + +static int xnet_mplex_av_insertsym(struct fid_av *av_fid, const char *node, + size_t nodecnt, const char *service, size_t svccnt, + fi_addr_t *fi_addr, uint64_t flags, void *context) +{ + int ret; + struct fid_list_entry *item; + struct fid_av *subav_fid; + struct xnet_mplex_av *av = container_of(av_fid, struct xnet_mplex_av, + util_av.av_fid.fid); + + ofi_genlock_lock(&av->lock); + dlist_foreach_container(&av->subav_list, struct fid_list_entry, item, entry) { + subav_fid = container_of(item->fid, struct fid_av, fid); + ret = fi_av_insertsym(subav_fid, node, nodecnt, service, svccnt, + fi_addr, flags, context); + if (ret) + goto out; + } + ret = ofi_ip_av_insertsym(&av->util_av.av_fid, node, nodecnt, + service, svccnt, fi_addr, flags, context); +out: + ofi_genlock_unlock(&av->lock); + + return ret; +} + +int xnet_mplex_av_insertsvc(struct fid_av *av_fid, const char *node, + const char *service, fi_addr_t *fi_addr, + uint64_t flags, void *context) +{ + int ret; + struct fid_list_entry *item; + struct fid_av *subav_fid; + struct xnet_mplex_av *av = container_of(av_fid, struct xnet_mplex_av, + util_av.av_fid.fid); + + ofi_genlock_lock(&av->lock); + dlist_foreach_container(&av->subav_list, struct fid_list_entry, item, entry) { + subav_fid = container_of(item->fid, struct fid_av, fid); + ret = fi_av_insertsvc(subav_fid, node, service, fi_addr, flags, + context); + if (ret) + goto out; + } + ret = ofi_ip_av_insertsvc(&av->util_av.av_fid, node, service, + fi_addr, flags, context); +out: + ofi_genlock_unlock(&av->lock); + return ret; +} + +int xnet_mplex_av_lookup(struct fid_av *av_fid, fi_addr_t fi_addr, + void *addr, size_t *addrlen) +{ + struct xnet_mplex_av *av = container_of(av_fid, struct xnet_mplex_av, + util_av.av_fid.fid); + return ofi_ip_av_lookup(&av->util_av.av_fid, fi_addr, addr, addrlen); +} + +static const char * +xnet_mplex_av_straddr(struct fid_av *av_fid, const void *addr, char *buf, size_t *len) +{ + return ofi_ip_av_straddr(av_fid, addr, buf, len); +} + +int xnet_mplex_av_set(struct fid_av *av_fid, struct fi_av_set_attr *attr, + struct fid_av_set **av_set_fid, void *context) +{ + return -FI_ENOSYS; +} + +static int xnet_mplex_av_close(struct fid *av_fid) +{ + struct xnet_mplex_av *av; + struct fid_list_entry *item; + int ret = 0; + + av = container_of(av_fid, struct xnet_mplex_av, util_av.av_fid.fid); + while (!dlist_empty(&av->subav_list)) { + dlist_pop_front(&av->subav_list, struct fid_list_entry, item, entry); + (void)fi_close(item->fid); + free(item); + } + ret = ofi_av_close(&av->util_av); + ofi_genlock_destroy(&av->lock); + return ret; +} + +static struct fi_ops xnet_mplex_av_fi_ops = { + .size = sizeof(struct fi_ops), + .close = xnet_mplex_av_close, + .bind = ofi_av_bind, + .control = fi_no_control, + .ops_open = fi_no_ops_open, +}; + +static struct fi_ops_av xnet_mplex_av_ops = { + .size = sizeof(struct fi_ops_av), + .insert = xnet_mplex_av_insert, + .insertsvc = xnet_mplex_av_insertsvc, + .insertsym = xnet_mplex_av_insertsym, + .remove = xnet_mplex_av_remove, + .lookup = xnet_mplex_av_lookup, + .straddr = xnet_mplex_av_straddr, + .av_set = xnet_mplex_av_set, +}; + +int xnet_multiplex_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr, + struct fid_av **fid_av, void *context) +{ + struct xnet_mplex_av *av; + struct util_domain *domain; + struct util_av_attr util_attr; + int ret; + + av = calloc(1, sizeof(*av)); + if (!av) + return -FI_ENOMEM; + + ret = ofi_genlock_init(&av->lock, OFI_LOCK_MUTEX); + if (ret) + goto free; + + domain = container_of(domain_fid, struct util_domain, domain_fid); + + util_attr.context_len = sizeof(struct util_peer_addr *); + util_attr.flags = 0; + util_attr.addrlen = ofi_sizeof_addr_format(domain->addr_format); + if (attr->type == FI_AV_UNSPEC) + attr->type = FI_AV_TABLE; + + ret = ofi_av_init(domain, attr, &util_attr, &av->util_av, context); + if (ret) + goto free_lock; + dlist_init(&av->subav_list); + av->util_av.av_fid.fid.ops = &xnet_mplex_av_fi_ops; + av->util_av.av_fid.ops = &xnet_mplex_av_ops; + *fid_av = &av->util_av.av_fid; + return FI_SUCCESS; + +free_lock: + ofi_genlock_destroy(&av->lock); +free: + free(av); + return ret; +} diff --git a/prov/tcp/src/xnet_cq.c b/prov/tcp/src/xnet_cq.c index 03ea975371d..6c1e6e959f2 100644 --- a/prov/tcp/src/xnet_cq.c +++ b/prov/tcp/src/xnet_cq.c @@ -315,7 +315,6 @@ int xnet_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, return ret; } - static void xnet_cntr_progress(struct util_cntr *cntr) { xnet_progress(xnet_cntr2_progress(cntr), false); diff --git a/prov/tcp/src/xnet_domain.c b/prov/tcp/src/xnet_domain.c index e2a7fae66e9..a467fd51a9f 100644 --- a/prov/tcp/src/xnet_domain.c +++ b/prov/tcp/src/xnet_domain.c @@ -52,6 +52,39 @@ static int xnet_mr_close(struct fid *fid) return ret; } +static void xnet_subdomains_mr_close(struct xnet_domain *domain, uint64_t mr_key) +{ + int ret; + struct fid_list_entry *item; + struct xnet_domain *subdomain; + dlist_foreach_container(&domain->subdomain_list, + struct fid_list_entry, item, entry) { + subdomain = container_of(item->fid, struct xnet_domain, + util_domain.domain_fid.fid); + ofi_genlock_lock(&subdomain->util_domain.lock); + ret = ofi_mr_map_remove(&subdomain->util_domain.mr_map, mr_key); + ofi_genlock_unlock(&subdomain->util_domain.lock); + + if (!ret) + ofi_atomic_dec32(&subdomain->util_domain.ref); + } +} + +static int xnet_mplex_mr_close(struct fid *fid) +{ + struct xnet_domain *domain; + struct ofi_mr *mr; + + mr = container_of(fid, struct ofi_mr, mr_fid.fid); + domain = container_of(&mr->domain->domain_fid, struct xnet_domain, + util_domain.domain_fid.fid); + + ofi_genlock_lock(&domain->subdomain_list_lock); + xnet_subdomains_mr_close(domain, mr->key); + ofi_genlock_unlock(&domain->subdomain_list_lock); + return ofi_mr_close(fid); +} + static struct fi_ops xnet_mr_fi_ops = { .size = sizeof(struct fi_ops), .close = xnet_mr_close, @@ -60,6 +93,14 @@ static struct fi_ops xnet_mr_fi_ops = { .ops_open = fi_no_ops_open }; +static struct fi_ops xnet_mplex_mr_fi_ops = { + .size = sizeof(struct fi_ops), + .close = xnet_mplex_mr_close, + .bind = fi_no_bind, + .control = fi_no_control, + .ops_open = fi_no_ops_open +}; + static int xnet_mr_reg(struct fid *fid, const void *buf, size_t len, uint64_t access, uint64_t offset, uint64_t requested_key, @@ -128,6 +169,137 @@ xnet_mr_regattr(struct fid *fid, const struct fi_mr_attr *attr, return ret; } +static int +xnet_mplex_mr_reg(struct fid *fid, const void *buf, size_t len, + uint64_t access, uint64_t offset, uint64_t requested_key, + uint64_t flags, struct fid_mr **mr_fid, void *context) +{ + struct xnet_domain *domain; + struct fid_domain *subdomain; + struct fid_list_entry *item; + struct fid_mr *sub_mr_fid; + struct ofi_mr *mr; + int ret; + + domain = container_of(fid, struct xnet_domain, + util_domain.domain_fid.fid); + ret = ofi_mr_reg(fid, buf, len, access, offset, requested_key, flags, + mr_fid, context); + + if (ret) + goto out; + + mr = container_of(*mr_fid, struct ofi_mr, mr_fid.fid); + mr->mr_fid.fid.ops = &xnet_mplex_mr_fi_ops; + + ofi_genlock_lock(&domain->subdomain_list_lock); + dlist_foreach_container(&domain->subdomain_list, + struct fid_list_entry, item, entry) { + subdomain = container_of(item->fid, struct fid_domain, fid); + ret = fi_mr_reg(subdomain, buf, len, access, offset, + requested_key, flags, &sub_mr_fid, + context); + if (ret) { + FI_WARN(&xnet_prov, FI_LOG_MR, "Failed to reg mr (%ld) from subdomain (%p)\n", + mr->key, subdomain); + + xnet_subdomains_mr_close(domain, mr->key); + (void) ofi_mr_close(&(*mr_fid)->fid); + goto unlock; + } + } +unlock: + ofi_genlock_unlock(&domain->subdomain_list_lock); +out: + return ret; +} + +static int +xnet_mplex_mr_regv(struct fid *fid, const struct iovec *iov, + size_t count, uint64_t access, + uint64_t offset, uint64_t requested_key, + uint64_t flags, struct fid_mr **mr_fid, void *context) +{ + struct xnet_domain *domain; + struct fid_domain *subdomain; + struct fid_list_entry *item; + struct fid_mr *sub_mr_fid; + struct ofi_mr *mr; + int ret; + + domain = container_of(fid, struct xnet_domain, + util_domain.domain_fid.fid); + ret = ofi_mr_regv(fid, iov, count, access, offset, requested_key, flags, + mr_fid, context); + + if (ret) + goto out; + + mr = container_of(*mr_fid, struct ofi_mr, mr_fid.fid); + mr->mr_fid.fid.ops = &xnet_mplex_mr_fi_ops; + + ofi_genlock_lock(&domain->subdomain_list_lock); + dlist_foreach_container(&domain->subdomain_list, + struct fid_list_entry, item, entry) { + subdomain = container_of(item->fid, struct fid_domain, fid); + ret = fi_mr_regv(subdomain, iov, count, access, offset, + requested_key, flags, &sub_mr_fid, context); + if (ret) { + FI_WARN(&xnet_prov, FI_LOG_MR, "Failed to reg mr (%ld) from subdomain (%p)\n", + mr->key, subdomain); + + xnet_subdomains_mr_close(domain, mr->key); + (void) ofi_mr_close(&(*mr_fid)->fid); + goto unlock; + } + } +unlock: + ofi_genlock_unlock(&domain->subdomain_list_lock); +out: + return ret; +} + +static int +xnet_mplex_mr_regattr(struct fid *fid, const struct fi_mr_attr *attr, + uint64_t flags, struct fid_mr **mr_fid) +{ + struct xnet_domain *domain; + struct fid_domain *subdomain; + struct fid_list_entry *item; + struct fid_mr *sub_mr_fid; + struct ofi_mr *mr; + int ret; + + domain = container_of(fid, struct xnet_domain, + util_domain.domain_fid.fid); + ret = ofi_mr_regattr(fid, attr, flags, mr_fid); + + if (ret) + goto out; + + mr = container_of(*mr_fid, struct ofi_mr, mr_fid.fid); + mr->mr_fid.fid.ops = &xnet_mplex_mr_fi_ops; + + ofi_genlock_lock(&domain->subdomain_list_lock); + dlist_foreach_container(&domain->subdomain_list, + struct fid_list_entry, item, entry) { + subdomain = container_of(item->fid, struct fid_domain, fid); + ret = fi_mr_regattr(subdomain, attr, flags, &sub_mr_fid); + if (ret) { + FI_WARN(&xnet_prov, FI_LOG_MR, "Failed to reg mr (%ld) from subdomain (%p)\n", + mr->key, subdomain); + + xnet_subdomains_mr_close(domain, mr->key); + (void) ofi_mr_close(&(*mr_fid)->fid); + goto unlock; + } + } +unlock: + ofi_genlock_unlock(&domain->subdomain_list_lock); +out: + return ret; +} + static int xnet_open_ep(struct fid_domain *domain_fid, struct fi_info *info, struct fid_ep **ep_fid, void *context) { @@ -147,13 +319,6 @@ static int xnet_open_ep(struct fid_domain *domain_fid, struct fi_info *info, return -FI_EINVAL; } -static int xnet_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr, - struct fid_av **fid_av, void *context) -{ - return rxm_util_av_open(domain_fid, attr, fid_av, context, - sizeof(struct xnet_conn), NULL); -} - static int xnet_query_atomic(struct fid_domain *domain, enum fi_datatype datatype, enum fi_op op, struct fi_atomic_attr *attr, uint64_t flags) @@ -167,9 +332,45 @@ xnet_query_atomic(struct fid_domain *domain, enum fi_datatype datatype, return -FI_EOPNOTSUPP; } -static struct fi_ops_domain xnet_domain_ops = { +static int xnet_domain_close(fid_t fid) +{ + struct xnet_domain *domain; + int ret; + + domain = container_of(fid, struct xnet_domain, + util_domain.domain_fid.fid); + + xnet_del_domain_progress(domain); + ret = ofi_domain_close(&domain->util_domain); + if (ret) + return ret; + + xnet_close_progress(&domain->progress); + free(domain); + return FI_SUCCESS; +} + +static int xnet_mplex_domain_close(fid_t fid) +{ + struct xnet_domain *domain; + struct fid_list_entry *item; + + domain = container_of(fid, struct xnet_domain, util_domain.domain_fid.fid); + while(!dlist_empty(&domain->subdomain_list)) { + dlist_pop_front(&domain->subdomain_list, struct fid_list_entry, item, entry); + (void)fi_close(item->fid); + free(item); + } + + ofi_genlock_destroy(&domain->subdomain_list_lock); + ofi_domain_close(&domain->util_domain); + free(domain); + return FI_SUCCESS; +} + +static struct fi_ops_domain xnet_mplex_domain_ops = { .size = sizeof(struct fi_ops_domain), - .av_open = xnet_av_open, + .av_open = xnet_multiplex_av_open, .cq_open = xnet_cq_open, .endpoint = xnet_open_ep, .scalable_ep = fi_no_scalable_ep, @@ -181,24 +382,86 @@ static struct fi_ops_domain xnet_domain_ops = { .query_collective = fi_no_query_collective, }; -static int xnet_domain_close(fid_t fid) +static struct fi_ops xnet_mplex_domain_fi_ops = { + .size = sizeof(struct fi_ops), + .close = xnet_mplex_domain_close, + .bind = fi_no_bind, + .control = fi_no_control, + .ops_open = fi_no_ops_open, + .tostr = fi_no_tostr, + .ops_set = fi_no_ops_set, +}; + +static struct fi_ops_mr xnet_mplex_domain_fi_ops_mr = { + .size = sizeof(struct fi_ops_mr), + .reg = xnet_mplex_mr_reg, + .regv = xnet_mplex_mr_regv, + .regattr = xnet_mplex_mr_regattr, +}; + +int xnet_domain_multiplexed(struct fid_domain *domain_fid) +{ + return domain_fid->ops == &xnet_mplex_domain_ops; +} + +int xnet_domain_multiplex_open(struct fid_fabric *fabric_fid, struct fi_info *info, + struct fid_domain **domain_fid, void *context) { struct xnet_domain *domain; int ret; - domain = container_of(fid, struct xnet_domain, - util_domain.domain_fid.fid); + domain = calloc(1, sizeof(*domain)); + if (!domain) + return -FI_ENOMEM; - xnet_del_domain_progress(domain); - ret = ofi_domain_close(&domain->util_domain); + ret = ofi_domain_init(fabric_fid, info, &domain->util_domain, context, + OFI_LOCK_MUTEX); if (ret) - return ret; + goto free; - xnet_close_progress(&domain->progress); - free(domain); + ret = ofi_genlock_init(&domain->subdomain_list_lock, OFI_LOCK_MUTEX); + if (ret) + goto close; + + domain->subdomain_info = fi_dupinfo(info); + if (!domain->subdomain_info) { + ret = -FI_ENOMEM; + goto free_lock; + } + + domain->subdomain_info->domain_attr->threading = FI_THREAD_DOMAIN; + + dlist_init(&domain->subdomain_list); + domain->ep_type = info->ep_attr->type; + domain->util_domain.domain_fid.ops = &xnet_mplex_domain_ops; + domain->util_domain.domain_fid.fid.ops = &xnet_mplex_domain_fi_ops; + domain->util_domain.domain_fid.mr = &xnet_mplex_domain_fi_ops_mr; + *domain_fid = &domain->util_domain.domain_fid; return FI_SUCCESS; + +free_lock: + ofi_genlock_destroy(&domain->subdomain_list_lock); +close: + ofi_domain_close(&domain->util_domain); +free: + free(domain); + return ret; } +static struct fi_ops_domain xnet_domain_ops = { + .size = sizeof(struct fi_ops_domain), + .av_open = xnet_av_open, + .cq_open = xnet_cq_open, + .endpoint = xnet_open_ep, + .scalable_ep = fi_no_scalable_ep, + .cntr_open = xnet_cntr_open, + .poll_open = fi_poll_create, + .stx_ctx = fi_no_stx_context, + .srx_ctx = xnet_srx_context, + .query_atomic = xnet_query_atomic, + .query_collective = fi_no_query_collective, +}; + static struct fi_ops xnet_domain_fi_ops = { .size = sizeof(struct fi_ops), .close = xnet_domain_close, @@ -226,12 +489,18 @@ int xnet_domain_open(struct fid_fabric *fabric_fid, struct fi_info *info, if (ret) return ret; + if (info->ep_attr->type == FI_EP_RDM && + info->domain_attr->threading == FI_THREAD_COMPLETION) + return xnet_domain_multiplex_open(fabric_fid, info, domain_fid, + context); + domain = calloc(1, sizeof(*domain)); if (!domain) return -FI_ENOMEM; ret = ofi_domain_init(fabric_fid, info, &domain->util_domain, context, - OFI_LOCK_NONE); + info->domain_attr->threading == FI_THREAD_SAFE ? + OFI_LOCK_MUTEX : OFI_LOCK_NONE); if (ret) goto free; diff --git a/prov/tcp/src/xnet_progress.c b/prov/tcp/src/xnet_progress.c index aa76968e175..87558622e9e 100644 --- a/prov/tcp/src/xnet_progress.c +++ b/prov/tcp/src/xnet_progress.c @@ -885,6 +885,7 @@ static int xnet_handle_read_req(struct xnet_ep *ep) { struct xnet_xfer_entry *resp; struct ofi_rma_iov *rma_iov; + struct xnet_domain *domain; ssize_t i; int ret; @@ -908,9 +909,10 @@ static int xnet_handle_read_req(struct xnet_ep *ep) resp->iov_cnt = 1 + resp->hdr.base_hdr.rma_iov_cnt; resp->hdr.base_hdr.size = resp->iov[0].iov_len; + domain = container_of(ep->util_ep.domain, struct xnet_domain, util_domain); for (i = 0; i < resp->hdr.base_hdr.rma_iov_cnt; i++) { - ret = ofi_mr_verify(&ep->util_ep.domain->mr_map, rma_iov[i].len, - (uintptr_t *) &rma_iov[i].addr, + ret = ofi_mr_verify(&domain->util_domain.mr_map, + rma_iov[i].len, (uintptr_t *) &rma_iov[i].addr, rma_iov[i].key, FI_REMOTE_READ); if (ret) { FI_WARN(&xnet_prov, FI_LOG_EP_DATA, diff --git a/prov/tcp/src/xnet_rdm.c b/prov/tcp/src/xnet_rdm.c index 0d85faaa449..5cd1251b970 100644 --- a/prov/tcp/src/xnet_rdm.c +++ b/prov/tcp/src/xnet_rdm.c @@ -695,6 +695,169 @@ static struct fi_ops_ep xnet_rdm_ep_ops = { .tx_size_left = fi_no_tx_size_left, }; +static int xnet_mplex_av_dup(struct util_ep *ep, struct xnet_domain *domain, + struct xnet_domain *subdomain) +{ + int ret, i; + struct xnet_mplex_av *xnet_av; + struct util_av *util_av; + struct fid_av *av_fid; + struct fi_av_attr av_attr = { + .type = ep->domain->av_type, + .count = ep->av->av_entry_pool->entry_cnt, + .flags = 0, + }; + size_t addr_size; + char addr[sizeof(struct sockaddr_in6)]; + xnet_av = container_of(&ep->av->av_fid, struct xnet_mplex_av, util_av.av_fid); + ofi_genlock_lock(&xnet_av->lock); + ret = fi_av_open(&subdomain->util_domain.domain_fid, &av_attr, &av_fid, NULL); + if (ret) + goto out; + util_av = container_of(av_fid, struct util_av, av_fid); + for (i = 0; i < ep->av->av_entry_pool->entry_cnt; i++) { + ret = fi_av_lookup(&ep->av->av_fid, i, addr, &addr_size); + if (ret) + continue; + ret = ofi_av_insert_addr_at(util_av, addr, i); + if (ret) + goto out; + } + fid_list_insert(&xnet_av->subav_list, NULL, &av_fid->fid); + + ofi_genlock_lock(&ep->av->ep_list_lock); + dlist_remove(&ep->av_entry); + ofi_genlock_unlock(&ep->av->ep_list_lock); + ofi_atomic_dec32(&ep->av->ref); + + ep->av = NULL; + ofi_ep_bind_av(ep, util_av); + ret = FI_SUCCESS; +out: + ofi_genlock_unlock(&xnet_av->lock); + return ret; +}; + +static void xnet_reg_subdomain_mr(struct ofi_rbmap *map, struct ofi_rbnode *node, void *context) +{ + int ret; + struct fi_mr_attr *attr = (struct fi_mr_attr *)node->data; + struct xnet_domain *subdomain = context; + + ret = ofi_mr_map_insert(&subdomain->util_domain.mr_map, attr, + &attr->requested_key, attr->context, + ((struct ofi_mr*)attr->context)->flags); + if (ret) + XNET_WARN_ERR(FI_LOG_MR, "ofi_mr_map_insert", ret); + else + ofi_atomic_inc32(&subdomain->util_domain.ref); +} + +struct xnet_domain *find_subdomain(struct xnet_rdm *rdm) +{ + int i; + struct util_cntr *cntr; + + if (!xnet_domain_multiplexed(&rdm->util_ep.rx_cq->domain->domain_fid)) { + return container_of(&rdm->util_ep.rx_cq->domain->domain_fid, + struct xnet_domain, util_domain.domain_fid); + } + + if (!xnet_domain_multiplexed(&rdm->util_ep.tx_cq->domain->domain_fid)) { + return container_of(&rdm->util_ep.tx_cq->domain->domain_fid, + struct xnet_domain, util_domain.domain_fid); + } + + for (i = 0; i < CNTR_CNT; i++) { + cntr = rdm->util_ep.cntrs[i]; + if (!cntr) + continue; + + if (!xnet_domain_multiplexed(&cntr->domain->domain_fid)) { + return container_of(&cntr->domain->domain_fid, + struct xnet_domain, util_domain.domain_fid); + } + } + + return NULL; +} + +void set_subdomain(struct xnet_rdm *rdm, struct xnet_domain *domain, + struct xnet_domain *subdomain) +{ + int i; + struct util_cntr *cntr; + + if (rdm->util_ep.rx_cq->domain == &domain->util_domain) { + ofi_atomic_dec32(&rdm->util_ep.rx_cq->domain->ref); + ofi_atomic_inc32(&subdomain->util_domain.ref); + rdm->util_ep.rx_cq->domain = &subdomain->util_domain; + } + + if (rdm->util_ep.tx_cq->domain == &domain->util_domain) { + ofi_atomic_dec32(&rdm->util_ep.tx_cq->domain->ref); + ofi_atomic_inc32(&subdomain->util_domain.ref); + rdm->util_ep.tx_cq->domain = &subdomain->util_domain; + } + + for (i = 0; i < CNTR_CNT; i++) { + cntr = rdm->util_ep.cntrs[i]; + if (!cntr) + continue; + + if (cntr->domain == &domain->util_domain) { + ofi_atomic_dec32(&cntr->domain->ref); + ofi_atomic_inc32(&subdomain->util_domain.ref); + cntr->domain = &subdomain->util_domain; + } + } +} + +int xnet_rdm_resolve_domains(struct xnet_rdm *rdm) +{ + int ret; + struct fid_domain *subdomain_fid; + struct xnet_domain *subdomain; + struct xnet_domain *domain = container_of(rdm->util_ep.domain, + struct xnet_domain, + util_domain); + subdomain = find_subdomain(rdm); + if (!subdomain) { + ret = fi_domain(&domain->util_domain.fabric->fabric_fid, + domain->subdomain_info, + &subdomain_fid, NULL); + if (ret) + return ret; + + subdomain = container_of(subdomain_fid, struct xnet_domain, + util_domain.domain_fid); + ret = fid_list_insert2(&domain->subdomain_list, + &domain->subdomain_list_lock, + &subdomain_fid->fid); + if (ret) + fi_close(&subdomain_fid->fid); + + set_subdomain(rdm, domain, subdomain); + + ofi_rbmap_foreach(domain->util_domain.mr_map.rbtree, + domain->util_domain.mr_map.rbtree->root, + xnet_reg_subdomain_mr, subdomain); + } + + ret = xnet_mplex_av_dup(&rdm->util_ep, domain, subdomain); + if (ret) + return ret; + + ofi_atomic_dec32(&rdm->util_ep.domain->ref); + ofi_atomic_inc32(&subdomain->util_domain.ref); + rdm->util_ep.domain = &subdomain->util_domain; + + ofi_atomic_dec32(&rdm->srx->domain->util_domain.ref); + ofi_atomic_inc32(&subdomain->util_domain.ref); + rdm->srx->domain = subdomain; + return ret; +} + static int xnet_enable_rdm(struct xnet_rdm *rdm) { struct xnet_progress *progress; @@ -702,6 +865,12 @@ static int xnet_enable_rdm(struct xnet_rdm *rdm) size_t len; int ret; + if (xnet_domain_multiplexed(&rdm->util_ep.domain->domain_fid)) { + ret = xnet_rdm_resolve_domains(rdm); + if (ret) + return ret; + } + (void) fi_ep_bind(&rdm->srx->rx_fid, &rdm->util_ep.rx_cq->cq_fid.fid, FI_RECV); if (rdm->util_ep.cntrs[CNTR_RX]) { diff --git a/prov/util/src/util_av.c b/prov/util/src/util_av.c index 657a28b5d64..7ec4fc407b9 100644 --- a/prov/util/src/util_av.c +++ b/prov/util/src/util_av.c @@ -738,9 +738,9 @@ int ofi_ip_av_insert(struct fid_av *av_fid, const void *addr, size_t count, count, fi_addr, flags, context); } -static int ip_av_insertsvc(struct fid_av *av, const char *node, - const char *service, fi_addr_t *fi_addr, - uint64_t flags, void *context) +int ofi_ip_av_insertsvc(struct fid_av *av, const char *node, + const char *service, fi_addr_t *fi_addr, + uint64_t flags, void *context) { return fi_av_insertsym(av, node, 1, service, 1, fi_addr, flags, context); } @@ -918,9 +918,9 @@ int ofi_ip_av_sym_getaddr(struct util_av *av, const char *node, svccnt, addr, addrlen); } -static int ip_av_insertsym(struct fid_av *av_fid, const char *node, - size_t nodecnt, const char *service, size_t svccnt, - fi_addr_t *fi_addr, uint64_t flags, void *context) +int ofi_ip_av_insertsym(struct fid_av *av_fid, const char *node, + size_t nodecnt, const char *service, size_t svccnt, + fi_addr_t *fi_addr, uint64_t flags, void *context) { struct util_av *av; void *addr; @@ -998,8 +998,8 @@ ofi_ip_av_straddr(struct fid_av *av, const void *addr, char *buf, size_t *len) static struct fi_ops_av ip_av_ops = { .size = sizeof(struct fi_ops_av), .insert = ofi_ip_av_insert, - .insertsvc = ip_av_insertsvc, - .insertsym = ip_av_insertsym, + .insertsvc = ofi_ip_av_insertsvc, + .insertsym = ofi_ip_av_insertsym, .remove = ofi_ip_av_remove, .lookup = ofi_ip_av_lookup, .straddr = ofi_ip_av_straddr, diff --git a/prov/util/src/util_ep.c b/prov/util/src/util_ep.c index eade45d46db..52949849943 100644 --- a/prov/util/src/util_ep.c +++ b/prov/util/src/util_ep.c @@ -41,6 +41,8 @@ int ofi_ep_bind_cq(struct util_ep *ep, struct util_cq *cq, uint64_t flags) { int ret; + assert(ep->domain == cq->domain); + ret = ofi_check_bind_cq_flags(ep, cq, flags); if (ret) return ret; diff --git a/prov/util/src/util_mr_map.c b/prov/util/src/util_mr_map.c index 535189ea71b..181626c69dc 100644 --- a/prov/util/src/util_mr_map.c +++ b/prov/util/src/util_mr_map.c @@ -38,8 +38,8 @@ #include -static struct fi_mr_attr * -dup_mr_attr(const struct fi_mr_attr *attr, uint64_t flags) + +struct fi_mr_attr *dup_mr_attr(const struct fi_mr_attr *attr, uint64_t flags) { struct fi_mr_attr *dup_attr; @@ -52,7 +52,6 @@ dup_mr_attr(const struct fi_mr_attr *attr, uint64_t flags) dup_attr->mr_iov = (struct iovec *) (dup_attr + 1); /* - * dup_mr_attr is only used insided ofi_mr_map_insert. * dmabuf must be converted to iov before the attr * is inserted to the mr_map */