diff --git a/include/ofi_util.h b/include/ofi_util.h
index 76c62d4fa42..bb4e8aacf0c 100644
--- a/include/ofi_util.h
+++ b/include/ofi_util.h
@@ -232,7 +232,7 @@ static const uint64_t ofi_rx_mr_flags[] = {
[ofi_op_atomic_fetch] = FI_REMOTE_WRITE | FI_REMOTE_READ,
[ofi_op_atomic_compare] = FI_REMOTE_WRITE | FI_REMOTE_READ,
};
-
+struct fi_mr_attr *dup_mr_attr(const struct fi_mr_attr *attr, uint64_t flags);
static inline uint64_t ofi_rx_mr_reg_flags(uint32_t op, uint16_t atomic_op)
{
if (atomic_op == FI_ATOMIC_READ)
@@ -1017,6 +1017,12 @@ int ofi_ip_av_remove(struct fid_av *av_fid, fi_addr_t *fi_addr,
size_t count, uint64_t flags);
int ofi_ip_av_lookup(struct fid_av *av_fid, fi_addr_t fi_addr,
void *addr, size_t *addrlen);
+int ofi_ip_av_insertsym(struct fid_av *av_fid, const char *node,
+ size_t nodecnt, const char *service, size_t svccnt,
+ fi_addr_t *fi_addr, uint64_t flags, void *context);
+int ofi_ip_av_insertsvc(struct fid_av *av, const char *node,
+ const char *service, fi_addr_t *fi_addr,
+ uint64_t flags, void *context);
const char *
ofi_ip_av_straddr(struct fid_av *av, const void *addr, char *buf, size_t *len);
diff --git a/libfabric.vcxproj b/libfabric.vcxproj
index d297285781b..8daefa16e6c 100644
--- a/libfabric.vcxproj
+++ b/libfabric.vcxproj
@@ -701,6 +701,7 @@
+
diff --git a/libfabric.vcxproj.filters b/libfabric.vcxproj.filters
index adebb5a85ad..a0833325f6a 100644
--- a/libfabric.vcxproj.filters
+++ b/libfabric.vcxproj.filters
@@ -462,6 +462,9 @@
Source Files\prov\tcp\src
+
+ Source Files\prov\tcp\src
+
Source Files\prov\tcp\src
diff --git a/prov/tcp/Makefile.include b/prov/tcp/Makefile.include
index d310f51d27f..34c109e92e2 100644
--- a/prov/tcp/Makefile.include
+++ b/prov/tcp/Makefile.include
@@ -5,6 +5,7 @@ _xnet_files = \
prov/tcp/src/xnet_cm.c \
prov/tcp/src/xnet_rdm_cm.c \
prov/tcp/src/xnet_domain.c \
+ prov/tcp/src/xnet_av.c \
prov/tcp/src/xnet_rma.c \
prov/tcp/src/xnet_msg.c \
prov/tcp/src/xnet_ep.c \
diff --git a/prov/tcp/src/xnet.h b/prov/tcp/src/xnet.h
index ee16d7e3a98..65fe911edbf 100644
--- a/prov/tcp/src/xnet.h
+++ b/prov/tcp/src/xnet.h
@@ -467,12 +467,43 @@ struct xnet_xfer_entry {
char msg_data[];
};
+struct xnet_mplex_av {
+ struct util_av util_av;
+ struct dlist_entry subav_list;
+ struct ofi_genlock lock;
+};
+
struct xnet_domain {
struct util_domain util_domain;
struct xnet_progress progress;
enum fi_ep_type ep_type;
+
+ // When an application requests FI_THREAD_COMPLETION
+ // the assumption is that the domain will be used
+ // across multiple threads.
+ //
+ // The xnet progress engine is optimized for single
+ // threaded performance, so instead of reworking the
+ // progress engine, likely losing single threaded
+ // performance, multiplex the domain into multiple
+ // subdomains for each ep. This way, each ep, with
+ // the assumption that the application wants to
+ // progress an ep per thread, can have it's own
+ // progress engine and avoid having a single
+ // synchronization point among all eps.
+ struct fi_info *subdomain_info;
+ struct ofi_genlock subdomain_list_lock;
+ struct dlist_entry subdomain_list;
};
+int xnet_domain_open(struct fid_fabric *fabric, struct fi_info *info,
+ struct fid_domain **domain, void *context);
+
+int xnet_multiplex_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr,
+ struct fid_av **fid_av, void *context);
+
+int xnet_domain_multiplexed(struct fid_domain *domain_fid);
+
static inline struct xnet_progress *xnet_ep2_progress(struct xnet_ep *ep)
{
struct xnet_domain *domain;
@@ -544,10 +575,6 @@ int xnet_passive_ep(struct fid_fabric *fabric, struct fi_info *info,
int xnet_set_port_range(void);
-int xnet_domain_open(struct fid_fabric *fabric, struct fi_info *info,
- struct fid_domain **domain, void *context);
-
-
int xnet_setup_socket(SOCKET sock, struct fi_info *info);
void xnet_set_zerocopy(SOCKET sock);
@@ -566,7 +593,8 @@ static inline struct xnet_cq *xnet_ep_tx_cq(struct xnet_ep *ep)
return container_of(ep->util_ep.tx_cq, struct xnet_cq, util_cq);
}
-
+int xnet_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr,
+ struct fid_av **fid_av, void *context);
int xnet_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
struct fid_cq **cq_fid, void *context);
void xnet_report_success(struct xnet_xfer_entry *xfer_entry);
diff --git a/prov/tcp/src/xnet_av.c b/prov/tcp/src/xnet_av.c
new file mode 100644
index 00000000000..b22a4195f5c
--- /dev/null
+++ b/prov/tcp/src/xnet_av.c
@@ -0,0 +1,235 @@
+/*
+ * Copyright (c) Intel Corporation. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * BSD license below:
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#include
+
+#include "xnet.h"
+
+int xnet_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr,
+ struct fid_av **fid_av, void *context)
+{
+ return rxm_util_av_open(domain_fid, attr, fid_av, context,
+ sizeof(struct xnet_conn), NULL);
+}
+
+static int xnet_mplex_av_remove(struct fid_av *av_fid, fi_addr_t *fi_addr,
+ size_t count, uint64_t flags)
+{
+ int ret;
+ struct fid_list_entry *item;
+ struct fid_av *subav_fid;
+ struct xnet_mplex_av *av = container_of(av_fid, struct xnet_mplex_av,
+ util_av.av_fid);
+
+ ofi_genlock_lock(&av->lock);
+ dlist_foreach_container(&av->subav_list, struct fid_list_entry, item, entry) {
+ subav_fid = container_of(item->fid, struct fid_av, fid);
+ ret = fi_av_remove(subav_fid, fi_addr, count, flags);
+ if (ret)
+ goto out;
+ }
+ ret = ofi_ip_av_remove(&av->util_av.av_fid, fi_addr, count, flags);
+out:
+ ofi_genlock_unlock(&av->lock);
+ return ret;
+}
+
+static int xnet_mplex_av_insert(struct fid_av *av_fid, const void *addr, size_t count,
+ fi_addr_t *fi_addr, uint64_t flags, void *context)
+{
+ int ret;
+ struct fid_list_entry *item;
+ struct fid_av *subav_fid;
+ struct xnet_mplex_av *av = container_of(av_fid, struct xnet_mplex_av,
+ util_av.av_fid.fid);
+
+ ofi_genlock_lock(&av->lock);
+ dlist_foreach_container(&av->subav_list, struct fid_list_entry, item, entry) {
+ subav_fid = container_of(item->fid, struct fid_av, fid);
+ ret = fi_av_insert(subav_fid, addr, count, fi_addr, flags, context);
+ if (ret < count)
+ goto out;
+ }
+ ret = ofi_ip_av_insert(&av->util_av.av_fid, addr, count, fi_addr, flags, context);
+out:
+ ofi_genlock_unlock(&av->lock);
+ return ret;
+}
+
+static int xnet_mplex_av_insertsym(struct fid_av *av_fid, const char *node,
+ size_t nodecnt, const char *service,
+ size_t svccnt, fi_addr_t *fi_addr,
+ uint64_t flags, void *context)
+{
+ int ret;
+ struct fid_list_entry *item;
+ struct fid_av *subav_fid;
+ struct xnet_mplex_av *av = container_of(av_fid, struct xnet_mplex_av,
+ util_av.av_fid.fid);
+
+ ofi_genlock_lock(&av->lock);
+ dlist_foreach_container(&av->subav_list, struct fid_list_entry, item, entry) {
+ subav_fid = container_of(item->fid, struct fid_av, fid);
+ ret = fi_av_insertsym(subav_fid, node, nodecnt, service, svccnt,
+ fi_addr, flags, context);
+ if (ret)
+ goto out;
+ }
+ ret = ofi_ip_av_insertsym(&av->util_av.av_fid, node, nodecnt,
+ service, svccnt, fi_addr, flags, context);
+out:
+ ofi_genlock_unlock(&av->lock);
+
+ return ret;
+}
+
+static int xnet_mplex_av_insertsvc(struct fid_av *av_fid, const char *node,
+ const char *service, fi_addr_t *fi_addr,
+ uint64_t flags, void *context)
+{
+ int ret;
+ struct fid_list_entry *item;
+ struct fid_av *subav_fid;
+ struct xnet_mplex_av *av = container_of(av_fid, struct xnet_mplex_av,
+ util_av.av_fid.fid);
+
+ ofi_genlock_lock(&av->lock);
+ dlist_foreach_container(&av->subav_list, struct fid_list_entry, item, entry) {
+ subav_fid = container_of(item->fid, struct fid_av, fid);
+ ret = fi_av_insertsvc(subav_fid, node, service, fi_addr, flags,
+ context);
+ if (ret)
+ goto out;
+ }
+ ret = ofi_ip_av_insertsvc(&av->util_av.av_fid, node, service,
+ fi_addr, flags, context);
+out:
+ ofi_genlock_unlock(&av->lock);
+ return ret;
+}
+
+static int xnet_mplex_av_lookup(struct fid_av *av_fid, fi_addr_t fi_addr,
+ void *addr, size_t *addrlen)
+{
+ struct xnet_mplex_av *av = container_of(av_fid, struct xnet_mplex_av,
+ util_av.av_fid.fid);
+ return ofi_ip_av_lookup(&av->util_av.av_fid, fi_addr, addr, addrlen);
+}
+
+static const char *
+xnet_mplex_av_straddr(struct fid_av *av_fid, const void *addr, char *buf, size_t *len)
+{
+ return ofi_ip_av_straddr(av_fid, addr, buf, len);
+}
+
+static int xnet_mplex_av_set(struct fid_av *av_fid, struct fi_av_set_attr *attr,
+ struct fid_av_set **av_set_fid, void *context)
+{
+ return -FI_ENOSYS;
+}
+
+static int xnet_mplex_av_close(struct fid *av_fid)
+{
+ struct xnet_mplex_av *av;
+ struct fid_list_entry *item;
+ int ret = 0;
+
+ av = container_of(av_fid, struct xnet_mplex_av, util_av.av_fid.fid);
+ while (!dlist_empty(&av->subav_list)) {
+ dlist_pop_front(&av->subav_list, struct fid_list_entry, item, entry);
+ (void)fi_close(item->fid);
+ free(item);
+ }
+ ret = ofi_av_close(&av->util_av);
+ ofi_genlock_destroy(&av->lock);
+ free(av);
+ return ret;
+}
+
+static struct fi_ops xnet_mplex_av_fi_ops = {
+ .size = sizeof(struct fi_ops),
+ .close = xnet_mplex_av_close,
+ .bind = ofi_av_bind,
+ .control = fi_no_control,
+ .ops_open = fi_no_ops_open,
+};
+
+static struct fi_ops_av xnet_mplex_av_ops = {
+ .size = sizeof(struct fi_ops_av),
+ .insert = xnet_mplex_av_insert,
+ .insertsvc = xnet_mplex_av_insertsvc,
+ .insertsym = xnet_mplex_av_insertsym,
+ .remove = xnet_mplex_av_remove,
+ .lookup = xnet_mplex_av_lookup,
+ .straddr = xnet_mplex_av_straddr,
+ .av_set = xnet_mplex_av_set,
+};
+
+int xnet_multiplex_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr,
+ struct fid_av **fid_av, void *context)
+{
+ struct xnet_mplex_av *av;
+ struct util_domain *domain;
+ struct util_av_attr util_attr;
+ int ret;
+
+ av = calloc(1, sizeof(*av));
+ if (!av)
+ return -FI_ENOMEM;
+
+ ret = ofi_genlock_init(&av->lock, OFI_LOCK_MUTEX);
+ if (ret)
+ goto free;
+
+ domain = container_of(domain_fid, struct util_domain, domain_fid);
+
+ util_attr.context_len = sizeof(struct util_peer_addr *);
+ util_attr.flags = 0;
+ util_attr.addrlen = ofi_sizeof_addr_format(domain->addr_format);
+ if (attr->type == FI_AV_UNSPEC)
+ attr->type = FI_AV_TABLE;
+
+ ret = ofi_av_init(domain, attr, &util_attr, &av->util_av, context);
+ if (ret)
+ goto free_lock;
+ dlist_init(&av->subav_list);
+ av->util_av.av_fid.fid.ops = &xnet_mplex_av_fi_ops;
+ av->util_av.av_fid.ops = &xnet_mplex_av_ops;
+ *fid_av = &av->util_av.av_fid;
+ return FI_SUCCESS;
+
+free_lock:
+ ofi_genlock_destroy(&av->lock);
+free:
+ free(av);
+ return ret;
+}
diff --git a/prov/tcp/src/xnet_cq.c b/prov/tcp/src/xnet_cq.c
index 03ea975371d..6c1e6e959f2 100644
--- a/prov/tcp/src/xnet_cq.c
+++ b/prov/tcp/src/xnet_cq.c
@@ -315,7 +315,6 @@ int xnet_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
return ret;
}
-
static void xnet_cntr_progress(struct util_cntr *cntr)
{
xnet_progress(xnet_cntr2_progress(cntr), false);
diff --git a/prov/tcp/src/xnet_domain.c b/prov/tcp/src/xnet_domain.c
index e2a7fae66e9..a46e293056d 100644
--- a/prov/tcp/src/xnet_domain.c
+++ b/prov/tcp/src/xnet_domain.c
@@ -52,6 +52,41 @@ static int xnet_mr_close(struct fid *fid)
return ret;
}
+static void xnet_subdomains_mr_close(struct xnet_domain *domain, uint64_t mr_key)
+{
+ int ret;
+ struct fid_list_entry *item;
+ struct xnet_domain *subdomain;
+
+ assert(ofi_genlock_held(&domain->subdomain_list_lock));
+ dlist_foreach_container(&domain->subdomain_list,
+ struct fid_list_entry, item, entry) {
+ subdomain = container_of(item->fid, struct xnet_domain,
+ util_domain.domain_fid.fid);
+ ofi_genlock_lock(&subdomain->util_domain.lock);
+ ret = ofi_mr_map_remove(&subdomain->util_domain.mr_map, mr_key);
+ ofi_genlock_unlock(&subdomain->util_domain.lock);
+
+ if (!ret)
+ ofi_atomic_dec32(&subdomain->util_domain.ref);
+ }
+}
+
+static int xnet_mplex_mr_close(struct fid *fid)
+{
+ struct xnet_domain *domain;
+ struct ofi_mr *mr;
+
+ mr = container_of(fid, struct ofi_mr, mr_fid.fid);
+ domain = container_of(&mr->domain->domain_fid, struct xnet_domain,
+ util_domain.domain_fid.fid);
+
+ ofi_genlock_lock(&domain->subdomain_list_lock);
+ xnet_subdomains_mr_close(domain, mr->key);
+ ofi_genlock_unlock(&domain->subdomain_list_lock);
+ return ofi_mr_close(fid);
+}
+
static struct fi_ops xnet_mr_fi_ops = {
.size = sizeof(struct fi_ops),
.close = xnet_mr_close,
@@ -60,6 +95,14 @@ static struct fi_ops xnet_mr_fi_ops = {
.ops_open = fi_no_ops_open
};
+static struct fi_ops xnet_mplex_mr_fi_ops = {
+ .size = sizeof(struct fi_ops),
+ .close = xnet_mplex_mr_close,
+ .bind = fi_no_bind,
+ .control = fi_no_control,
+ .ops_open = fi_no_ops_open
+};
+
static int
xnet_mr_reg(struct fid *fid, const void *buf, size_t len,
uint64_t access, uint64_t offset, uint64_t requested_key,
@@ -128,6 +171,81 @@ xnet_mr_regattr(struct fid *fid, const struct fi_mr_attr *attr,
return ret;
}
+static int
+xnet_mplex_mr_regattr(struct fid *fid, const struct fi_mr_attr *attr,
+ uint64_t flags, struct fid_mr **mr_fid)
+{
+ struct xnet_domain *domain;
+ struct fid_domain *subdomain;
+ struct fid_list_entry *item;
+ struct fid_mr *sub_mr_fid;
+ struct ofi_mr *mr;
+ int ret;
+
+ domain = container_of(fid, struct xnet_domain,
+ util_domain.domain_fid.fid);
+ ret = ofi_mr_regattr(fid, attr, flags, mr_fid);
+ if (ret)
+ goto out;
+
+ mr = container_of(*mr_fid, struct ofi_mr, mr_fid.fid);
+ mr->mr_fid.fid.ops = &xnet_mplex_mr_fi_ops;
+
+ ofi_genlock_lock(&domain->subdomain_list_lock);
+ dlist_foreach_container(&domain->subdomain_list,
+ struct fid_list_entry, item, entry) {
+ subdomain = container_of(item->fid, struct fid_domain, fid);
+ ret = fi_mr_regattr(subdomain, attr, flags, &sub_mr_fid);
+ if (ret) {
+ FI_WARN(&xnet_prov, FI_LOG_MR, "Failed to reg mr (%ld) from subdomain (%p)\n",
+ mr->key, subdomain);
+
+ xnet_subdomains_mr_close(domain, mr->key);
+ (void) ofi_mr_close(&(*mr_fid)->fid);
+ goto unlock;
+ }
+ }
+unlock:
+ ofi_genlock_unlock(&domain->subdomain_list_lock);
+out:
+ return ret;
+}
+
+static int
+xnet_mplex_mr_regv(struct fid *fid, const struct iovec *iov,
+ size_t count, uint64_t access,
+ uint64_t offset, uint64_t requested_key,
+ uint64_t flags, struct fid_mr **mr_fid, void *context)
+{
+ struct fi_mr_attr attr;
+
+ attr.mr_iov = iov;
+ attr.iov_count = count;
+ attr.access = access;
+ attr.offset = offset;
+ attr.requested_key = requested_key;
+ attr.context = context;
+ attr.iface = FI_HMEM_SYSTEM;
+ attr.device.reserved = 0;
+ attr.hmem_data = NULL;
+
+ return xnet_mplex_mr_regattr(fid, &attr, flags, mr_fid);
+}
+
+static int
+xnet_mplex_mr_reg(struct fid *fid, const void *buf, size_t len,
+ uint64_t access, uint64_t offset, uint64_t requested_key,
+ uint64_t flags, struct fid_mr **mr_fid, void *context)
+{
+ struct iovec iov;
+
+ iov.iov_base = (void *) buf;
+ iov.iov_len = len;
+
+ return xnet_mplex_mr_regv(fid, &iov, 1, access, offset, requested_key,
+ flags, mr_fid, context);
+}
+
static int xnet_open_ep(struct fid_domain *domain_fid, struct fi_info *info,
struct fid_ep **ep_fid, void *context)
{
@@ -147,13 +265,6 @@ static int xnet_open_ep(struct fid_domain *domain_fid, struct fi_info *info,
return -FI_EINVAL;
}
-static int xnet_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr,
- struct fid_av **fid_av, void *context)
-{
- return rxm_util_av_open(domain_fid, attr, fid_av, context,
- sizeof(struct xnet_conn), NULL);
-}
-
static int
xnet_query_atomic(struct fid_domain *domain, enum fi_datatype datatype,
enum fi_op op, struct fi_atomic_attr *attr, uint64_t flags)
@@ -167,9 +278,45 @@ xnet_query_atomic(struct fid_domain *domain, enum fi_datatype datatype,
return -FI_EOPNOTSUPP;
}
-static struct fi_ops_domain xnet_domain_ops = {
+static int xnet_domain_close(fid_t fid)
+{
+ struct xnet_domain *domain;
+ int ret;
+
+ domain = container_of(fid, struct xnet_domain,
+ util_domain.domain_fid.fid);
+
+ xnet_del_domain_progress(domain);
+ ret = ofi_domain_close(&domain->util_domain);
+ if (ret)
+ return ret;
+
+ xnet_close_progress(&domain->progress);
+ free(domain);
+ return FI_SUCCESS;
+}
+
+static int xnet_mplex_domain_close(fid_t fid)
+{
+ struct xnet_domain *domain;
+ struct fid_list_entry *item;
+
+ domain = container_of(fid, struct xnet_domain, util_domain.domain_fid.fid);
+ while (!dlist_empty(&domain->subdomain_list)) {
+ dlist_pop_front(&domain->subdomain_list, struct fid_list_entry, item, entry);
+ (void)fi_close(item->fid);
+ free(item);
+ }
+
+ ofi_genlock_destroy(&domain->subdomain_list_lock);
+ ofi_domain_close(&domain->util_domain);
+ free(domain);
+ return FI_SUCCESS;
+}
+
+static struct fi_ops_domain xnet_mplex_domain_ops = {
.size = sizeof(struct fi_ops_domain),
- .av_open = xnet_av_open,
+ .av_open = xnet_multiplex_av_open,
.cq_open = xnet_cq_open,
.endpoint = xnet_open_ep,
.scalable_ep = fi_no_scalable_ep,
@@ -181,24 +328,86 @@ static struct fi_ops_domain xnet_domain_ops = {
.query_collective = fi_no_query_collective,
};
-static int xnet_domain_close(fid_t fid)
+static struct fi_ops xnet_mplex_domain_fi_ops = {
+ .size = sizeof(struct fi_ops),
+ .close = xnet_mplex_domain_close,
+ .bind = fi_no_bind,
+ .control = fi_no_control,
+ .ops_open = fi_no_ops_open,
+ .tostr = fi_no_tostr,
+ .ops_set = fi_no_ops_set,
+};
+
+static struct fi_ops_mr xnet_mplex_domain_fi_ops_mr = {
+ .size = sizeof(struct fi_ops_mr),
+ .reg = xnet_mplex_mr_reg,
+ .regv = xnet_mplex_mr_regv,
+ .regattr = xnet_mplex_mr_regattr,
+};
+
+int xnet_domain_multiplexed(struct fid_domain *domain_fid)
+{
+ return domain_fid->ops == &xnet_mplex_domain_ops;
+}
+
+int xnet_domain_multiplex_open(struct fid_fabric *fabric_fid, struct fi_info *info,
+ struct fid_domain **domain_fid, void *context)
{
struct xnet_domain *domain;
int ret;
- domain = container_of(fid, struct xnet_domain,
- util_domain.domain_fid.fid);
+ domain = calloc(1, sizeof(*domain));
+ if (!domain)
+ return -FI_ENOMEM;
- xnet_del_domain_progress(domain);
- ret = ofi_domain_close(&domain->util_domain);
+ ret = ofi_domain_init(fabric_fid, info, &domain->util_domain, context,
+ OFI_LOCK_MUTEX);
if (ret)
- return ret;
+ goto free;
- xnet_close_progress(&domain->progress);
- free(domain);
+ ret = ofi_genlock_init(&domain->subdomain_list_lock, OFI_LOCK_MUTEX);
+ if (ret)
+ goto close;
+
+ domain->subdomain_info = fi_dupinfo(info);
+ if (!domain->subdomain_info) {
+ ret = -FI_ENOMEM;
+ goto free_lock;
+ }
+
+ domain->subdomain_info->domain_attr->threading = FI_THREAD_DOMAIN;
+
+ dlist_init(&domain->subdomain_list);
+ domain->ep_type = info->ep_attr->type;
+ domain->util_domain.domain_fid.ops = &xnet_mplex_domain_ops;
+ domain->util_domain.domain_fid.fid.ops = &xnet_mplex_domain_fi_ops;
+ domain->util_domain.domain_fid.mr = &xnet_mplex_domain_fi_ops_mr;
+ *domain_fid = &domain->util_domain.domain_fid;
return FI_SUCCESS;
+
+free_lock:
+ ofi_genlock_destroy(&domain->subdomain_list_lock);
+close:
+ ofi_domain_close(&domain->util_domain);
+free:
+ free(domain);
+ return ret;
}
+static struct fi_ops_domain xnet_domain_ops = {
+ .size = sizeof(struct fi_ops_domain),
+ .av_open = xnet_av_open,
+ .cq_open = xnet_cq_open,
+ .endpoint = xnet_open_ep,
+ .scalable_ep = fi_no_scalable_ep,
+ .cntr_open = xnet_cntr_open,
+ .poll_open = fi_poll_create,
+ .stx_ctx = fi_no_stx_context,
+ .srx_ctx = xnet_srx_context,
+ .query_atomic = xnet_query_atomic,
+ .query_collective = fi_no_query_collective,
+};
+
static struct fi_ops xnet_domain_fi_ops = {
.size = sizeof(struct fi_ops),
.close = xnet_domain_close,
@@ -226,6 +435,11 @@ int xnet_domain_open(struct fid_fabric *fabric_fid, struct fi_info *info,
if (ret)
return ret;
+ if (info->ep_attr->type == FI_EP_RDM &&
+ info->domain_attr->threading == FI_THREAD_COMPLETION)
+ return xnet_domain_multiplex_open(fabric_fid, info, domain_fid,
+ context);
+
domain = calloc(1, sizeof(*domain));
if (!domain)
return -FI_ENOMEM;
diff --git a/prov/tcp/src/xnet_rdm.c b/prov/tcp/src/xnet_rdm.c
index 0d85faaa449..6034971a511 100644
--- a/prov/tcp/src/xnet_rdm.c
+++ b/prov/tcp/src/xnet_rdm.c
@@ -695,6 +695,170 @@ static struct fi_ops_ep xnet_rdm_ep_ops = {
.tx_size_left = fi_no_tx_size_left,
};
+static int xnet_mplex_av_dup(struct util_ep *ep, struct xnet_domain *domain,
+ struct xnet_domain *subdomain)
+{
+ int ret, i;
+ struct xnet_mplex_av *xnet_av;
+ struct util_av *util_av;
+ struct fid_av *av_fid;
+ size_t addr_size;
+ char addr[sizeof(struct sockaddr_in6)];
+ struct fi_av_attr av_attr = {
+ .type = ep->domain->av_type,
+ .count = ep->av->av_entry_pool->entry_cnt,
+ .flags = 0,
+ };
+
+ xnet_av = container_of(&ep->av->av_fid, struct xnet_mplex_av, util_av.av_fid);
+ ofi_genlock_lock(&xnet_av->lock);
+ ret = fi_av_open(&subdomain->util_domain.domain_fid, &av_attr, &av_fid, NULL);
+ if (ret)
+ goto out;
+ util_av = container_of(av_fid, struct util_av, av_fid);
+ for (i = 0; i < ep->av->av_entry_pool->entry_cnt; i++) {
+ ret = fi_av_lookup(&ep->av->av_fid, i, addr, &addr_size);
+ if (ret)
+ continue;
+ ret = ofi_av_insert_addr_at(util_av, addr, i);
+ if (ret)
+ goto out;
+ }
+ fid_list_insert(&xnet_av->subav_list, NULL, &av_fid->fid);
+
+ ofi_genlock_lock(&ep->av->ep_list_lock);
+ dlist_remove(&ep->av_entry);
+ ofi_genlock_unlock(&ep->av->ep_list_lock);
+ ofi_atomic_dec32(&ep->av->ref);
+
+ ep->av = NULL;
+ ofi_ep_bind_av(ep, util_av);
+ ret = FI_SUCCESS;
+out:
+ ofi_genlock_unlock(&xnet_av->lock);
+ return ret;
+};
+
+static void xnet_reg_subdomain_mr(struct ofi_rbmap *map, struct ofi_rbnode *node, void *context)
+{
+ int ret;
+ struct fi_mr_attr *attr = (struct fi_mr_attr *)node->data;
+ struct xnet_domain *subdomain = context;
+
+ ret = ofi_mr_map_insert(&subdomain->util_domain.mr_map, attr,
+ &attr->requested_key, attr->context,
+ ((struct ofi_mr*)attr->context)->flags);
+ if (ret)
+ XNET_WARN_ERR(FI_LOG_MR, "ofi_mr_map_insert", ret);
+ else
+ ofi_atomic_inc32(&subdomain->util_domain.ref);
+}
+
+struct xnet_domain *find_subdomain(struct xnet_rdm *rdm)
+{
+ int i;
+ struct util_cntr *cntr;
+
+ if (!xnet_domain_multiplexed(&rdm->util_ep.rx_cq->domain->domain_fid)) {
+ return container_of(&rdm->util_ep.rx_cq->domain->domain_fid,
+ struct xnet_domain, util_domain.domain_fid);
+ }
+
+ if (!xnet_domain_multiplexed(&rdm->util_ep.tx_cq->domain->domain_fid)) {
+ return container_of(&rdm->util_ep.tx_cq->domain->domain_fid,
+ struct xnet_domain, util_domain.domain_fid);
+ }
+
+ for (i = 0; i < CNTR_CNT; i++) {
+ cntr = rdm->util_ep.cntrs[i];
+ if (!cntr)
+ continue;
+
+ if (!xnet_domain_multiplexed(&cntr->domain->domain_fid)) {
+ return container_of(&cntr->domain->domain_fid,
+ struct xnet_domain, util_domain.domain_fid);
+ }
+ }
+
+ return NULL;
+}
+
+static void set_subdomain(struct xnet_rdm *rdm, struct xnet_domain *domain,
+ struct xnet_domain *subdomain)
+{
+ int i;
+ struct util_cntr *cntr;
+
+ if (rdm->util_ep.rx_cq->domain == &domain->util_domain) {
+ ofi_atomic_dec32(&rdm->util_ep.rx_cq->domain->ref);
+ ofi_atomic_inc32(&subdomain->util_domain.ref);
+ rdm->util_ep.rx_cq->domain = &subdomain->util_domain;
+ }
+
+ if (rdm->util_ep.tx_cq->domain == &domain->util_domain) {
+ ofi_atomic_dec32(&rdm->util_ep.tx_cq->domain->ref);
+ ofi_atomic_inc32(&subdomain->util_domain.ref);
+ rdm->util_ep.tx_cq->domain = &subdomain->util_domain;
+ }
+
+ for (i = 0; i < CNTR_CNT; i++) {
+ cntr = rdm->util_ep.cntrs[i];
+ if (!cntr)
+ continue;
+
+ if (cntr->domain == &domain->util_domain) {
+ ofi_atomic_dec32(&cntr->domain->ref);
+ ofi_atomic_inc32(&subdomain->util_domain.ref);
+ cntr->domain = &subdomain->util_domain;
+ }
+ }
+}
+
+int xnet_rdm_resolve_domains(struct xnet_rdm *rdm)
+{
+ int ret;
+ struct fid_domain *subdomain_fid;
+ struct xnet_domain *subdomain;
+ struct xnet_domain *domain = container_of(rdm->util_ep.domain,
+ struct xnet_domain,
+ util_domain);
+ subdomain = find_subdomain(rdm);
+ if (!subdomain) {
+ ret = fi_domain(&domain->util_domain.fabric->fabric_fid,
+ domain->subdomain_info,
+ &subdomain_fid, NULL);
+ if (ret)
+ return ret;
+
+ subdomain = container_of(subdomain_fid, struct xnet_domain,
+ util_domain.domain_fid);
+ ret = fid_list_insert2(&domain->subdomain_list,
+ &domain->subdomain_list_lock,
+ &subdomain_fid->fid);
+ if (ret)
+ fi_close(&subdomain_fid->fid);
+
+ set_subdomain(rdm, domain, subdomain);
+
+ ofi_rbmap_foreach(domain->util_domain.mr_map.rbtree,
+ domain->util_domain.mr_map.rbtree->root,
+ xnet_reg_subdomain_mr, subdomain);
+ }
+
+ ret = xnet_mplex_av_dup(&rdm->util_ep, domain, subdomain);
+ if (ret)
+ return ret;
+
+ ofi_atomic_dec32(&rdm->util_ep.domain->ref);
+ ofi_atomic_inc32(&subdomain->util_domain.ref);
+ rdm->util_ep.domain = &subdomain->util_domain;
+
+ ofi_atomic_dec32(&rdm->srx->domain->util_domain.ref);
+ ofi_atomic_inc32(&subdomain->util_domain.ref);
+ rdm->srx->domain = subdomain;
+ return ret;
+}
+
static int xnet_enable_rdm(struct xnet_rdm *rdm)
{
struct xnet_progress *progress;
@@ -702,6 +866,12 @@ static int xnet_enable_rdm(struct xnet_rdm *rdm)
size_t len;
int ret;
+ if (xnet_domain_multiplexed(&rdm->util_ep.domain->domain_fid)) {
+ ret = xnet_rdm_resolve_domains(rdm);
+ if (ret)
+ return ret;
+ }
+
(void) fi_ep_bind(&rdm->srx->rx_fid, &rdm->util_ep.rx_cq->cq_fid.fid,
FI_RECV);
if (rdm->util_ep.cntrs[CNTR_RX]) {
diff --git a/prov/util/src/util_av.c b/prov/util/src/util_av.c
index da7b34d3935..9a70aaa6f5c 100644
--- a/prov/util/src/util_av.c
+++ b/prov/util/src/util_av.c
@@ -737,9 +737,9 @@ int ofi_ip_av_insert(struct fid_av *av_fid, const void *addr, size_t count,
count, fi_addr, flags, context);
}
-static int ip_av_insertsvc(struct fid_av *av, const char *node,
- const char *service, fi_addr_t *fi_addr,
- uint64_t flags, void *context)
+int ofi_ip_av_insertsvc(struct fid_av *av, const char *node,
+ const char *service, fi_addr_t *fi_addr,
+ uint64_t flags, void *context)
{
return fi_av_insertsym(av, node, 1, service, 1, fi_addr, flags, context);
}
@@ -917,9 +917,9 @@ int ofi_ip_av_sym_getaddr(struct util_av *av, const char *node,
svccnt, addr, addrlen);
}
-static int ip_av_insertsym(struct fid_av *av_fid, const char *node,
- size_t nodecnt, const char *service, size_t svccnt,
- fi_addr_t *fi_addr, uint64_t flags, void *context)
+int ofi_ip_av_insertsym(struct fid_av *av_fid, const char *node,
+ size_t nodecnt, const char *service, size_t svccnt,
+ fi_addr_t *fi_addr, uint64_t flags, void *context)
{
struct util_av *av;
void *addr;
@@ -997,8 +997,8 @@ ofi_ip_av_straddr(struct fid_av *av, const void *addr, char *buf, size_t *len)
static struct fi_ops_av ip_av_ops = {
.size = sizeof(struct fi_ops_av),
.insert = ofi_ip_av_insert,
- .insertsvc = ip_av_insertsvc,
- .insertsym = ip_av_insertsym,
+ .insertsvc = ofi_ip_av_insertsvc,
+ .insertsym = ofi_ip_av_insertsym,
.remove = ofi_ip_av_remove,
.lookup = ofi_ip_av_lookup,
.straddr = ofi_ip_av_straddr,
diff --git a/prov/util/src/util_ep.c b/prov/util/src/util_ep.c
index eade45d46db..43a3bec4be8 100644
--- a/prov/util/src/util_ep.c
+++ b/prov/util/src/util_ep.c
@@ -41,6 +41,8 @@ int ofi_ep_bind_cq(struct util_ep *ep, struct util_cq *cq, uint64_t flags)
{
int ret;
+ assert(ep->domain == cq->domain);
+
ret = ofi_check_bind_cq_flags(ep, cq, flags);
if (ret)
return ret;
@@ -101,6 +103,8 @@ int ofi_ep_bind_av(struct util_ep *util_ep, struct util_av *av)
int ofi_ep_bind_cntr(struct util_ep *ep, struct util_cntr *cntr, uint64_t flags)
{
+ assert(ep->domain == cntr->domain);
+
if (flags & ~(FI_TRANSMIT | FI_RECV | FI_READ | FI_WRITE |
FI_REMOTE_READ | FI_REMOTE_WRITE)) {
FI_WARN(ep->domain->fabric->prov, FI_LOG_EP_CTRL,
diff --git a/prov/util/src/util_mr_map.c b/prov/util/src/util_mr_map.c
index 535189ea71b..181626c69dc 100644
--- a/prov/util/src/util_mr_map.c
+++ b/prov/util/src/util_mr_map.c
@@ -38,8 +38,8 @@
#include
-static struct fi_mr_attr *
-dup_mr_attr(const struct fi_mr_attr *attr, uint64_t flags)
+
+struct fi_mr_attr *dup_mr_attr(const struct fi_mr_attr *attr, uint64_t flags)
{
struct fi_mr_attr *dup_attr;
@@ -52,7 +52,6 @@ dup_mr_attr(const struct fi_mr_attr *attr, uint64_t flags)
dup_attr->mr_iov = (struct iovec *) (dup_attr + 1);
/*
- * dup_mr_attr is only used insided ofi_mr_map_insert.
* dmabuf must be converted to iov before the attr
* is inserted to the mr_map
*/