Skip to content

Commit

Permalink
Merge pull request #2395 from JasonRuonanWang/ssc-varying-step
Browse files Browse the repository at this point in the history
SSC: avoid repeated MPI_Get calls for blocks that have already been transferred
  • Loading branch information
JasonRuonanWang authored Jul 28, 2020
2 parents 0ef03a0 + 8dfb30b commit da37d50
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 45 deletions.
1 change: 1 addition & 0 deletions source/adios2/engine/ssc/SscReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
if (m_CurrentStep == 0 || m_WriterDefinitionsLocked == false ||
m_ReaderSelectionsLocked == false)
{
m_ReceivedRanks.clear();
m_Buffer.resize(1, 0);
m_GlobalWritePattern.clear();
m_GlobalWritePattern.resize(m_StreamSize);
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/engine/ssc/SscReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "adios2/toolkit/profiling/taustubs/tautimer.hpp"
#include <mpi.h>
#include <queue>
#include <unordered_set>

namespace adios2
{
Expand Down Expand Up @@ -53,6 +54,7 @@ class SscReader : public Engine
MPI_Comm m_StreamComm;
std::string m_MpiMode = "twosided";
std::vector<MPI_Request> m_MpiRequests;
std::unordered_set<int> m_ReceivedRanks;

int m_StreamRank;
int m_StreamSize;
Expand Down
114 changes: 69 additions & 45 deletions source/adios2/engine/ssc/SscReader.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -29,58 +29,77 @@ void SscReader::GetDeferredCommon(Variable<std::string> &variable,
{
TAU_SCOPED_TIMER_FUNC();
variable.SetData(data);
if (m_CurrentStep == 0 || m_WriterDefinitionsLocked == false ||
m_ReaderSelectionsLocked == false)

if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked)
{
m_LocalReadPattern.emplace_back();
auto &b = m_LocalReadPattern.back();
b.name = variable.m_Name;
b.count = variable.m_Count;
b.start = variable.m_Start;
b.shape = variable.m_Shape;
b.type = DataType::String;
if (m_CurrentStep == 0)
{
m_LocalReadPattern.emplace_back();
auto &b = m_LocalReadPattern.back();
b.name = variable.m_Name;
b.count = variable.m_Count;
b.start = variable.m_Start;
b.shape = variable.m_Shape;
b.type = DataType::String;

m_LocalReadPatternJson["Variables"].emplace_back();
auto &jref = m_LocalReadPatternJson["Variables"].back();
jref["Name"] = b.name;
jref["Type"] = ToString(b.type);
jref["ShapeID"] = variable.m_ShapeID;
jref["Start"] = b.start;
jref["Count"] = b.count;
jref["Shape"] = b.shape;
jref["BufferStart"] = 0;
jref["BufferCount"] = 0;
m_LocalReadPatternJson["Variables"].emplace_back();
auto &jref = m_LocalReadPatternJson["Variables"].back();
jref["Name"] = b.name;
jref["Type"] = ToString(b.type);
jref["ShapeID"] = variable.m_ShapeID;
jref["Start"] = b.start;
jref["Count"] = b.count;
jref["Shape"] = b.shape;
jref["BufferStart"] = 0;
jref["BufferCount"] = 0;

ssc::JsonToBlockVecVec(m_GlobalWritePatternJson, m_GlobalWritePattern);
m_AllReceivingWriterRanks =
ssc::CalculateOverlap(m_GlobalWritePattern, m_LocalReadPattern);
CalculatePosition(m_GlobalWritePattern, m_AllReceivingWriterRanks);
size_t totalDataSize = 0;
for (auto i : m_AllReceivingWriterRanks)
{
totalDataSize += i.second.second;
ssc::JsonToBlockVecVec(m_GlobalWritePatternJson,
m_GlobalWritePattern);
m_AllReceivingWriterRanks =
ssc::CalculateOverlap(m_GlobalWritePattern, m_LocalReadPattern);
CalculatePosition(m_GlobalWritePattern, m_AllReceivingWriterRanks);
size_t totalDataSize = 0;
for (auto i : m_AllReceivingWriterRanks)
{
totalDataSize += i.second.second;
}
m_Buffer.resize(totalDataSize);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Win_lock(MPI_LOCK_SHARED, i.first, 0, m_MpiWin);
MPI_Get(m_Buffer.data() + i.second.first, i.second.second,
MPI_CHAR, i.first, 0, i.second.second, MPI_CHAR,
m_MpiWin);
MPI_Win_unlock(i.first, m_MpiWin);
}
}
m_Buffer.resize(totalDataSize);

for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Win_lock(MPI_LOCK_SHARED, i.first, 0, m_MpiWin);
MPI_Get(m_Buffer.data() + i.second.first, i.second.second, MPI_CHAR,
i.first, 0, i.second.second, MPI_CHAR, m_MpiWin);
MPI_Win_unlock(i.first, m_MpiWin);
const auto &v = m_GlobalWritePattern[i.first];
for (const auto &b : v)
{
if (b.name == variable.m_Name)
{
std::vector<char> str(b.bufferCount);
std::memcpy(str.data(), m_Buffer.data() + b.bufferStart,
b.bufferCount);
*data = std::string(str.begin(), str.end());
}
}
}
}

for (const auto &i : m_AllReceivingWriterRanks)
else
{
const auto &v = m_GlobalWritePattern[i.first];
for (const auto &b : v)
for (const auto &i : m_AllReceivingWriterRanks)
{
if (b.name == variable.m_Name)
const auto &v = m_GlobalWritePattern[i.first];
for (const auto &b : v)
{
std::vector<char> str(b.bufferCount);
std::memcpy(str.data(), m_Buffer.data() + b.bufferStart,
b.bufferCount);
*data = std::string(str.begin(), str.end());
if (b.name == variable.m_Name)
{
*data = std::string(b.value.begin(), b.value.end());
}
}
}
}
Expand Down Expand Up @@ -147,10 +166,15 @@ void SscReader::GetDeferredCommon(Variable<T> &variable, T *data)
m_Buffer.resize(totalDataSize);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Win_lock(MPI_LOCK_SHARED, i.first, 0, m_MpiWin);
MPI_Get(m_Buffer.data() + i.second.first, i.second.second, MPI_CHAR,
i.first, 0, i.second.second, MPI_CHAR, m_MpiWin);
MPI_Win_unlock(i.first, m_MpiWin);
if (m_ReceivedRanks.find(i.first) == m_ReceivedRanks.end())
{
MPI_Win_lock(MPI_LOCK_SHARED, i.first, 0, m_MpiWin);
MPI_Get(m_Buffer.data() + i.second.first, i.second.second,
MPI_CHAR, i.first, 0, i.second.second, MPI_CHAR,
m_MpiWin);
MPI_Win_unlock(i.first, m_MpiWin);
m_ReceivedRanks.insert(i.first);
}
}
}

Expand Down

0 comments on commit da37d50

Please sign in to comment.