Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mutable Attributes: Allow redefining attributes. BP4 stream reading will provide the late… #2827

Merged
merged 4 commits into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions source/adios2/core/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ void Engine::Init() {}
void Engine::InitParameters() {}
void Engine::InitTransports() {}

void Engine::NotifyEngineAttribute(std::string name, DataType type) noexcept {}

// DoPut*
#define declare_type(T) \
void Engine::DoPut(Variable<T> &, typename Variable<T>::Span &, \
Expand Down
5 changes: 5 additions & 0 deletions source/adios2/core/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,11 @@ class Engine
return nullptr;
}

/** Notify the engine when a new attribute is defined. Called from IO.tcc
*/
virtual void NotifyEngineAttribute(std::string name,
DataType type) noexcept;

protected:
/** from ADIOS class passed to Engine created with Open
* if no communicator is passed */
Expand Down
64 changes: 39 additions & 25 deletions source/adios2/core/IO.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "adios2/helper/adiosFunctions.h"
#include "adios2/helper/adiosType.h"
#include <adios2-perfstubs-interface.h>
#include <adios2/core/Engine.h>

namespace adios2
{
Expand Down Expand Up @@ -117,25 +118,31 @@ Attribute<T> &IO::DefineAttribute(const std::string &name, const T &value,
auto itExistingAttribute = m_Attributes.find(globalName);
if (itExistingAttribute != m_Attributes.end())
{
if (helper::ValueToString(value) ==
if (helper::ValueToString(value) !=
itExistingAttribute->second->GetInfo()["Value"])
{
return static_cast<Attribute<T> &>(*itExistingAttribute->second);
itExistingAttribute->second = std::unique_ptr<AttributeBase>(
new Attribute<T>(globalName, value));
for (auto &e : m_Engines)
{
e.second->NotifyEngineAttribute(
globalName, itExistingAttribute->second->m_Type);
}
}
else
return static_cast<Attribute<T> &>(*itExistingAttribute->second);
}
else
{
auto itAttributePair = m_Attributes.emplace(
globalName, std::unique_ptr<AttributeBase>(
new Attribute<T>(globalName, value)));
for (auto &e : m_Engines)
{
throw std::invalid_argument(
"ERROR: attribute " + globalName +
" has been defined and its value cannot be changed, in call to "
"DefineAttribute\n");
e.second->NotifyEngineAttribute(
globalName, itAttributePair.first->second->m_Type);
}
return static_cast<Attribute<T> &>(*itAttributePair.first->second);
}

auto itAttributePair = m_Attributes.emplace(
globalName,
std::unique_ptr<AttributeBase>(new Attribute<T>(globalName, value)));

return static_cast<Attribute<T> &>(*itAttributePair.first->second);
}

template <class T>
Expand Down Expand Up @@ -165,23 +172,30 @@ Attribute<T> &IO::DefineAttribute(const std::string &name, const T *array,
helper::VectorToCSV(std::vector<T>(array, array + elements)) +
" }");

if (itExistingAttribute->second->GetInfo()["Value"] == arrayValues)
if (itExistingAttribute->second->GetInfo()["Value"] != arrayValues)
{
return static_cast<Attribute<T> &>(*itExistingAttribute->second);
itExistingAttribute->second = std::unique_ptr<AttributeBase>(
new Attribute<T>(globalName, array, elements));
for (auto &e : m_Engines)
{
e.second->NotifyEngineAttribute(
globalName, itExistingAttribute->second->m_Type);
}
}
else
return static_cast<Attribute<T> &>(*itExistingAttribute->second);
}
else
{
auto itAttributePair = m_Attributes.emplace(
globalName, std::unique_ptr<AttributeBase>(
new Attribute<T>(globalName, array, elements)));
for (auto &e : m_Engines)
{
throw std::invalid_argument(
"ERROR: attribute " + globalName +
" has been defined and its value cannot be changed, in call to "
"DefineAttribute\n");
e.second->NotifyEngineAttribute(
globalName, itAttributePair.first->second->m_Type);
}
return static_cast<Attribute<T> &>(*itAttributePair.first->second);
}

auto itAttributePair = m_Attributes.emplace(
globalName, std::unique_ptr<AttributeBase>(
new Attribute<T>(globalName, array, elements)));
return static_cast<Attribute<T> &>(*itAttributePair.first->second);
}

template <class T>
Expand Down
5 changes: 5 additions & 0 deletions source/adios2/engine/bp4/BP4Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,11 @@ size_t BP4Writer::DebugGetDataBufferSize() const
return m_BP4Serializer.DebugGetDataBufferSize();
}

void BP4Writer::NotifyEngineAttribute(std::string name, DataType type) noexcept
{
m_BP4Serializer.m_SerializedAttributes.erase(name);
}

} // end namespace engine
} // end namespace core
} // end namespace adios2
2 changes: 2 additions & 0 deletions source/adios2/engine/bp4/BP4Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ class BP4Writer : public core::Engine

template <class T>
void PerformPutCommon(Variable<T> &variable);

void NotifyEngineAttribute(std::string name, DataType type) noexcept;
};

} // end namespace engine
Expand Down
176 changes: 175 additions & 1 deletion testing/adios2/engine/bp/TestBPWriteReadAttributes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,15 @@ TEST_F(BPWriteReadAttributes, WriteReadStreamVar)

adios2::IO io = adios.DeclareIO("TestIO");

if (!engineName.empty())
{
io.SetEngine(engineName);
}
else
{
io.SetEngine("FileStream");
}

auto var1 = io.DefineVariable<int32_t>("var1");
auto var2 = io.DefineVariable<int32_t>("var2", shape, start, count);

Expand Down Expand Up @@ -1021,6 +1030,14 @@ TEST_F(BPWriteReadAttributes, WriteReadStreamVar)
};

adios2::IO io = adios.DeclareIO("ReaderIO");
if (!engineName.empty())
{
io.SetEngine(engineName);
}
else
{
io.SetEngine("FileStream");
}
adios2::Engine bpReader = io.Open(fName, adios2::Mode::Read);

while (bpReader.BeginStep() == adios2::StepStatus::OK)
Expand All @@ -1035,14 +1052,171 @@ TEST_F(BPWriteReadAttributes, WriteReadStreamVar)
auto var2 = io.InquireVariable<int32_t>("var2");
if (var2)
{
lf_VerifyAttributes("var1", separator, io, false);
lf_VerifyAttributes("var2", separator, io, false);
lf_VerifyAttributes("var2", separator, io, true);
}

bpReader.EndStep();
}
}
}

TEST_F(BPWriteReadAttributes, WriteReadStreamMutable)
{
const std::string fName = "foo" + std::string(&adios2::PathSeparator, 1) +
"AttributesWriteReadMutable.bp";

const std::string separator = "\\";

int mpiRank = 0, mpiSize = 1;
// Number of rows
const size_t Nx = 8;

// Number of steps
const size_t NSteps = 3;

#if ADIOS2_USE_MPI
MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank);
MPI_Comm_size(MPI_COMM_WORLD, &mpiSize);
#endif

const double d3[3] = {-1.1, -1.2, -1.3};
SmallTestData currentTestData =
generateNewSmallTestData(m_TestData, 0, 0, 0);

// Write test data using BP
#if ADIOS2_USE_MPI
adios2::ADIOS adios(MPI_COMM_WORLD);
#else
adios2::ADIOS adios;
#endif
{
const adios2::Dims shape{static_cast<size_t>(Nx * mpiSize)};
const adios2::Dims start{static_cast<size_t>(Nx * mpiRank)};
const adios2::Dims count{Nx};

adios2::IO io = adios.DeclareIO("TestIO");
if (!engineName.empty())
{
io.SetEngine(engineName);
}
else
{
io.SetEngine("FileStream");
}

auto var1 = io.DefineVariable<int32_t>("var1");
auto var2 = io.DefineVariable<int32_t>("var2", shape, start, count);

io.DefineAttribute<double>("dArray", d3, 3, var1.Name(), separator);
io.DefineAttribute<double>("dArray", d3, 3, var2.Name(), separator);

io.DefineAttribute<int32_t>("i32Value", -1, var1.Name(), separator);
io.DefineAttribute<int32_t>("i32Value", -1, var2.Name(), separator);

adios2::Engine bpWriter = io.Open(fName, adios2::Mode::Write);

for (size_t step = 0; step < NSteps; ++step)
{
// Generate test data for each process uniquely
SmallTestData currentTestData = generateNewSmallTestData(
m_TestData, static_cast<int>(step), mpiRank, mpiSize);

const int32_t step32 = static_cast<int32_t>(step);
const double stepD = static_cast<double>(step);
double d[3] = {stepD + 0.1, stepD + 0.2, stepD + 0.3};

bpWriter.BeginStep();

io.DefineAttribute<double>("dArray", d, 3, var1.Name(), separator);
io.DefineAttribute<uint32_t>("i32Value", step32, var1.Name(),
separator);
bpWriter.Put(var1, step32);

if (step % 2 == 0)
{
bpWriter.Put(var2, currentTestData.I32.data());
io.DefineAttribute<double>("dArray", d, 3, var2.Name(),
separator);
io.DefineAttribute<uint32_t>("i32Value", step32, var2.Name(),
separator);
}

bpWriter.EndStep();
}
bpWriter.Close();
}

// reader
{
auto lf_VerifyAttributes = [](const int32_t step,
const std::string &variableName,
const std::string separator,
adios2::IO &io) {
const std::map<std::string, adios2::Params> attributesInfo =
io.AvailableAttributes(variableName, separator, false);

const double stepD = static_cast<double>(step);
const double d[3] = {stepD + 0.1, stepD + 0.2, stepD + 0.3};

auto itDArray = attributesInfo.find("dArray");
EXPECT_NE(itDArray, attributesInfo.end());
EXPECT_EQ(itDArray->second.at("Type"), "double");
EXPECT_EQ(itDArray->second.at("Elements"), "3");

auto a =
io.InquireAttribute<double>("dArray", variableName, separator);
auto adata = a.Data();
for (int i = 0; i < 3; ++i)
{
EXPECT_EQ(adata[i], d[i]);
}

const std::string stepS = std::to_string(step);
auto itU32Value = attributesInfo.find("i32Value");
EXPECT_NE(itU32Value, attributesInfo.end());
EXPECT_EQ(itU32Value->second.at("Type"), "uint32_t");
EXPECT_EQ(itU32Value->second.at("Elements"), "1");
EXPECT_EQ(itU32Value->second.at("Value"), stepS);
};

adios2::IO io = adios.DeclareIO("ReaderIO");
if (!engineName.empty())
{
io.SetEngine(engineName);
io.SetParameter("StreamReader", "ON");
}
else
{
io.SetEngine("FileStream");
}
adios2::Engine bpReader = io.Open(fName, adios2::Mode::Read);

while (bpReader.BeginStep() == adios2::StepStatus::OK)
{
int32_t step = static_cast<int32_t>(bpReader.CurrentStep());
if (engineName == "BP3")
{
// BP3 does not support changing attributes
step = 0;
}
auto var1 = io.InquireVariable<int32_t>("var1");
if (var1)
{
lf_VerifyAttributes(step, "var1", separator, io);
}

auto var2 = io.InquireVariable<int32_t>("var2");
if (var2)
{
lf_VerifyAttributes(step, "var2", separator, io);
}

bpReader.EndStep();
}
}
}

//******************************************************************************
// main
//******************************************************************************
Expand Down