diff --git a/include/openPMD/IO/ADIOS/ADIOS1IOHandlerImpl.hpp b/include/openPMD/IO/ADIOS/ADIOS1IOHandlerImpl.hpp
index 9cc07b923a..068b451860 100644
--- a/include/openPMD/IO/ADIOS/ADIOS1IOHandlerImpl.hpp
+++ b/include/openPMD/IO/ADIOS/ADIOS1IOHandlerImpl.hpp
@@ -56,6 +56,7 @@ namespace openPMD
void createDataset(Writable*, Parameter< Operation::CREATE_DATASET > const&) override;
void extendDataset(Writable*, Parameter< Operation::EXTEND_DATASET > const&) override;
void openFile(Writable*, Parameter< Operation::OPEN_FILE > const&) override;
+ void closeFile(Writable*, Parameter< Operation::CLOSE_FILE > const&) override;
void openPath(Writable*, Parameter< Operation::OPEN_PATH > const&) override;
void openDataset(Writable*, Parameter< Operation::OPEN_DATASET > &) override;
void deleteFile(Writable*, Parameter< Operation::DELETE_FILE > const&) override;
diff --git a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp
index 539514c032..ccaf7afe26 100644
--- a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp
+++ b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp
@@ -128,6 +128,9 @@ class ADIOS2IOHandlerImpl
void openFile( Writable *,
Parameter< Operation::OPEN_FILE > const & ) override;
+ void closeFile( Writable *,
+ Parameter< Operation::CLOSE_FILE > const & ) override;
+
void openPath( Writable *,
Parameter< Operation::OPEN_PATH > const & ) override;
diff --git a/include/openPMD/IO/ADIOS/ParallelADIOS1IOHandlerImpl.hpp b/include/openPMD/IO/ADIOS/ParallelADIOS1IOHandlerImpl.hpp
index 1774ebdd82..b084c48026 100644
--- a/include/openPMD/IO/ADIOS/ParallelADIOS1IOHandlerImpl.hpp
+++ b/include/openPMD/IO/ADIOS/ParallelADIOS1IOHandlerImpl.hpp
@@ -57,6 +57,7 @@ namespace openPMD
void createDataset(Writable*, Parameter< Operation::CREATE_DATASET > const&) override;
void extendDataset(Writable*, Parameter< Operation::EXTEND_DATASET > const&) override;
void openFile(Writable*, Parameter< Operation::OPEN_FILE > const&) override;
+ void closeFile(Writable*, Parameter< Operation::CLOSE_FILE > const&) override;
void openPath(Writable*, Parameter< Operation::OPEN_PATH > const&) override;
void openDataset(Writable*, Parameter< Operation::OPEN_DATASET > &) override;
void deleteFile(Writable*, Parameter< Operation::DELETE_FILE > const&) override;
diff --git a/include/openPMD/IO/AbstractIOHandlerImpl.hpp b/include/openPMD/IO/AbstractIOHandlerImpl.hpp
index ded85bfa25..7de511c172 100644
--- a/include/openPMD/IO/AbstractIOHandlerImpl.hpp
+++ b/include/openPMD/IO/AbstractIOHandlerImpl.hpp
@@ -68,6 +68,9 @@ class AbstractIOHandlerImpl
case O::OPEN_FILE:
openFile(i.writable, deref_dynamic_cast< Parameter< O::OPEN_FILE > >(i.parameter.get()));
break;
+ case O::CLOSE_FILE:
+ closeFile(i.writable, *dynamic_cast< Parameter< O::CLOSE_FILE >* >(i.parameter.get()));
+ break;
case O::OPEN_PATH:
openPath(i.writable, deref_dynamic_cast< Parameter< O::OPEN_PATH > >(i.parameter.get()));
break;
@@ -118,6 +121,12 @@ class AbstractIOHandlerImpl
return std::future< void >();
}
+ /**
+ * Close the file corresponding with the writable and release file handles.
+ * The operation should succeed in any access mode.
+ */
+ virtual void
+ closeFile( Writable *, Parameter< Operation::CLOSE_FILE > const & ) = 0;
/** Create a new file in physical storage, possibly overriding an existing file.
*
* The operation should fail if m_handler->m_frontendAccess is Access::READ_ONLY.
diff --git a/include/openPMD/IO/HDF5/HDF5IOHandlerImpl.hpp b/include/openPMD/IO/HDF5/HDF5IOHandlerImpl.hpp
index 396ff85e8a..94b44773da 100644
--- a/include/openPMD/IO/HDF5/HDF5IOHandlerImpl.hpp
+++ b/include/openPMD/IO/HDF5/HDF5IOHandlerImpl.hpp
@@ -43,6 +43,7 @@ namespace openPMD
void createDataset(Writable*, Parameter< Operation::CREATE_DATASET > const&) override;
void extendDataset(Writable*, Parameter< Operation::EXTEND_DATASET > const&) override;
void openFile(Writable*, Parameter< Operation::OPEN_FILE > const&) override;
+ void closeFile(Writable*, Parameter< Operation::CLOSE_FILE > const&) override;
void openPath(Writable*, Parameter< Operation::OPEN_PATH > const&) override;
void openDataset(Writable*, Parameter< Operation::OPEN_DATASET > &) override;
void deleteFile(Writable*, Parameter< Operation::DELETE_FILE > const&) override;
diff --git a/include/openPMD/IO/IOTask.hpp b/include/openPMD/IO/IOTask.hpp
index 7ceaff95c2..3c006b5250 100644
--- a/include/openPMD/IO/IOTask.hpp
+++ b/include/openPMD/IO/IOTask.hpp
@@ -46,6 +46,7 @@ OPENPMDAPI_EXPORT_ENUM_CLASS(Operation)
{
CREATE_FILE,
OPEN_FILE,
+ CLOSE_FILE,
DELETE_FILE,
CREATE_PATH,
@@ -127,6 +128,20 @@ struct OPENPMDAPI_EXPORT Parameter< Operation::OPEN_FILE > : public AbstractPara
std::string name = "";
};
+template<>
+struct OPENPMDAPI_EXPORT Parameter< Operation::CLOSE_FILE > : public AbstractParameter
+{
+ Parameter() = default;
+ Parameter( Parameter const & ) : AbstractParameter() {}
+
+ std::unique_ptr< AbstractParameter >
+ clone() const override
+ {
+ return std::unique_ptr< AbstractParameter >(
+ new Parameter< Operation::CLOSE_FILE >( *this ) );
+ }
+};
+
template<>
struct OPENPMDAPI_EXPORT Parameter< Operation::DELETE_FILE > : public AbstractParameter
{
diff --git a/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp b/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp
index 3804d39558..35ed7c8ef8 100644
--- a/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp
+++ b/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp
@@ -184,6 +184,11 @@ namespace openPMD
Parameter< Operation::OPEN_FILE > const &
) override;
+ void closeFile(
+ Writable *,
+ Parameter< Operation::CLOSE_FILE > const &
+ ) override;
+
void openPath(
Writable *,
Parameter< Operation::OPEN_PATH > const &
diff --git a/include/openPMD/Iteration.hpp b/include/openPMD/Iteration.hpp
index 6699ed1603..7f02b2ad5d 100644
--- a/include/openPMD/Iteration.hpp
+++ b/include/openPMD/Iteration.hpp
@@ -87,6 +87,44 @@ class Iteration : public Attributable
*/
Iteration& setTimeUnitSI(double newTimeUnitSI);
+ /**
+ * @brief Close an iteration. No further (backend-propagating) accesses
+ * may be performed on this iteration.
+ * A closed iteration may not (yet) be reopened.
+ * @return Reference to iteration.
+ */
+ /*
+ * Note: If the API is changed in future to allow reopening closed
+ * iterations, measures should be taken to prevent this in the streaming
+ * API. Currently, disallowing to reopen closed iterations satisfies
+ * the requirements of the streaming API.
+ */
+ Iteration &
+ close( bool flush = true );
+
+ /**
+ * @brief Has the iteration been closed?
+ * A closed iteration may not (yet) be reopened.
+ *
+ * @return Whether the iteration has been closed.
+ */
+ bool
+ closed() const;
+
+ /**
+ * @brief Has the iteration been closed by the writer?
+ * Background: Upon calling Iteration::close(), the openPMD API
+ * will add metadata to the iteration in form of an attribute,
+ * indicating that the iteration has indeed been closed.
+ * Useful mainly in streaming context when a reader inquires from
+ * a writer that it is done writing.
+ *
+ * @return Whether the iteration has been explicitly closed (yet) by the
+ * writer.
+ */
+ bool
+ closedByWriter() const;
+
Container< Mesh > meshes;
Container< ParticleSpecies > particles; //particleSpecies?
@@ -99,8 +137,41 @@ class Iteration : public Attributable
void flush();
void read();
+ /**
+ * @brief Whether an iteration has been closed yet.
+ *
+ */
+ enum class CloseStatus
+ {
+ Open, //!< Iteration has not been closed
+ ClosedInFrontend, /*!< Iteration has been closed, but task has not yet
+ been propagated to the backend */
+ ClosedInBackend /*!< Iteration has been closed and task has been
+ propagated to the backend */
+ };
+
+ /*
+ * An iteration may be logically closed in the frontend,
+ * but not necessarily yet in the backend.
+ * Will be propagated to the backend upon next flush.
+ * Store the current status.
+ */
+ std::shared_ptr< CloseStatus > m_closed =
+ std::make_shared< CloseStatus >( CloseStatus::Open );
+
+ /*
+ * @brief Check recursively whether this Iteration is dirty.
+ * It is dirty if any attribute or dataset is read from or written to
+ * the backend.
+ *
+ * @return true If dirty.
+ * @return false Otherwise.
+ */
+ bool
+ dirtyRecursive() const;
+
virtual void linkHierarchy(std::shared_ptr< Writable > const& w);
-}; //Iteration
+}; // Iteration
extern template
float
diff --git a/include/openPMD/ParticleSpecies.hpp b/include/openPMD/ParticleSpecies.hpp
index 7868453046..51a8302407 100644
--- a/include/openPMD/ParticleSpecies.hpp
+++ b/include/openPMD/ParticleSpecies.hpp
@@ -45,6 +45,17 @@ class ParticleSpecies : public Container< Record >
void read();
void flush(std::string const &) override;
+
+ /**
+ * @brief Check recursively whether this ParticleSpecies is dirty.
+ * It is dirty if any attribute or dataset is read from or written to
+ * the backend.
+ *
+ * @return true If dirty.
+ * @return false Otherwise.
+ */
+ bool
+ dirtyRecursive() const;
};
namespace traits
diff --git a/include/openPMD/RecordComponent.hpp b/include/openPMD/RecordComponent.hpp
index 90867143f4..80d8173866 100644
--- a/include/openPMD/RecordComponent.hpp
+++ b/include/openPMD/RecordComponent.hpp
@@ -198,6 +198,17 @@ class RecordComponent : public BaseRecordComponent
* @return Reference to this RecordComponent instance.
*/
RecordComponent& makeEmpty( Dataset d );
+
+ /**
+ * @brief Check recursively whether this RecordComponent is dirty.
+ * It is dirty if any attribute or dataset is read from or written to
+ * the backend.
+ *
+ * @return true If dirty.
+ * @return false Otherwise.
+ */
+ bool
+ dirtyRecursive() const;
}; // RecordComponent
diff --git a/include/openPMD/Series.hpp b/include/openPMD/Series.hpp
index b2a3854726..d56ce377e8 100644
--- a/include/openPMD/Series.hpp
+++ b/include/openPMD/Series.hpp
@@ -262,8 +262,10 @@ class Series : public Attributable
std::unique_ptr< ParsedInput > parseInput(std::string);
void init(std::shared_ptr< AbstractIOHandler >, std::unique_ptr< ParsedInput >);
void initDefaults();
- void flushFileBased();
- void flushGroupBased();
+ template< typename IterationsContainer >
+ void flushFileBased( IterationsContainer && iterationsToFlush );
+ template< typename IterationsContainer >
+ void flushGroupBased( IterationsContainer && iterationsToFlush );
void flushMeshesPath();
void flushParticlesPath();
void readFileBased();
diff --git a/include/openPMD/backend/BaseRecord.hpp b/include/openPMD/backend/BaseRecord.hpp
index 42ed18b23c..d7afb2516e 100644
--- a/include/openPMD/backend/BaseRecord.hpp
+++ b/include/openPMD/backend/BaseRecord.hpp
@@ -97,7 +97,18 @@ class BaseRecord : public Container< T_elem >
void flush(std::string const&) final;
virtual void flush_impl(std::string const&) = 0;
virtual void read() = 0;
-}; //BaseRecord
+
+ /**
+ * @brief Check recursively whether this BaseRecord is dirty.
+ * It is dirty if any attribute or dataset is read from or written to
+ * the backend.
+ *
+ * @return true If dirty.
+ * @return false Otherwise.
+ */
+ bool
+ dirtyRecursive() const;
+}; // BaseRecord
template< typename T_elem >
@@ -295,4 +306,22 @@ BaseRecord< T_elem >::flush(std::string const& name)
this->flush_impl(name);
}
-} // openPMD
+
+template< typename T_elem >
+inline bool
+BaseRecord< T_elem >::dirtyRecursive() const
+{
+ if( Attributable::dirty )
+ {
+ return true;
+ }
+ for( auto const & pair : *this )
+ {
+ if( pair.second.dirtyRecursive() )
+ {
+ return true;
+ }
+ }
+ return false;
+}
+} // namespace openPMD
diff --git a/src/IO/ADIOS/ADIOS1IOHandler.cpp b/src/IO/ADIOS/ADIOS1IOHandler.cpp
index 47d3848cd9..0622f3bd62 100644
--- a/src/IO/ADIOS/ADIOS1IOHandler.cpp
+++ b/src/IO/ADIOS/ADIOS1IOHandler.cpp
@@ -135,6 +135,9 @@ ADIOS1IOHandlerImpl::flush()
case O::OPEN_DATASET:
openDataset(i.writable, deref_dynamic_cast< Parameter< O::OPEN_DATASET > >(i.parameter.get()));
break;
+ case O::CLOSE_FILE:
+ closeFile(i.writable, *dynamic_cast< Parameter< O::CLOSE_FILE >* >(i.parameter.get()));
+ break;
case O::DELETE_FILE:
deleteFile(i.writable, deref_dynamic_cast< Parameter< O::DELETE_FILE > >(i.parameter.get()));
break;
diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp
index 40c16b1d3e..909be4321e 100644
--- a/src/IO/ADIOS/ADIOS2IOHandler.cpp
+++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp
@@ -324,6 +324,24 @@ void ADIOS2IOHandlerImpl::openFile(
writable->abstractFilePosition = std::make_shared< ADIOS2FilePosition >( );
}
+void
+ADIOS2IOHandlerImpl::closeFile(
+ Writable * writable,
+ Parameter< Operation::CLOSE_FILE > const & )
+{
+ auto fileIterator = m_files.find( writable );
+ if ( fileIterator != m_files.end( ) )
+ {
+ fileIterator->second.invalidate( );
+ auto it = m_fileData.find( fileIterator->second );
+ if ( it != m_fileData.end( ) )
+ {
+ it->second->flush( );
+ m_fileData.erase( it );
+ }
+ }
+}
+
void ADIOS2IOHandlerImpl::openPath(
Writable * writable, const Parameter< Operation::OPEN_PATH > & parameters )
{
diff --git a/src/IO/ADIOS/CommonADIOS1IOHandler.cpp b/src/IO/ADIOS/CommonADIOS1IOHandler.cpp
index 568519d594..fdc7c13268 100644
--- a/src/IO/ADIOS/CommonADIOS1IOHandler.cpp
+++ b/src/IO/ADIOS/CommonADIOS1IOHandler.cpp
@@ -540,8 +540,68 @@ CommonADIOS1IOHandlerImpl::openFile(Writable* writable,
}
void
-CommonADIOS1IOHandlerImpl::openPath(Writable* writable,
- Parameter< Operation::OPEN_PATH > const& parameters)
+CommonADIOS1IOHandlerImpl::closeFile(
+ Writable * writable,
+ Parameter< Operation::CLOSE_FILE > const & )
+{
+ auto myFile = m_filePaths.find( writable );
+ if( myFile == m_filePaths.end() )
+ {
+ return;
+ }
+
+ // finish write operations
+ auto myGroup = m_groups.find( myFile->second );
+ if( myGroup != m_groups.end() )
+ {
+ auto attributeWrites = m_attributeWrites.find( myGroup->second );
+ if( this->m_handler->m_backendAccess != Access::READ_ONLY &&
+ attributeWrites != m_attributeWrites.end() )
+ {
+ for( auto & att : attributeWrites->second )
+ {
+ flush_attribute( myGroup->second, att.first, att.second );
+ }
+ m_attributeWrites.erase( attributeWrites );
+ }
+ m_groups.erase( myGroup );
+ }
+
+ auto handle_write = m_openWriteFileHandles.find( myFile->second );
+ if( handle_write != m_openWriteFileHandles.end() )
+ {
+ close( handle_write->second );
+ m_openWriteFileHandles.erase( handle_write );
+ }
+
+ // finish read operations
+ auto handle_read = m_openReadFileHandles.find( myFile->second );
+ if( handle_read != m_openReadFileHandles.end() )
+ {
+ auto scheduled = m_scheduledReads.find( handle_read->second );
+ if( scheduled != m_scheduledReads.end() )
+ {
+ auto status = adios_perform_reads( scheduled->first, 1 );
+ VERIFY(
+ status == err_no_error,
+ "[ADIOS1] Internal error: Failed to perform ADIOS reads during "
+ "dataset reading" );
+
+ for( auto & sel : scheduled->second )
+ adios_selection_delete( sel );
+ m_scheduledReads.erase( scheduled );
+ }
+ close( handle_read->second );
+ m_openReadFileHandles.erase( handle_read );
+ }
+ m_existsOnDisk.erase( myFile->second );
+ m_filePaths.erase( myFile );
+}
+
+void
+CommonADIOS1IOHandlerImpl::openPath(
+ Writable * writable,
+ Parameter< Operation::OPEN_PATH > const & parameters )
{
/* Sanitize path */
std::string path = parameters.path;
diff --git a/src/IO/HDF5/HDF5IOHandler.cpp b/src/IO/HDF5/HDF5IOHandler.cpp
index 5404b6b4bd..fef2ce4ecb 100644
--- a/src/IO/HDF5/HDF5IOHandler.cpp
+++ b/src/IO/HDF5/HDF5IOHandler.cpp
@@ -399,8 +399,48 @@ HDF5IOHandlerImpl::openFile(Writable* writable,
}
void
-HDF5IOHandlerImpl::openPath(Writable* writable,
- Parameter< Operation::OPEN_PATH > const& parameters)
+HDF5IOHandlerImpl::closeFile(
+ Writable * writable,
+ Parameter< Operation::CLOSE_FILE > const & )
+{
+ auto fileID_it = m_fileIDs.find( writable );
+ if( fileID_it == m_fileIDs.end() )
+ {
+ throw std::runtime_error(
+ "[HDF5] Trying to close a file that is not "
+ "present in the backend" );
+ return;
+ }
+ hid_t fileID = fileID_it->second;
+ H5Fclose( fileID );
+ m_openFileIDs.erase( fileID );
+ m_fileIDs.erase( fileID_it );
+
+ /* std::unordered_map::erase:
+ * References and iterators to the erased elements are invalidated. Other
+ * iterators and references are not invalidated.
+ */
+ using iter_t = decltype( m_fileNamesWithID )::iterator;
+ std::vector< iter_t > deleteMe;
+ deleteMe.reserve( 1 ); // should normally suffice
+ for( auto it = m_fileNamesWithID.begin(); it != m_fileNamesWithID.end();
+ ++it )
+ {
+ if( it->second == fileID )
+ {
+ deleteMe.push_back( it );
+ }
+ }
+ for( auto iterator : deleteMe )
+ {
+ m_fileNamesWithID.erase( iterator );
+ }
+}
+
+void
+HDF5IOHandlerImpl::openPath(
+ Writable * writable,
+ Parameter< Operation::OPEN_PATH > const & parameters )
{
auto res = m_fileIDs.find(writable->parent);
hid_t node_id, path_id;
diff --git a/src/IO/JSON/JSONIOHandlerImpl.cpp b/src/IO/JSON/JSONIOHandlerImpl.cpp
index ecd40a8eff..2497ff9508 100644
--- a/src/IO/JSON/JSONIOHandlerImpl.cpp
+++ b/src/IO/JSON/JSONIOHandlerImpl.cpp
@@ -306,6 +306,21 @@ namespace openPMD
}
+ void JSONIOHandlerImpl::closeFile(
+ Writable * writable,
+ Parameter< Operation::CLOSE_FILE > const &
+ )
+ {
+ auto fileIterator = m_files.find( writable );
+ if ( fileIterator != m_files.end( ) )
+ {
+ putJsonContents( fileIterator->second );
+ fileIterator->second.invalidate( );
+ m_files.erase( fileIterator );
+ }
+ }
+
+
void JSONIOHandlerImpl::openPath(
Writable * writable,
Parameter< Operation::OPEN_PATH > const & parameters
diff --git a/src/Iteration.cpp b/src/Iteration.cpp
index 4668df9277..8e02d1a0cf 100644
--- a/src/Iteration.cpp
+++ b/src/Iteration.cpp
@@ -19,12 +19,16 @@
* If not, see .
*/
#include "openPMD/Iteration.hpp"
+
#include "openPMD/Dataset.hpp"
#include "openPMD/Datatype.hpp"
#include "openPMD/Series.hpp"
+#include "openPMD/auxiliary/DerefDynamicCast.hpp"
#include "openPMD/auxiliary/StringManip.hpp"
#include "openPMD/backend/Writable.hpp"
+#include
+
namespace openPMD
{
@@ -40,7 +44,8 @@ Iteration::Iteration()
Iteration::Iteration(Iteration const& i)
: Attributable{i},
meshes{i.meshes},
- particles{i.particles}
+ particles{i.particles},
+ m_closed{i.m_closed}
{
IOHandler = i.IOHandler;
parent = i.parent;
@@ -57,6 +62,7 @@ Iteration& Iteration::operator=(Iteration const& i)
particles = i.particles;
IOHandler = i.IOHandler;
parent = i.parent;
+ m_closed = i.m_closed;
meshes.IOHandler = IOHandler;
meshes.parent = this->m_writable.get();
particles.IOHandler = IOHandler;
@@ -97,6 +103,72 @@ Iteration::setTimeUnitSI(double newTimeUnitSI)
return *this;
}
+Iteration &
+Iteration::close( bool _flush )
+{
+ using bool_type = unsigned char;
+ if( this->IOHandler->m_frontendAccess != Access::READ_ONLY )
+ {
+ setAttribute< bool_type >( "closed", 1u );
+ }
+ *m_closed = CloseStatus::ClosedInFrontend;
+ if( _flush )
+ {
+ Series * s = &auxiliary::deref_dynamic_cast< Series >(
+ parent->attributable->parent->attributable );
+ // figure out my iteration number
+ uint64_t index;
+ bool found = false;
+ for( auto const & pair : s->iterations )
+ {
+ if( pair.second.m_writable.get() == this->m_writable.get() )
+ {
+ found = true;
+ index = pair.first;
+ break;
+ }
+ }
+ if( !found )
+ {
+ throw std::runtime_error(
+ "[Iteration::close] Iteration not found in Series." );
+ }
+ std::map< uint64_t, Iteration > flushOnlyThisIteration{
+ { index, *this } };
+ switch( *s->m_iterationEncoding )
+ {
+ using IE = IterationEncoding;
+ case IE::fileBased:
+ s->flushFileBased( flushOnlyThisIteration );
+ break;
+ case IE::groupBased:
+ s->flushGroupBased( flushOnlyThisIteration );
+ break;
+ }
+ }
+ return *this;
+}
+
+bool
+Iteration::closed() const
+{
+ return *m_closed != CloseStatus::Open;
+}
+
+bool
+Iteration::closedByWriter() const
+{
+ using bool_type = unsigned char;
+ if( containsAttribute( "closed" ) )
+ {
+ return getAttribute( "closed" ).get< bool_type >() == 0u ? false : true;
+ }
+ else
+ {
+ return false;
+ }
+}
+
void
Iteration::flushFileBased(std::string const& filename, uint64_t i)
{
@@ -359,6 +431,30 @@ Iteration::read()
readAttributes();
}
+bool
+Iteration::dirtyRecursive() const
+{
+ if( dirty )
+ {
+ return true;
+ }
+ for( auto const & pair : particles )
+ {
+ if( pair.second.dirtyRecursive() )
+ {
+ return true;
+ }
+ }
+ for( auto const & pair : meshes )
+ {
+ if( pair.second.dirtyRecursive() )
+ {
+ return true;
+ }
+ }
+ return false;
+}
+
void
Iteration::linkHierarchy(std::shared_ptr< Writable > const& w)
{
@@ -367,16 +463,15 @@ Iteration::linkHierarchy(std::shared_ptr< Writable > const& w)
particles.linkHierarchy(m_writable);
}
+template float
+Iteration::time< float >() const;
+template double
+Iteration::time< double >() const;
+template long double
+Iteration::time< long double >() const;
-template
-float Iteration::time< float >() const;
-template
-double Iteration::time< double >() const;
-template
-long double Iteration::time< long double >() const;
-
-template
-float Iteration::dt< float >() const;
+template float
+Iteration::dt< float >() const;
template
double Iteration::dt< double >() const;
template
diff --git a/src/ParticleSpecies.cpp b/src/ParticleSpecies.cpp
index 291a9de24a..8ce06bb82a 100644
--- a/src/ParticleSpecies.cpp
+++ b/src/ParticleSpecies.cpp
@@ -143,4 +143,21 @@ ParticleSpecies::flush(std::string const& path)
}
}
}
-} // openPMD
+
+bool
+ParticleSpecies::dirtyRecursive() const
+{
+ if( dirty )
+ {
+ return true;
+ }
+ for( auto const & pair : *this )
+ {
+ if( pair.second.dirtyRecursive() )
+ {
+ return true;
+ }
+ }
+ return false;
+}
+} // namespace openPMD
diff --git a/src/RecordComponent.cpp b/src/RecordComponent.cpp
index 8580268257..1ee776a555 100644
--- a/src/RecordComponent.cpp
+++ b/src/RecordComponent.cpp
@@ -265,5 +265,14 @@ RecordComponent::readBase()
readAttributes();
}
-} // openPMD
+bool
+RecordComponent::dirtyRecursive() const
+{
+ if( Attributable::dirty )
+ {
+ return true;
+ }
+ return !m_chunks->empty();
+}
+} // namespace openPMD
diff --git a/src/Series.cpp b/src/Series.cpp
index f7a70508a3..c4967849fb 100644
--- a/src/Series.cpp
+++ b/src/Series.cpp
@@ -369,10 +369,10 @@ Series::flush()
{
using IE = IterationEncoding;
case IE::fileBased:
- flushFileBased();
+ flushFileBased( iterations );
break;
case IE::groupBased:
- flushGroupBased();
+ flushGroupBased( iterations );
break;
}
@@ -550,22 +550,66 @@ Series::initDefaults()
// TODO Potentially warn on flush if software and author are not user-provided (defaulted)
}
+template< typename IterationsContainer >
void
-Series::flushFileBased()
+Series::flushFileBased( IterationsContainer && iterationsToFlush )
{
- if( iterations.empty() )
- throw std::runtime_error("fileBased output can not be written with no iterations.");
+ if( iterationsToFlush.empty() )
+ throw std::runtime_error(
+ "fileBased output can not be written with no iterations." );
- if(IOHandler->m_frontendAccess == Access::READ_ONLY )
- for( auto& i : iterations )
+ if( IOHandler->m_frontendAccess == Access::READ_ONLY )
+ for( auto & i : iterationsToFlush )
+ {
+ if( *i.second.m_closed == Iteration::CloseStatus::ClosedInBackend )
+ {
+ // file corresponding with the iteration has previously been
+ // closed and fully flushed
+ // verify that there have been no further accesses
+ if( i.second.dirtyRecursive() )
+ {
+ throw std::runtime_error(
+ "[Series] Detected illegal access to iteration that "
+ "has been closed previously." );
+ }
+ continue;
+ }
i.second.flush();
+ if( *i.second.m_closed == Iteration::CloseStatus::ClosedInFrontend )
+ {
+ Parameter< Operation::CLOSE_FILE > fClose;
+ IOHandler->enqueue( IOTask( &i.second, std::move( fClose ) ) );
+ *i.second.m_closed = Iteration::CloseStatus::ClosedInBackend;
+ }
+ IOHandler->flush();
+ }
else
{
bool allDirty = dirty;
- for( auto& i : iterations )
+ for( auto & i : iterationsToFlush )
{
+ if( *i.second.m_closed == Iteration::CloseStatus::ClosedInBackend )
+ {
+ // file corresponding with the iteration has previously been
+ // closed and fully flushed
+ // verify that there have been no further accesses
+ if (!i.second.written)
+ {
+ throw std::runtime_error(
+ "[Series] Closed iteration has not been written. This "
+ "is an internal error.");
+ }
+ if( i.second.dirtyRecursive() )
+ {
+ throw std::runtime_error(
+ "[Series] Detected illegal access to iteration that "
+ "has been closed previously." );
+ }
+ continue;
+ }
/* as there is only one series,
- * emulate the file belonging to each iteration as not yet written */
+ * emulate the file belonging to each iteration as not yet written
+ */
written = false;
iterations.written = false;
@@ -580,6 +624,13 @@ Series::flushFileBased()
flushAttributes();
+ if( *i.second.m_closed == Iteration::CloseStatus::ClosedInFrontend )
+ {
+ Parameter< Operation::CLOSE_FILE > fClose;
+ IOHandler->enqueue( IOTask( &i.second, std::move( fClose ) ) );
+ *i.second.m_closed = Iteration::CloseStatus::ClosedInBackend;
+ }
+
IOHandler->flush();
/* reset the dirty bit for every iteration (i.e. file)
@@ -590,12 +641,39 @@ Series::flushFileBased()
}
}
+template void
+Series::flushFileBased< std::map< uint64_t, Iteration > & >(
+ std::map< uint64_t, Iteration > & );
+
+template< typename IterationsContainer >
void
-Series::flushGroupBased()
+Series::flushGroupBased( IterationsContainer && iterationsToFlush )
{
- if(IOHandler->m_frontendAccess == Access::READ_ONLY )
- for( auto& i : iterations )
+ if( IOHandler->m_frontendAccess == Access::READ_ONLY )
+ for( auto & i : iterationsToFlush )
+ {
+ if( *i.second.m_closed == Iteration::CloseStatus::ClosedInBackend )
+ {
+ // file corresponding with the iteration has previously been
+ // closed and fully flushed
+ // verify that there have been no further accesses
+ if( i.second.dirtyRecursive() )
+ {
+ throw std::runtime_error(
+ "[Series] Illegal access to iteration " +
+ std::to_string( i.first ) +
+ " that has been closed previously." );
+ }
+ continue;
+ }
i.second.flush();
+ if( *i.second.m_closed == Iteration::CloseStatus::ClosedInFrontend )
+ {
+ // the iteration has no dedicated file in group-based mode
+ *i.second.m_closed = Iteration::CloseStatus::ClosedInBackend;
+ }
+ IOHandler->flush();
+ }
else
{
if( !written )
@@ -605,22 +683,52 @@ Series::flushGroupBased()
IOHandler->enqueue(IOTask(this, fCreate));
}
- iterations.flush(auxiliary::replace_first(basePath(), "%T/", ""));
+ iterations.flush( auxiliary::replace_first( basePath(), "%T/", "" ) );
- for( auto& i : iterations )
+ for( auto & i : iterationsToFlush )
{
+ if( *i.second.m_closed == Iteration::CloseStatus::ClosedInBackend )
+ {
+ // file corresponding with the iteration has previously been
+ // closed and fully flushed
+ // verify that there have been no further accesses
+ if (!i.second.written)
+ {
+ throw std::runtime_error(
+ "[Series] Closed iteration has not been written. This "
+ "is an internal error.");
+ }
+ if( i.second.dirtyRecursive() )
+ {
+ throw std::runtime_error(
+ "[Series] Illegal access to iteration " +
+ std::to_string( i.first ) +
+ " that has been closed previously." );
+ }
+ continue;
+ }
if( !i.second.written )
{
i.second.m_writable->parent = getWritable(&iterations);
i.second.parent = getWritable(&iterations);
}
i.second.flushGroupBased(i.first);
+ if( *i.second.m_closed == Iteration::CloseStatus::ClosedInFrontend )
+ {
+ // the iteration has no dedicated file in group-based mode
+ *i.second.m_closed = Iteration::CloseStatus::ClosedInBackend;
+ }
}
flushAttributes();
+ IOHandler->flush();
}
}
+template void
+Series::flushGroupBased< std::map< uint64_t, Iteration > & >(
+ std::map< uint64_t, Iteration > & );
+
void
Series::flushMeshesPath()
{
diff --git a/src/binding/python/Iteration.cpp b/src/binding/python/Iteration.cpp
index 822cf6022b..b1f50980b5 100644
--- a/src/binding/python/Iteration.cpp
+++ b/src/binding/python/Iteration.cpp
@@ -53,6 +53,7 @@ void init_Iteration(py::module &m) {
.def("set_dt", &Iteration::setDt)
.def("time_unit_SI", &Iteration::timeUnitSI)
.def("set_time_unit_SI", &Iteration::setTimeUnitSI)
+ .def("close", &Iteration::close, py::arg("flush") = true)
.def_readwrite("meshes", &Iteration::meshes,
py::return_value_policy::reference,
diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp
index 4ef087d996..29bba8fbc3 100644
--- a/test/SerialIOTest.cpp
+++ b/test/SerialIOTest.cpp
@@ -91,8 +91,138 @@ TEST_CASE( "multi_series_test", "[serial]" )
allSeries.clear();
}
-inline
void
+close_iteration_test( std::string file_ending )
+{
+ std::string name = "../samples/close_iterations_%T." + file_ending;
+
+ std::vector data{2, 4, 6, 8};
+ // { // we do *not* need these parentheses
+ Series write(name, Access::CREATE);
+ bool isAdios1 = write.backend() == "ADIOS1";
+ {
+ Iteration it0 = write.iterations[ 0 ];
+ auto E_x = it0.meshes[ "E" ][ "x" ];
+ E_x.resetDataset( { Datatype::INT, { 2, 2 } } );
+ E_x.storeChunk( data, { 0, 0 }, { 2, 2 } );
+ it0.close( /* flush = */ false );
+ }
+ write.flush();
+ // }
+
+ if (isAdios1)
+ {
+ // run a simplified test for Adios1 since Adios1 has issues opening
+ // twice in the same process
+ REQUIRE( auxiliary::file_exists( "../samples/close_iterations_0.bp" ) );
+ }
+ else
+ {
+ Series read( name, Access::READ_ONLY );
+ Iteration it0 = read.iterations[ 0 ];
+ auto E_x_read = it0.meshes[ "E" ][ "x" ];
+ auto chunk = E_x_read.loadChunk< int >( { 0, 0 }, { 2, 2 } );
+ it0.close( /* flush = */ false );
+ read.flush();
+ for( size_t i = 0; i < data.size(); ++i )
+ {
+ REQUIRE( data[ i ] == chunk.get()[ i ] );
+ }
+ }
+
+ {
+ Iteration it1 = write.iterations[1];
+ auto E_x = it1.meshes[ "E" ][ "x" ];
+ E_x.resetDataset( { Datatype::INT, { 2, 2 } } );
+ E_x.storeChunk( data, { 0, 0 }, { 2, 2 } );
+ it1.close( /* flush = */ true );
+
+ // illegally access iteration after closing
+ E_x.storeChunk( data, { 0, 0 }, { 2, 2 } );
+ REQUIRE_THROWS( write.flush() );
+ }
+
+ if (isAdios1)
+ {
+ // run a simplified test for Adios1 since Adios1 has issues opening
+ // twice in the same process
+ REQUIRE( auxiliary::file_exists( "../samples/close_iterations_1.bp" ) );
+ }
+ else
+ {
+ Series read( name, Access::READ_ONLY );
+ Iteration it1 = read.iterations[ 1 ];
+ auto E_x_read = it1.meshes[ "E" ][ "x" ];
+ auto chunk = E_x_read.loadChunk< int >( { 0, 0 }, { 2, 2 } );
+ it1.close( /* flush = */ true );
+ for( size_t i = 0; i < data.size(); ++i )
+ {
+ REQUIRE( data[ i ] == chunk.get()[ i ] );
+ }
+ auto read_again = E_x_read.loadChunk< int >( { 0, 0 }, { 2, 2 } );
+ REQUIRE_THROWS( read.flush() );
+ }
+}
+
+TEST_CASE( "close_iteration_test", "[serial]" )
+{
+ for( auto const & t : backends )
+ {
+ close_iteration_test( std::get< 0 >( t ) );
+ }
+}
+
+#if openPMD_HAVE_ADIOS2
+TEST_CASE( "close_iteration_throws_test", "[serial" )
+{
+ /*
+ * Iterations should not be accessed any more after closing.
+ * Test that the openPMD API detects that case and throws.
+ */
+ {
+ Series series( "close_iteration_throws_1.bp", Access::CREATE );
+ auto it0 = series.iterations[ 0 ];
+ auto E_x = it0.meshes[ "E" ][ "x" ];
+ E_x.resetDataset( { Datatype::INT, { 5 } } );
+ std::vector< int > data{ 0, 1, 2, 3, 4 };
+ E_x.storeChunk( data, { 0 }, { 5 } );
+ it0.close();
+
+ auto B_y = it0.meshes[ "B" ][ "y" ];
+ B_y.resetDataset( { Datatype::INT, { 5 } } );
+ B_y.storeChunk( data, { 0 }, { 5 } );
+ REQUIRE_THROWS( series.flush() );
+ }
+ {
+ Series series( "close_iteration_throws_2.bp", Access::CREATE );
+ auto it0 = series.iterations[ 0 ];
+ auto E_x = it0.meshes[ "E" ][ "x" ];
+ E_x.resetDataset( { Datatype::INT, { 5 } } );
+ std::vector< int > data{ 0, 1, 2, 3, 4 };
+ E_x.storeChunk( data, { 0 }, { 5 } );
+ it0.close();
+
+ auto e_position_x = it0.particles[ "e" ][ "position" ][ "x" ];
+ e_position_x.resetDataset( { Datatype::INT, { 5 } } );
+ e_position_x.storeChunk( data, { 0 }, { 5 } );
+ REQUIRE_THROWS( series.flush() );
+ }
+ {
+ Series series( "close_iteration_throws_3.bp", Access::CREATE );
+ auto it0 = series.iterations[ 0 ];
+ auto E_x = it0.meshes[ "E" ][ "x" ];
+ E_x.resetDataset( { Datatype::INT, { 5 } } );
+ std::vector< int > data{ 0, 1, 2, 3, 4 };
+ E_x.storeChunk( data, { 0 }, { 5 } );
+ it0.close();
+
+ it0.setTimeUnitSI( 2.0 );
+ REQUIRE_THROWS( series.flush() );
+ }
+}
+#endif
+
+inline void
empty_dataset_test( std::string file_ending )
{
{
diff --git a/test/python/unittest/API/APITest.py b/test/python/unittest/API/APITest.py
index 79ab2c629c..4e50313a38 100644
--- a/test/python/unittest/API/APITest.py
+++ b/test/python/unittest/API/APITest.py
@@ -1229,6 +1229,69 @@ def testFieldRecord(self):
self.assertIsInstance(Ex, io.Mesh_Record_Component)
+ def makeCloseIterationRoundTrip(self, backend, file_ending):
+ # write
+ series = io.Series(
+ "unittest_closeIteration_%T." + file_ending,
+ io.Access_Type.create
+ )
+ DS = io.Dataset
+ data = np.array([2, 4, 6, 8], dtype=np.dtype("int"))
+ extent = [4]
+
+ it0 = series.iterations[0]
+ E_x = it0.meshes["E"]["x"]
+ E_x.reset_dataset(DS(np.dtype("int"), extent))
+ E_x.store_chunk(data, [0], extent)
+ it0.close(flush=True)
+
+ if backend != 'adios1':
+ read = io.Series(
+ "unittest_closeIteration_%T." + file_ending,
+ io.Access_Type.read_only
+ )
+ it0 = read.iterations[0]
+ E_x = it0.meshes["E"]["x"]
+ chunk = E_x.load_chunk([0], extent)
+ it0.close() # flush = True <- default argument
+
+ for i in range(len(data)):
+ self.assertEqual(data[i], chunk[i])
+ del read
+
+ it1 = series.iterations[1]
+ E_x = it1.meshes["E"]["x"]
+ E_x.reset_dataset(DS(np.dtype("int"), extent))
+ E_x.store_chunk(data, [0], extent)
+ it1.close(flush=False)
+ series.flush()
+
+ if backend != 'adios1':
+ read = io.Series(
+ "unittest_closeIteration_%T." + file_ending,
+ io.Access_Type.read_only
+ )
+ it1 = read.iterations[1]
+ E_x = it1.meshes["E"]["x"]
+ chunk = E_x.load_chunk([0], extent)
+ it1.close(flush=False)
+ read.flush()
+
+ for i in range(len(data)):
+ self.assertEqual(data[i], chunk[i])
+ del read
+
+ def testCloseIteration(self):
+ backend_filesupport = {
+ 'json': 'json',
+ 'hdf5': 'h5',
+ 'adios1': 'bp',
+ 'adios2': 'bp'
+ }
+ for b in io.variants:
+ if io.variants[b] is True and b in backend_filesupport:
+ self.makeCloseIterationRoundTrip(b, backend_filesupport[b])
+
if __name__ == '__main__':
unittest.main()