From 117a1d52a7de4147255c5229967e231c916c0e39 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Wed, 31 Jul 2024 21:46:53 -0500 Subject: [PATCH] Remote SST streaming (#4280) Remote SST streaming --- source/adios2/toolkit/sst/cp/cp_common.c | 124 ++++++++++++++++++++- source/adios2/toolkit/sst/cp/cp_internal.h | 2 + source/adios2/toolkit/sst/cp/cp_reader.c | 2 +- source/adios2/toolkit/sst/cp/cp_writer.c | 9 +- source/adios2/toolkit/sst/sst_data.h | 1 + source/adios2/toolkit/sst/win_interface.h | 6 + 6 files changed, 138 insertions(+), 6 deletions(-) diff --git a/source/adios2/toolkit/sst/cp/cp_common.c b/source/adios2/toolkit/sst/cp/cp_common.c index c7a779472f..5dba568f44 100644 --- a/source/adios2/toolkit/sst/cp/cp_common.c +++ b/source/adios2/toolkit/sst/cp/cp_common.c @@ -9,8 +9,14 @@ #include #include #ifndef _MSC_VER +#include #include +#include #include + +#include +#include +#include #include #else #include "../win_interface.h" @@ -865,7 +871,11 @@ void **CP_consolidateDataToAll(SstStream Stream, void *LocalInfo, FFSTypeHandle atom_t CM_TRANSPORT_ATOM = 0; static atom_t IP_INTERFACE_ATOM = 0; +atom_t IP_PORT_ATOM = 0; +atom_t IP_ADDR_ATOM = 0; +atom_t IP_HOST_ATOM = 0; static atom_t CM_ENET_CONN_TIMEOUT = -1; +static atom_t SST_GROUP_ID_ATOM = -1; static void initAtomList() { @@ -874,7 +884,12 @@ static void initAtomList() CM_TRANSPORT_ATOM = attr_atom_from_string("CM_TRANSPORT"); IP_INTERFACE_ATOM = attr_atom_from_string("IP_INTERFACE"); + IP_PORT_ATOM = attr_atom_from_string("IP_PORT"); + IP_ADDR_ATOM = attr_atom_from_string("IP_ADDR"); + IP_HOST_ATOM = attr_atom_from_string("IP_HOST"); + SST_GROUP_ID_ATOM = attr_atom_from_string("SST_GROUP_ID"); CM_ENET_CONN_TIMEOUT = attr_atom_from_string("CM_ENET_CONN_TIMEOUT"); + SST_GROUP_ID_ATOM = attr_atom_from_string("SST_GROUP_ID"); } static void AddCustomStruct(CP_StructList *List, FMStructDescList Struct) @@ -1300,6 +1315,10 @@ extern char *CP_GetContactString(SstStream Stream, attr_list DPAttrs) { set_int_attr(ContactList, CM_ENET_CONN_TIMEOUT, 60000); /* 60 seconds */ } + if (Stream->ConfigParams->RemoteGroup) + { + set_string_attr(ContactList, SST_GROUP_ID_ATOM, strdup(Stream->ConfigParams->RemoteGroup)); + } if (DPAttrs) { attr_merge_lists(ContactList, DPAttrs); @@ -1628,12 +1647,115 @@ static SMPI_Comm CP_getMPIComm(SstStream Stream) { return Stream->mpiComm; } extern void WriterConnCloseHandler(CManager cm, CMConnection closed_conn, void *client_data); extern void ReaderConnCloseHandler(CManager cm, CMConnection ClosedConn, void *client_data); +CMConnection Tunneling_get_conn(CManager cm, attr_list attrs) +{ + char *group_id = NULL; + if (get_string_attr(attrs, SST_GROUP_ID_ATOM, &group_id)) + { + attr_list conn_attrs; + // do the tunneling + char request[1024] = {'\0'}; + const char *format_string = "/connect_port?group=%s&service=%s&dhost=%s&dport=%d"; + int DestinationIP; + int DestinationPort; + char *DestinationHostStr; + if (!get_int_attr(attrs, IP_PORT_ATOM, &DestinationPort)) + { + fprintf(stderr, "No IP_PORT atom\n"); + return NULL; + } + if (!get_int_attr(attrs, IP_ADDR_ATOM, &DestinationIP)) + { + DestinationIP = 0; + } + if (!get_string_attr(attrs, IP_HOST_ATOM, &DestinationHostStr)) + { + fprintf(stderr, "No IP_HOST"); + DestinationHostStr = NULL; + } + char IPstr[INET_ADDRSTRLEN]; + if ((DestinationHostStr == NULL) && (DestinationIP == 0)) + { + fprintf(stderr, "NO IP or Hostname for tunnel\n"); + return NULL; + } + else if (DestinationHostStr == NULL) + { + inet_ntop(AF_INET, &DestinationIP, IPstr, INET_ADDRSTRLEN); + DestinationHostStr = &IPstr[0]; + } + int request_len = snprintf(request, sizeof(request), format_string, group_id, "service_5", + DestinationHostStr, DestinationPort); + union + { + struct sockaddr s; + struct sockaddr_in s_I4; + } sock_addr; + + sock_addr.s_I4.sin_family = AF_INET; + sock_addr.s_I4.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + sock_addr.s_I4.sin_port = htons(30000); + + int sock = socket(AF_INET, SOCK_STREAM, 0); + /* Actually connect. */ + if (connect(sock, (struct sockaddr *)&sock_addr.s_I4, sizeof(sock_addr)) == -1) + printf("Error\n"); + + /* Send request. */ + printf("Request is \"%s\"\n", request); + int nbytes_total = 0; + while (nbytes_total < request_len) + { + int nbytes_last = write(sock, request + nbytes_total, request_len - nbytes_total); + if (nbytes_last == -1) + printf("Error\n"); + nbytes_total += nbytes_last; + } + + /* Read the response. */ + size_t bytes_recd = 0; + char recv_buffer[2048]; + while (1) + { + size_t remaining = sizeof(recv_buffer) - bytes_recd; + int read_len = remaining; + nbytes_total = read(sock, recv_buffer + bytes_recd, read_len); + if (nbytes_total == 0) + { + break; + } + if (nbytes_total == -1) + { + printf("Error\n"); + } + bytes_recd += nbytes_total; + } + + close(sock); + printf("We got %s from the tunnel server\n", recv_buffer); + char msg[256]; + int forward_port; + sscanf(recv_buffer, "port:%d,msg:%s", &forward_port, &msg[0]); + printf("Forward port is %d\n", forward_port); + conn_attrs = create_attr_list(); + add_attr(conn_attrs, IP_PORT_ATOM, Attr_Int4, (attr_value)(intptr_t)forward_port); + + add_attr(conn_attrs, IP_ADDR_ATOM, Attr_Int4, (attr_value)INADDR_LOOPBACK); + + return CMget_conn(cm, conn_attrs); + } + else + { + return CMget_conn(cm, attrs); + } +} + static int CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank, CMFormat Format, void *Data) { CP_PeerConnection *Peers = (CP_PeerConnection *)Cohort; if (Peers[Rank].CMconn == NULL) { - Peers[Rank].CMconn = CMget_conn(s->CPInfo->SharedCM->cm, Peers[Rank].ContactList); + Peers[Rank].CMconn = Tunneling_get_conn(s->CPInfo->SharedCM->cm, Peers[Rank].ContactList); if (!Peers[Rank].CMconn) { CP_error(s, "Connection failed in CP_sendToPeer! Contact list was:\n"); diff --git a/source/adios2/toolkit/sst/cp/cp_internal.h b/source/adios2/toolkit/sst/cp/cp_internal.h index 625166b3fb..5ee0a89c54 100644 --- a/source/adios2/toolkit/sst/cp/cp_internal.h +++ b/source/adios2/toolkit/sst/cp/cp_internal.h @@ -577,6 +577,7 @@ extern void FFSFreeMarshalData(SstStream Stream); extern void getPeerArrays(int MySize, int MyRank, int PeerSize, int **forwardArray, int **reverseArray); extern void AddToLastCallFreeList(void *Block); +extern CMConnection Tunneling_get_conn(CManager cm, attr_list attrs); enum VerbosityLevel { @@ -602,3 +603,4 @@ extern CPNetworkInfoFunc globalNetinfoCallback; extern void SSTSetNetworkCallback(CPNetworkInfoFunc callback); extern void DoStreamSummary(SstStream Stream); #define SSIZE_T_MAX (9223372036854775807) +extern int IP_PORT_ATOM; diff --git a/source/adios2/toolkit/sst/cp/cp_reader.c b/source/adios2/toolkit/sst/cp/cp_reader.c index 1f0daef7fa..1002025f6a 100644 --- a/source/adios2/toolkit/sst/cp/cp_reader.c +++ b/source/adios2/toolkit/sst/cp/cp_reader.c @@ -397,7 +397,7 @@ attr_list ContactWriter(SstStream Stream, char *Filename, SstParams Params, SMPI (globalNetinfoCallback)(2, CMContactString, NULL); } WriterRank0Contact = attr_list_from_string(CMContactString); - conn = CMget_conn(Stream->CPInfo->SharedCM->cm, WriterRank0Contact); + conn = Tunneling_get_conn(Stream->CPInfo->SharedCM->cm, WriterRank0Contact); free_attr_list(WriterRank0Contact); } if (conn) diff --git a/source/adios2/toolkit/sst/cp/cp_writer.c b/source/adios2/toolkit/sst/cp/cp_writer.c index 5c60d10e40..99166f8d47 100644 --- a/source/adios2/toolkit/sst/cp/cp_writer.c +++ b/source/adios2/toolkit/sst/cp/cp_writer.c @@ -211,6 +211,7 @@ static void writeContactInfoScreen(const char *Name, SstStream Stream, attr_list "available to the reader.\n", Name); fprintf(stdout, "\t%s\n", Contact); + fflush(stdout); free(Contact); } @@ -584,8 +585,8 @@ static int initWSReader(WS_ReaderInfo reader, int ReaderSize, CP_ReaderInitInfo if (!reader->Connections[peer].CMconn) { reader->Connections[peer].CMconn = - CMget_conn(reader->ParentStream->CPInfo->SharedCM->cm, - reader->Connections[peer].ContactList); + Tunneling_get_conn(reader->ParentStream->CPInfo->SharedCM->cm, + reader->Connections[peer].ContactList); } if (!reader->Connections[peer].CMconn) @@ -633,7 +634,7 @@ static int initWSReader(WS_ReaderInfo reader, int ReaderSize, CP_ReaderInitInfo } if (reader->ParentStream->ConnectionUsleepMultiplier != 0) usleep(WriterRank * reader->ParentStream->ConnectionUsleepMultiplier); - reader->Connections[peer].CMconn = CMget_conn( + reader->Connections[peer].CMconn = Tunneling_get_conn( reader->ParentStream->CPInfo->SharedCM->cm, reader->Connections[peer].ContactList); if (!reader->Connections[peer].CMconn) @@ -663,7 +664,7 @@ static int initWSReader(WS_ReaderInfo reader, int ReaderSize, CP_ReaderInitInfo { if (!reader->Connections[0].CMconn) { - reader->Connections[0].CMconn = CMget_conn( + reader->Connections[0].CMconn = Tunneling_get_conn( reader->ParentStream->CPInfo->SharedCM->cm, reader->Connections[0].ContactList); } if (!reader->Connections[0].CMconn) diff --git a/source/adios2/toolkit/sst/sst_data.h b/source/adios2/toolkit/sst/sst_data.h index c7031d5378..cdac13428f 100644 --- a/source/adios2/toolkit/sst/sst_data.h +++ b/source/adios2/toolkit/sst/sst_data.h @@ -79,6 +79,7 @@ typedef struct _SstStats MACRO(ReaderShortCircuitReads, Bool, int, 0) \ MACRO(StatsLevel, Int, int, 0) \ MACRO(UseOneTimeAttributes, Bool, int, 0) \ + MACRO(RemoteGroup, String, char *, NULL) \ MACRO(ControlModule, String, char *, NULL) typedef enum diff --git a/source/adios2/toolkit/sst/win_interface.h b/source/adios2/toolkit/sst/win_interface.h index b609a02f5a..d0782dbf55 100644 --- a/source/adios2/toolkit/sst/win_interface.h +++ b/source/adios2/toolkit/sst/win_interface.h @@ -6,6 +6,12 @@ #include #include #include +#define getpid() _getpid() +#define read(fd, buf, len) recv(fd, (buf), (len), 0) +#define write(fd, buf, len) send(fd, buf, (len), 0) +#define close(x) closesocket(x) +#define INST_ADDRSTRLEN 50 + #define pthread_mutex_t SRWLOCK #define pthread_thread_t HANDLE #define pthread_thread_id DWORD