From ab622040bf959174463ed31e2a039083121e4731 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Wed, 4 Mar 2020 14:34:50 -0500 Subject: [PATCH] Move async file open functionality to transports --- source/adios2/engine/bp3/BP3Writer.cpp | 16 +++-- source/adios2/engine/bp4/BP4Writer.cpp | 16 +++-- source/adios2/toolkit/transport/Transport.h | 7 ++- .../toolkit/transport/file/FileFStream.cpp | 59 +++++++++++++++--- .../toolkit/transport/file/FileFStream.h | 7 ++- .../toolkit/transport/file/FilePOSIX.cpp | 61 ++++++++++++++++--- .../adios2/toolkit/transport/file/FilePOSIX.h | 8 ++- .../toolkit/transport/file/FileStdio.cpp | 50 +++++++++++++-- .../adios2/toolkit/transport/file/FileStdio.h | 7 ++- .../toolkit/transport/null/NullTransport.cpp | 3 +- .../toolkit/transport/null/NullTransport.h | 3 +- .../toolkit/transport/shm/ShmSystemV.cpp | 3 +- .../adios2/toolkit/transport/shm/ShmSystemV.h | 3 +- .../toolkit/transportman/TransportMan.cpp | 61 +++---------------- .../toolkit/transportman/TransportMan.h | 16 ----- 15 files changed, 203 insertions(+), 117 deletions(-) diff --git a/source/adios2/engine/bp3/BP3Writer.cpp b/source/adios2/engine/bp3/BP3Writer.cpp index 82c52fd3da..e6028842f1 100644 --- a/source/adios2/engine/bp3/BP3Writer.cpp +++ b/source/adios2/engine/bp3/BP3Writer.cpp @@ -188,16 +188,14 @@ void BP3Writer::InitTransports() { if (m_BP3Serializer.m_Parameters.AsyncTasks) { - m_FileDataManager.OpenFilesAsync( - bpSubStreamNames, m_OpenMode, m_IO.m_TransportsParameters, - m_BP3Serializer.m_Profiler.m_IsActive); - } - else - { - m_FileDataManager.OpenFiles(bpSubStreamNames, m_OpenMode, - m_IO.m_TransportsParameters, - m_BP3Serializer.m_Profiler.m_IsActive); + for (size_t i = 0; i < m_IO.m_TransportsParameters.size(); ++i) + { + m_IO.m_TransportsParameters[i]["asynctasks"] = "true"; + } } + m_FileDataManager.OpenFiles(bpSubStreamNames, m_OpenMode, + m_IO.m_TransportsParameters, + m_BP3Serializer.m_Profiler.m_IsActive); } } diff --git a/source/adios2/engine/bp4/BP4Writer.cpp b/source/adios2/engine/bp4/BP4Writer.cpp index c531c75f8d..7d2d2382a6 100644 --- a/source/adios2/engine/bp4/BP4Writer.cpp +++ b/source/adios2/engine/bp4/BP4Writer.cpp @@ -190,16 +190,14 @@ void BP4Writer::InitTransports() { if (m_BP4Serializer.m_Parameters.AsyncTasks) { - m_FileDataManager.OpenFilesAsync( - bpSubStreamNames, m_OpenMode, m_IO.m_TransportsParameters, - m_BP4Serializer.m_Profiler.m_IsActive); - } - else - { - m_FileDataManager.OpenFiles(bpSubStreamNames, m_OpenMode, - m_IO.m_TransportsParameters, - m_BP4Serializer.m_Profiler.m_IsActive); + for (size_t i = 0; i < m_IO.m_TransportsParameters.size(); ++i) + { + m_IO.m_TransportsParameters[i]["asynctasks"] = "true"; + } } + m_FileDataManager.OpenFiles(bpSubStreamNames, m_OpenMode, + m_IO.m_TransportsParameters, + m_BP4Serializer.m_Profiler.m_IsActive); } if (m_BP4Serializer.m_RankMPI == 0) diff --git a/source/adios2/toolkit/transport/Transport.h b/source/adios2/toolkit/transport/Transport.h index e25149f73f..a613282b57 100644 --- a/source/adios2/toolkit/transport/Transport.h +++ b/source/adios2/toolkit/transport/Transport.h @@ -58,11 +58,14 @@ class Transport void InitProfiler(const Mode openMode, const TimeUnit timeUnit); /** - * Opens transport, required before SetBuffer, Write, Read, Flush, Close + * Opens transport, possibly asynchronously, required before SetBuffer, + * Write, Read, Flush, Close * @param name * @param openMode + * @param async */ - virtual void Open(const std::string &name, const Mode openMode) = 0; + virtual void Open(const std::string &name, const Mode openMode, + const bool async = false) = 0; /** * If OS buffered (FILE* or fstream), sets the buffer size diff --git a/source/adios2/toolkit/transport/file/FileFStream.cpp b/source/adios2/toolkit/transport/file/FileFStream.cpp index 3269745190..b2a8bd4578 100644 --- a/source/adios2/toolkit/transport/file/FileFStream.cpp +++ b/source/adios2/toolkit/transport/file/FileFStream.cpp @@ -23,8 +23,31 @@ FileFStream::FileFStream(helper::Comm const &comm, const bool debugMode) { } -void FileFStream::Open(const std::string &name, const Mode openMode) +void FileFStream::WaitForOpen() { + if (m_IsOpening) + { + if (m_OpenFuture.valid()) + { + m_OpenFuture.get(); + } + m_IsOpening = false; + CheckFile( + "couldn't open file " + m_Name + + ", check permissions or path existence, in call to POSIX open"); + m_IsOpen = true; + } +} + +void FileFStream::Open(const std::string &name, const Mode openMode, + const bool async) +{ + auto lf_AsyncOpenWrite = [&](const std::string &name) -> void { + ProfilerStart("open"); + m_FileStream.open(name, std::fstream::out | std::fstream::binary | + std::fstream::trunc); + ProfilerStop("open"); + }; m_Name = name; CheckName(); m_OpenMode = openMode; @@ -32,10 +55,19 @@ void FileFStream::Open(const std::string &name, const Mode openMode) switch (m_OpenMode) { case (Mode::Write): - ProfilerStart("open"); - m_FileStream.open(name, std::fstream::out | std::fstream::binary | - std::fstream::trunc); - ProfilerStop("open"); + if (async) + { + m_IsOpening = true; + m_OpenFuture = + std::async(std::launch::async, lf_AsyncOpenWrite, name); + } + else + { + ProfilerStart("open"); + m_FileStream.open(name, std::fstream::out | std::fstream::binary | + std::fstream::trunc); + ProfilerStop("open"); + } break; case (Mode::Append): @@ -59,9 +91,13 @@ void FileFStream::Open(const std::string &name, const Mode openMode) ", in call to stream open"); } - CheckFile("couldn't open file " + m_Name + - ", check permissions or path existence, in call to fstream open"); - m_IsOpen = true; + if (!m_IsOpening) + { + CheckFile( + "couldn't open file " + m_Name + + ", check permissions or path existence, in call to fstream open"); + m_IsOpen = true; + } } void FileFStream::SetBuffer(char *buffer, size_t size) @@ -81,6 +117,7 @@ void FileFStream::Write(const char *buffer, size_t size, size_t start) ", in call to fstream write"); }; + WaitForOpen(); if (start != MaxSizeT) { m_FileStream.seekp(start); @@ -117,6 +154,7 @@ void FileFStream::Read(char *buffer, size_t size, size_t start) ", in call to fstream read"); }; + WaitForOpen(); if (start != MaxSizeT) { m_FileStream.seekg(start); @@ -145,6 +183,7 @@ void FileFStream::Read(char *buffer, size_t size, size_t start) size_t FileFStream::GetSize() { + WaitForOpen(); const auto currentPosition = m_FileStream.tellg(); m_FileStream.seekg(0, std::ios_base::end); const std::streampos size = m_FileStream.tellg(); @@ -159,6 +198,7 @@ size_t FileFStream::GetSize() void FileFStream::Flush() { + WaitForOpen(); ProfilerStart("write"); m_FileStream.flush(); ProfilerStart("write"); @@ -168,6 +208,7 @@ void FileFStream::Flush() void FileFStream::Close() { + WaitForOpen(); ProfilerStart("close"); m_FileStream.close(); ProfilerStop("close"); @@ -186,6 +227,7 @@ void FileFStream::CheckFile(const std::string hint) const void FileFStream::SeekToEnd() { + WaitForOpen(); m_FileStream.seekp(0, std::ios_base::end); CheckFile("couldn't move to the end of file " + m_Name + ", in call to fstream seekp"); @@ -193,6 +235,7 @@ void FileFStream::SeekToEnd() void FileFStream::SeekToBegin() { + WaitForOpen(); m_FileStream.seekp(0, std::ios_base::beg); CheckFile("couldn't move to the beginning of file " + m_Name + ", in call to fstream seekp"); diff --git a/source/adios2/toolkit/transport/file/FileFStream.h b/source/adios2/toolkit/transport/file/FileFStream.h index a83af203b2..5673d993be 100644 --- a/source/adios2/toolkit/transport/file/FileFStream.h +++ b/source/adios2/toolkit/transport/file/FileFStream.h @@ -12,6 +12,7 @@ #define ADIOS2_TOOLKIT_TRANSPORT_FILE_FILESTREAM_H_ #include +#include //std::async, std::future #include "adios2/common/ADIOSConfig.h" #include "adios2/helper/adiosComm.h" @@ -31,7 +32,8 @@ class FileFStream : public Transport ~FileFStream() = default; - void Open(const std::string &name, const Mode openMode) final; + void Open(const std::string &name, const Mode openMode, + const bool async = false) final; void SetBuffer(char *buffer, size_t size) final; @@ -52,12 +54,15 @@ class FileFStream : public Transport private: /** file stream using fstream library */ std::fstream m_FileStream; + bool m_IsOpening = false; + std::future m_OpenFuture; /** * Check if m_FileStream is false after an operation * @param hint exception message */ void CheckFile(const std::string hint) const; + void WaitForOpen(); }; } // end namespace transport diff --git a/source/adios2/toolkit/transport/file/FilePOSIX.cpp b/source/adios2/toolkit/transport/file/FilePOSIX.cpp index 711c530390..aa15cfdb29 100644 --- a/source/adios2/toolkit/transport/file/FilePOSIX.cpp +++ b/source/adios2/toolkit/transport/file/FilePOSIX.cpp @@ -37,8 +37,32 @@ FilePOSIX::~FilePOSIX() } } -void FilePOSIX::Open(const std::string &name, const Mode openMode) +void FilePOSIX::WaitForOpen() { + if (m_IsOpening) + { + if (m_OpenFuture.valid()) + { + m_FileDescriptor = m_OpenFuture.get(); + } + m_IsOpening = false; + CheckFile( + "couldn't open file " + m_Name + + ", check permissions or path existence, in call to POSIX open"); + m_IsOpen = true; + } +} + +void FilePOSIX::Open(const std::string &name, const Mode openMode, + const bool async) +{ + auto lf_AsyncOpenWrite = [&](const std::string &name) -> int { + ProfilerStart("open"); + int FD = open(m_Name.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666); + ProfilerStop("open"); + return FD; + }; + m_Name = name; CheckName(); m_OpenMode = openMode; @@ -46,10 +70,19 @@ void FilePOSIX::Open(const std::string &name, const Mode openMode) { case (Mode::Write): - ProfilerStart("open"); - m_FileDescriptor = - open(m_Name.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666); - ProfilerStop("open"); + if (async) + { + m_IsOpening = true; + m_OpenFuture = + std::async(std::launch::async, lf_AsyncOpenWrite, name); + } + else + { + ProfilerStart("open"); + m_FileDescriptor = + open(m_Name.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666); + ProfilerStop("open"); + } break; case (Mode::Append): @@ -71,10 +104,13 @@ void FilePOSIX::Open(const std::string &name, const Mode openMode) ", in call to POSIX open"); } - CheckFile("couldn't open file " + m_Name + - ", check permissions or path existence, in call to POSIX open"); - - m_IsOpen = true; + if (!m_IsOpening) + { + CheckFile( + "couldn't open file " + m_Name + + ", check permissions or path existence, in call to POSIX open"); + m_IsOpen = true; + } } void FilePOSIX::Write(const char *buffer, size_t size, size_t start) @@ -103,6 +139,7 @@ void FilePOSIX::Write(const char *buffer, size_t size, size_t start) } }; + WaitForOpen(); if (start != MaxSizeT) { const auto newPosition = lseek(m_FileDescriptor, start, SEEK_SET); @@ -161,6 +198,8 @@ void FilePOSIX::Read(char *buffer, size_t size, size_t start) } }; + WaitForOpen(); + if (start != MaxSizeT) { const auto newPosition = lseek(m_FileDescriptor, start, SEEK_SET); @@ -197,6 +236,7 @@ void FilePOSIX::Read(char *buffer, size_t size, size_t start) size_t FilePOSIX::GetSize() { struct stat fileStat; + WaitForOpen(); if (fstat(m_FileDescriptor, &fileStat) == -1) { throw std::ios_base::failure("ERROR: couldn't get size of file " + @@ -209,6 +249,7 @@ void FilePOSIX::Flush() {} void FilePOSIX::Close() { + WaitForOpen(); ProfilerStart("close"); const int status = close(m_FileDescriptor); ProfilerStop("close"); @@ -232,6 +273,7 @@ void FilePOSIX::CheckFile(const std::string hint) const void FilePOSIX::SeekToEnd() { + WaitForOpen(); const int status = lseek(m_FileDescriptor, 0, SEEK_END); if (status == -1) { @@ -243,6 +285,7 @@ void FilePOSIX::SeekToEnd() void FilePOSIX::SeekToBegin() { + WaitForOpen(); const int status = lseek(m_FileDescriptor, 0, SEEK_SET); if (status == -1) { diff --git a/source/adios2/toolkit/transport/file/FilePOSIX.h b/source/adios2/toolkit/transport/file/FilePOSIX.h index 33f0b558b2..7c09187586 100644 --- a/source/adios2/toolkit/transport/file/FilePOSIX.h +++ b/source/adios2/toolkit/transport/file/FilePOSIX.h @@ -11,6 +11,8 @@ #ifndef ADIOS2_TOOLKIT_TRANSPORT_FILE_FILEDESCRIPTOR_H_ #define ADIOS2_TOOLKIT_TRANSPORT_FILE_FILEDESCRIPTOR_H_ +#include //std::async, std::future + #include "adios2/common/ADIOSConfig.h" #include "adios2/toolkit/transport/Transport.h" @@ -32,7 +34,8 @@ class FilePOSIX : public Transport ~FilePOSIX(); - void Open(const std::string &name, const Mode openMode) final; + void Open(const std::string &name, const Mode openMode, + const bool async = false) final; void Write(const char *buffer, size_t size, size_t start = MaxSizeT) final; @@ -52,12 +55,15 @@ class FilePOSIX : public Transport private: /** POSIX file handle returned by Open */ int m_FileDescriptor = -1; + bool m_IsOpening = false; + std::future m_OpenFuture; /** * Check if m_FileDescriptor is -1 after an operation * @param hint exception message */ void CheckFile(const std::string hint) const; + void WaitForOpen(); }; } // end namespace transport diff --git a/source/adios2/toolkit/transport/file/FileStdio.cpp b/source/adios2/toolkit/transport/file/FileStdio.cpp index f95fed4511..7abba5b2db 100644 --- a/source/adios2/toolkit/transport/file/FileStdio.cpp +++ b/source/adios2/toolkit/transport/file/FileStdio.cpp @@ -37,8 +37,28 @@ FileStdio::~FileStdio() } } -void FileStdio::Open(const std::string &name, const Mode openMode) +void FileStdio::WaitForOpen() { + if (m_IsOpening) + { + if (m_OpenFuture.valid()) + { + m_File = m_OpenFuture.get(); + } + m_IsOpening = false; + CheckFile( + "couldn't open file " + m_Name + + ", check permissions or path existence, in call to POSIX open"); + m_IsOpen = true; + } +} + +void FileStdio::Open(const std::string &name, const Mode openMode, + const bool async) +{ + auto lf_AsyncOpenWrite = [&](const std::string &name) -> FILE * { + return std::fopen(name.c_str(), "wb"); + }; m_Name = name; CheckName(); m_OpenMode = openMode; @@ -46,7 +66,16 @@ void FileStdio::Open(const std::string &name, const Mode openMode) switch (m_OpenMode) { case (Mode::Write): - m_File = std::fopen(name.c_str(), "wb"); + if (async) + { + m_IsOpening = true; + m_OpenFuture = + std::async(std::launch::async, lf_AsyncOpenWrite, name); + } + else + { + m_File = std::fopen(name.c_str(), "wb"); + } break; case (Mode::Append): m_File = std::fopen(name.c_str(), "rwb"); @@ -61,9 +90,13 @@ void FileStdio::Open(const std::string &name, const Mode openMode) ", in call to stdio fopen"); } - CheckFile("couldn't open file " + m_Name + - ", check permissions or path existence, in call to stdio open"); - m_IsOpen = true; + if (!m_IsOpening) + { + CheckFile( + "couldn't open file " + m_Name + + ", check permissions or path existence, in call to stdio open"); + m_IsOpen = true; + } } void FileStdio::SetBuffer(char *buffer, size_t size) @@ -98,6 +131,7 @@ void FileStdio::Write(const char *buffer, size_t size, size_t start) } }; + WaitForOpen(); if (start != MaxSizeT) { const auto status = @@ -151,6 +185,7 @@ void FileStdio::Read(char *buffer, size_t size, size_t start) } }; + WaitForOpen(); if (start != MaxSizeT) { const auto status = @@ -182,6 +217,7 @@ void FileStdio::Read(char *buffer, size_t size, size_t start) size_t FileStdio::GetSize() { + WaitForOpen(); const auto currentPosition = ftell(m_File); if (currentPosition == -1L) { @@ -204,6 +240,7 @@ size_t FileStdio::GetSize() void FileStdio::Flush() { + WaitForOpen(); ProfilerStart("write"); const int status = std::fflush(m_File); ProfilerStop("write"); @@ -217,6 +254,7 @@ void FileStdio::Flush() void FileStdio::Close() { + WaitForOpen(); ProfilerStart("close"); const int status = std::fclose(m_File); ProfilerStop("close"); @@ -240,6 +278,7 @@ void FileStdio::CheckFile(const std::string hint) const void FileStdio::SeekToEnd() { + WaitForOpen(); const auto status = std::fseek(m_File, 0, SEEK_END); if (status == -1) { @@ -251,6 +290,7 @@ void FileStdio::SeekToEnd() void FileStdio::SeekToBegin() { + WaitForOpen(); const auto status = std::fseek(m_File, 0, SEEK_SET); if (status == -1) { diff --git a/source/adios2/toolkit/transport/file/FileStdio.h b/source/adios2/toolkit/transport/file/FileStdio.h index 9b828df9d5..3ae0aa5df6 100644 --- a/source/adios2/toolkit/transport/file/FileStdio.h +++ b/source/adios2/toolkit/transport/file/FileStdio.h @@ -12,6 +12,7 @@ #define ADIOS2_TOOLKIT_TRANSPORT_FILE_FILEPOINTER_H_ #include // FILE* +#include //std::async, std::future #include "adios2/toolkit/transport/Transport.h" @@ -33,7 +34,8 @@ class FileStdio : public Transport ~FileStdio(); - void Open(const std::string &name, const Mode openMode) final; + void Open(const std::string &name, const Mode openMode, + const bool async = false) final; void SetBuffer(char *buffer, size_t size) final; @@ -54,12 +56,15 @@ class FileStdio : public Transport private: /** C File pointer */ FILE *m_File = nullptr; + bool m_IsOpening = false; + std::future m_OpenFuture; /** * Check for std::ferror and throw an exception if true * @param hint exception message */ void CheckFile(const std::string hint) const; + void WaitForOpen(); }; } // end namespace transport diff --git a/source/adios2/toolkit/transport/null/NullTransport.cpp b/source/adios2/toolkit/transport/null/NullTransport.cpp index be6c154a9a..0cd60c60c5 100644 --- a/source/adios2/toolkit/transport/null/NullTransport.cpp +++ b/source/adios2/toolkit/transport/null/NullTransport.cpp @@ -31,7 +31,8 @@ NullTransport::NullTransport(helper::Comm const &comm, const bool debugMode) NullTransport::~NullTransport() = default; -void NullTransport::Open(const std::string &name, const Mode openMode) +void NullTransport::Open(const std::string &name, const Mode openMode, + const bool async) { if (Impl->IsOpen) { diff --git a/source/adios2/toolkit/transport/null/NullTransport.h b/source/adios2/toolkit/transport/null/NullTransport.h index 2556488396..e45865f555 100644 --- a/source/adios2/toolkit/transport/null/NullTransport.h +++ b/source/adios2/toolkit/transport/null/NullTransport.h @@ -34,7 +34,8 @@ class NullTransport : public Transport virtual ~NullTransport(); - void Open(const std::string &name, const Mode openMode) override; + void Open(const std::string &name, const Mode openMode, + const bool async = false) override; void SetBuffer(char *buffer, size_t size) override; diff --git a/source/adios2/toolkit/transport/shm/ShmSystemV.cpp b/source/adios2/toolkit/transport/shm/ShmSystemV.cpp index bc1b597bb6..9a637a0d23 100644 --- a/source/adios2/toolkit/transport/shm/ShmSystemV.cpp +++ b/source/adios2/toolkit/transport/shm/ShmSystemV.cpp @@ -47,7 +47,8 @@ ShmSystemV::~ShmSystemV() // this might not be correct } } -void ShmSystemV::Open(const std::string &name, const Mode openMode) +void ShmSystemV::Open(const std::string &name, const Mode openMode, + const bool async) { m_Name = name; CheckName(); diff --git a/source/adios2/toolkit/transport/shm/ShmSystemV.h b/source/adios2/toolkit/transport/shm/ShmSystemV.h index c21bbbff47..9705b9fda7 100644 --- a/source/adios2/toolkit/transport/shm/ShmSystemV.h +++ b/source/adios2/toolkit/transport/shm/ShmSystemV.h @@ -40,7 +40,8 @@ class ShmSystemV : public Transport ~ShmSystemV(); - void Open(const std::string &name, const Mode openMode) final; + void Open(const std::string &name, const Mode openMode, + const bool async = false) final; void Write(const char *buffer, size_t size, size_t start = MaxSizeT) final; diff --git a/source/adios2/toolkit/transportman/TransportMan.cpp b/source/adios2/toolkit/transportman/TransportMan.cpp index 5ebd5e0ae3..7d8099511b 100644 --- a/source/adios2/toolkit/transportman/TransportMan.cpp +++ b/source/adios2/toolkit/transportman/TransportMan.cpp @@ -75,7 +75,6 @@ void TransportMan::OpenFiles(const std::vector &fileNames, const std::vector ¶metersVector, const bool profile) { - WaitForAsync(); for (size_t i = 0; i < fileNames.size(); ++i) { const Params ¶meters = parametersVector[i]; @@ -90,40 +89,10 @@ void TransportMan::OpenFiles(const std::vector &fileNames, } } -void TransportMan::OpenFilesAsync(const std::vector &fileNames, - const Mode openMode, - const std::vector ¶metersVector, - const bool profile) -{ - WaitForAsync(); - auto lf_OpenFiles = - [&](const std::vector &fileNames, const Mode openMode, - const std::vector ¶metersVector, const bool profile) { - for (size_t i = 0; i < fileNames.size(); ++i) - { - const Params ¶meters = parametersVector[i]; - const std::string type = parameters.at("transport"); - - if (type == "File" || type == "file") - { - std::shared_ptr file = OpenFileTransport( - fileNames[i], openMode, parameters, profile); - // TODO might need mutex for multiple files - m_Transports.insert({i, file}); - } - } - }; - - m_FutureOpenFiles = - std::async(std::launch::async, lf_OpenFiles, std::move(fileNames), - openMode, std::cref(parametersVector), profile); -} - void TransportMan::OpenFileID(const std::string &name, const size_t id, const Mode mode, const Params ¶meters, const bool profile) { - WaitForAsync(); std::shared_ptr file = OpenFileTransport(name, mode, parameters, profile); m_Transports.insert({id, file}); @@ -175,7 +144,6 @@ std::vector TransportMan::GetFilesBaseNames( std::vector TransportMan::GetTransportsTypes() noexcept { - WaitForAsync(); std::vector types; types.reserve(m_Transports.size()); @@ -190,7 +158,6 @@ std::vector TransportMan::GetTransportsTypes() noexcept std::vector TransportMan::GetTransportsProfilers() noexcept { - WaitForAsync(); std::vector profilers; profilers.reserve(m_Transports.size()); @@ -205,7 +172,6 @@ TransportMan::GetTransportsProfilers() noexcept void TransportMan::WriteFiles(const char *buffer, const size_t size, const int transportIndex) { - WaitForAsync(); if (transportIndex == -1) { for (auto &transportPair : m_Transports) @@ -230,7 +196,6 @@ void TransportMan::WriteFiles(const char *buffer, const size_t size, void TransportMan::WriteFileAt(const char *buffer, const size_t size, const size_t start, const int transportIndex) { - WaitForAsync(); auto itTransport = m_Transports.find(transportIndex); CheckFile(itTransport, ", in call to WriteFileAt with index " + std::to_string(transportIndex)); @@ -239,7 +204,6 @@ void TransportMan::WriteFileAt(const char *buffer, const size_t size, void TransportMan::SeekToFileEnd(const int transportIndex) { - WaitForAsync(); auto itTransport = m_Transports.find(transportIndex); CheckFile(itTransport, ", in call to SeekToFileEnd with index " + std::to_string(transportIndex)); @@ -248,7 +212,6 @@ void TransportMan::SeekToFileEnd(const int transportIndex) void TransportMan::SeekToFileBegin(const int transportIndex) { - WaitForAsync(); auto itTransport = m_Transports.find(transportIndex); CheckFile(itTransport, ", in call to SeekToFileBegin with index " + std::to_string(transportIndex)); @@ -257,7 +220,6 @@ void TransportMan::SeekToFileBegin(const int transportIndex) size_t TransportMan::GetFileSize(const size_t transportIndex) const { - WaitForAsync(); auto itTransport = m_Transports.find(transportIndex); CheckFile(itTransport, ", in call to GetFileSize with index " + std::to_string(transportIndex)); @@ -267,7 +229,6 @@ size_t TransportMan::GetFileSize(const size_t transportIndex) const void TransportMan::ReadFile(char *buffer, const size_t size, const size_t start, const size_t transportIndex) { - WaitForAsync(); auto itTransport = m_Transports.find(transportIndex); CheckFile(itTransport, ", in call to ReadFile with index " + std::to_string(transportIndex)); @@ -276,7 +237,6 @@ void TransportMan::ReadFile(char *buffer, const size_t size, const size_t start, void TransportMan::FlushFiles(const int transportIndex) { - WaitForAsync(); if (transportIndex == -1) { for (auto &transportPair : m_Transports) @@ -300,7 +260,6 @@ void TransportMan::FlushFiles(const int transportIndex) void TransportMan::CloseFiles(const int transportIndex) { - WaitForAsync(); if (transportIndex == -1) { for (auto &transportPair : m_Transports) @@ -325,7 +284,6 @@ void TransportMan::CloseFiles(const int transportIndex) bool TransportMan::AllTransportsClosed() const noexcept { bool allClose = true; - WaitForAsync(); for (const auto &transportPair : m_Transports) { const auto &transport = transportPair.second; @@ -395,6 +353,14 @@ TransportMan::OpenFileTransport(const std::string &fileName, return helper::StringToTimeUnit(profileUnits, m_DebugMode); }; + auto lf_GetAsync = [&](const std::string defaultAsync, + const Params ¶meters) -> bool { + std::string Async = defaultAsync; + helper::SetParameterValue("AsyncTasks", parameters, Async); + helper::SetParameterValue("asynctasks", parameters, Async); + return helper::StringTo(Async, m_DebugMode, ""); + }; + // BODY OF FUNCTION starts here std::shared_ptr transport; lf_SetFileTransport(lf_GetLibrary(DefaultFileLibrary, parameters), @@ -408,7 +374,7 @@ TransportMan::OpenFileTransport(const std::string &fileName, } // open - transport->Open(fileName, openMode); + transport->Open(fileName, openMode, lf_GetAsync("false", parameters)); return transport; } @@ -434,14 +400,5 @@ void TransportMan::CheckFile( } } -void TransportMan::WaitForAsync() const -{ - // Ensure any prior OpenFilesAsync is complete - if (m_FutureOpenFiles.valid()) - { - m_FutureOpenFiles.get(); - } -} - } // end namespace transport } // end namespace adios2 diff --git a/source/adios2/toolkit/transportman/TransportMan.h b/source/adios2/toolkit/transportman/TransportMan.h index e9e9f66303..482e2e539d 100644 --- a/source/adios2/toolkit/transportman/TransportMan.h +++ b/source/adios2/toolkit/transportman/TransportMan.h @@ -71,21 +71,6 @@ class TransportMan const std::vector ¶metersVector, const bool profile); - /** - * Async version of OpenFiles - * @param fileNames - * @param openMode - * @param parametersVector - * @param profile - * - * Opens happen asynchronously, but any future call waits for their - * completion - */ - void OpenFilesAsync(const std::vector &fileNames, - const Mode openMode, - const std::vector ¶metersVector, - const bool profile); - /** * Used for sub-files defined by index * @param name @@ -178,7 +163,6 @@ class TransportMan protected: helper::Comm const &m_Comm; const bool m_DebugMode = false; - mutable std::future m_FutureOpenFiles; std::shared_ptr OpenFileTransport(const std::string &fileName, const Mode openMode,