Skip to content

Commit

Permalink
Remote SST streaming (#4280)
Browse files Browse the repository at this point in the history
Remote SST streaming
  • Loading branch information
eisenhauer authored Aug 1, 2024
1 parent e3abec2 commit 117a1d5
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 6 deletions.
124 changes: 123 additions & 1 deletion source/adios2/toolkit/sst/cp/cp_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@
#include <atl.h>
#include <evpath.h>
#ifndef _MSC_VER
#include <netinet/in.h>
#include <pthread.h>
#include <sys/socket.h>
#include <sys/time.h>

#include <arpa/inet.h>
#include <netdb.h>
#include <netinet/in.h>
#include <unistd.h>
#else
#include "../win_interface.h"
Expand Down Expand Up @@ -865,7 +871,11 @@ void **CP_consolidateDataToAll(SstStream Stream, void *LocalInfo, FFSTypeHandle

atom_t CM_TRANSPORT_ATOM = 0;
static atom_t IP_INTERFACE_ATOM = 0;
atom_t IP_PORT_ATOM = 0;
atom_t IP_ADDR_ATOM = 0;
atom_t IP_HOST_ATOM = 0;
static atom_t CM_ENET_CONN_TIMEOUT = -1;
static atom_t SST_GROUP_ID_ATOM = -1;

static void initAtomList()
{
Expand All @@ -874,7 +884,12 @@ static void initAtomList()

CM_TRANSPORT_ATOM = attr_atom_from_string("CM_TRANSPORT");
IP_INTERFACE_ATOM = attr_atom_from_string("IP_INTERFACE");
IP_PORT_ATOM = attr_atom_from_string("IP_PORT");
IP_ADDR_ATOM = attr_atom_from_string("IP_ADDR");
IP_HOST_ATOM = attr_atom_from_string("IP_HOST");
SST_GROUP_ID_ATOM = attr_atom_from_string("SST_GROUP_ID");
CM_ENET_CONN_TIMEOUT = attr_atom_from_string("CM_ENET_CONN_TIMEOUT");
SST_GROUP_ID_ATOM = attr_atom_from_string("SST_GROUP_ID");
}

static void AddCustomStruct(CP_StructList *List, FMStructDescList Struct)
Expand Down Expand Up @@ -1300,6 +1315,10 @@ extern char *CP_GetContactString(SstStream Stream, attr_list DPAttrs)
{
set_int_attr(ContactList, CM_ENET_CONN_TIMEOUT, 60000); /* 60 seconds */
}
if (Stream->ConfigParams->RemoteGroup)
{
set_string_attr(ContactList, SST_GROUP_ID_ATOM, strdup(Stream->ConfigParams->RemoteGroup));
}
if (DPAttrs)
{
attr_merge_lists(ContactList, DPAttrs);
Expand Down Expand Up @@ -1628,12 +1647,115 @@ static SMPI_Comm CP_getMPIComm(SstStream Stream) { return Stream->mpiComm; }

extern void WriterConnCloseHandler(CManager cm, CMConnection closed_conn, void *client_data);
extern void ReaderConnCloseHandler(CManager cm, CMConnection ClosedConn, void *client_data);
CMConnection Tunneling_get_conn(CManager cm, attr_list attrs)
{
char *group_id = NULL;
if (get_string_attr(attrs, SST_GROUP_ID_ATOM, &group_id))
{
attr_list conn_attrs;
// do the tunneling
char request[1024] = {'\0'};
const char *format_string = "/connect_port?group=%s&service=%s&dhost=%s&dport=%d";
int DestinationIP;
int DestinationPort;
char *DestinationHostStr;
if (!get_int_attr(attrs, IP_PORT_ATOM, &DestinationPort))
{
fprintf(stderr, "No IP_PORT atom\n");
return NULL;
}
if (!get_int_attr(attrs, IP_ADDR_ATOM, &DestinationIP))
{
DestinationIP = 0;
}
if (!get_string_attr(attrs, IP_HOST_ATOM, &DestinationHostStr))
{
fprintf(stderr, "No IP_HOST");
DestinationHostStr = NULL;
}
char IPstr[INET_ADDRSTRLEN];
if ((DestinationHostStr == NULL) && (DestinationIP == 0))
{
fprintf(stderr, "NO IP or Hostname for tunnel\n");
return NULL;
}
else if (DestinationHostStr == NULL)
{
inet_ntop(AF_INET, &DestinationIP, IPstr, INET_ADDRSTRLEN);
DestinationHostStr = &IPstr[0];
}
int request_len = snprintf(request, sizeof(request), format_string, group_id, "service_5",
DestinationHostStr, DestinationPort);
union
{
struct sockaddr s;
struct sockaddr_in s_I4;
} sock_addr;

sock_addr.s_I4.sin_family = AF_INET;
sock_addr.s_I4.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
sock_addr.s_I4.sin_port = htons(30000);

int sock = socket(AF_INET, SOCK_STREAM, 0);
/* Actually connect. */
if (connect(sock, (struct sockaddr *)&sock_addr.s_I4, sizeof(sock_addr)) == -1)
printf("Error\n");

/* Send request. */
printf("Request is \"%s\"\n", request);
int nbytes_total = 0;
while (nbytes_total < request_len)
{
int nbytes_last = write(sock, request + nbytes_total, request_len - nbytes_total);
if (nbytes_last == -1)
printf("Error\n");
nbytes_total += nbytes_last;
}

/* Read the response. */
size_t bytes_recd = 0;
char recv_buffer[2048];
while (1)
{
size_t remaining = sizeof(recv_buffer) - bytes_recd;
int read_len = remaining;
nbytes_total = read(sock, recv_buffer + bytes_recd, read_len);
if (nbytes_total == 0)
{
break;
}
if (nbytes_total == -1)
{
printf("Error\n");
}
bytes_recd += nbytes_total;
}

close(sock);
printf("We got %s from the tunnel server\n", recv_buffer);
char msg[256];
int forward_port;
sscanf(recv_buffer, "port:%d,msg:%s", &forward_port, &msg[0]);
printf("Forward port is %d\n", forward_port);
conn_attrs = create_attr_list();
add_attr(conn_attrs, IP_PORT_ATOM, Attr_Int4, (attr_value)(intptr_t)forward_port);

add_attr(conn_attrs, IP_ADDR_ATOM, Attr_Int4, (attr_value)INADDR_LOOPBACK);

return CMget_conn(cm, conn_attrs);
}
else
{
return CMget_conn(cm, attrs);
}
}

static int CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank, CMFormat Format, void *Data)
{
CP_PeerConnection *Peers = (CP_PeerConnection *)Cohort;
if (Peers[Rank].CMconn == NULL)
{
Peers[Rank].CMconn = CMget_conn(s->CPInfo->SharedCM->cm, Peers[Rank].ContactList);
Peers[Rank].CMconn = Tunneling_get_conn(s->CPInfo->SharedCM->cm, Peers[Rank].ContactList);
if (!Peers[Rank].CMconn)
{
CP_error(s, "Connection failed in CP_sendToPeer! Contact list was:\n");
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/toolkit/sst/cp/cp_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ extern void FFSFreeMarshalData(SstStream Stream);
extern void getPeerArrays(int MySize, int MyRank, int PeerSize, int **forwardArray,
int **reverseArray);
extern void AddToLastCallFreeList(void *Block);
extern CMConnection Tunneling_get_conn(CManager cm, attr_list attrs);

enum VerbosityLevel
{
Expand All @@ -602,3 +603,4 @@ extern CPNetworkInfoFunc globalNetinfoCallback;
extern void SSTSetNetworkCallback(CPNetworkInfoFunc callback);
extern void DoStreamSummary(SstStream Stream);
#define SSIZE_T_MAX (9223372036854775807)
extern int IP_PORT_ATOM;
2 changes: 1 addition & 1 deletion source/adios2/toolkit/sst/cp/cp_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ attr_list ContactWriter(SstStream Stream, char *Filename, SstParams Params, SMPI
(globalNetinfoCallback)(2, CMContactString, NULL);
}
WriterRank0Contact = attr_list_from_string(CMContactString);
conn = CMget_conn(Stream->CPInfo->SharedCM->cm, WriterRank0Contact);
conn = Tunneling_get_conn(Stream->CPInfo->SharedCM->cm, WriterRank0Contact);
free_attr_list(WriterRank0Contact);
}
if (conn)
Expand Down
9 changes: 5 additions & 4 deletions source/adios2/toolkit/sst/cp/cp_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ static void writeContactInfoScreen(const char *Name, SstStream Stream, attr_list
"available to the reader.\n",
Name);
fprintf(stdout, "\t%s\n", Contact);
fflush(stdout);
free(Contact);
}

Expand Down Expand Up @@ -584,8 +585,8 @@ static int initWSReader(WS_ReaderInfo reader, int ReaderSize, CP_ReaderInitInfo
if (!reader->Connections[peer].CMconn)
{
reader->Connections[peer].CMconn =
CMget_conn(reader->ParentStream->CPInfo->SharedCM->cm,
reader->Connections[peer].ContactList);
Tunneling_get_conn(reader->ParentStream->CPInfo->SharedCM->cm,
reader->Connections[peer].ContactList);
}

if (!reader->Connections[peer].CMconn)
Expand Down Expand Up @@ -633,7 +634,7 @@ static int initWSReader(WS_ReaderInfo reader, int ReaderSize, CP_ReaderInitInfo
}
if (reader->ParentStream->ConnectionUsleepMultiplier != 0)
usleep(WriterRank * reader->ParentStream->ConnectionUsleepMultiplier);
reader->Connections[peer].CMconn = CMget_conn(
reader->Connections[peer].CMconn = Tunneling_get_conn(
reader->ParentStream->CPInfo->SharedCM->cm, reader->Connections[peer].ContactList);

if (!reader->Connections[peer].CMconn)
Expand Down Expand Up @@ -663,7 +664,7 @@ static int initWSReader(WS_ReaderInfo reader, int ReaderSize, CP_ReaderInitInfo
{
if (!reader->Connections[0].CMconn)
{
reader->Connections[0].CMconn = CMget_conn(
reader->Connections[0].CMconn = Tunneling_get_conn(
reader->ParentStream->CPInfo->SharedCM->cm, reader->Connections[0].ContactList);
}
if (!reader->Connections[0].CMconn)
Expand Down
1 change: 1 addition & 0 deletions source/adios2/toolkit/sst/sst_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ typedef struct _SstStats
MACRO(ReaderShortCircuitReads, Bool, int, 0) \
MACRO(StatsLevel, Int, int, 0) \
MACRO(UseOneTimeAttributes, Bool, int, 0) \
MACRO(RemoteGroup, String, char *, NULL) \
MACRO(ControlModule, String, char *, NULL)

typedef enum
Expand Down
6 changes: 6 additions & 0 deletions source/adios2/toolkit/sst/win_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
#include <string.h>
#include <sys/timeb.h>
#include <time.h>
#define getpid() _getpid()
#define read(fd, buf, len) recv(fd, (buf), (len), 0)
#define write(fd, buf, len) send(fd, buf, (len), 0)
#define close(x) closesocket(x)
#define INST_ADDRSTRLEN 50

#define pthread_mutex_t SRWLOCK
#define pthread_thread_t HANDLE
#define pthread_thread_id DWORD
Expand Down

0 comments on commit 117a1d5

Please sign in to comment.