Skip to content

Commit

Permalink
Make adios2_remote_server multithreaded and compress responses using … (
Browse files Browse the repository at this point in the history
#4407)

* Make adios2_remote_server multithreaded and compress responses using MGARD or ZFP (preference in this order) when an accuracy is set by user.
- added "-t nThreads" option, default is 8
- Use threads to compress responsesi and to return data to client. Adios Get() is still done in main thread because that is not thread-safe.

updates to Campaign engine:
- Call PerformGets() in Campaign engine
- Pass accuracy between CampaignReader and BP5 variables

update to Python API
- added python bindings.Accuracy struct, and SetAccuracy/GetAccuracy to bindings, and variable.set_accuracy() and variable.get_accuracy to python API.

* set response operator type in remote server for file Read Requests as well.
black python
  • Loading branch information
pnorbert authored Dec 6, 2024
1 parent c03622a commit e74f184
Show file tree
Hide file tree
Showing 19 changed files with 407 additions and 69 deletions.
18 changes: 18 additions & 0 deletions bindings/Python/py11Variable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,24 @@ size_t Variable::SelectionSize() const
return size;
}

void Variable::SetAccuracy(const adios2::Accuracy &a)
{
helper::CheckForNullptr(m_VariableBase, "in call to Variable::SetAccuracy");
m_VariableBase->SetAccuracy(a);
}

adios2::Accuracy Variable::GetAccuracy() const
{
helper::CheckForNullptr(m_VariableBase, "in call to Variable::GetAccuracy");
return m_VariableBase->GetAccuracy();
}

adios2::Accuracy Variable::GetAccuracyRequested() const
{
helper::CheckForNullptr(m_VariableBase, "in call to Variable::GetAccuracyRequested");
return m_VariableBase->GetAccuracyRequested();
}

size_t Variable::AddOperation(const Operator op, const Params &parameters)
{
helper::CheckForNullptr(m_VariableBase, "in call to Variable::AddOperation");
Expand Down
4 changes: 4 additions & 0 deletions bindings/Python/py11Variable.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class Variable

size_t SelectionSize() const;

void SetAccuracy(const adios2::Accuracy &a);
adios2::Accuracy GetAccuracy() const;
adios2::Accuracy GetAccuracyRequested() const;

std::string Name() const;

std::string Type() const;
Expand Down
18 changes: 18 additions & 0 deletions bindings/Python/py11glue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ PYBIND11_MODULE(ADIOS2_PYTHON_MODULE_NAME, m)
m.attr("is_built_with_mpi") = false;
#endif

m.attr("L2_norm") = adios2::L2_norm;
m.attr("Linf_norm") = adios2::Linf_norm;

// enum classes
pybind11::enum_<adios2::Mode>(m, "Mode")
.value("Write", adios2::Mode::Write)
Expand Down Expand Up @@ -131,6 +134,18 @@ PYBIND11_MODULE(ADIOS2_PYTHON_MODULE_NAME, m)
.value("StoreData", adios2::DerivedVarType::StoreData)
.export_values();

pybind11::class_<adios2::Accuracy>(m, "Accuracy")
.def(pybind11::init<double, double, bool>())
.def_readwrite("error", &adios2::Accuracy::error)
.def_readwrite("norm", &adios2::Accuracy::norm)
.def_readwrite("relative", &adios2::Accuracy::relative)

.def("__repr__", [](const adios2::Accuracy &self) {
std::ostringstream _stream;
_stream << "(" << self.error << ", " << self.norm << ", " << self.relative << ")";
return _stream.str();
});

pybind11::class_<adios2::py11::ADIOS>(m, "ADIOS")
// Python 2
.def("__nonzero__",
Expand Down Expand Up @@ -373,6 +388,9 @@ PYBIND11_MODULE(ADIOS2_PYTHON_MODULE_NAME, m)
.def("SetBlockSelection", &adios2::py11::Variable::SetBlockSelection)
.def("SetSelection", &adios2::py11::Variable::SetSelection)
.def("SetStepSelection", &adios2::py11::Variable::SetStepSelection)
.def("SetAccuracy", &adios2::py11::Variable::SetAccuracy)
.def("GetAccuracy", &adios2::py11::Variable::GetAccuracy)
.def("GetAccuracyRequested", &adios2::py11::Variable::GetAccuracyRequested)
.def("SelectionSize", &adios2::py11::Variable::SelectionSize)
.def("Name", &adios2::py11::Variable::Name)
.def("Type", &adios2::py11::Variable::Type)
Expand Down
23 changes: 23 additions & 0 deletions python/adios2/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
accompanying file Copyright.txt for details.
"""

from adios2 import bindings


class Variable:
"""High level representation of the Variable class in the adios2.bindings"""
Expand Down Expand Up @@ -88,6 +90,18 @@ def set_step_selection(self, step_selection):
"""
self.impl.SetStepSelection(step_selection)

def set_accuracy(self, error, norm, relative):
"""
Set Accuracy for (remote) reading for this variable
Args:
error: floating point value
norm: floating point value
relative: True or False
"""
acc = bindings.Accuracy(error, norm, relative)
self.impl.SetAccuracy(acc)

def shape(self, step=None):
"""
Get the shape assigned to the given step for this variable.
Expand Down Expand Up @@ -177,6 +191,15 @@ def name(self):
"""
return self.impl.Name()

def get_accuracy(self):
"""
Get the accuracy of the variable (of its last read)
Returns:
adios2.bindings.Accuracy struct (with error, norm and relative fields).
"""
return self.impl.GetAccuracy()

def add_operation_string(self, name, params={}):
"""
Add an operation (operator) as a string
Expand Down
71 changes: 65 additions & 6 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -496,14 +496,14 @@ void BP5Reader::PerformRemoteGetsWithKVCache()
{
auto &Req = GetRequests[req_seq];
const DataType varType = m_IO.InquireVariableType(Req.VarName);
VariableBase *VB = m_BP5Deserializer->GetVariableBaseFromBP5VarRec(Req.VarRec);

std::string keyPrefix = m_Fingerprint + "|" + Req.VarName + std::to_string(Req.RelStep);
if (Req.BlockID != std::numeric_limits<std::size_t>::max())
{
MinVarInfo *minBlocksInfo = nullptr;
if (MinBlocksInfoMap.find(keyPrefix) == MinBlocksInfoMap.end())
{
VariableBase *VB = m_BP5Deserializer->GetVariableBaseFromBP5VarRec(Req.VarRec);
minBlocksInfo = MinBlocksInfo(*VB, Req.RelStep);
MinBlocksInfoMap[keyPrefix] = minBlocksInfo;
}
Expand Down Expand Up @@ -584,7 +584,7 @@ void BP5Reader::PerformRemoteGetsWithKVCache()
box.StartToVector(start);
box.CountToVector(count);
auto handle = m_Remote->Get(Req.VarName, Req.RelStep, Req.BlockID, count, start,
ReqInfo.Data);
VB->m_AccuracyRequested, ReqInfo.Data);
handles.push_back(handle);
remoteRequestsInfo.push_back(ReqInfo);
}
Expand Down Expand Up @@ -656,13 +656,72 @@ void BP5Reader::PerformRemoteGets()
std::vector<Remote::GetHandle> handles;
for (auto &Req : GetRequests)
{
auto handle =
m_Remote->Get(Req.VarName, Req.RelStep, Req.BlockID, Req.Count, Req.Start, Req.Data);
VariableBase *VB = m_BP5Deserializer->GetVariableBaseFromBP5VarRec(Req.VarRec);
auto handle = m_Remote->Get(Req.VarName, Req.RelStep, Req.BlockID, Req.Count, Req.Start,
VB->m_AccuracyRequested, Req.Data);
handles.push_back(handle);
}
for (auto &handle : handles)

size_t nHandles = handles.size();
// TP endGenerate = NOW();
// double generateTime = DURATION(startGenerate, endGenerate);

size_t nextHandle = 0;
std::mutex mutexReadRequests;

auto lf_GetNextHandle = [&]() -> size_t {
std::lock_guard<std::mutex> lockGuard(mutexReadRequests);
size_t reqidx = MaxSizeT;
if (nextHandle < nHandles)
{
reqidx = nextHandle;
++nextHandle;
}
return reqidx;
};

auto lf_WaitForGet = [&](const size_t threadID) -> bool {
while (true)
{
const auto reqidx = lf_GetNextHandle();
if (reqidx > nHandles)
{
break;
}
m_Remote->WaitForGet(handles[reqidx]);
// std::cout << "BP5Reader::PerformRemoteGets: thread " << threadID
// << " done with response " << reqidx << std::endl;
}
return true;
};

if (m_Threads > 1 && nHandles > 1)
{
m_Remote->WaitForGet(handle);
size_t nThreads = (m_Threads < nHandles ? m_Threads : nHandles);
std::vector<std::future<bool>> futures(nThreads - 1);

// launch Threads-1 threads to process subsets of handles,
// then main thread process the last subset
for (size_t tid = 0; tid < nThreads - 1; ++tid)
{
futures[tid] = std::async(std::launch::async, lf_WaitForGet, tid + 1);
}

// main thread runs last subset of reads
lf_WaitForGet(0);

// wait for all async threads
for (auto &f : futures)
{
f.get();
}
}
else
{
for (auto &handle : handles)
{
m_Remote->WaitForGet(handle);
}
}
}

Expand Down
10 changes: 6 additions & 4 deletions source/adios2/engine/campaign/CampaignReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ void CampaignReader::PerformGets()
{
std::cout << "Campaign Reader " << m_ReaderRank << " PerformGets()\n";
}
for (auto ep : m_Engines)
{
ep->PerformGets();
}
m_NeedPerformGets = false;
}

Expand Down Expand Up @@ -451,10 +455,8 @@ std::string CampaignReader::VariableExprStr(const VariableBase &Var)
void CampaignReader::DoGetDeferred(Variable<T> &variable, T *data) \
{ \
PERFSTUBS_SCOPED_TIMER("CampaignReader::Get"); \
auto it = m_VarInternalInfo.find(variable.m_Name); \
Variable<T> *v = reinterpret_cast<Variable<T> *>(it->second.originalVar); \
Engine *e = m_Engines[it->second.engineIdx]; \
e->Get(*v, data, adios2::Mode::Deferred); \
auto p = TranslateToActualVariable(variable); \
p.second->Get(*p.first, data, adios2::Mode::Deferred); \
} \
\
std::map<size_t, std::vector<typename Variable<T>::BPInfo>> \
Expand Down
3 changes: 3 additions & 0 deletions source/adios2/engine/campaign/CampaignReader.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ inline Variable<T> CampaignReader::DuplicateVariable(Variable<T> *variable, IO &
v.m_StepsCount = variable->m_StepsCount;
v.m_Start = variable->m_Start;
v.m_Count = variable->m_Count;
v.m_AccuracyRequested = variable->m_AccuracyRequested;
v.m_AccuracyProvided = variable->m_AccuracyProvided;

v.m_Engine = this; // Variable::Shape() uses this member to call engine
vii.originalVar = static_cast<void *>(variable);
Expand Down Expand Up @@ -81,6 +83,7 @@ CampaignReader::TranslateToActualVariable(Variable<T> &variable)
v->m_MemoryStart = variable.m_MemoryStart;
v->m_MemoryCount = variable.m_MemoryCount;
v->m_MemSpace = variable.m_MemSpace;
v->m_AccuracyRequested = variable.m_AccuracyRequested;
return std::make_pair(v, e);
}

Expand Down
1 change: 0 additions & 1 deletion source/adios2/operator/compress/CompressMGARD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ size_t CompressMGARD::Operate(const char *dataIn, const Dims &blockStart, const
mgard_x::compress(mgardDim, mgardType, mgardCount, tolerance, s, errorBoundType, dataIn,
compressedData, sizeOut, config, true);
bufferOutOffset += sizeOut;

return bufferOutOffset;
}

Expand Down
2 changes: 1 addition & 1 deletion source/adios2/operator/refactor/RefactorMDR.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ RefactorMDR::RefactorMDR(const Params &parameters)
// config.block_size = 64;

config.dev_type = mgard_x::device_type::AUTO;
config.prefetch = false;
// config.prefetch = false;
// config.max_memory_footprint = max_memory_footprint;

config.lossless = mgard_x::lossless_type::Huffman_Zstd;
Expand Down
39 changes: 36 additions & 3 deletions source/adios2/toolkit/remote/EVPathRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
#include "EVPathRemote.h"
#include "Remote.h"
#include "adios2/core/ADIOS.h"
#include "adios2/core/Operator.h"
#include "adios2/helper/adiosLog.h"
#include "adios2/helper/adiosString.h"
#include "adios2/helper/adiosSystem.h"
#include "adios2/operator/OperatorFactory.h"

#ifdef _MSC_VER
#define strdup(x) _strdup(x)
#endif
Expand Down Expand Up @@ -60,7 +63,32 @@ void ReadResponseHandler(CManager cm, CMConnection conn, void *vevent, void *cli
{
EVPathRemoteCommon::ReadResponseMsg read_response_msg =
static_cast<EVPathRemoteCommon::ReadResponseMsg>(vevent);
memcpy(read_response_msg->Dest, read_response_msg->ReadData, read_response_msg->Size);

switch (read_response_msg->OperatorType)
{
case adios2::core::Operator::OperatorType::COMPRESS_MGARD: {
auto op = adios2::core::MakeOperator("mgard", {});
op->InverseOperate(read_response_msg->ReadData, read_response_msg->Size,
(char *)read_response_msg->Dest);
break;
}

case adios2::core::Operator::OperatorType::COMPRESS_ZFP: {
auto op = adios2::core::MakeOperator("zfp", {});
op->InverseOperate(read_response_msg->ReadData, read_response_msg->Size,
(char *)read_response_msg->Dest);
break;
}

case adios2::core::Operator::OperatorType::COMPRESS_NULL:
memcpy(read_response_msg->Dest, read_response_msg->ReadData, read_response_msg->Size);
break;
default:
helper::Throw<std::invalid_argument>("Remote", "EVPathRemote", "ReadResponseHandler",
"Invalid operator type " +
std::to_string(read_response_msg->OperatorType) +
" received in response");
}
CMCondition_signal(cm, read_response_msg->ReadResponseCondition);
return;
};
Expand Down Expand Up @@ -157,7 +185,7 @@ void EVPathRemote::OpenSimpleFile(const std::string hostname, const int32_t port
}

EVPathRemote::GetHandle EVPathRemote::Get(char *VarName, size_t Step, size_t BlockID, Dims &Count,
Dims &Start, void *dest)
Dims &Start, Accuracy &accuracy, void *dest)
{
EVPathRemoteCommon::_GetRequestMsg GetMsg;
memset(&GetMsg, 0, sizeof(GetMsg));
Expand All @@ -169,6 +197,9 @@ EVPathRemote::GetHandle EVPathRemote::Get(char *VarName, size_t Step, size_t Blo
GetMsg.DimCount = (int)Count.size();
GetMsg.Count = Count.data();
GetMsg.Start = Start.data();
GetMsg.Error = accuracy.error;
GetMsg.Norm = accuracy.norm;
GetMsg.Relative = accuracy.relative;
GetMsg.Dest = dest;
CMwrite(m_conn, ev_state.GetRequestFormat, &GetMsg);
return (Remote::GetHandle)(intptr_t)GetMsg.GetResponseCondition;
Expand All @@ -190,7 +221,9 @@ EVPathRemote::GetHandle EVPathRemote::Read(size_t Start, size_t Size, void *Dest

bool EVPathRemote::WaitForGet(GetHandle handle)
{
return CMCondition_wait(ev_state.cm, (int)(intptr_t)handle);
int result = CMCondition_wait(ev_state.cm, (int)(intptr_t)handle);

return result;
}
#else

Expand Down
3 changes: 2 additions & 1 deletion source/adios2/toolkit/remote/EVPathRemote.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class EVPathRemote : public Remote

void OpenSimpleFile(const std::string hostname, const int32_t port, const std::string filename);

GetHandle Get(char *VarName, size_t Step, size_t BlockID, Dims &Count, Dims &Start, void *dest);
GetHandle Get(char *VarName, size_t Step, size_t BlockID, Dims &Count, Dims &Start,
Accuracy &accuracy, void *dest);

bool WaitForGet(GetHandle handle);

Expand Down
2 changes: 1 addition & 1 deletion source/adios2/toolkit/remote/Remote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ void Remote::OpenSimpleFile(const std::string hostname, const int32_t port,
};

Remote::GetHandle Remote::Get(char *VarName, size_t Step, size_t BlockID, Dims &Count, Dims &Start,
void *dest)
Accuracy &accuracy, void *dest)
{
ThrowUp("RemoteGet");
return (Remote::GetHandle)(intptr_t)0;
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/toolkit/remote/Remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class Remote
typedef void *GetHandle;

virtual GetHandle Get(char *VarName, size_t Step, size_t BlockID, Dims &Count, Dims &Start,
void *dest);
Accuracy &accuracy, void *dest);

virtual bool WaitForGet(GetHandle handle);

Expand Down
2 changes: 1 addition & 1 deletion source/adios2/toolkit/remote/XrootdRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ bool XrootdRemote::WaitForGet(GetHandle handle)
}

Remote::GetHandle XrootdRemote::Get(char *VarName, size_t Step, size_t BlockID, Dims &Count,
Dims &Start, void *dest)
Dims &Start, Accuracy &accuracy, void *dest)
{
#ifdef ADIOS2_HAVE_XROOTD
char rName[512] = "/etc";
Expand Down
Loading

0 comments on commit e74f184

Please sign in to comment.