diff --git a/source/adios2/engine/bp5/BP5Writer.cpp b/source/adios2/engine/bp5/BP5Writer.cpp index aab245698c..6287cd5c83 100644 --- a/source/adios2/engine/bp5/BP5Writer.cpp +++ b/source/adios2/engine/bp5/BP5Writer.cpp @@ -53,7 +53,8 @@ StepStatus BP5Writer::BeginStep(StepMode mode, const float timeoutSeconds) } else { - m_BP5Serializer.InitStep(new ChunkV("BP5Writer", true /* always copy */, + m_BP5Serializer.InitStep(new ChunkV("BP5Writer", + false /* always copy */, m_Parameters.BufferChunkSize)); } return StepStatus::OK; @@ -64,6 +65,7 @@ size_t BP5Writer::CurrentStep() const { return m_WriterStep; } void BP5Writer::PerformPuts() { PERFSTUBS_SCOPED_TIMER("BP5Writer::PerformPuts"); + m_BP5Serializer.PerformPuts(); return; } @@ -692,7 +694,6 @@ void BP5Writer::DoFlush(const bool isFinal, const int transportIndex) void BP5Writer::DoClose(const int transportIndex) { PERFSTUBS_SCOPED_TIMER("BP5Writer::Close"); - PerformPuts(); DoFlush(true, transportIndex); diff --git a/source/adios2/engine/bp5/BP5Writer.tcc b/source/adios2/engine/bp5/BP5Writer.tcc index 5adf31ac66..c810245112 100644 --- a/source/adios2/engine/bp5/BP5Writer.tcc +++ b/source/adios2/engine/bp5/BP5Writer.tcc @@ -4,8 +4,6 @@ * * BP5Writer.tcc implementation of template functions with known type * - * Created on: Aug 1, 2018 - * Author: Lipeng Wan wanl@ornl.gov */ #ifndef ADIOS2_ENGINE_BP5_BP5WRITER_TCC_ #define ADIOS2_ENGINE_BP5_BP5WRITER_TCC_ diff --git a/source/adios2/toolkit/format/bp5/BP5Serializer.cpp b/source/adios2/toolkit/format/bp5/BP5Serializer.cpp index 4d5bb01100..5185e1dc2f 100644 --- a/source/adios2/toolkit/format/bp5/BP5Serializer.cpp +++ b/source/adios2/toolkit/format/bp5/BP5Serializer.cpp @@ -10,6 +10,7 @@ #include "adios2/core/IO.h" #include "adios2/helper/adiosMemory.h" #include "adios2/toolkit/format/buffer/ffs/BufferFFS.h" +#include // max_align_t #include @@ -429,6 +430,8 @@ size_t BP5Serializer::CalcSize(const size_t Count, const size_t *Vals) return Elems; } +void BP5Serializer::PerformPuts() { CurDataBuffer->CopyExternalToInternal(); } + void BP5Serializer::Marshal(void *Variable, const char *Name, const DataType Type, size_t ElemSize, size_t DimCount, const size_t *Shape, @@ -681,7 +684,7 @@ BP5Serializer::TimestepInfo BP5Serializer::CloseTimestep(int timestep) "BP5Serializer:: CloseTimestep without Prior Init"); } MBase->DataBlockSize = CurDataBuffer->AddToVec( - 0, NULL, 8, true); // output block size multiple of 8, offset is size + 0, NULL, sizeof(max_align_t), true); // output block size aligned void *MetaDataBlock = FFSencode(MetaEncodeBuffer, Info.MetaFormat, MetadataBuf, &MetaDataSize); diff --git a/source/adios2/toolkit/format/bp5/BP5Serializer.h b/source/adios2/toolkit/format/bp5/BP5Serializer.h index 23e79b8d53..f82f6b1d32 100644 --- a/source/adios2/toolkit/format/bp5/BP5Serializer.h +++ b/source/adios2/toolkit/format/bp5/BP5Serializer.h @@ -68,6 +68,7 @@ class BP5Serializer : virtual public BP5Base void InitStep(BufferV *DataBuffer); TimestepInfo CloseTimestep(int timestep); + void PerformPuts(); core::Engine *m_Engine = NULL; diff --git a/source/adios2/toolkit/format/buffer/BufferV.h b/source/adios2/toolkit/format/buffer/BufferV.h index 136d041a91..8931217a1c 100644 --- a/source/adios2/toolkit/format/buffer/BufferV.h +++ b/source/adios2/toolkit/format/buffer/BufferV.h @@ -9,6 +9,7 @@ #include "adios2/common/ADIOSConfig.h" #include "adios2/common/ADIOSTypes.h" +#include namespace adios2 { @@ -34,6 +35,12 @@ class BufferV virtual BufferV_iovec DataVec() noexcept = 0; + /* + * This is used in PerformPuts() to copy externally referenced data so that + * it can be modified by the application + */ + virtual void CopyExternalToInternal() = 0; + /** * Reset the buffer to initial state (without freeing internal buffers) */ diff --git a/source/adios2/toolkit/format/buffer/chunk/ChunkV.cpp b/source/adios2/toolkit/format/buffer/chunk/ChunkV.cpp index 30f3615460..198e103839 100644 --- a/source/adios2/toolkit/format/buffer/chunk/ChunkV.cpp +++ b/source/adios2/toolkit/format/buffer/chunk/ChunkV.cpp @@ -8,7 +8,9 @@ #include "ChunkV.h" #include "adios2/toolkit/format/buffer/BufferV.h" +#include #include +#include // max_align_t #include namespace adios2 @@ -30,6 +32,53 @@ ChunkV::~ChunkV() } } +void ChunkV::CopyExternalToInternal() +{ + for (std::size_t i = 0; i < DataV.size(); ++i) + { + if (DataV[i].External) + { + size_t size = DataV[i].Size; + // we can possibly append this entry to the tail if the tail entry + // is internal + bool AppendPossible = DataV.size() && !DataV.back().External; + + if (AppendPossible && (m_TailChunkPos + size > m_ChunkSize)) + { + // No room in current chunk, close it out + // realloc down to used size (helpful?) and set size in array + m_Chunks.back() = + (char *)realloc(m_Chunks.back(), m_TailChunkPos); + + m_TailChunkPos = 0; + m_TailChunk = NULL; + AppendPossible = false; + } + if (AppendPossible) + { + // We can use current chunk, just append the data and modify the + // DataV entry + memcpy(m_TailChunk + m_TailChunkPos, DataV[i].Base, size); + DataV[i].External = false; + DataV[i].Base = m_TailChunk + m_TailChunkPos; + m_TailChunkPos += size; + } + else + { + // We need a new chunk, get the larger of size or m_ChunkSize + size_t NewSize = m_ChunkSize; + if (size > m_ChunkSize) + NewSize = size; + m_TailChunk = (char *)malloc(NewSize); + m_Chunks.push_back(m_TailChunk); + memcpy(m_TailChunk, DataV[i].Base, size); + m_TailChunkPos = size; + DataV[i] = {false, m_TailChunk, 0, size}; + } + } + } +} + size_t ChunkV::AddToVec(const size_t size, const void *buf, int align, bool CopyReqd) { @@ -37,7 +86,8 @@ size_t ChunkV::AddToVec(const size_t size, const void *buf, int align, if (badAlign) { int addAlign = align - badAlign; - char zero[16] = {0}; + assert(addAlign < sizeof(max_align_t)); + static char zero[sizeof(max_align_t)] = {0}; AddToVec(addAlign, zero, 1, true); } size_t retOffset = CurOffset; @@ -81,6 +131,7 @@ size_t ChunkV::AddToVec(const size_t size, const void *buf, int align, if (size > m_ChunkSize) NewSize = size; m_TailChunk = (char *)malloc(NewSize); + m_Chunks.push_back(m_TailChunk); memcpy(m_TailChunk, buf, size); m_TailChunkPos = size; VecEntry entry = {false, m_TailChunk, 0, size}; diff --git a/source/adios2/toolkit/format/buffer/chunk/ChunkV.h b/source/adios2/toolkit/format/buffer/chunk/ChunkV.h index c0eb02a8bf..fb34a53128 100644 --- a/source/adios2/toolkit/format/buffer/chunk/ChunkV.h +++ b/source/adios2/toolkit/format/buffer/chunk/ChunkV.h @@ -33,6 +33,8 @@ class ChunkV : public BufferV virtual size_t AddToVec(const size_t size, const void *buf, int align, bool CopyReqd); + void CopyExternalToInternal(); + private: std::vector m_Chunks; size_t m_TailChunkPos = 0; diff --git a/source/adios2/toolkit/format/buffer/malloc/MallocV.cpp b/source/adios2/toolkit/format/buffer/malloc/MallocV.cpp index 2528b7dfcc..efe5520ee2 100644 --- a/source/adios2/toolkit/format/buffer/malloc/MallocV.cpp +++ b/source/adios2/toolkit/format/buffer/malloc/MallocV.cpp @@ -8,6 +8,8 @@ #include "MallocV.h" #include "adios2/toolkit/format/buffer/BufferV.h" +#include +#include // max_align_t #include namespace adios2 @@ -35,6 +37,51 @@ void MallocV::Reset() DataV.clear(); } +/* + * This is used in PerformPuts() to copy externally referenced data + * so that it can be modified by the application. It does *not* + * change the metadata offset that was originally returned by + * AddToVec. That is, it relocates the data from application memory + * into the internal buffer, but it does not change the position of + * that data in the write order, which may result in non-contiguous + * writes from the internal buffer. + */ +void MallocV::CopyExternalToInternal() +{ + for (std::size_t i = 0; i < DataV.size(); ++i) + { + if (DataV[i].External) + { + size_t size = DataV[i].Size; + + /* force internal buffer alignment */ + (void)AddToVec(0, NULL, sizeof(max_align_t), true); + + if (m_internalPos + size > m_AllocatedSize) + { + // need to resize + size_t NewSize; + if (m_internalPos + size > m_AllocatedSize * m_GrowthFactor) + { + // just grow as needed (more than GrowthFactor) + NewSize = m_internalPos + size; + } + else + { + NewSize = (size_t)(m_AllocatedSize * m_GrowthFactor); + } + m_InternalBlock = (char *)realloc(m_InternalBlock, NewSize); + m_AllocatedSize = NewSize; + } + memcpy(m_InternalBlock + m_internalPos, DataV[i].Base, size); + DataV[i].External = false; + DataV[i].Base = NULL; + DataV[i].Offset = m_internalPos; + m_internalPos += size; + } + } +} + size_t MallocV::AddToVec(const size_t size, const void *buf, int align, bool CopyReqd) { @@ -42,7 +89,8 @@ size_t MallocV::AddToVec(const size_t size, const void *buf, int align, if (badAlign) { int addAlign = align - badAlign; - char zero[16] = {0}; + assert(addAlign < sizeof(max_align_t)); + static char zero[sizeof(max_align_t)] = {0}; AddToVec(addAlign, zero, 1, true); } size_t retOffset = CurOffset; diff --git a/source/adios2/toolkit/format/buffer/malloc/MallocV.h b/source/adios2/toolkit/format/buffer/malloc/MallocV.h index 52eedddead..cadf4cd90b 100644 --- a/source/adios2/toolkit/format/buffer/malloc/MallocV.h +++ b/source/adios2/toolkit/format/buffer/malloc/MallocV.h @@ -37,6 +37,8 @@ class MallocV : public BufferV virtual size_t AddToVec(const size_t size, const void *buf, int align, bool CopyReqd); + void CopyExternalToInternal(); + private: char *m_InternalBlock = NULL; size_t m_AllocatedSize = 0; diff --git a/testing/adios2/engine/staging-common/CMakeLists.txt b/testing/adios2/engine/staging-common/CMakeLists.txt index 1796197a51..d2432ea0e5 100644 --- a/testing/adios2/engine/staging-common/CMakeLists.txt +++ b/testing/adios2/engine/staging-common/CMakeLists.txt @@ -21,6 +21,7 @@ foreach(helper TestCommonWriteAttrs TestCommonWriteLocal TestCommonWriteShared + TestDefSyncWrite TestCommonRead TestCommonReadR64 TestCommonReadLocal @@ -149,9 +150,9 @@ if(ADIOS2_HAVE_SST) endif() -# For the moment, only test the default comm pattern (Peer) +# For the moment, only test the default comm pattern (Min) MutateTestSet( COMM_MIN_SST_TESTS "CommMin" writer "CPCommPattern=Min" "${BASIC_SST_TESTS}" ) -MutateTestSet( COMM_PEER_SST_TESTS "CommPeer" writer "CPCommPattern=Peer" "${BASIC_SST_TESTS}" ) +#MutateTestSet( COMM_PEER_SST_TESTS "CommPeer" writer "CPCommPattern=Peer" "${BASIC_SST_TESTS}" ) # temporarily remove PreciousTimestep CommPeer tests list (REMOVE_ITEM COMM_PEER_SST_TESTS "PreciousTimestep") diff --git a/testing/adios2/engine/staging-common/ParseArgs.h b/testing/adios2/engine/staging-common/ParseArgs.h index d4fb4aa80a..08f52836a3 100644 --- a/testing/adios2/engine/staging-common/ParseArgs.h +++ b/testing/adios2/engine/staging-common/ParseArgs.h @@ -38,6 +38,8 @@ int NoData = 0; int NoDataNode = -1; int EarlyExit = 0; int LocalCount = 1; +int DataSize = 4 * 1024 * 1024 / 8; /* DefaultMinDeferredSize is 4*1024*1024 + This should be more than that. */ std::string shutdown_name = "DieTest"; adios2::Mode GlobalWriteMode = adios2::Mode::Deferred; diff --git a/testing/adios2/engine/staging-common/TestDefSyncWrite.cpp b/testing/adios2/engine/staging-common/TestDefSyncWrite.cpp new file mode 100644 index 0000000000..39568aed04 --- /dev/null +++ b/testing/adios2/engine/staging-common/TestDefSyncWrite.cpp @@ -0,0 +1,222 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + */ +#include +#include +#include + +#include +#include +#include +#include + +#include + +#include + +#include "TestData.h" + +#include "ParseArgs.h" +int StartStep = 0; +int EndStep = 4 * 4 * 4 * 4 * 4; // all 4 possibilities for all 5 variables +int SmallSize = 100; + +// ADIOS2 write +TEST(CommonWriteTest, ADIOS2CommonWrite) +{ + adios2::ADIOS adios; + + adios2::IO io = adios.DeclareIO("TestIO"); + + adios2::Dims big_shape{static_cast(DataSize)}; + adios2::Dims big_start{static_cast(0)}; + adios2::Dims big_count{static_cast(DataSize)}; + + adios2::Dims small_shape{static_cast(SmallSize)}; + adios2::Dims small_start{static_cast(0)}; + adios2::Dims small_count{static_cast(SmallSize)}; + + std::vector> vars; + vars.push_back( + io.DefineVariable("big1", big_shape, big_start, big_count)); + vars.push_back(io.DefineVariable("small1", small_shape, small_start, + small_count)); + vars.push_back( + io.DefineVariable("big2", big_shape, big_start, big_count)); + vars.push_back(io.DefineVariable("small2", small_shape, small_start, + small_count)); + vars.push_back( + io.DefineVariable("big3", big_shape, big_start, big_count)); + + std::vector> data; + for (int i = 0; i < 5; i++) + { + int size = DataSize; + if ((i == 1) || (i == 3)) + { + size = SmallSize; + } + std::vector tmp(size); + data.push_back(tmp); + } + + // Create the Engine + io.SetEngine(engine); + io.SetParameters(engineParams); + + adios2::Engine engine = io.Open(fname, adios2::Mode::Write); + + /* + * write possibilities: + * Don't write + * Sync - always destroy data afterwards + * Deferred + * Deferred with immediate PerformPuts() - Destroy all prior data + * + */ + for (int step = StartStep; step < EndStep; ++step) + { + int mask = step; + engine.BeginStep(); + + std::cout << "Begin Write Step " << step << " writing vars : "; + for (int j = 0; j < 5; j++) + { + std::fill(data[j].begin(), data[j].end(), (double)j + 1.0); + } + for (int j = 0; j < 5; j++) + { + adios2::Mode write_mode; + char c; + int this_var_mask = (mask & 0x3); + mask >>= 2; + switch (this_var_mask) + { + case 0: + continue; + case 1: + write_mode = adios2::Mode::Sync; + c = 's'; + break; + case 2: + case 3: + write_mode = adios2::Mode::Deferred; + c = 'd'; + break; + } + std::cout << j << c << " "; + engine.Put(vars[j], data[j].data(), write_mode); + if (this_var_mask == 1) + { + std::fill(data[j].begin(), data[j].end(), -100.0); + } + else if (this_var_mask == 3) + { + std::cout << "P "; + engine.PerformPuts(); + for (int k = 0; k <= j; k++) + std::fill(data[k].begin(), data[k].end(), -100.0); + } + } + std::cout << std::endl; + engine.EndStep(); + } + + // Close the file + engine.Close(); +} + +// ADIOS2 write +TEST(CommonWriteTest, ADIOS2CommonRead) +{ + adios2::ADIOS adios; + + adios2::IO io = adios.DeclareIO("TestIO"); + + std::vector> data; + for (int i = 0; i < 5; i++) + { + int size = DataSize; + if ((i == 1) || (i == 3)) + { + size = SmallSize; + } + std::vector tmp(size); + data.push_back(tmp); + } + + // Create the Engine + io.SetEngine(engine); + io.SetParameters(engineParams); + + adios2::Engine engine = io.Open(fname, adios2::Mode::Read); + EXPECT_TRUE(engine); + + /* + * write possibilities: + * Don't write + * Sync - always destroy data afterwards + * Deferred + * Deferred with immediate PerformPuts() - Destroy all prior data + * + */ + for (int step = StartStep; step < EndStep; ++step) + { + EXPECT_TRUE(engine.BeginStep() == adios2::StepStatus::OK); + + std::vector> vars; + vars.push_back(io.InquireVariable("big1")); + vars.push_back(io.InquireVariable("small1")); + vars.push_back(io.InquireVariable("big2")); + vars.push_back(io.InquireVariable("small2")); + vars.push_back(io.InquireVariable("big3")); + + std::vector var_present(vars.size()); + for (int i = 0; i <= 5; i++) + std::fill(data[i].begin(), data[i].end(), -200.0); + std::cout << "Variables Read in TS " << step << ": "; + for (int j = 0; j < 5; j++) + { + var_present[j] = (bool)vars[j]; + if (vars[j]) + { + std::cout << " " << j; + var_present.push_back(true); + engine.Get(vars[j], data[j].data()); + } + } + std::cout << std::endl; + engine.EndStep(); + for (int j = 0; j < 5; j++) + { + if (var_present[j]) + { + for (std::size_t index = 0; index < data[j].size(); ++index) + { + EXPECT_EQ(data[j][index], j + 1.0) + << "Data isn't correct, for " << vars[j].Name() << "[" + << index << "]"; + } + } + } + } + + // Close the file + engine.Close(); +} + +int main(int argc, char **argv) +{ + + int result; + ::testing::InitGoogleTest(&argc, argv); + + DelayMS = 0; // zero for common writer + + ParseArgs(argc, argv); + + result = RUN_ALL_TESTS(); + + return result; +}