Skip to content

Commit

Permalink
Tweaks and test for SST in threads
Browse files Browse the repository at this point in the history
  • Loading branch information
eisenhauer committed Jun 12, 2020
1 parent bf02720 commit 976241d
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 17 deletions.
7 changes: 7 additions & 0 deletions source/adios2/toolkit/sst/cp/cp_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,7 @@ static void doFormatRegistration(CP_GlobalInfo CPInfo, CP_DP_Interface DPInfo)
CMregister_handler(CPInfo->ReaderCloseFormat, CP_ReaderCloseHandler, NULL);
}

static pthread_mutex_t StateMutex = PTHREAD_MUTEX_INITIALIZER;
static CP_GlobalInfo CPInfo = NULL;
static int CPInfoRefCount = 0;

Expand Down Expand Up @@ -1139,6 +1140,7 @@ extern void SstStreamDestroy(SstStream Stream)
pthread_mutex_unlock(&Stream->DataLock);
// Stream is free'd in LastCall

pthread_mutex_lock(&StateMutex);
CPInfoRefCount--;
if (CPInfoRefCount == 0)
{
Expand All @@ -1163,6 +1165,7 @@ extern void SstStreamDestroy(SstStream Stream)
free_FMfield_list(CP_SstParamsList);
CP_SstParamsList = NULL;
}
pthread_mutex_unlock(&StateMutex);
CP_verbose(&StackStream, "SstStreamDestroy successful, returning\n");
}

Expand All @@ -1182,6 +1185,7 @@ extern char *CP_GetContactString(SstStream Stream, attr_list DPAttrs)
strdup(Stream->ConfigParams->NetworkInterface));
}
ContactList = CMget_specific_contact_list(Stream->CPInfo->cm, ListenList);
ContactList = CMderef_and_copy_list(Stream->CPInfo->cm, ContactList);
if (strcmp(Stream->ConfigParams->ControlTransport, "enet") == 0)
{
set_int_attr(ContactList, CM_ENET_CONN_TIMEOUT, 60000); /* 60 seconds */
Expand All @@ -1199,9 +1203,11 @@ extern char *CP_GetContactString(SstStream Stream, attr_list DPAttrs)
extern CP_GlobalInfo CP_getCPInfo(CP_DP_Interface DPInfo, char *ControlModule)
{

pthread_mutex_lock(&StateMutex);
if (CPInfo)
{
CPInfoRefCount++;
pthread_mutex_unlock(&StateMutex);
return CPInfo;
}

Expand Down Expand Up @@ -1276,6 +1282,7 @@ extern CP_GlobalInfo CP_getCPInfo(CP_DP_Interface DPInfo, char *ControlModule)
doFormatRegistration(CPInfo, DPInfo);

CPInfoRefCount++;
pthread_mutex_unlock(&StateMutex);
return CPInfo;
}

Expand Down
14 changes: 0 additions & 14 deletions source/adios2/toolkit/sst/cp/cp_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -318,20 +318,6 @@ static void RemoveQueueEntries(SstStream Stream)
}
}

static void ReleaseAndDiscardRemainingTimesteps(SstStream Stream)
{
CPTimestepList List = Stream->QueuedTimesteps;

while (List)
{
List->Expired = 1;
List->PreciousTimestep = 0;
List->ReferenceCount = 0;
List = List->Next;
}
RemoveQueueEntries(Stream);
}

/*
Queue maintenance: (ASSUME LOCKED)
calculate smallest entry for CurrentTimestep in a reader. Update that
Expand Down
15 changes: 12 additions & 3 deletions source/adios2/toolkit/sst/dp/evpath_dp.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@
#include "adios2/toolkit/profiling/taustubs/taustubs.h"
#include "dp_interface.h"

#if defined(__has_feature)
#if __has_feature(thread_sanitizer)
#define NO_SANITIZE_THREAD __attribute__((no_sanitize("thread")))
#endif
#endif

#ifndef NO_SANITIZE_THREAD
#define NO_SANITIZE_THREAD
#endif

/*
* Some conventions:
* `RS` indicates a reader-side item.
Expand Down Expand Up @@ -1481,15 +1491,14 @@ static int EvpathGetPriority(CP_Services Svcs, void *CP_Stream,
struct _SstParams *Params)
{
// Define any unique attributes here
(void)attr_atom_from_string("EVPATH_DP_Attr");
// (void)attr_atom_from_string("EVPATH_DP_Attr");

/* The evpath DP should be a lower priority than any RDMA dp, so return 1 */
return 1;
}

extern CP_DP_Interface LoadEVpathDP()
extern NO_SANITIZE_THREAD CP_DP_Interface LoadEVpathDP()
{
memset(&evpathDPInterface, 0, sizeof(evpathDPInterface));
evpathDPInterface.ReaderContactFormats = EvpathReaderContactStructs;
evpathDPInterface.WriterContactFormats = EvpathWriterContactStructs;
evpathDPInterface.TimestepInfoFormats = NULL; // EvpathTimestepInfoStructs;
Expand Down
3 changes: 3 additions & 0 deletions testing/adios2/engine/staging-common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ gtest_add_tests_helper(StagingMPMD MPI_ONLY "" Engine.Staging. ".InSituMPI" EXTR
if(ADIOS2_HAVE_SST)
gtest_add_tests_helper(StagingMPMD MPI_ONLY "" Engine.Staging. ".SST.FFS" EXTRA_ARGS "SST" "MarshalMethod=FFS")
gtest_add_tests_helper(StagingMPMD MPI_ONLY "" Engine.Staging. ".SST.BP" EXTRA_ARGS "SST" "MarshalMethod=BP")
gtest_add_tests_helper(Threads MPI_NONE "" Engine.Staging. "SST.FFS" EXTRA_ARGS "SST" "MarshalMethod=FFS")
gtest_add_tests_helper(Threads MPI_NONE "" Engine.Staging. "SST.BP" EXTRA_ARGS "SST" "MarshalMethod=BP")
# gtest_add_tests_helper(Threads MPI_NONE "" Engine.Staging. "BP4_stream" EXTRA_ARGS "BP4" "")
endif()

foreach(helper
Expand Down
98 changes: 98 additions & 0 deletions testing/adios2/engine/staging-common/TestThreads.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#include <adios2.h>
#include <array>
#include <condition_variable>
#include <future>
#include <iostream>
#include <numeric>
#include <thread>

#include <gtest/gtest.h>

#include "ParseArgs.h"

using dt = long long;

int failure = 0;

void Read()
{
adios2::ADIOS adios;
adios2::IO io = adios.DeclareIO("IO");
io.SetEngine("sst");

io.SetEngine(engine);
io.SetParameters(engineParams);

adios2::Engine Reader = io.Open("communicate", adios2::Mode::Read);

std::array<dt, 100000> ar;

auto status = Reader.BeginStep();
if (status == adios2::StepStatus::EndOfStream)
{
return;
}

adios2::Variable<dt> var = io.InquireVariable<dt>("data");
Reader.Get(var, ar.begin());
Reader.EndStep();
dt expect = 0;
for (auto &val : ar)
{
if (val != expect)
{
failure++;
break;
}
expect++;
}

Reader.Close();
}

void Write()
{
adios2::ADIOS adios;
adios2::IO io = adios.DeclareIO("IO");
io.SetEngine(engine);
io.SetParameters(engineParams);
io.SetEngine("sst");
adios2::Engine Writer = io.Open("communicate", adios2::Mode::Write);

auto var =
io.DefineVariable<dt>("data", adios2::Dims{10000, 10},
adios2::Dims{0, 0}, adios2::Dims{10000, 10});

std::array<dt, 100000> ar;

std::iota(ar.begin(), ar.end(), 0);

Writer.BeginStep();
Writer.Put<dt>(var, ar.begin());
Writer.EndStep();
Writer.Close();
}

class TestThreads : public ::testing::Test
{
public:
TestThreads() = default;
};

TEST_F(TestThreads, Basic)
{
auto read_fut = std::async(std::launch::async, Read);
auto write_fut = std::async(std::launch::async, Write);
read_fut.wait();
write_fut.wait();
}

int main(int argc, char **argv)
{
::testing::InitGoogleTest(&argc, argv);
ParseArgs(argc, argv);

int result;
result = RUN_ALL_TESTS();
return result;
}

0 comments on commit 976241d

Please sign in to comment.