Skip to content

Commit

Permalink
Merge pull request #2397 from pnorbert/bp4-stream-reader-mode
Browse files Browse the repository at this point in the history
Add stream reading mode for BP4 metadata processing. Read and parse o…
  • Loading branch information
pnorbert authored Jul 29, 2020
2 parents 5a7e488 + 5d9dd06 commit 0a0a694
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 26 deletions.
3 changes: 2 additions & 1 deletion docs/user_guide/source/engines/bp4.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ This engine allows the user to fine tune the buffering operations through the fo

16. **BurstBufferVerbose**: Verbose level 1 will cause each draining thread to print a one line report at the end (to standard output) about where it has spent its time and the number of bytes moved. Verbose level 2 will cause each thread to print a line for each draining operation (file creation, copy block, write block from memory, etc).


17. **StreamReader**: By default the BP4 engine parses all available metadata in Open(). An application may turn this flag on to parse a limited number of steps at once, and update metadata when those steps have been processed. If the flag is ON, reading only works in streaming mode (using BeginStep/EndStep); file reading mode will not work as there will be zero steps processed in Open().

============================== ===================== ===========================================================
**Key** **Value Format** **Default** and Examples
Expand All @@ -98,6 +98,7 @@ This engine allows the user to fine tune the buffering operations through the fo
BurstBufferPath string **""**, /mnt/bb/norbert, /ssd
BurstBufferDrain string On/Off **On**, Off
BurstBufferVerbose integer, 0-2 **0**, ``1``, ``2``
StreamReader string On/Off On, **Off**
============================== ===================== ===========================================================


Expand Down
1 change: 1 addition & 0 deletions docs/user_guide/source/engines/virtual_engines.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ These are the actual settings in ADIOS when a virtual engine is selected. The pa
============================== ===================== ===========================================================
OpenTimeoutSecs float **3600** (wait for up to an hour)
BeginStepPollingFrequencySecs float **1** (poll the file system with 1 second frequency
StreamReader bool **On** (process metadata in streaming mode)
============================== ===================== ===========================================================

3. ``InSituAnalysis``. The engine is ``SST``. The parameters are set to:
Expand Down
1 change: 1 addition & 0 deletions source/adios2/core/IO.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ void IO::SetEngine(const std::string engineType) noexcept
{
finalEngineType = "BP4";
lf_InsertParam("OpenTimeoutSecs", "3600");
lf_InsertParam("StreamReader", "true");
}
/* "file" is handled entirely in IO::Open() as it needs the name */
else
Expand Down
124 changes: 100 additions & 24 deletions source/adios2/engine/bp4/BP4Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,24 +157,22 @@ void BP4Reader::Init()
const Seconds timeoutSeconds =
Seconds(m_BP4Deserializer.m_Parameters.OpenTimeoutSecs);

// set poll to 1/100 of timeout
Seconds pollSeconds = timeoutSeconds / 100.0;
static const auto pollSecondsMin = Seconds(1.0);
if (pollSeconds < pollSecondsMin)
{
pollSeconds = pollSecondsMin;
}
static const auto pollSecondsMax = Seconds(10.0);
if (pollSeconds > pollSecondsMax)
Seconds pollSeconds =
Seconds(m_BP4Deserializer.m_Parameters.BeginStepPollingFrequencySecs);
if (pollSeconds > timeoutSeconds)
{
pollSeconds = pollSecondsMax;
pollSeconds = timeoutSeconds;
}

TimePoint timeoutInstant =
std::chrono::steady_clock::now() + timeoutSeconds;

OpenFiles(timeoutInstant, pollSeconds, timeoutSeconds);
InitBuffer(timeoutInstant, pollSeconds / 10, timeoutSeconds);
if (!m_BP4Deserializer.m_Parameters.StreamReader)
{
/* non-stream reader gets as much steps as available now */
InitBuffer(timeoutInstant, pollSeconds / 10, timeoutSeconds);
}
}

bool BP4Reader::SleepOrQuit(const TimePoint &timeoutInstant,
Expand Down Expand Up @@ -317,6 +315,71 @@ void BP4Reader::InitTransports()
}
}

/* Count index records to minimum 1 and maximum of N records so that
* expected metadata size is less then a predetermined constant
*/
void MetadataCalculateMinFileSize(
const format::BP4Deserializer &m_BP4Deserializer,
const std::string &IdxFileName, char *buf, size_t idxsize, bool hasHeader,
const size_t mdStartPos, size_t &newIdxSize, size_t &expectedMinFileSize)
{
newIdxSize = 0;
expectedMinFileSize = 0;

if (hasHeader && idxsize < m_BP4Deserializer.m_IndexRecordSize)
{
return;
}

/* eliminate header for now for only calculating with records */
if (hasHeader)
{
buf += m_BP4Deserializer.m_IndexRecordSize;
idxsize -= m_BP4Deserializer.m_IndexRecordSize;
}

if (idxsize % m_BP4Deserializer.m_IndexRecordSize != 0)
{
throw std::runtime_error(
"FATAL CODING ERROR: ADIOS Index file " + IdxFileName +
" is assumed to always contain n*" +
std::to_string(m_BP4Deserializer.m_IndexRecordSize) +
" byte-length records. "
"Right now the length of index buffer is " +
std::to_string(idxsize) + " bytes.");
}

const size_t nTotalRecords = idxsize / m_BP4Deserializer.m_IndexRecordSize;
if (nTotalRecords == 0)
{
// no (new) step entry in the index, so no metadata is expected
newIdxSize = 0;
expectedMinFileSize = 0;
return;
}

size_t nRecords = 1;
expectedMinFileSize = *(uint64_t *)&(
buf[nRecords * m_BP4Deserializer.m_IndexRecordSize - 24]);
while (nRecords < nTotalRecords)
{
const size_t n = nRecords + 1;
const uint64_t mdEndPos =
*(uint64_t *)&(buf[n * m_BP4Deserializer.m_IndexRecordSize - 24]);
if (mdEndPos - mdStartPos > 16777216)
{
break;
}
expectedMinFileSize = mdEndPos;
++nRecords;
}
newIdxSize = nRecords * m_BP4Deserializer.m_IndexRecordSize;
if (hasHeader)
{
newIdxSize += m_BP4Deserializer.m_IndexRecordSize;
}
}

uint64_t
MetadataExpectedMinFileSize(const format::BP4Deserializer &m_BP4Deserializer,
const std::string &IdxFileName, bool hasHeader)
Expand All @@ -330,7 +393,9 @@ MetadataExpectedMinFileSize(const format::BP4Deserializer &m_BP4Deserializer,
"The file size now is " +
std::to_string(idxsize) + " bytes.");
}
if ((hasHeader && idxsize < 128) || idxsize < 64)
if ((hasHeader && idxsize < m_BP4Deserializer.m_IndexHeaderSize +
m_BP4Deserializer.m_IndexRecordSize) ||
idxsize < m_BP4Deserializer.m_IndexRecordSize)
{
// no (new) step entry in the index, so no metadata is expected
return 0;
Expand Down Expand Up @@ -450,23 +515,35 @@ size_t BP4Reader::UpdateBuffer(const TimePoint &timeoutInstant,
const size_t idxFileSize = m_MDIndexFileManager.GetFileSize(0);
if (idxFileSize > m_MDIndexFileProcessedSize)
{
const size_t newIdxSize = idxFileSize - m_MDIndexFileProcessedSize;
const size_t maxIdxSize = idxFileSize - m_MDIndexFileProcessedSize;
std::vector<char> idxbuf(maxIdxSize);
m_MDIndexFileManager.ReadFile(idxbuf.data(), maxIdxSize,
m_MDIndexFileProcessedSize);
size_t newIdxSize;
size_t expectedMinFileSize;
char *buf = idxbuf.data();

MetadataCalculateMinFileSize(
m_BP4Deserializer, m_Name, buf, maxIdxSize, !m_IdxHeaderParsed,
m_MDFileProcessedSize, newIdxSize, expectedMinFileSize);

// const uint64_t expectedMinFileSize = MetadataExpectedMinFileSize(
// m_BP4Deserializer, m_Name, !m_IdxHeaderParsed);

if (m_BP4Deserializer.m_MetadataIndex.m_Buffer.size() < newIdxSize)
{
m_BP4Deserializer.m_MetadataIndex.Resize(
newIdxSize, "re-allocating metadata index buffer, in "
"call to BP4Reader::BeginStep/UpdateBuffer");
}
m_BP4Deserializer.m_MetadataIndex.m_Position = 0;
m_MDIndexFileManager.ReadFile(
m_BP4Deserializer.m_MetadataIndex.m_Buffer.data(), newIdxSize,
m_MDIndexFileProcessedSize);
std::copy(idxbuf.begin(), idxbuf.begin() + newIdxSize,
m_BP4Deserializer.m_MetadataIndex.m_Buffer.begin());

/* Wait until as much metadata arrives in the file as much
* is indicated by the existing index entries
*/
uint64_t expectedMinFileSize = MetadataExpectedMinFileSize(
m_BP4Deserializer, m_Name, !m_IdxHeaderParsed);

size_t fileSize = 0;
do
{
Expand All @@ -486,7 +563,8 @@ size_t BP4Reader::UpdateBuffer(const TimePoint &timeoutInstant,
* the buffer now.
*/
const size_t fileSize = m_MDFileManager.GetFileSize(0);
const size_t newMDSize = fileSize - m_MDFileProcessedSize;
const size_t newMDSize =
expectedMinFileSize - m_MDFileProcessedSize;
if (m_BP4Deserializer.m_Metadata.m_Buffer.size() < newMDSize)
{
m_BP4Deserializer.m_Metadata.Resize(
Expand Down Expand Up @@ -549,18 +627,16 @@ void BP4Reader::ProcessMetadataForNewSteps(const size_t newIdxSize)
{
m_MDIndexFileProcessedSize += newIdxSize;
}
size_t idxsize = m_BP4Deserializer.m_MetadataIndex.m_Buffer.size();
uint64_t lastpos = *(uint64_t *)&(
m_BP4Deserializer.m_MetadataIndex.m_Buffer[idxsize - 24]);
}

bool BP4Reader::CheckWriterActive()
{
size_t flag = 0;
if (m_BP4Deserializer.m_RankMPI == 0)
{
std::vector<char> header(64, '\0');
m_MDIndexFileManager.ReadFile(header.data(), 64, 0, 0);
std::vector<char> header(m_BP4Deserializer.m_IndexHeaderSize, '\0');
m_MDIndexFileManager.ReadFile(
header.data(), m_BP4Deserializer.m_IndexHeaderSize, 0, 0);
bool active = m_BP4Deserializer.ReadActiveFlag(header);
flag = (active ? 1 : 0);
}
Expand Down
5 changes: 5 additions & 0 deletions source/adios2/toolkit/format/bp/BPBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ void BPBase::Init(const Params &parameters, const std::string hint,
static_cast<int>(helper::StringTo<int32_t>(
value, " in Parameter key=BurstBufferVerbose " + hint));
}
else if (key == "streamreader")
{
parsedParameters.StreamReader = helper::StringTo<bool>(
value, " in Parameter key=StreamReader " + hint);
}
}
if (!engineType.empty())
{
Expand Down
5 changes: 5 additions & 0 deletions source/adios2/toolkit/format/bp/BPBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ class BPBase
bool BurstBufferDrain = true;
/** Verbose level for burst buffer draining thread */
int BurstBufferVerbose = 0;

/** Stream reader flag: process metadata step-by-step
* instead of parsing everything available
*/
bool StreamReader = false;
};

/** Return type of the ResizeBuffer function. */
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/toolkit/format/bp/bp4/BP4Base.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class BP4Base : virtual public BPBase
BufferSTL m_MetadataIndex;

/** Positions of flags in Index Table Header that Reader uses */
static constexpr size_t m_IndexHeaderSize = 64;
static constexpr size_t m_IndexRecordSize = 64;
static constexpr size_t m_EndianFlagPosition = 36;
static constexpr size_t m_BPVersionPosition = 37;
static constexpr size_t m_ActiveFlagPosition = 38;
Expand Down
10 changes: 10 additions & 0 deletions testing/adios2/engine/bp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

set(BP3_DIR ${CMAKE_CURRENT_BINARY_DIR}/bp3)
set(BP4_DIR ${CMAKE_CURRENT_BINARY_DIR}/bp4)
set(FS_DIR ${CMAKE_CURRENT_BINARY_DIR}/filestream)
file(MAKE_DIRECTORY ${BP3_DIR})
file(MAKE_DIRECTORY ${BP4_DIR})
file(MAKE_DIRECTORY ${FS_DIR})

macro(bp3_bp4_gtest_add_tests_helper testname mpi)
gtest_add_tests_helper(${testname} ${mpi} BP Engine.BP. .BP3
Expand Down Expand Up @@ -68,3 +70,11 @@ gtest_add_tests_helper(StepsInSituLocalArray MPI_ALLOW BP Engine.BP. .BP4
WORKING_DIRECTORY ${BP4_DIR} EXTRA_ARGS "BP4"
)

# FileStream is BP4 + StreamReader=true
gtest_add_tests_helper(StepsInSituGlobalArray MPI_ALLOW BP Engine.BP. .FileStream
WORKING_DIRECTORY ${FS_DIR} EXTRA_ARGS "FileStream"
)
gtest_add_tests_helper(StepsInSituLocalArray MPI_ALLOW BP Engine.BP. .FileStream
WORKING_DIRECTORY ${FS_DIR} EXTRA_ARGS "FileStream"
)

34 changes: 33 additions & 1 deletion testing/adios2/engine/staging-common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ if(ADIOS2_HAVE_SST)
gtest_add_tests_helper(Threads MPI_NONE "" Engine.Staging. ".SST.FFS" EXTRA_ARGS "SST" "MarshalMethod=FFS")
gtest_add_tests_helper(Threads MPI_NONE "" Engine.Staging. ".SST.BP" EXTRA_ARGS "SST" "MarshalMethod=BP")
gtest_add_tests_helper(Threads MPI_NONE "" Engine.Staging. ".BP4_stream" EXTRA_ARGS "BP4" "OpenTimeoutSecs=5")
gtest_add_tests_helper(Threads MPI_NONE "" Engine.Staging. ".FileStream" EXTRA_ARGS "FileStream")
endif()

foreach(helper
Expand Down Expand Up @@ -230,7 +231,7 @@ endif()
if(NOT MSVC) # not on windows
# BP4 streaming tests start with all the simple tests, but with a timeout added on open
LIST (APPEND BP4_STREAM_TESTS ${ALL_SIMPLE_TESTS} ${SPECIAL_TESTS})
MutateTestSet( BP4_STREAM_TESTS "BPS" reader "OpenTimeoutSecs=10" "${BP4_STREAM_TESTS}")
MutateTestSet( BP4_STREAM_TESTS "BPS" reader "OpenTimeoutSecs=10,BeginStepPollingFrequencySecs=0.1" "${BP4_STREAM_TESTS}")
# SharedVars fail with BP4_streaming*
list (FILTER BP4_STREAM_TESTS EXCLUDE REGEX ".*SharedVar.BPS$")
# Discard not a feature of BP4
Expand Down Expand Up @@ -268,6 +269,37 @@ if(NOT MSVC) # not on windows

endif()

#
# Setup streaming tests for FileStream virtual engine (BP4+StreamReader=true)
#
if(NOT MSVC) # not on windows
# FileStream streaming tests start with all the simple tests, but with a timeout added on open
LIST (APPEND FILESTREAM_TESTS ${SIMPLE_TESTS} ${SIMPLE_MPI_TESTS})
MutateTestSet( FILESTREAM_TESTS "FS" reader "OpenTimeoutSecs=10,BeginStepPollingFrequencySecs=0.1" "${FILESTREAM_TESTS}")
# SharedVars fail with file_streaming*
list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*SharedVar.FS$")
# SharedVars fail with file_streaming*
list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*SharedVar.FS$")
# Local fail with file_streaming*
list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*Local.FS$")
# The nobody-writes-data-in-a-timestep tests don't work for any BP-file based engine
list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*NoData.FS$")
# Don't need to repeat tests that are identical for BP4 and FileStream
list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*NoReaderNoWait.FS$")
list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*TimeoutOnOpen.FS$")
list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*NoReaderNoWait.FS$")

foreach(test ${FILESTREAM_TESTS})
add_common_test(${test} FileStream)
endforeach()

MutateTestSet( FileStream_BBSTREAM_TESTS "BB" writer "BurstBufferPath=bb,BurstBufferVerbose=2" "${FILESTREAM_TESTS}")
foreach(test ${FileStream_BBSTREAM_TESTS})
add_common_test(${test} FileStream)
endforeach()

endif()

#
# Setup tests for HDF5 engine
#
Expand Down

0 comments on commit 0a0a694

Please sign in to comment.