Skip to content

Commit

Permalink
Merge pull request #2392 from NAThompson/identify_via_mode
Browse files Browse the repository at this point in the history
Identify via mode
  • Loading branch information
pnorbert authored Jul 29, 2020
2 parents da37d50 + dac365e commit b79bb92
Show file tree
Hide file tree
Showing 10 changed files with 276 additions and 239 deletions.
35 changes: 15 additions & 20 deletions docs/user_guide/source/engines/inline.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,44 @@
Inline for zero-copy
********************

The Inline engine provides in-process communication between the writer and reader, and seeks to avoid copying data buffers.
The ``Inline`` engine provides in-process communication between the writer and reader, and seeks to avoid copying data buffers.

This engine is experimental, and is focused on the N -> N case: N writers share a process with N readers, and the analysis happens 'inline' without writing the data to a file or copying to another buffer. It has similar considerations to the streaming SST engine, since analysis must happen per step.
This engine is focused on the N -> N case: N writers share a process with N readers, and the analysis happens 'inline' without writing the data to a file or copying to another buffer.
It has similar considerations to the streaming SST engine, since analysis must happen per step.

To use this engine, you can either specify it in your XML config file, with tag ``<engine type=Inline>`` or set it in your application code:

.. code-block:: c++

adios2::IO inlineIO = adios.DeclareIO("ioName");
inlineIO.SetEngine("Inline");
inlineIO.SetParameters({{"writerID", "inline_write"}, {"readerID", "inline_read"}});
adios2::Engine inlineWriter = inlineIO.Open("inline_write", adios2::Mode::Write);
adios2::Engine inlineReader = inlineIO.Open("inline_read", adios2::Mode::Read);

Notice that unlike other engines, the reader and writer share an IO instance. Also, the ``writerID`` parameter allows the reader to connect to the writer, and ``readerID`` allows writer to connect to the reader. Both the writer and reader must be opened before either tries to call BeginStep/PerformPuts/PeformGets.
Notice that unlike other engines, the reader and writer share an IO instance.
Both the writer and reader must be opened before either tries to call ``BeginStep()``/``PerformPuts()``/``PerformGets()``.
There must be exactly one writer, and exactly one reader.

For successful operation, the writer will perform a step, then the reader will perform a step in the same process. Data is decomposed between processes, and the writer can write its portion of the data like other ADIOS engines. When the reader starts its step, the only data it has available is that written by the writer in its process. To select this data in ADIOS, use a block selection. The reader then can retrieve whatever data was written by the writer. The reader does require the use of a new ``Get()`` call that was added to the API:
For successful operation, the writer will perform a step, then the reader will perform a step in the same process.
Data is decomposed between processes, and the writer can write its portion of the data like other ADIOS2 engines.
When the reader starts its step, the only data it has available is that written by the writer in its process.
To select this data in ADIOS2, use a block selection.
The reader then can retrieve whatever data was written by the writer.
The reader requires the use of a new ``Get()`` call that was added to the API:

.. code-block:: c++

void Engine::Get<T>( \
Variable<T>, typename Variable<T>::Info & info, const Mode);

This version of ``Get`` is only used for the inline engine and requires passing a ``Variable<T>::Info`` object, which can be obtained from calling the reader's ``BlocksInfo()``. See the example below for details.
This version of ``Get`` is only used for the inline engine and requires passing a ``Variable<T>::Info`` object, which can be obtained from calling the reader's ``BlocksInfo()``.
See the example below for details.

.. note::
This ``Get()`` method is preliminary and may be removed in the future when the span interface on the read side becomes available.

.. note::
The inline engine does not support Sync mode for writing. In addition, since the inline engine does not do any data copy, the writer should avoid changing the data contents before the reader has read the data.
The inline engine does not support ``Sync`` mode for writing. In addition, since the inline engine does not do any data copy, the writer should avoid changing the data contents before the reader has read the data.

Typical access pattern:

Expand Down Expand Up @@ -59,16 +67,3 @@ Typical access pattern:
// use info.Data() to get the pointer for each element in blocksInfo

// After any desired analysis is finished, writer can now reuse data pointer


Parameters:

1. **writerID**: Match the string passed to the ``IO::Open()`` call when creating the writer. The reader uses this parameter to fetch the correct writer.
2. **readerID**: Match the string passed to the ``IO::Open()`` call when creating the reader. The writer uses this parameter to fetch the correct reader.

=========== ===================== ===============================
**Key** **Value Type** **Default** and Examples
=========== ===================== ===============================
writerID string none, match the writer name
readerID string none, match the reader name
=========== ===================== ===============================
264 changes: 109 additions & 155 deletions examples/hello/inlineReaderWriter/helloInlineReaderWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,84 +22,61 @@
void DoAnalysis(adios2::IO &inlineIO, adios2::Engine &inlineReader, int rank,
unsigned int step)
{
try
{
inlineReader.BeginStep();
/////////////////////READ
adios2::Variable<float> inlineFloats000 =
inlineIO.InquireVariable<float>("inlineFloats000");
inlineReader.BeginStep();
/////////////////////READ
adios2::Variable<float> inlineFloats000 =
inlineIO.InquireVariable<float>("inlineFloats000");

adios2::Variable<std::string> inlineString =
inlineIO.InquireVariable<std::string>("inlineString");
adios2::Variable<std::string> inlineString =
inlineIO.InquireVariable<std::string>("inlineString");

if (inlineFloats000)
{
auto blocksInfo = inlineReader.BlocksInfo(inlineFloats000, step);

std::cout << "Data StepsStart " << inlineFloats000.StepsStart()
<< " from rank " << rank << ": ";
for (auto &info : blocksInfo)
{
// bp file reader would see all blocks, inline only sees local
// writer's block(s).
size_t myBlock = info.BlockID;
inlineFloats000.SetBlockSelection(myBlock);

// info passed by reference
// engine must remember data pointer (or info) to fill it out at
// PerformGets()
inlineReader.Get<float>(inlineFloats000, info,
adios2::Mode::Deferred);
}
inlineReader.PerformGets();
if (inlineFloats000)
{
auto blocksInfo = inlineReader.BlocksInfo(inlineFloats000, step);

for (const auto &info : blocksInfo)
{
adios2::Dims count = info.Count;
const float *vectData = info.Data();
for (size_t i = 0; i < count[0]; ++i)
{
float datum = vectData[i];
std::cout << datum << " ";
}
std::cout << "\n";
}
}
else
std::cout << "Data StepsStart " << inlineFloats000.StepsStart()
<< " from rank " << rank << ": ";
for (auto &info : blocksInfo)
{
std::cout << "Variable inlineFloats000 not found\n";
// bp file reader would see all blocks, inline only sees local
// writer's block(s).
size_t myBlock = info.BlockID;
inlineFloats000.SetBlockSelection(myBlock);

// info passed by reference
// engine must remember data pointer (or info) to fill it out at
// PerformGets()
inlineReader.Get<float>(inlineFloats000, info,
adios2::Mode::Deferred);
}
inlineReader.PerformGets();

if (inlineString && rank == 0)
for (const auto &info : blocksInfo)
{
// inlineString.SetStepSelection({step, 1});

std::string myString;
inlineReader.Get(inlineString, myString, adios2::Mode::Sync);
std::cout << "inlineString: " << myString << "\n";
adios2::Dims count = info.Count;
const float *vectData = info.Data();
for (size_t i = 0; i < count[0]; ++i)
{
float datum = vectData[i];
std::cout << datum << " ";
}
std::cout << "\n";
}
inlineReader.EndStep();
// all deferred block info are now valid - need data pointers to be
// valid, filled with data
}
catch (std::invalid_argument &e)
else
{
std::cout << "Invalid argument exception, STOPPING PROGRAM from rank "
<< rank << "\n";
std::cout << e.what() << "\n";
std::cout << "Variable inlineFloats000 not found\n";
}
catch (std::ios_base::failure &e)
{
std::cout << "IO System base failure exception, STOPPING PROGRAM "
"from rank "
<< rank << "\n";
std::cout << e.what() << "\n";
}
catch (std::exception &e)

if (inlineString && rank == 0)
{
std::cout << "Exception, STOPPING PROGRAM from rank " << rank << "\n";
std::cout << e.what() << "\n";
std::string myString;
inlineReader.Get(inlineString, myString, adios2::Mode::Sync);
std::cout << "inlineString: " << myString << "\n";
}
inlineReader.EndStep();
// all deferred block info are now valid - need data pointers to be
// valid, filled with data
}

int main(int argc, char *argv[])
Expand All @@ -109,6 +86,9 @@ int main(int argc, char *argv[])
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
adios2::ADIOS adios(MPI_COMM_WORLD);
#else
adios2::ADIOS adios;
#endif

// Application variable
Expand All @@ -117,114 +97,88 @@ int main(int argc, char *argv[])

try
{
#if ADIOS2_USE_MPI
/** ADIOS class factory of IO class objects */
adios2::ADIOS adios(MPI_COMM_WORLD);
#else
adios2::ADIOS adios;
#endif
/*** IO class object: settings and factory of Settings: Variables,
* Parameters, Transports, and Execution: Engines
* Inline uses single IO for write/read */
// Inline uses single IO for write/read
adios2::IO inlineIO = adios.DeclareIO("InlineReadWrite");
/// WRITE
{
inlineIO.SetEngine("Inline");
inlineIO.SetParameters({{"verbose", "4"},
{"writerID", "myWriteID"},
{"readerID", "myReadID"}});
inlineIO.SetEngine("Inline");
inlineIO.SetParameter("verbose", "4");

/** global array: name, { shape (total dimensions) }, { start
* (local) },
* { count (local) }, all are constant dimensions */
const unsigned int variablesSize = 10;
std::vector<adios2::Variable<float>> inlineFloats(variablesSize);
/** global array: name, { shape (total dimensions) }, { start
* (local) },
* { count (local) }, all are constant dimensions */
const unsigned int variablesSize = 10;
std::vector<adios2::Variable<float>> inlineFloats(variablesSize);

adios2::Variable<std::string> inlineString =
inlineIO.DefineVariable<std::string>("inlineString");
adios2::Variable<std::string> inlineString =
inlineIO.DefineVariable<std::string>("inlineString");

for (unsigned int v = 0; v < variablesSize; ++v)
for (unsigned int v = 0; v < variablesSize; ++v)
{
std::string namev("inlineFloats");
if (v < 10)
{
namev += "00";
}
else if (v < 100)
{
std::string namev("inlineFloats");
if (v < 10)
{
namev += "00";
}
else if (v < 100)
{
namev += "0";
}
namev += std::to_string(v);

inlineFloats[v] = inlineIO.DefineVariable<float>(
namev, {size * Nx}, {rank * Nx}, {Nx},
adios2::ConstantDims);
namev += "0";
}
namev += std::to_string(v);

/** global single value variable: name */
adios2::Variable<unsigned int> inlineTimeStep =
inlineIO.DefineVariable<unsigned int>("timeStep");
inlineFloats[v] = inlineIO.DefineVariable<float>(
namev, {size * Nx}, {rank * Nx}, {Nx}, adios2::ConstantDims);
}

/** global single value variable: name */
adios2::Variable<unsigned int> inlineTimeStep =
inlineIO.DefineVariable<unsigned int>("timeStep");

/** Engine derived class, spawned to start IO operations */
adios2::Engine inlineWriter =
inlineIO.Open("myWriteID", adios2::Mode::Write);
adios2::Engine inlineWriter =
inlineIO.Open("myWriteID", adios2::Mode::Write);

adios2::Engine inlineReader =
inlineIO.Open("myReadID", adios2::Mode::Read);
adios2::Engine inlineReader =
inlineIO.Open("myReadID", adios2::Mode::Read);

for (unsigned int timeStep = 0; timeStep < 3; ++timeStep)
for (unsigned int timeStep = 0; timeStep < 3; ++timeStep)
{
inlineWriter.BeginStep();
if (rank == 0) // global single value, only saved by rank 0
{
inlineWriter.BeginStep();
if (rank == 0) // global single value, only saved by rank 0
{
inlineWriter.Put<unsigned int>(inlineTimeStep, timeStep);
}

// template type is optional, but recommended
for (unsigned int v = 0; v < variablesSize; ++v)
{
// Note: Put is deferred, so all variables will see v == 9
// and myFloats[0] == 9, 10, or 11
myFloats[rank] = static_cast<float>(v + timeStep + rank);
inlineWriter.Put(inlineFloats[v], myFloats.data());
}

const std::string myString(
"Hello from rank: " + std::to_string(rank) +
" and timestep: " + std::to_string(timeStep));

if (rank == 0)
{
inlineWriter.Put(inlineString, myString);
}

inlineWriter.EndStep();

DoAnalysis(inlineIO, inlineReader, rank, timeStep);
inlineWriter.Put<unsigned int>(inlineTimeStep, timeStep);
}

inlineWriter.Close();
inlineReader.Close();
// template type is optional, but recommended
for (unsigned int v = 0; v < variablesSize; ++v)
{
// Note: Put is deferred, so all variables will see v == 9
// and myFloats[0] == 9, 10, or 11
myFloats[rank] = static_cast<float>(v + timeStep + rank);
inlineWriter.Put(inlineFloats[v], myFloats.data());
}

const std::string myString(
"Hello from rank: " + std::to_string(rank) +
" and timestep: " + std::to_string(timeStep));

if (rank == 0)
{
inlineWriter.Put(inlineString, myString);
}

inlineWriter.EndStep();

DoAnalysis(inlineIO, inlineReader, rank, timeStep);
}
// MPI_Barrier(MPI_COMM_WORLD);
}
catch (std::invalid_argument &e)
{
std::cout << "Invalid argument exception, STOPPING PROGRAM from rank "
<< rank << "\n";
std::cout << e.what() << "\n";
}
catch (std::ios_base::failure &e)
catch (std::exception const &e)
{
std::cout << "IO System base failure exception, STOPPING PROGRAM "
"from rank "
<< rank << "\n";
std::cout << e.what() << "\n";
}
catch (std::exception &e)
{
std::cout << "Exception, STOPPING PROGRAM from rank " << rank << "\n";
std::cout << "Caught exception from rank " << rank << "\n";
std::cout << e.what() << "\n";
#if ADIOS2_USE_MPI
return MPI_Abort(MPI_COMM_WORLD, 1);
#else
return 1;
#endif
}

#if ADIOS2_USE_MPI
Expand Down
Loading

0 comments on commit b79bb92

Please sign in to comment.