Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move async file open functionality to transports #2007

Merged
merged 1 commit into from
Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions source/adios2/engine/bp3/BP3Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,16 +188,14 @@ void BP3Writer::InitTransports()
{
if (m_BP3Serializer.m_Parameters.AsyncTasks)
{
m_FileDataManager.OpenFilesAsync(
bpSubStreamNames, m_OpenMode, m_IO.m_TransportsParameters,
m_BP3Serializer.m_Profiler.m_IsActive);
}
else
{
m_FileDataManager.OpenFiles(bpSubStreamNames, m_OpenMode,
m_IO.m_TransportsParameters,
m_BP3Serializer.m_Profiler.m_IsActive);
for (size_t i = 0; i < m_IO.m_TransportsParameters.size(); ++i)
{
m_IO.m_TransportsParameters[i]["asynctasks"] = "true";
}
}
m_FileDataManager.OpenFiles(bpSubStreamNames, m_OpenMode,
m_IO.m_TransportsParameters,
m_BP3Serializer.m_Profiler.m_IsActive);
}
}

Expand Down
16 changes: 7 additions & 9 deletions source/adios2/engine/bp4/BP4Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,16 +190,14 @@ void BP4Writer::InitTransports()
{
if (m_BP4Serializer.m_Parameters.AsyncTasks)
{
m_FileDataManager.OpenFilesAsync(
bpSubStreamNames, m_OpenMode, m_IO.m_TransportsParameters,
m_BP4Serializer.m_Profiler.m_IsActive);
}
else
{
m_FileDataManager.OpenFiles(bpSubStreamNames, m_OpenMode,
m_IO.m_TransportsParameters,
m_BP4Serializer.m_Profiler.m_IsActive);
for (size_t i = 0; i < m_IO.m_TransportsParameters.size(); ++i)
{
m_IO.m_TransportsParameters[i]["asynctasks"] = "true";
}
}
m_FileDataManager.OpenFiles(bpSubStreamNames, m_OpenMode,
m_IO.m_TransportsParameters,
m_BP4Serializer.m_Profiler.m_IsActive);
}

if (m_BP4Serializer.m_RankMPI == 0)
Expand Down
7 changes: 5 additions & 2 deletions source/adios2/toolkit/transport/Transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,14 @@ class Transport
void InitProfiler(const Mode openMode, const TimeUnit timeUnit);

/**
* Opens transport, required before SetBuffer, Write, Read, Flush, Close
* Opens transport, possibly asynchronously, required before SetBuffer,
* Write, Read, Flush, Close
* @param name
* @param openMode
* @param async
*/
virtual void Open(const std::string &name, const Mode openMode) = 0;
virtual void Open(const std::string &name, const Mode openMode,
const bool async = false) = 0;

/**
* If OS buffered (FILE* or fstream), sets the buffer size
Expand Down
59 changes: 51 additions & 8 deletions source/adios2/toolkit/transport/file/FileFStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,51 @@ FileFStream::FileFStream(helper::Comm const &comm, const bool debugMode)
{
}

void FileFStream::Open(const std::string &name, const Mode openMode)
void FileFStream::WaitForOpen()
{
if (m_IsOpening)
{
if (m_OpenFuture.valid())
{
m_OpenFuture.get();
}
m_IsOpening = false;
CheckFile(
"couldn't open file " + m_Name +
", check permissions or path existence, in call to POSIX open");
m_IsOpen = true;
}
}

void FileFStream::Open(const std::string &name, const Mode openMode,
const bool async)
{
auto lf_AsyncOpenWrite = [&](const std::string &name) -> void {
ProfilerStart("open");
m_FileStream.open(name, std::fstream::out | std::fstream::binary |
std::fstream::trunc);
ProfilerStop("open");
};
chuckatkins marked this conversation as resolved.
Show resolved Hide resolved
m_Name = name;
CheckName();
m_OpenMode = openMode;

switch (m_OpenMode)
{
case (Mode::Write):
ProfilerStart("open");
m_FileStream.open(name, std::fstream::out | std::fstream::binary |
std::fstream::trunc);
ProfilerStop("open");
if (async)
{
m_IsOpening = true;
m_OpenFuture =
std::async(std::launch::async, lf_AsyncOpenWrite, name);
}
else
{
ProfilerStart("open");
m_FileStream.open(name, std::fstream::out | std::fstream::binary |
std::fstream::trunc);
ProfilerStop("open");
}
break;

case (Mode::Append):
Expand All @@ -59,9 +91,13 @@ void FileFStream::Open(const std::string &name, const Mode openMode)
", in call to stream open");
}

CheckFile("couldn't open file " + m_Name +
", check permissions or path existence, in call to fstream open");
m_IsOpen = true;
if (!m_IsOpening)
{
CheckFile(
"couldn't open file " + m_Name +
", check permissions or path existence, in call to fstream open");
m_IsOpen = true;
}
}

void FileFStream::SetBuffer(char *buffer, size_t size)
Expand All @@ -81,6 +117,7 @@ void FileFStream::Write(const char *buffer, size_t size, size_t start)
", in call to fstream write");
};

WaitForOpen();
if (start != MaxSizeT)
{
m_FileStream.seekp(start);
Expand Down Expand Up @@ -117,6 +154,7 @@ void FileFStream::Read(char *buffer, size_t size, size_t start)
", in call to fstream read");
};

WaitForOpen();
if (start != MaxSizeT)
{
m_FileStream.seekg(start);
Expand Down Expand Up @@ -145,6 +183,7 @@ void FileFStream::Read(char *buffer, size_t size, size_t start)

size_t FileFStream::GetSize()
{
WaitForOpen();
const auto currentPosition = m_FileStream.tellg();
m_FileStream.seekg(0, std::ios_base::end);
const std::streampos size = m_FileStream.tellg();
Expand All @@ -159,6 +198,7 @@ size_t FileFStream::GetSize()

void FileFStream::Flush()
{
WaitForOpen();
ProfilerStart("write");
m_FileStream.flush();
ProfilerStart("write");
Expand All @@ -168,6 +208,7 @@ void FileFStream::Flush()

void FileFStream::Close()
{
WaitForOpen();
ProfilerStart("close");
m_FileStream.close();
ProfilerStop("close");
Expand All @@ -186,13 +227,15 @@ void FileFStream::CheckFile(const std::string hint) const

void FileFStream::SeekToEnd()
{
WaitForOpen();
m_FileStream.seekp(0, std::ios_base::end);
CheckFile("couldn't move to the end of file " + m_Name +
", in call to fstream seekp");
}

void FileFStream::SeekToBegin()
{
WaitForOpen();
m_FileStream.seekp(0, std::ios_base::beg);
CheckFile("couldn't move to the beginning of file " + m_Name +
", in call to fstream seekp");
Expand Down
7 changes: 6 additions & 1 deletion source/adios2/toolkit/transport/file/FileFStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#define ADIOS2_TOOLKIT_TRANSPORT_FILE_FILESTREAM_H_

#include <fstream>
#include <future> //std::async, std::future

#include "adios2/common/ADIOSConfig.h"
#include "adios2/helper/adiosComm.h"
Expand All @@ -31,7 +32,8 @@ class FileFStream : public Transport

~FileFStream() = default;

void Open(const std::string &name, const Mode openMode) final;
void Open(const std::string &name, const Mode openMode,
const bool async = false) final;

void SetBuffer(char *buffer, size_t size) final;

Expand All @@ -52,12 +54,15 @@ class FileFStream : public Transport
private:
/** file stream using fstream library */
std::fstream m_FileStream;
bool m_IsOpening = false;
std::future<void> m_OpenFuture;

/**
* Check if m_FileStream is false after an operation
* @param hint exception message
*/
void CheckFile(const std::string hint) const;
void WaitForOpen();
};

} // end namespace transport
Expand Down
61 changes: 52 additions & 9 deletions source/adios2/toolkit/transport/file/FilePOSIX.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,52 @@ FilePOSIX::~FilePOSIX()
}
}

void FilePOSIX::Open(const std::string &name, const Mode openMode)
void FilePOSIX::WaitForOpen()
{
if (m_IsOpening)
{
if (m_OpenFuture.valid())
{
m_FileDescriptor = m_OpenFuture.get();
}
m_IsOpening = false;
CheckFile(
"couldn't open file " + m_Name +
", check permissions or path existence, in call to POSIX open");
m_IsOpen = true;
}
}

void FilePOSIX::Open(const std::string &name, const Mode openMode,
const bool async)
{
auto lf_AsyncOpenWrite = [&](const std::string &name) -> int {
ProfilerStart("open");
int FD = open(m_Name.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666);
ProfilerStop("open");
return FD;
};

m_Name = name;
CheckName();
m_OpenMode = openMode;
switch (m_OpenMode)
{

case (Mode::Write):
ProfilerStart("open");
m_FileDescriptor =
open(m_Name.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666);
ProfilerStop("open");
if (async)
{
m_IsOpening = true;
m_OpenFuture =
std::async(std::launch::async, lf_AsyncOpenWrite, name);
}
else
{
ProfilerStart("open");
m_FileDescriptor =
open(m_Name.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666);
ProfilerStop("open");
}
break;

case (Mode::Append):
Expand All @@ -71,10 +104,13 @@ void FilePOSIX::Open(const std::string &name, const Mode openMode)
", in call to POSIX open");
}

CheckFile("couldn't open file " + m_Name +
", check permissions or path existence, in call to POSIX open");

m_IsOpen = true;
if (!m_IsOpening)
{
CheckFile(
"couldn't open file " + m_Name +
", check permissions or path existence, in call to POSIX open");
m_IsOpen = true;
}
}

void FilePOSIX::Write(const char *buffer, size_t size, size_t start)
Expand Down Expand Up @@ -103,6 +139,7 @@ void FilePOSIX::Write(const char *buffer, size_t size, size_t start)
}
};

WaitForOpen();
if (start != MaxSizeT)
{
const auto newPosition = lseek(m_FileDescriptor, start, SEEK_SET);
Expand Down Expand Up @@ -161,6 +198,8 @@ void FilePOSIX::Read(char *buffer, size_t size, size_t start)
}
};

WaitForOpen();

if (start != MaxSizeT)
{
const auto newPosition = lseek(m_FileDescriptor, start, SEEK_SET);
Expand Down Expand Up @@ -197,6 +236,7 @@ void FilePOSIX::Read(char *buffer, size_t size, size_t start)
size_t FilePOSIX::GetSize()
{
struct stat fileStat;
WaitForOpen();
if (fstat(m_FileDescriptor, &fileStat) == -1)
{
throw std::ios_base::failure("ERROR: couldn't get size of file " +
Expand All @@ -209,6 +249,7 @@ void FilePOSIX::Flush() {}

void FilePOSIX::Close()
{
WaitForOpen();
ProfilerStart("close");
const int status = close(m_FileDescriptor);
ProfilerStop("close");
Expand All @@ -232,6 +273,7 @@ void FilePOSIX::CheckFile(const std::string hint) const

void FilePOSIX::SeekToEnd()
{
WaitForOpen();
const int status = lseek(m_FileDescriptor, 0, SEEK_END);
if (status == -1)
{
Expand All @@ -243,6 +285,7 @@ void FilePOSIX::SeekToEnd()

void FilePOSIX::SeekToBegin()
{
WaitForOpen();
const int status = lseek(m_FileDescriptor, 0, SEEK_SET);
if (status == -1)
{
Expand Down
8 changes: 7 additions & 1 deletion source/adios2/toolkit/transport/file/FilePOSIX.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#ifndef ADIOS2_TOOLKIT_TRANSPORT_FILE_FILEDESCRIPTOR_H_
#define ADIOS2_TOOLKIT_TRANSPORT_FILE_FILEDESCRIPTOR_H_

#include <future> //std::async, std::future

#include "adios2/common/ADIOSConfig.h"
#include "adios2/toolkit/transport/Transport.h"

Expand All @@ -32,7 +34,8 @@ class FilePOSIX : public Transport

~FilePOSIX();

void Open(const std::string &name, const Mode openMode) final;
void Open(const std::string &name, const Mode openMode,
const bool async = false) final;

void Write(const char *buffer, size_t size, size_t start = MaxSizeT) final;

Expand All @@ -52,12 +55,15 @@ class FilePOSIX : public Transport
private:
/** POSIX file handle returned by Open */
int m_FileDescriptor = -1;
bool m_IsOpening = false;
std::future<int> m_OpenFuture;

/**
* Check if m_FileDescriptor is -1 after an operation
* @param hint exception message
*/
void CheckFile(const std::string hint) const;
void WaitForOpen();
};

} // end namespace transport
Expand Down
Loading