Skip to content

Commit

Permalink
Add functionality to close a file (#746)
Browse files Browse the repository at this point in the history
* Add IOTask for closing file

* Add API calls for Iteration::close

* Add closeFile to ADIOS2 backend

* remove skipFlush

and other wrongly commited lines

* Add test

* add back skipFlush

* Invalidate file in ADIOS2 after closing

* Remove superfluous ; after function

* Make Iteration::close synchronous by default

Optional parameter `bool flush = true`

* Add closeFile operation to JSON backend

* Remove name parameter from CLOSE_FILE task

* Fix name shadowing

* [wip] close_file for ADIOS1

somehow still fails when creating the second iteration

* Temporary workaround for ADIOS1

* Implement file closing in HDF5

* change erase order

* run a simplified test for ADIOS1

* Split closed into closed and closedByWriter

This enables closing iterations in read-mode, too.
closedByWriter will be mainly useful in streaming mode, which this
commit is preparing

* Fix ADIOS1 test

* cleanup

* Throw if flushing a closed iteration that has been accessed

* Add verifyClosed

* Close files in read mode too

* Test throwing upon accessing an iteration post closing

* Also do sanity-checks in group-based mode

This is currently not necessary, since closing an iteration in
group-based mode is a no-op. It will become relevant in streaming-based
mode, where closing an iteration means discarding its corresponding data
packet.

* Rename m_closed* members of Iteration and update docu

* Unify m_closed(Backend|Frontend) into common enum

* Flush only the closed iteration upon Iteration::close

* Undo unnecessary whitespace changes

* Two small fixes in ADIOS1 IO Handler

1) Remove debugging output
2) Delete buffered attribute writes after performing them

* Prevent throwing in destructor in close_iteration_test

* Do not use boolean attributes

* Revert "Prevent throwing in destructor in close_iteration_test"

This reverts commit 1490db6.

* Add some documentation

* Add reviewer's suggestions

Also fix a bug in group-based mode

* Expose Iteration::close to Python API

* Run simplified test for ADIOS1 in Python

Co-authored-by: Axel Huebl <axel.huebl@plasma.ninja>
  • Loading branch information
franzpoeschel and ax3l authored Jul 27, 2020
1 parent 27e47de commit 0f38504
Show file tree
Hide file tree
Showing 24 changed files with 754 additions and 36 deletions.
1 change: 1 addition & 0 deletions include/openPMD/IO/ADIOS/ADIOS1IOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ namespace openPMD
void createDataset(Writable*, Parameter< Operation::CREATE_DATASET > const&) override;
void extendDataset(Writable*, Parameter< Operation::EXTEND_DATASET > const&) override;
void openFile(Writable*, Parameter< Operation::OPEN_FILE > const&) override;
void closeFile(Writable*, Parameter< Operation::CLOSE_FILE > const&) override;
void openPath(Writable*, Parameter< Operation::OPEN_PATH > const&) override;
void openDataset(Writable*, Parameter< Operation::OPEN_DATASET > &) override;
void deleteFile(Writable*, Parameter< Operation::DELETE_FILE > const&) override;
Expand Down
3 changes: 3 additions & 0 deletions include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ class ADIOS2IOHandlerImpl
void openFile( Writable *,
Parameter< Operation::OPEN_FILE > const & ) override;

void closeFile( Writable *,
Parameter< Operation::CLOSE_FILE > const & ) override;

void openPath( Writable *,
Parameter< Operation::OPEN_PATH > const & ) override;

Expand Down
1 change: 1 addition & 0 deletions include/openPMD/IO/ADIOS/ParallelADIOS1IOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ namespace openPMD
void createDataset(Writable*, Parameter< Operation::CREATE_DATASET > const&) override;
void extendDataset(Writable*, Parameter< Operation::EXTEND_DATASET > const&) override;
void openFile(Writable*, Parameter< Operation::OPEN_FILE > const&) override;
void closeFile(Writable*, Parameter< Operation::CLOSE_FILE > const&) override;
void openPath(Writable*, Parameter< Operation::OPEN_PATH > const&) override;
void openDataset(Writable*, Parameter< Operation::OPEN_DATASET > &) override;
void deleteFile(Writable*, Parameter< Operation::DELETE_FILE > const&) override;
Expand Down
9 changes: 9 additions & 0 deletions include/openPMD/IO/AbstractIOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ class AbstractIOHandlerImpl
case O::OPEN_FILE:
openFile(i.writable, deref_dynamic_cast< Parameter< O::OPEN_FILE > >(i.parameter.get()));
break;
case O::CLOSE_FILE:
closeFile(i.writable, *dynamic_cast< Parameter< O::CLOSE_FILE >* >(i.parameter.get()));
break;
case O::OPEN_PATH:
openPath(i.writable, deref_dynamic_cast< Parameter< O::OPEN_PATH > >(i.parameter.get()));
break;
Expand Down Expand Up @@ -118,6 +121,12 @@ class AbstractIOHandlerImpl
return std::future< void >();
}

/**
* Close the file corresponding with the writable and release file handles.
* The operation should succeed in any access mode.
*/
virtual void
closeFile( Writable *, Parameter< Operation::CLOSE_FILE > const & ) = 0;
/** Create a new file in physical storage, possibly overriding an existing file.
*
* The operation should fail if m_handler->m_frontendAccess is Access::READ_ONLY.
Expand Down
1 change: 1 addition & 0 deletions include/openPMD/IO/HDF5/HDF5IOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ namespace openPMD
void createDataset(Writable*, Parameter< Operation::CREATE_DATASET > const&) override;
void extendDataset(Writable*, Parameter< Operation::EXTEND_DATASET > const&) override;
void openFile(Writable*, Parameter< Operation::OPEN_FILE > const&) override;
void closeFile(Writable*, Parameter< Operation::CLOSE_FILE > const&) override;
void openPath(Writable*, Parameter< Operation::OPEN_PATH > const&) override;
void openDataset(Writable*, Parameter< Operation::OPEN_DATASET > &) override;
void deleteFile(Writable*, Parameter< Operation::DELETE_FILE > const&) override;
Expand Down
15 changes: 15 additions & 0 deletions include/openPMD/IO/IOTask.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ OPENPMDAPI_EXPORT_ENUM_CLASS(Operation)
{
CREATE_FILE,
OPEN_FILE,
CLOSE_FILE,
DELETE_FILE,

CREATE_PATH,
Expand Down Expand Up @@ -127,6 +128,20 @@ struct OPENPMDAPI_EXPORT Parameter< Operation::OPEN_FILE > : public AbstractPara
std::string name = "";
};

template<>
struct OPENPMDAPI_EXPORT Parameter< Operation::CLOSE_FILE > : public AbstractParameter
{
Parameter() = default;
Parameter( Parameter const & ) : AbstractParameter() {}

std::unique_ptr< AbstractParameter >
clone() const override
{
return std::unique_ptr< AbstractParameter >(
new Parameter< Operation::CLOSE_FILE >( *this ) );
}
};

template<>
struct OPENPMDAPI_EXPORT Parameter< Operation::DELETE_FILE > : public AbstractParameter
{
Expand Down
5 changes: 5 additions & 0 deletions include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ namespace openPMD
Parameter< Operation::OPEN_FILE > const &
) override;

void closeFile(
Writable *,
Parameter< Operation::CLOSE_FILE > const &
) override;

void openPath(
Writable *,
Parameter< Operation::OPEN_PATH > const &
Expand Down
73 changes: 72 additions & 1 deletion include/openPMD/Iteration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,44 @@ class Iteration : public Attributable
*/
Iteration& setTimeUnitSI(double newTimeUnitSI);

/**
* @brief Close an iteration. No further (backend-propagating) accesses
* may be performed on this iteration.
* A closed iteration may not (yet) be reopened.
* @return Reference to iteration.
*/
/*
* Note: If the API is changed in future to allow reopening closed
* iterations, measures should be taken to prevent this in the streaming
* API. Currently, disallowing to reopen closed iterations satisfies
* the requirements of the streaming API.
*/
Iteration &
close( bool flush = true );

/**
* @brief Has the iteration been closed?
* A closed iteration may not (yet) be reopened.
*
* @return Whether the iteration has been closed.
*/
bool
closed() const;

/**
* @brief Has the iteration been closed by the writer?
* Background: Upon calling Iteration::close(), the openPMD API
* will add metadata to the iteration in form of an attribute,
* indicating that the iteration has indeed been closed.
* Useful mainly in streaming context when a reader inquires from
* a writer that it is done writing.
*
* @return Whether the iteration has been explicitly closed (yet) by the
* writer.
*/
bool
closedByWriter() const;

Container< Mesh > meshes;
Container< ParticleSpecies > particles; //particleSpecies?

Expand All @@ -99,8 +137,41 @@ class Iteration : public Attributable
void flush();
void read();

/**
* @brief Whether an iteration has been closed yet.
*
*/
enum class CloseStatus
{
Open, //!< Iteration has not been closed
ClosedInFrontend, /*!< Iteration has been closed, but task has not yet
been propagated to the backend */
ClosedInBackend /*!< Iteration has been closed and task has been
propagated to the backend */
};

/*
* An iteration may be logically closed in the frontend,
* but not necessarily yet in the backend.
* Will be propagated to the backend upon next flush.
* Store the current status.
*/
std::shared_ptr< CloseStatus > m_closed =
std::make_shared< CloseStatus >( CloseStatus::Open );

/*
* @brief Check recursively whether this Iteration is dirty.
* It is dirty if any attribute or dataset is read from or written to
* the backend.
*
* @return true If dirty.
* @return false Otherwise.
*/
bool
dirtyRecursive() const;

virtual void linkHierarchy(std::shared_ptr< Writable > const& w);
}; //Iteration
}; // Iteration

extern template
float
Expand Down
11 changes: 11 additions & 0 deletions include/openPMD/ParticleSpecies.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ class ParticleSpecies : public Container< Record >

void read();
void flush(std::string const &) override;

/**
* @brief Check recursively whether this ParticleSpecies is dirty.
* It is dirty if any attribute or dataset is read from or written to
* the backend.
*
* @return true If dirty.
* @return false Otherwise.
*/
bool
dirtyRecursive() const;
};

namespace traits
Expand Down
11 changes: 11 additions & 0 deletions include/openPMD/RecordComponent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,17 @@ class RecordComponent : public BaseRecordComponent
* @return Reference to this RecordComponent instance.
*/
RecordComponent& makeEmpty( Dataset d );

/**
* @brief Check recursively whether this RecordComponent is dirty.
* It is dirty if any attribute or dataset is read from or written to
* the backend.
*
* @return true If dirty.
* @return false Otherwise.
*/
bool
dirtyRecursive() const;
}; // RecordComponent


Expand Down
6 changes: 4 additions & 2 deletions include/openPMD/Series.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,10 @@ class Series : public Attributable
std::unique_ptr< ParsedInput > parseInput(std::string);
void init(std::shared_ptr< AbstractIOHandler >, std::unique_ptr< ParsedInput >);
void initDefaults();
void flushFileBased();
void flushGroupBased();
template< typename IterationsContainer >
void flushFileBased( IterationsContainer && iterationsToFlush );
template< typename IterationsContainer >
void flushGroupBased( IterationsContainer && iterationsToFlush );
void flushMeshesPath();
void flushParticlesPath();
void readFileBased();
Expand Down
33 changes: 31 additions & 2 deletions include/openPMD/backend/BaseRecord.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,18 @@ class BaseRecord : public Container< T_elem >
void flush(std::string const&) final;
virtual void flush_impl(std::string const&) = 0;
virtual void read() = 0;
}; //BaseRecord

/**
* @brief Check recursively whether this BaseRecord is dirty.
* It is dirty if any attribute or dataset is read from or written to
* the backend.
*
* @return true If dirty.
* @return false Otherwise.
*/
bool
dirtyRecursive() const;
}; // BaseRecord


template< typename T_elem >
Expand Down Expand Up @@ -295,4 +306,22 @@ BaseRecord< T_elem >::flush(std::string const& name)

this->flush_impl(name);
}
} // openPMD

template< typename T_elem >
inline bool
BaseRecord< T_elem >::dirtyRecursive() const
{
if( Attributable::dirty )
{
return true;
}
for( auto const & pair : *this )
{
if( pair.second.dirtyRecursive() )
{
return true;
}
}
return false;
}
} // namespace openPMD
3 changes: 3 additions & 0 deletions src/IO/ADIOS/ADIOS1IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ ADIOS1IOHandlerImpl::flush()
case O::OPEN_DATASET:
openDataset(i.writable, deref_dynamic_cast< Parameter< O::OPEN_DATASET > >(i.parameter.get()));
break;
case O::CLOSE_FILE:
closeFile(i.writable, *dynamic_cast< Parameter< O::CLOSE_FILE >* >(i.parameter.get()));
break;
case O::DELETE_FILE:
deleteFile(i.writable, deref_dynamic_cast< Parameter< O::DELETE_FILE > >(i.parameter.get()));
break;
Expand Down
18 changes: 18 additions & 0 deletions src/IO/ADIOS/ADIOS2IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,24 @@ void ADIOS2IOHandlerImpl::openFile(
writable->abstractFilePosition = std::make_shared< ADIOS2FilePosition >( );
}

void
ADIOS2IOHandlerImpl::closeFile(
Writable * writable,
Parameter< Operation::CLOSE_FILE > const & )
{
auto fileIterator = m_files.find( writable );
if ( fileIterator != m_files.end( ) )
{
fileIterator->second.invalidate( );
auto it = m_fileData.find( fileIterator->second );
if ( it != m_fileData.end( ) )
{
it->second->flush( );
m_fileData.erase( it );
}
}
}

void ADIOS2IOHandlerImpl::openPath(
Writable * writable, const Parameter< Operation::OPEN_PATH > & parameters )
{
Expand Down
64 changes: 62 additions & 2 deletions src/IO/ADIOS/CommonADIOS1IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -540,8 +540,68 @@ CommonADIOS1IOHandlerImpl::openFile(Writable* writable,
}

void
CommonADIOS1IOHandlerImpl::openPath(Writable* writable,
Parameter< Operation::OPEN_PATH > const& parameters)
CommonADIOS1IOHandlerImpl::closeFile(
Writable * writable,
Parameter< Operation::CLOSE_FILE > const & )
{
auto myFile = m_filePaths.find( writable );
if( myFile == m_filePaths.end() )
{
return;
}

// finish write operations
auto myGroup = m_groups.find( myFile->second );
if( myGroup != m_groups.end() )
{
auto attributeWrites = m_attributeWrites.find( myGroup->second );
if( this->m_handler->m_backendAccess != Access::READ_ONLY &&
attributeWrites != m_attributeWrites.end() )
{
for( auto & att : attributeWrites->second )
{
flush_attribute( myGroup->second, att.first, att.second );
}
m_attributeWrites.erase( attributeWrites );
}
m_groups.erase( myGroup );
}

auto handle_write = m_openWriteFileHandles.find( myFile->second );
if( handle_write != m_openWriteFileHandles.end() )
{
close( handle_write->second );
m_openWriteFileHandles.erase( handle_write );
}

// finish read operations
auto handle_read = m_openReadFileHandles.find( myFile->second );
if( handle_read != m_openReadFileHandles.end() )
{
auto scheduled = m_scheduledReads.find( handle_read->second );
if( scheduled != m_scheduledReads.end() )
{
auto status = adios_perform_reads( scheduled->first, 1 );
VERIFY(
status == err_no_error,
"[ADIOS1] Internal error: Failed to perform ADIOS reads during "
"dataset reading" );

for( auto & sel : scheduled->second )
adios_selection_delete( sel );
m_scheduledReads.erase( scheduled );
}
close( handle_read->second );
m_openReadFileHandles.erase( handle_read );
}
m_existsOnDisk.erase( myFile->second );
m_filePaths.erase( myFile );
}

void
CommonADIOS1IOHandlerImpl::openPath(
Writable * writable,
Parameter< Operation::OPEN_PATH > const & parameters )
{
/* Sanitize path */
std::string path = parameters.path;
Expand Down
Loading

0 comments on commit 0f38504

Please sign in to comment.