diff --git a/include/ofi_enosys.h b/include/ofi_enosys.h
index 322ae0cc935..2b811e47625 100644
--- a/include/ofi_enosys.h
+++ b/include/ofi_enosys.h
@@ -493,7 +493,8 @@ static struct fi_ops_av_set X = {
.straddr = X,
};
*/
-
+int fi_no_av_set(struct fid_av *av_fid, struct fi_av_set_attr *attr,
+ struct fid_av_set **av_set_fid, void *context);
int fi_no_av_set_union(struct fid_av_set *dst, const struct fid_av_set *src);
int fi_no_av_set_intersect(struct fid_av_set *dst, const struct fid_av_set *src);
int fi_no_av_set_diff(struct fid_av_set *dst, const struct fid_av_set *src);
diff --git a/include/ofi_util.h b/include/ofi_util.h
index 28fe61ed7d1..e90b09ea058 100644
--- a/include/ofi_util.h
+++ b/include/ofi_util.h
@@ -1011,8 +1011,15 @@ int ofi_ip_av_insert(struct fid_av *av_fid, const void *addr, size_t count,
fi_addr_t *fi_addr, uint64_t flags, void *context);
int ofi_ip_av_remove(struct fid_av *av_fid, fi_addr_t *fi_addr,
size_t count, uint64_t flags);
+bool ofi_ip_av_is_valid(struct fid_av *av_fid, fi_addr_t fi_addr);
int ofi_ip_av_lookup(struct fid_av *av_fid, fi_addr_t fi_addr,
void *addr, size_t *addrlen);
+int ofi_ip_av_insertsym(struct fid_av *av_fid, const char *node,
+ size_t nodecnt, const char *service, size_t svccnt,
+ fi_addr_t *fi_addr, uint64_t flags, void *context);
+int ofi_ip_av_insertsvc(struct fid_av *av, const char *node,
+ const char *service, fi_addr_t *fi_addr,
+ uint64_t flags, void *context);
const char *
ofi_ip_av_straddr(struct fid_av *av, const void *addr, char *buf, size_t *len);
diff --git a/libfabric.vcxproj b/libfabric.vcxproj
index 48209c6a344..1bc35fb93b5 100644
--- a/libfabric.vcxproj
+++ b/libfabric.vcxproj
@@ -701,6 +701,7 @@
+
diff --git a/libfabric.vcxproj.filters b/libfabric.vcxproj.filters
index adebb5a85ad..a0833325f6a 100644
--- a/libfabric.vcxproj.filters
+++ b/libfabric.vcxproj.filters
@@ -462,6 +462,9 @@
Source Files\prov\tcp\src
+
+ Source Files\prov\tcp\src
+
Source Files\prov\tcp\src
diff --git a/prov/tcp/Makefile.include b/prov/tcp/Makefile.include
index d310f51d27f..34c109e92e2 100644
--- a/prov/tcp/Makefile.include
+++ b/prov/tcp/Makefile.include
@@ -5,6 +5,7 @@ _xnet_files = \
prov/tcp/src/xnet_cm.c \
prov/tcp/src/xnet_rdm_cm.c \
prov/tcp/src/xnet_domain.c \
+ prov/tcp/src/xnet_av.c \
prov/tcp/src/xnet_rma.c \
prov/tcp/src/xnet_msg.c \
prov/tcp/src/xnet_ep.c \
diff --git a/prov/tcp/src/xnet.h b/prov/tcp/src/xnet.h
index ee16d7e3a98..486af642d47 100644
--- a/prov/tcp/src/xnet.h
+++ b/prov/tcp/src/xnet.h
@@ -467,10 +467,34 @@ struct xnet_xfer_entry {
char msg_data[];
};
+struct xnet_mplex_av {
+ struct util_av util_av;
+ struct dlist_entry subav_list;
+ struct ofi_genlock lock;
+};
+
struct xnet_domain {
struct util_domain util_domain;
struct xnet_progress progress;
enum fi_ep_type ep_type;
+
+ /* When an application requests FI_THREAD_COMPLETION
+ * the assumption is that the domain will be used
+ * across multiple threads.
+ *
+ * The xnet progress engine is optimized for single
+ * threaded performance, so instead of reworking the
+ * progress engine, likely losing single threaded
+ * performance, multiplex the domain into multiple
+ * subdomains for each ep. This way, each ep, with
+ * the assumption that the application wants to
+ * progress an ep per thread, can have it's own
+ * progress engine and avoid having a single
+ * synchronization point among all eps.
+ */
+ struct fi_info *subdomain_info;
+ struct ofi_genlock subdomain_list_lock;
+ struct dlist_entry subdomain_list;
};
static inline struct xnet_progress *xnet_ep2_progress(struct xnet_ep *ep)
@@ -544,10 +568,6 @@ int xnet_passive_ep(struct fid_fabric *fabric, struct fi_info *info,
int xnet_set_port_range(void);
-int xnet_domain_open(struct fid_fabric *fabric, struct fi_info *info,
- struct fid_domain **domain, void *context);
-
-
int xnet_setup_socket(SOCKET sock, struct fi_info *info);
void xnet_set_zerocopy(SOCKET sock);
@@ -567,6 +587,13 @@ static inline struct xnet_cq *xnet_ep_tx_cq(struct xnet_ep *ep)
}
+int xnet_mplex_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr,
+ struct fid_av **fid_av, void *context);
+int xnet_domain_multiplexed(struct fid_domain *domain_fid);
+int xnet_domain_open(struct fid_fabric *fabric, struct fi_info *info,
+ struct fid_domain **domain, void *context);
+int xnet_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr,
+ struct fid_av **fid_av, void *context);
int xnet_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
struct fid_cq **cq_fid, void *context);
void xnet_report_success(struct xnet_xfer_entry *xfer_entry);
diff --git a/prov/tcp/src/xnet_av.c b/prov/tcp/src/xnet_av.c
new file mode 100644
index 00000000000..7cf77604a58
--- /dev/null
+++ b/prov/tcp/src/xnet_av.c
@@ -0,0 +1,234 @@
+/*
+ * Copyright (c) Intel Corporation. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * BSD license below:
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#include
+
+#include "xnet.h"
+
+int xnet_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr,
+ struct fid_av **fid_av, void *context)
+{
+ return rxm_util_av_open(domain_fid, attr, fid_av, context,
+ sizeof(struct xnet_conn), NULL);
+}
+
+static int xnet_mplex_av_remove(struct fid_av *av_fid, fi_addr_t *fi_addr,
+ size_t count, uint64_t flags)
+{
+ int ret;
+ struct fid_list_entry *item;
+ struct fid_av *subav_fid;
+ struct xnet_mplex_av *av = container_of(av_fid, struct xnet_mplex_av,
+ util_av.av_fid);
+
+ ofi_genlock_lock(&av->lock);
+ dlist_foreach_container(&av->subav_list, struct fid_list_entry, item, entry) {
+ subav_fid = container_of(item->fid, struct fid_av, fid);
+ ret = fi_av_remove(subav_fid, fi_addr, count, flags);
+ if (ret)
+ goto out;
+ }
+ ret = ofi_ip_av_remove(&av->util_av.av_fid, fi_addr, count, flags);
+out:
+ ofi_genlock_unlock(&av->lock);
+ return ret;
+}
+
+static int xnet_mplex_av_insert(struct fid_av *av_fid, const void *addr, size_t count,
+ fi_addr_t *fi_addr, uint64_t flags, void *context)
+{
+ int ret;
+ struct fid_list_entry *item;
+ struct fid_av *subav_fid;
+ fi_addr_t sub_fi_addr;
+ struct xnet_mplex_av *av = container_of(av_fid, struct xnet_mplex_av,
+ util_av.av_fid.fid);
+
+ ofi_genlock_lock(&av->lock);
+ ret = ofi_ip_av_insert(&av->util_av.av_fid, addr, count, fi_addr, flags, context);
+ if (ret < count)
+ goto out;
+ dlist_foreach_container(&av->subav_list, struct fid_list_entry, item, entry) {
+ subav_fid = container_of(item->fid, struct fid_av, fid);
+ ret = fi_av_insert(subav_fid, addr, count, &sub_fi_addr, flags, context);
+ if (ret < count)
+ break;
+ assert(*fi_addr == sub_fi_addr);
+ }
+out:
+ ofi_genlock_unlock(&av->lock);
+ return ret;
+}
+
+static int xnet_mplex_av_insertsym(struct fid_av *av_fid, const char *node,
+ size_t nodecnt, const char *service,
+ size_t svccnt, fi_addr_t *fi_addr,
+ uint64_t flags, void *context)
+{
+ int ret;
+ struct fid_list_entry *item;
+ struct fid_av *subav_fid;
+ fi_addr_t sub_fi_addr;
+ struct xnet_mplex_av *av = container_of(av_fid, struct xnet_mplex_av,
+ util_av.av_fid.fid);
+
+ ofi_genlock_lock(&av->lock);
+ ret = ofi_ip_av_insertsym(&av->util_av.av_fid, node, nodecnt,
+ service, svccnt, fi_addr, flags, context);
+ if (ret)
+ goto out;
+ dlist_foreach_container(&av->subav_list, struct fid_list_entry, item, entry) {
+ subav_fid = container_of(item->fid, struct fid_av, fid);
+ ret = fi_av_insertsym(subav_fid, node, nodecnt, service, svccnt,
+ &sub_fi_addr, flags, context);
+ if (ret)
+ break;
+ assert(*fi_addr == sub_fi_addr);
+ }
+out:
+ ofi_genlock_unlock(&av->lock);
+
+ return ret;
+}
+
+static int xnet_mplex_av_insertsvc(struct fid_av *av_fid, const char *node,
+ const char *service, fi_addr_t *fi_addr,
+ uint64_t flags, void *context)
+{
+ int ret;
+ struct fid_list_entry *item;
+ struct fid_av *subav_fid;
+ fi_addr_t sub_fi_addr;
+ struct xnet_mplex_av *av = container_of(av_fid, struct xnet_mplex_av,
+ util_av.av_fid.fid);
+
+ ofi_genlock_lock(&av->lock);
+ ret = ofi_ip_av_insertsvc(&av->util_av.av_fid, node, service,
+ fi_addr, flags, context);
+ if (ret)
+ goto out;
+ dlist_foreach_container(&av->subav_list, struct fid_list_entry, item, entry) {
+ subav_fid = container_of(item->fid, struct fid_av, fid);
+ ret = fi_av_insertsvc(subav_fid, node, service, &sub_fi_addr, flags,
+ context);
+ if (ret)
+ break;
+ assert(*fi_addr == sub_fi_addr);
+ }
+out:
+ ofi_genlock_unlock(&av->lock);
+ return ret;
+}
+
+static int xnet_mplex_av_lookup(struct fid_av *av_fid, fi_addr_t fi_addr,
+ void *addr, size_t *addrlen)
+{
+ struct xnet_mplex_av *av = container_of(av_fid, struct xnet_mplex_av,
+ util_av.av_fid.fid);
+ return ofi_ip_av_lookup(&av->util_av.av_fid, fi_addr, addr, addrlen);
+}
+
+static int xnet_mplex_av_close(struct fid *av_fid)
+{
+ struct xnet_mplex_av *av;
+ struct fid_list_entry *item;
+ int ret = 0;
+
+ av = container_of(av_fid, struct xnet_mplex_av, util_av.av_fid.fid);
+ while (!dlist_empty(&av->subav_list)) {
+ dlist_pop_front(&av->subav_list, struct fid_list_entry, item, entry);
+ (void)fi_close(item->fid);
+ free(item);
+ }
+ ret = ofi_av_close(&av->util_av);
+ ofi_genlock_destroy(&av->lock);
+ free(av);
+ return ret;
+}
+
+static struct fi_ops xnet_mplex_av_fi_ops = {
+ .size = sizeof(struct fi_ops),
+ .close = xnet_mplex_av_close,
+ .bind = fi_no_bind,
+ .control = fi_no_control,
+ .ops_open = fi_no_ops_open,
+};
+
+static struct fi_ops_av xnet_mplex_av_ops = {
+ .size = sizeof(struct fi_ops_av),
+ .insert = xnet_mplex_av_insert,
+ .insertsvc = xnet_mplex_av_insertsvc,
+ .insertsym = xnet_mplex_av_insertsym,
+ .remove = xnet_mplex_av_remove,
+ .lookup = xnet_mplex_av_lookup,
+ .straddr = ofi_ip_av_straddr,
+ .av_set = fi_no_av_set,
+};
+
+int xnet_mplex_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr,
+ struct fid_av **fid_av, void *context)
+{
+ struct xnet_mplex_av *av;
+ struct util_domain *domain;
+ struct util_av_attr util_attr = {0};
+ int ret;
+
+ av = calloc(1, sizeof(*av));
+ if (!av)
+ return -FI_ENOMEM;
+
+ ret = ofi_genlock_init(&av->lock, OFI_LOCK_MUTEX);
+ if (ret)
+ goto free;
+
+ domain = container_of(domain_fid, struct util_domain, domain_fid);
+
+ util_attr.context_len = sizeof(struct util_peer_addr *);
+ util_attr.addrlen = ofi_sizeof_addr_format(domain->addr_format);
+ if (attr->type == FI_AV_UNSPEC)
+ attr->type = FI_AV_TABLE;
+
+ ret = ofi_av_init(domain, attr, &util_attr, &av->util_av, context);
+ if (ret)
+ goto free_lock;
+ dlist_init(&av->subav_list);
+ av->util_av.av_fid.fid.ops = &xnet_mplex_av_fi_ops;
+ av->util_av.av_fid.ops = &xnet_mplex_av_ops;
+ *fid_av = &av->util_av.av_fid;
+ return FI_SUCCESS;
+
+free_lock:
+ ofi_genlock_destroy(&av->lock);
+free:
+ free(av);
+ return ret;
+}
diff --git a/prov/tcp/src/xnet_domain.c b/prov/tcp/src/xnet_domain.c
index e2a7fae66e9..d178748ed5d 100644
--- a/prov/tcp/src/xnet_domain.c
+++ b/prov/tcp/src/xnet_domain.c
@@ -52,6 +52,41 @@ static int xnet_mr_close(struct fid *fid)
return ret;
}
+static void xnet_subdomains_mr_close(struct xnet_domain *domain, uint64_t mr_key)
+{
+ int ret;
+ struct fid_list_entry *item;
+ struct xnet_domain *subdomain;
+
+ assert(ofi_genlock_held(&domain->subdomain_list_lock));
+ dlist_foreach_container(&domain->subdomain_list,
+ struct fid_list_entry, item, entry) {
+ subdomain = container_of(item->fid, struct xnet_domain,
+ util_domain.domain_fid.fid);
+ ofi_genlock_lock(&subdomain->util_domain.lock);
+ ret = ofi_mr_map_remove(&subdomain->util_domain.mr_map, mr_key);
+ ofi_genlock_unlock(&subdomain->util_domain.lock);
+
+ if (!ret)
+ ofi_atomic_dec32(&subdomain->util_domain.ref);
+ }
+}
+
+static int xnet_mplex_mr_close(struct fid *fid)
+{
+ struct xnet_domain *domain;
+ struct ofi_mr *mr;
+
+ mr = container_of(fid, struct ofi_mr, mr_fid.fid);
+ domain = container_of(&mr->domain->domain_fid, struct xnet_domain,
+ util_domain.domain_fid.fid);
+
+ ofi_genlock_lock(&domain->subdomain_list_lock);
+ xnet_subdomains_mr_close(domain, mr->key);
+ ofi_genlock_unlock(&domain->subdomain_list_lock);
+ return ofi_mr_close(fid);
+}
+
static struct fi_ops xnet_mr_fi_ops = {
.size = sizeof(struct fi_ops),
.close = xnet_mr_close,
@@ -60,6 +95,14 @@ static struct fi_ops xnet_mr_fi_ops = {
.ops_open = fi_no_ops_open
};
+static struct fi_ops xnet_mplex_mr_fi_ops = {
+ .size = sizeof(struct fi_ops),
+ .close = xnet_mplex_mr_close,
+ .bind = fi_no_bind,
+ .control = fi_no_control,
+ .ops_open = fi_no_ops_open
+};
+
static int
xnet_mr_reg(struct fid *fid, const void *buf, size_t len,
uint64_t access, uint64_t offset, uint64_t requested_key,
@@ -128,6 +171,79 @@ xnet_mr_regattr(struct fid *fid, const struct fi_mr_attr *attr,
return ret;
}
+static int
+xnet_mplex_mr_regattr(struct fid *fid, const struct fi_mr_attr *attr,
+ uint64_t flags, struct fid_mr **mr_fid)
+{
+ struct xnet_domain *domain;
+ struct fid_list_entry *item;
+ struct fid_mr *sub_mr_fid;
+ struct ofi_mr *mr;
+ int ret;
+
+ domain = container_of(fid, struct xnet_domain,
+ util_domain.domain_fid.fid);
+ ret = ofi_mr_regattr(fid, attr, flags, mr_fid);
+ if (ret)
+ return ret;
+
+ mr = container_of(*mr_fid, struct ofi_mr, mr_fid.fid);
+ mr->mr_fid.fid.ops = &xnet_mplex_mr_fi_ops;
+
+ ofi_genlock_lock(&domain->subdomain_list_lock);
+ dlist_foreach_container(&domain->subdomain_list,
+ struct fid_list_entry, item, entry) {
+ ret = xnet_mr_regattr(item->fid, attr, flags, &sub_mr_fid);
+ if (ret) {
+ FI_WARN(&xnet_prov, FI_LOG_MR,
+ "Failed to reg mr (%ld) from subdomain (%p)\n",
+ mr->key, item->fid);
+
+ xnet_subdomains_mr_close(domain, mr->key);
+ (void) ofi_mr_close(&(*mr_fid)->fid);
+ break;
+ }
+ }
+
+ ofi_genlock_unlock(&domain->subdomain_list_lock);
+ return ret;
+}
+
+static int
+xnet_mplex_mr_regv(struct fid *fid, const struct iovec *iov,
+ size_t count, uint64_t access,
+ uint64_t offset, uint64_t requested_key,
+ uint64_t flags, struct fid_mr **mr_fid, void *context)
+{
+ struct fi_mr_attr attr;
+
+ attr.mr_iov = iov;
+ attr.iov_count = count;
+ attr.access = access;
+ attr.offset = offset;
+ attr.requested_key = requested_key;
+ attr.context = context;
+ attr.iface = FI_HMEM_SYSTEM;
+ attr.device.reserved = 0;
+ attr.hmem_data = NULL;
+
+ return xnet_mplex_mr_regattr(fid, &attr, flags, mr_fid);
+}
+
+static int
+xnet_mplex_mr_reg(struct fid *fid, const void *buf, size_t len,
+ uint64_t access, uint64_t offset, uint64_t requested_key,
+ uint64_t flags, struct fid_mr **mr_fid, void *context)
+{
+ struct iovec iov;
+
+ iov.iov_base = (void *) buf;
+ iov.iov_len = len;
+
+ return xnet_mplex_mr_regv(fid, &iov, 1, access, offset, requested_key,
+ flags, mr_fid, context);
+}
+
static int xnet_open_ep(struct fid_domain *domain_fid, struct fi_info *info,
struct fid_ep **ep_fid, void *context)
{
@@ -147,13 +263,6 @@ static int xnet_open_ep(struct fid_domain *domain_fid, struct fi_info *info,
return -FI_EINVAL;
}
-static int xnet_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr,
- struct fid_av **fid_av, void *context)
-{
- return rxm_util_av_open(domain_fid, attr, fid_av, context,
- sizeof(struct xnet_conn), NULL);
-}
-
static int
xnet_query_atomic(struct fid_domain *domain, enum fi_datatype datatype,
enum fi_op op, struct fi_atomic_attr *attr, uint64_t flags)
@@ -167,9 +276,48 @@ xnet_query_atomic(struct fid_domain *domain, enum fi_datatype datatype,
return -FI_EOPNOTSUPP;
}
-static struct fi_ops_domain xnet_domain_ops = {
+static int xnet_domain_close(fid_t fid)
+{
+ struct xnet_domain *domain;
+ int ret;
+
+ domain = container_of(fid, struct xnet_domain,
+ util_domain.domain_fid.fid);
+
+ xnet_del_domain_progress(domain);
+ ret = ofi_domain_close(&domain->util_domain);
+ if (ret)
+ return ret;
+
+ xnet_close_progress(&domain->progress);
+ free(domain);
+ return FI_SUCCESS;
+}
+
+static int xnet_mplex_domain_close(fid_t fid)
+{
+ struct xnet_domain *domain;
+ struct fid_list_entry *item;
+
+ domain = container_of(fid, struct xnet_domain, util_domain.domain_fid.fid);
+ ofi_genlock_lock(&domain->subdomain_list_lock);
+ while (!dlist_empty(&domain->subdomain_list)) {
+ dlist_pop_front(&domain->subdomain_list, struct fid_list_entry,
+ item, entry);
+ (void)fi_close(item->fid);
+ free(item);
+ }
+ ofi_genlock_unlock(&domain->subdomain_list_lock);
+
+ ofi_genlock_destroy(&domain->subdomain_list_lock);
+ ofi_domain_close(&domain->util_domain);
+ free(domain);
+ return FI_SUCCESS;
+}
+
+static struct fi_ops_domain xnet_mplex_domain_ops = {
.size = sizeof(struct fi_ops_domain),
- .av_open = xnet_av_open,
+ .av_open = xnet_mplex_av_open,
.cq_open = xnet_cq_open,
.endpoint = xnet_open_ep,
.scalable_ep = fi_no_scalable_ep,
@@ -181,24 +329,86 @@ static struct fi_ops_domain xnet_domain_ops = {
.query_collective = fi_no_query_collective,
};
-static int xnet_domain_close(fid_t fid)
+static struct fi_ops xnet_mplex_domain_fi_ops = {
+ .size = sizeof(struct fi_ops),
+ .close = xnet_mplex_domain_close,
+ .bind = fi_no_bind,
+ .control = fi_no_control,
+ .ops_open = fi_no_ops_open,
+ .tostr = fi_no_tostr,
+ .ops_set = fi_no_ops_set,
+};
+
+static struct fi_ops_mr xnet_mplex_domain_fi_ops_mr = {
+ .size = sizeof(struct fi_ops_mr),
+ .reg = xnet_mplex_mr_reg,
+ .regv = xnet_mplex_mr_regv,
+ .regattr = xnet_mplex_mr_regattr,
+};
+
+int xnet_domain_multiplexed(struct fid_domain *domain_fid)
+{
+ return domain_fid->ops == &xnet_mplex_domain_ops;
+}
+
+static int xnet_domain_mplex_open(struct fid_fabric *fabric_fid, struct fi_info *info,
+ struct fid_domain **domain_fid, void *context)
{
struct xnet_domain *domain;
int ret;
- domain = container_of(fid, struct xnet_domain,
- util_domain.domain_fid.fid);
+ domain = calloc(1, sizeof(*domain));
+ if (!domain)
+ return -FI_ENOMEM;
- xnet_del_domain_progress(domain);
- ret = ofi_domain_close(&domain->util_domain);
+ ret = ofi_domain_init(fabric_fid, info, &domain->util_domain, context,
+ OFI_LOCK_MUTEX);
if (ret)
- return ret;
+ goto free;
- xnet_close_progress(&domain->progress);
- free(domain);
+ ret = ofi_genlock_init(&domain->subdomain_list_lock, OFI_LOCK_MUTEX);
+ if (ret)
+ goto close;
+
+ domain->subdomain_info = fi_dupinfo(info);
+ if (!domain->subdomain_info) {
+ ret = -FI_ENOMEM;
+ goto free_lock;
+ }
+
+ domain->subdomain_info->domain_attr->threading = FI_THREAD_DOMAIN;
+
+ dlist_init(&domain->subdomain_list);
+ domain->ep_type = info->ep_attr->type;
+ domain->util_domain.domain_fid.ops = &xnet_mplex_domain_ops;
+ domain->util_domain.domain_fid.fid.ops = &xnet_mplex_domain_fi_ops;
+ domain->util_domain.domain_fid.mr = &xnet_mplex_domain_fi_ops_mr;
+ *domain_fid = &domain->util_domain.domain_fid;
return FI_SUCCESS;
+
+free_lock:
+ ofi_genlock_destroy(&domain->subdomain_list_lock);
+close:
+ ofi_domain_close(&domain->util_domain);
+free:
+ free(domain);
+ return ret;
}
+static struct fi_ops_domain xnet_domain_ops = {
+ .size = sizeof(struct fi_ops_domain),
+ .av_open = xnet_av_open,
+ .cq_open = xnet_cq_open,
+ .endpoint = xnet_open_ep,
+ .scalable_ep = fi_no_scalable_ep,
+ .cntr_open = xnet_cntr_open,
+ .poll_open = fi_poll_create,
+ .stx_ctx = fi_no_stx_context,
+ .srx_ctx = xnet_srx_context,
+ .query_atomic = xnet_query_atomic,
+ .query_collective = fi_no_query_collective,
+};
+
static struct fi_ops xnet_domain_fi_ops = {
.size = sizeof(struct fi_ops),
.close = xnet_domain_close,
@@ -226,6 +436,10 @@ int xnet_domain_open(struct fid_fabric *fabric_fid, struct fi_info *info,
if (ret)
return ret;
+ if (info->ep_attr->type == FI_EP_RDM &&
+ info->domain_attr->threading == FI_THREAD_COMPLETION)
+ return xnet_domain_mplex_open(fabric_fid, info, domain_fid, context);
+
domain = calloc(1, sizeof(*domain));
if (!domain)
return -FI_ENOMEM;
diff --git a/prov/tcp/src/xnet_rdm.c b/prov/tcp/src/xnet_rdm.c
index 0d85faaa449..420f606f658 100644
--- a/prov/tcp/src/xnet_rdm.c
+++ b/prov/tcp/src/xnet_rdm.c
@@ -695,6 +695,210 @@ static struct fi_ops_ep xnet_rdm_ep_ops = {
.tx_size_left = fi_no_tx_size_left,
};
+static int xnet_mplex_av_dup(struct util_ep *ep, struct xnet_mplex_av *mplex_av,
+ struct xnet_domain *subdomain, struct fid_av **av_fid)
+{
+ int ret, i;
+ struct util_av *subav;
+ size_t addr_size;
+ char addr[sizeof(struct sockaddr_in6)];
+ struct fi_av_attr av_attr = {
+ .type = ep->domain->av_type,
+ .count = mplex_av->util_av.av_entry_pool->entry_cnt,
+ .flags = 0,
+ };
+
+ assert(ofi_genlock_held(&mplex_av->lock));
+ ret = fi_av_open(&subdomain->util_domain.domain_fid, &av_attr, av_fid, NULL);
+ if (ret)
+ return ret;
+
+ subav = container_of(*av_fid, struct util_av, av_fid);
+ for (i = 0; i < mplex_av->util_av.av_entry_pool->entry_cnt; i++) {
+ if (!ofi_ip_av_is_valid(&mplex_av->util_av.av_fid, i))
+ continue;
+
+ ret = ofi_ip_av_lookup(&mplex_av->util_av.av_fid, i, addr, &addr_size);
+ if (ret)
+ continue;
+
+ ofi_mutex_lock(&subav->lock);
+ ret = ofi_av_insert_addr_at(subav, addr, i);
+ ofi_mutex_unlock(&subav->lock);
+ if (ret)
+ return ret;
+ }
+ fid_list_insert(&mplex_av->subav_list, NULL, &subav->av_fid.fid);
+ return FI_SUCCESS;
+};
+
+static int xnet_set_subav(struct util_ep *ep, struct xnet_domain *subdomain)
+{
+ int ret;
+ struct fid_list_entry *item;
+ struct util_av *subav;
+ struct fid_av *subav_fid;
+ struct xnet_mplex_av *xnet_av;
+
+ xnet_av = container_of(&ep->av->av_fid, struct xnet_mplex_av, util_av.av_fid);
+
+ ofi_genlock_lock(&xnet_av->lock);
+ dlist_foreach_container(&xnet_av->subav_list, struct fid_list_entry, item, entry) {
+ subav = container_of(item->fid, struct util_av, av_fid.fid);
+ if (subav->domain == &subdomain->util_domain)
+ goto move_av;
+ }
+ ret = xnet_mplex_av_dup(ep, xnet_av, subdomain, &subav_fid);
+ if (ret)
+ goto out;
+ subav = container_of(subav_fid, struct util_av, av_fid.fid);
+move_av:
+ ofi_genlock_lock(&ep->av->ep_list_lock);
+ dlist_remove(&ep->av_entry);
+ ofi_genlock_unlock(&ep->av->ep_list_lock);
+ ofi_atomic_dec32(&ep->av->ref);
+ ep->av = NULL;
+ ret = ofi_ep_bind_av(ep, subav);
+out:
+ ofi_genlock_unlock(&xnet_av->lock);
+ return ret;
+}
+
+static int xnet_reg_subdomain_mr(struct ofi_rbmap *map, struct ofi_rbnode *node, void *context)
+{
+ int ret;
+ struct fi_mr_attr *attr = (struct fi_mr_attr *)node->data;
+ struct xnet_domain *subdomain = context;
+
+ ret = ofi_mr_map_insert(&subdomain->util_domain.mr_map, attr,
+ &attr->requested_key, attr->context,
+ ((struct ofi_mr*)attr->context)->flags);
+ if (ret) {
+ XNET_WARN_ERR(FI_LOG_MR, "ofi_mr_map_insert", ret);
+ return ret;
+ }
+
+ ofi_atomic_inc32(&subdomain->util_domain.ref);
+ return FI_SUCCESS;
+}
+
+static struct xnet_domain *xnet_find_subdomain(struct xnet_rdm *rdm)
+{
+ int i;
+ struct util_cntr *cntr;
+
+ if (!xnet_domain_multiplexed(&rdm->util_ep.rx_cq->domain->domain_fid)) {
+ return container_of(&rdm->util_ep.rx_cq->domain->domain_fid,
+ struct xnet_domain, util_domain.domain_fid);
+ }
+
+ if (!xnet_domain_multiplexed(&rdm->util_ep.tx_cq->domain->domain_fid)) {
+ return container_of(&rdm->util_ep.tx_cq->domain->domain_fid,
+ struct xnet_domain, util_domain.domain_fid);
+ }
+
+ for (i = 0; i < CNTR_CNT; i++) {
+ cntr = rdm->util_ep.cntrs[i];
+ if (!cntr)
+ continue;
+
+ if (!xnet_domain_multiplexed(&cntr->domain->domain_fid)) {
+ return container_of(&cntr->domain->domain_fid,
+ struct xnet_domain, util_domain.domain_fid);
+ }
+ }
+
+ return NULL;
+}
+
+static void xnet_set_subdomain(struct xnet_rdm *rdm, struct xnet_domain *domain,
+ struct xnet_domain *subdomain)
+{
+ int i;
+ struct util_cntr *cntr;
+
+ assert(ofi_genlock_held(&domain->util_domain.lock));
+
+ if (rdm->util_ep.rx_cq->domain == &domain->util_domain) {
+ ofi_atomic_dec32(&rdm->util_ep.rx_cq->domain->ref);
+ ofi_atomic_inc32(&subdomain->util_domain.ref);
+ rdm->util_ep.rx_cq->domain = &subdomain->util_domain;
+ }
+ assert(rdm->util_ep.rx_cq->domain == &subdomain->util_domain);
+
+ if (rdm->util_ep.tx_cq->domain == &domain->util_domain) {
+ ofi_atomic_dec32(&rdm->util_ep.tx_cq->domain->ref);
+ ofi_atomic_inc32(&subdomain->util_domain.ref);
+ rdm->util_ep.tx_cq->domain = &subdomain->util_domain;
+ }
+ assert(rdm->util_ep.tx_cq->domain == &subdomain->util_domain);
+
+ for (i = 0; i < CNTR_CNT; i++) {
+ cntr = rdm->util_ep.cntrs[i];
+ if (!cntr)
+ continue;
+
+ if (cntr->domain == &domain->util_domain) {
+ ofi_atomic_dec32(&cntr->domain->ref);
+ ofi_atomic_inc32(&subdomain->util_domain.ref);
+ cntr->domain = &subdomain->util_domain;
+ }
+ assert(cntr->domain == &subdomain->util_domain);
+ }
+}
+
+int xnet_rdm_resolve_domains(struct xnet_rdm *rdm)
+{
+ int ret;
+ struct fid_domain *subdomain_fid;
+ struct xnet_domain *subdomain;
+ struct xnet_domain *domain;
+
+ domain = container_of(rdm->util_ep.domain, struct xnet_domain, util_domain);
+ ofi_genlock_lock(&domain->util_domain.lock);
+ subdomain = xnet_find_subdomain(rdm);
+ if (!subdomain) {
+ ret = fi_domain(&domain->util_domain.fabric->fabric_fid,
+ domain->subdomain_info,
+ &subdomain_fid, NULL);
+ if (ret)
+ goto out;
+
+ subdomain = container_of(subdomain_fid, struct xnet_domain,
+ util_domain.domain_fid);
+ ret = fid_list_insert2(&domain->subdomain_list,
+ &domain->subdomain_list_lock,
+ &subdomain_fid->fid);
+ if (ret) {
+ fi_close(&subdomain_fid->fid);
+ goto out;
+ }
+
+ ret = ofi_rbmap_foreach(domain->util_domain.mr_map.rbtree,
+ domain->util_domain.mr_map.rbtree->root,
+ xnet_reg_subdomain_mr, subdomain);
+ if (ret)
+ goto out;
+ }
+
+ xnet_set_subdomain(rdm, domain, subdomain);
+ ret = xnet_set_subav(&rdm->util_ep, subdomain);
+ if (ret)
+ goto out;
+
+ ofi_atomic_dec32(&rdm->util_ep.domain->ref);
+ ofi_atomic_inc32(&subdomain->util_domain.ref);
+ rdm->util_ep.domain = &subdomain->util_domain;
+
+ ofi_atomic_dec32(&rdm->srx->domain->util_domain.ref);
+ ofi_atomic_inc32(&subdomain->util_domain.ref);
+ rdm->srx->domain = subdomain;
+
+out:
+ ofi_genlock_unlock(&domain->util_domain.lock);
+ return ret;
+}
+
static int xnet_enable_rdm(struct xnet_rdm *rdm)
{
struct xnet_progress *progress;
@@ -702,6 +906,12 @@ static int xnet_enable_rdm(struct xnet_rdm *rdm)
size_t len;
int ret;
+ if (xnet_domain_multiplexed(&rdm->util_ep.domain->domain_fid)) {
+ ret = xnet_rdm_resolve_domains(rdm);
+ if (ret)
+ return ret;
+ }
+
(void) fi_ep_bind(&rdm->srx->rx_fid, &rdm->util_ep.rx_cq->cq_fid.fid,
FI_RECV);
if (rdm->util_ep.cntrs[CNTR_RX]) {
diff --git a/prov/util/src/util_av.c b/prov/util/src/util_av.c
index 3f2abf0f5fb..16ebb595ce0 100644
--- a/prov/util/src/util_av.c
+++ b/prov/util/src/util_av.c
@@ -658,9 +658,9 @@ int ofi_ip_av_insert(struct fid_av *av_fid, const void *addr, size_t count,
count, fi_addr, flags, context);
}
-static int ip_av_insertsvc(struct fid_av *av, const char *node,
- const char *service, fi_addr_t *fi_addr,
- uint64_t flags, void *context)
+int ofi_ip_av_insertsvc(struct fid_av *av, const char *node,
+ const char *service, fi_addr_t *fi_addr,
+ uint64_t flags, void *context)
{
return fi_av_insertsym(av, node, 1, service, 1, fi_addr, flags, context);
}
@@ -838,9 +838,9 @@ int ofi_ip_av_sym_getaddr(struct util_av *av, const char *node,
svccnt, addr, addrlen);
}
-static int ip_av_insertsym(struct fid_av *av_fid, const char *node,
- size_t nodecnt, const char *service, size_t svccnt,
- fi_addr_t *fi_addr, uint64_t flags, void *context)
+int ofi_ip_av_insertsym(struct fid_av *av_fid, const char *node,
+ size_t nodecnt, const char *service, size_t svccnt,
+ fi_addr_t *fi_addr, uint64_t flags, void *context)
{
struct util_av *av;
void *addr;
@@ -895,6 +895,14 @@ int ofi_ip_av_remove(struct fid_av *av_fid, fi_addr_t *fi_addr,
return 0;
}
+bool ofi_ip_av_is_valid(struct fid_av *av_fid, fi_addr_t fi_addr)
+{
+ struct util_av *av =
+ container_of(av_fid, struct util_av, av_fid);
+
+ return ofi_bufpool_ibuf_is_valid(av->av_entry_pool, fi_addr);
+}
+
int ofi_ip_av_lookup(struct fid_av *av_fid, fi_addr_t fi_addr,
void *addr, size_t *addrlen)
{
@@ -918,8 +926,8 @@ ofi_ip_av_straddr(struct fid_av *av, const void *addr, char *buf, size_t *len)
static struct fi_ops_av ip_av_ops = {
.size = sizeof(struct fi_ops_av),
.insert = ofi_ip_av_insert,
- .insertsvc = ip_av_insertsvc,
- .insertsym = ip_av_insertsym,
+ .insertsvc = ofi_ip_av_insertsvc,
+ .insertsym = ofi_ip_av_insertsym,
.remove = ofi_ip_av_remove,
.lookup = ofi_ip_av_lookup,
.straddr = ofi_ip_av_straddr,
diff --git a/src/enosys.c b/src/enosys.c
index d0e402a5786..96c69bfcc6d 100644
--- a/src/enosys.c
+++ b/src/enosys.c
@@ -660,6 +660,12 @@ const char *fi_no_av_straddr(struct fid_av *av, const void *addr, char *buf,
return "unknown";
}
+int fi_no_av_set(struct fid_av *av_fid, struct fi_av_set_attr *attr,
+ struct fid_av_set **av_set_fid, void *context)
+{
+ return -FI_ENOSYS;
+}
+
int fi_no_av_set_union(struct fid_av_set *dst, const struct fid_av_set *src)
{
return -FI_ENOSYS;