diff --git a/include/openPMD/IO/ADIOS/ADIOS1IOHandlerImpl.hpp b/include/openPMD/IO/ADIOS/ADIOS1IOHandlerImpl.hpp index 9cc07b923a..068b451860 100644 --- a/include/openPMD/IO/ADIOS/ADIOS1IOHandlerImpl.hpp +++ b/include/openPMD/IO/ADIOS/ADIOS1IOHandlerImpl.hpp @@ -56,6 +56,7 @@ namespace openPMD void createDataset(Writable*, Parameter< Operation::CREATE_DATASET > const&) override; void extendDataset(Writable*, Parameter< Operation::EXTEND_DATASET > const&) override; void openFile(Writable*, Parameter< Operation::OPEN_FILE > const&) override; + void closeFile(Writable*, Parameter< Operation::CLOSE_FILE > const&) override; void openPath(Writable*, Parameter< Operation::OPEN_PATH > const&) override; void openDataset(Writable*, Parameter< Operation::OPEN_DATASET > &) override; void deleteFile(Writable*, Parameter< Operation::DELETE_FILE > const&) override; diff --git a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp index 539514c032..ccaf7afe26 100644 --- a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp +++ b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp @@ -128,6 +128,9 @@ class ADIOS2IOHandlerImpl void openFile( Writable *, Parameter< Operation::OPEN_FILE > const & ) override; + void closeFile( Writable *, + Parameter< Operation::CLOSE_FILE > const & ) override; + void openPath( Writable *, Parameter< Operation::OPEN_PATH > const & ) override; diff --git a/include/openPMD/IO/ADIOS/ParallelADIOS1IOHandlerImpl.hpp b/include/openPMD/IO/ADIOS/ParallelADIOS1IOHandlerImpl.hpp index 1774ebdd82..b084c48026 100644 --- a/include/openPMD/IO/ADIOS/ParallelADIOS1IOHandlerImpl.hpp +++ b/include/openPMD/IO/ADIOS/ParallelADIOS1IOHandlerImpl.hpp @@ -57,6 +57,7 @@ namespace openPMD void createDataset(Writable*, Parameter< Operation::CREATE_DATASET > const&) override; void extendDataset(Writable*, Parameter< Operation::EXTEND_DATASET > const&) override; void openFile(Writable*, Parameter< Operation::OPEN_FILE > const&) override; + void closeFile(Writable*, Parameter< Operation::CLOSE_FILE > const&) override; void openPath(Writable*, Parameter< Operation::OPEN_PATH > const&) override; void openDataset(Writable*, Parameter< Operation::OPEN_DATASET > &) override; void deleteFile(Writable*, Parameter< Operation::DELETE_FILE > const&) override; diff --git a/include/openPMD/IO/AbstractIOHandlerImpl.hpp b/include/openPMD/IO/AbstractIOHandlerImpl.hpp index ded85bfa25..7de511c172 100644 --- a/include/openPMD/IO/AbstractIOHandlerImpl.hpp +++ b/include/openPMD/IO/AbstractIOHandlerImpl.hpp @@ -68,6 +68,9 @@ class AbstractIOHandlerImpl case O::OPEN_FILE: openFile(i.writable, deref_dynamic_cast< Parameter< O::OPEN_FILE > >(i.parameter.get())); break; + case O::CLOSE_FILE: + closeFile(i.writable, *dynamic_cast< Parameter< O::CLOSE_FILE >* >(i.parameter.get())); + break; case O::OPEN_PATH: openPath(i.writable, deref_dynamic_cast< Parameter< O::OPEN_PATH > >(i.parameter.get())); break; @@ -118,6 +121,12 @@ class AbstractIOHandlerImpl return std::future< void >(); } + /** + * Close the file corresponding with the writable and release file handles. + * The operation should succeed in any access mode. + */ + virtual void + closeFile( Writable *, Parameter< Operation::CLOSE_FILE > const & ) = 0; /** Create a new file in physical storage, possibly overriding an existing file. * * The operation should fail if m_handler->m_frontendAccess is Access::READ_ONLY. diff --git a/include/openPMD/IO/HDF5/HDF5IOHandlerImpl.hpp b/include/openPMD/IO/HDF5/HDF5IOHandlerImpl.hpp index 396ff85e8a..94b44773da 100644 --- a/include/openPMD/IO/HDF5/HDF5IOHandlerImpl.hpp +++ b/include/openPMD/IO/HDF5/HDF5IOHandlerImpl.hpp @@ -43,6 +43,7 @@ namespace openPMD void createDataset(Writable*, Parameter< Operation::CREATE_DATASET > const&) override; void extendDataset(Writable*, Parameter< Operation::EXTEND_DATASET > const&) override; void openFile(Writable*, Parameter< Operation::OPEN_FILE > const&) override; + void closeFile(Writable*, Parameter< Operation::CLOSE_FILE > const&) override; void openPath(Writable*, Parameter< Operation::OPEN_PATH > const&) override; void openDataset(Writable*, Parameter< Operation::OPEN_DATASET > &) override; void deleteFile(Writable*, Parameter< Operation::DELETE_FILE > const&) override; diff --git a/include/openPMD/IO/IOTask.hpp b/include/openPMD/IO/IOTask.hpp index 7ceaff95c2..3c006b5250 100644 --- a/include/openPMD/IO/IOTask.hpp +++ b/include/openPMD/IO/IOTask.hpp @@ -46,6 +46,7 @@ OPENPMDAPI_EXPORT_ENUM_CLASS(Operation) { CREATE_FILE, OPEN_FILE, + CLOSE_FILE, DELETE_FILE, CREATE_PATH, @@ -127,6 +128,20 @@ struct OPENPMDAPI_EXPORT Parameter< Operation::OPEN_FILE > : public AbstractPara std::string name = ""; }; +template<> +struct OPENPMDAPI_EXPORT Parameter< Operation::CLOSE_FILE > : public AbstractParameter +{ + Parameter() = default; + Parameter( Parameter const & ) : AbstractParameter() {} + + std::unique_ptr< AbstractParameter > + clone() const override + { + return std::unique_ptr< AbstractParameter >( + new Parameter< Operation::CLOSE_FILE >( *this ) ); + } +}; + template<> struct OPENPMDAPI_EXPORT Parameter< Operation::DELETE_FILE > : public AbstractParameter { diff --git a/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp b/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp index 3804d39558..35ed7c8ef8 100644 --- a/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp +++ b/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp @@ -184,6 +184,11 @@ namespace openPMD Parameter< Operation::OPEN_FILE > const & ) override; + void closeFile( + Writable *, + Parameter< Operation::CLOSE_FILE > const & + ) override; + void openPath( Writable *, Parameter< Operation::OPEN_PATH > const & diff --git a/include/openPMD/Iteration.hpp b/include/openPMD/Iteration.hpp index 6699ed1603..7f02b2ad5d 100644 --- a/include/openPMD/Iteration.hpp +++ b/include/openPMD/Iteration.hpp @@ -87,6 +87,44 @@ class Iteration : public Attributable */ Iteration& setTimeUnitSI(double newTimeUnitSI); + /** + * @brief Close an iteration. No further (backend-propagating) accesses + * may be performed on this iteration. + * A closed iteration may not (yet) be reopened. + * @return Reference to iteration. + */ + /* + * Note: If the API is changed in future to allow reopening closed + * iterations, measures should be taken to prevent this in the streaming + * API. Currently, disallowing to reopen closed iterations satisfies + * the requirements of the streaming API. + */ + Iteration & + close( bool flush = true ); + + /** + * @brief Has the iteration been closed? + * A closed iteration may not (yet) be reopened. + * + * @return Whether the iteration has been closed. + */ + bool + closed() const; + + /** + * @brief Has the iteration been closed by the writer? + * Background: Upon calling Iteration::close(), the openPMD API + * will add metadata to the iteration in form of an attribute, + * indicating that the iteration has indeed been closed. + * Useful mainly in streaming context when a reader inquires from + * a writer that it is done writing. + * + * @return Whether the iteration has been explicitly closed (yet) by the + * writer. + */ + bool + closedByWriter() const; + Container< Mesh > meshes; Container< ParticleSpecies > particles; //particleSpecies? @@ -99,8 +137,41 @@ class Iteration : public Attributable void flush(); void read(); + /** + * @brief Whether an iteration has been closed yet. + * + */ + enum class CloseStatus + { + Open, //!< Iteration has not been closed + ClosedInFrontend, /*!< Iteration has been closed, but task has not yet + been propagated to the backend */ + ClosedInBackend /*!< Iteration has been closed and task has been + propagated to the backend */ + }; + + /* + * An iteration may be logically closed in the frontend, + * but not necessarily yet in the backend. + * Will be propagated to the backend upon next flush. + * Store the current status. + */ + std::shared_ptr< CloseStatus > m_closed = + std::make_shared< CloseStatus >( CloseStatus::Open ); + + /* + * @brief Check recursively whether this Iteration is dirty. + * It is dirty if any attribute or dataset is read from or written to + * the backend. + * + * @return true If dirty. + * @return false Otherwise. + */ + bool + dirtyRecursive() const; + virtual void linkHierarchy(std::shared_ptr< Writable > const& w); -}; //Iteration +}; // Iteration extern template float diff --git a/include/openPMD/ParticleSpecies.hpp b/include/openPMD/ParticleSpecies.hpp index 7868453046..51a8302407 100644 --- a/include/openPMD/ParticleSpecies.hpp +++ b/include/openPMD/ParticleSpecies.hpp @@ -45,6 +45,17 @@ class ParticleSpecies : public Container< Record > void read(); void flush(std::string const &) override; + + /** + * @brief Check recursively whether this ParticleSpecies is dirty. + * It is dirty if any attribute or dataset is read from or written to + * the backend. + * + * @return true If dirty. + * @return false Otherwise. + */ + bool + dirtyRecursive() const; }; namespace traits diff --git a/include/openPMD/RecordComponent.hpp b/include/openPMD/RecordComponent.hpp index b592d3f1f4..17d6309e5c 100644 --- a/include/openPMD/RecordComponent.hpp +++ b/include/openPMD/RecordComponent.hpp @@ -198,6 +198,17 @@ class RecordComponent : public BaseRecordComponent * @return Reference to this RecordComponent instance. */ RecordComponent& makeEmpty( Dataset d ); + + /** + * @brief Check recursively whether this RecordComponent is dirty. + * It is dirty if any attribute or dataset is read from or written to + * the backend. + * + * @return true If dirty. + * @return false Otherwise. + */ + bool + dirtyRecursive() const; }; // RecordComponent diff --git a/include/openPMD/Series.hpp b/include/openPMD/Series.hpp index b2a3854726..d56ce377e8 100644 --- a/include/openPMD/Series.hpp +++ b/include/openPMD/Series.hpp @@ -262,8 +262,10 @@ class Series : public Attributable std::unique_ptr< ParsedInput > parseInput(std::string); void init(std::shared_ptr< AbstractIOHandler >, std::unique_ptr< ParsedInput >); void initDefaults(); - void flushFileBased(); - void flushGroupBased(); + template< typename IterationsContainer > + void flushFileBased( IterationsContainer && iterationsToFlush ); + template< typename IterationsContainer > + void flushGroupBased( IterationsContainer && iterationsToFlush ); void flushMeshesPath(); void flushParticlesPath(); void readFileBased(); diff --git a/include/openPMD/backend/BaseRecord.hpp b/include/openPMD/backend/BaseRecord.hpp index 42ed18b23c..d7afb2516e 100644 --- a/include/openPMD/backend/BaseRecord.hpp +++ b/include/openPMD/backend/BaseRecord.hpp @@ -97,7 +97,18 @@ class BaseRecord : public Container< T_elem > void flush(std::string const&) final; virtual void flush_impl(std::string const&) = 0; virtual void read() = 0; -}; //BaseRecord + + /** + * @brief Check recursively whether this BaseRecord is dirty. + * It is dirty if any attribute or dataset is read from or written to + * the backend. + * + * @return true If dirty. + * @return false Otherwise. + */ + bool + dirtyRecursive() const; +}; // BaseRecord template< typename T_elem > @@ -295,4 +306,22 @@ BaseRecord< T_elem >::flush(std::string const& name) this->flush_impl(name); } -} // openPMD + +template< typename T_elem > +inline bool +BaseRecord< T_elem >::dirtyRecursive() const +{ + if( Attributable::dirty ) + { + return true; + } + for( auto const & pair : *this ) + { + if( pair.second.dirtyRecursive() ) + { + return true; + } + } + return false; +} +} // namespace openPMD diff --git a/src/IO/ADIOS/ADIOS1IOHandler.cpp b/src/IO/ADIOS/ADIOS1IOHandler.cpp index 47d3848cd9..0622f3bd62 100644 --- a/src/IO/ADIOS/ADIOS1IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS1IOHandler.cpp @@ -135,6 +135,9 @@ ADIOS1IOHandlerImpl::flush() case O::OPEN_DATASET: openDataset(i.writable, deref_dynamic_cast< Parameter< O::OPEN_DATASET > >(i.parameter.get())); break; + case O::CLOSE_FILE: + closeFile(i.writable, *dynamic_cast< Parameter< O::CLOSE_FILE >* >(i.parameter.get())); + break; case O::DELETE_FILE: deleteFile(i.writable, deref_dynamic_cast< Parameter< O::DELETE_FILE > >(i.parameter.get())); break; diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index 40c16b1d3e..909be4321e 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -324,6 +324,24 @@ void ADIOS2IOHandlerImpl::openFile( writable->abstractFilePosition = std::make_shared< ADIOS2FilePosition >( ); } +void +ADIOS2IOHandlerImpl::closeFile( + Writable * writable, + Parameter< Operation::CLOSE_FILE > const & ) +{ + auto fileIterator = m_files.find( writable ); + if ( fileIterator != m_files.end( ) ) + { + fileIterator->second.invalidate( ); + auto it = m_fileData.find( fileIterator->second ); + if ( it != m_fileData.end( ) ) + { + it->second->flush( ); + m_fileData.erase( it ); + } + } +} + void ADIOS2IOHandlerImpl::openPath( Writable * writable, const Parameter< Operation::OPEN_PATH > & parameters ) { diff --git a/src/IO/ADIOS/CommonADIOS1IOHandler.cpp b/src/IO/ADIOS/CommonADIOS1IOHandler.cpp index 568519d594..fdc7c13268 100644 --- a/src/IO/ADIOS/CommonADIOS1IOHandler.cpp +++ b/src/IO/ADIOS/CommonADIOS1IOHandler.cpp @@ -540,8 +540,68 @@ CommonADIOS1IOHandlerImpl::openFile(Writable* writable, } void -CommonADIOS1IOHandlerImpl::openPath(Writable* writable, - Parameter< Operation::OPEN_PATH > const& parameters) +CommonADIOS1IOHandlerImpl::closeFile( + Writable * writable, + Parameter< Operation::CLOSE_FILE > const & ) +{ + auto myFile = m_filePaths.find( writable ); + if( myFile == m_filePaths.end() ) + { + return; + } + + // finish write operations + auto myGroup = m_groups.find( myFile->second ); + if( myGroup != m_groups.end() ) + { + auto attributeWrites = m_attributeWrites.find( myGroup->second ); + if( this->m_handler->m_backendAccess != Access::READ_ONLY && + attributeWrites != m_attributeWrites.end() ) + { + for( auto & att : attributeWrites->second ) + { + flush_attribute( myGroup->second, att.first, att.second ); + } + m_attributeWrites.erase( attributeWrites ); + } + m_groups.erase( myGroup ); + } + + auto handle_write = m_openWriteFileHandles.find( myFile->second ); + if( handle_write != m_openWriteFileHandles.end() ) + { + close( handle_write->second ); + m_openWriteFileHandles.erase( handle_write ); + } + + // finish read operations + auto handle_read = m_openReadFileHandles.find( myFile->second ); + if( handle_read != m_openReadFileHandles.end() ) + { + auto scheduled = m_scheduledReads.find( handle_read->second ); + if( scheduled != m_scheduledReads.end() ) + { + auto status = adios_perform_reads( scheduled->first, 1 ); + VERIFY( + status == err_no_error, + "[ADIOS1] Internal error: Failed to perform ADIOS reads during " + "dataset reading" ); + + for( auto & sel : scheduled->second ) + adios_selection_delete( sel ); + m_scheduledReads.erase( scheduled ); + } + close( handle_read->second ); + m_openReadFileHandles.erase( handle_read ); + } + m_existsOnDisk.erase( myFile->second ); + m_filePaths.erase( myFile ); +} + +void +CommonADIOS1IOHandlerImpl::openPath( + Writable * writable, + Parameter< Operation::OPEN_PATH > const & parameters ) { /* Sanitize path */ std::string path = parameters.path; diff --git a/src/IO/HDF5/HDF5IOHandler.cpp b/src/IO/HDF5/HDF5IOHandler.cpp index 5404b6b4bd..fef2ce4ecb 100644 --- a/src/IO/HDF5/HDF5IOHandler.cpp +++ b/src/IO/HDF5/HDF5IOHandler.cpp @@ -399,8 +399,48 @@ HDF5IOHandlerImpl::openFile(Writable* writable, } void -HDF5IOHandlerImpl::openPath(Writable* writable, - Parameter< Operation::OPEN_PATH > const& parameters) +HDF5IOHandlerImpl::closeFile( + Writable * writable, + Parameter< Operation::CLOSE_FILE > const & ) +{ + auto fileID_it = m_fileIDs.find( writable ); + if( fileID_it == m_fileIDs.end() ) + { + throw std::runtime_error( + "[HDF5] Trying to close a file that is not " + "present in the backend" ); + return; + } + hid_t fileID = fileID_it->second; + H5Fclose( fileID ); + m_openFileIDs.erase( fileID ); + m_fileIDs.erase( fileID_it ); + + /* std::unordered_map::erase: + * References and iterators to the erased elements are invalidated. Other + * iterators and references are not invalidated. + */ + using iter_t = decltype( m_fileNamesWithID )::iterator; + std::vector< iter_t > deleteMe; + deleteMe.reserve( 1 ); // should normally suffice + for( auto it = m_fileNamesWithID.begin(); it != m_fileNamesWithID.end(); + ++it ) + { + if( it->second == fileID ) + { + deleteMe.push_back( it ); + } + } + for( auto iterator : deleteMe ) + { + m_fileNamesWithID.erase( iterator ); + } +} + +void +HDF5IOHandlerImpl::openPath( + Writable * writable, + Parameter< Operation::OPEN_PATH > const & parameters ) { auto res = m_fileIDs.find(writable->parent); hid_t node_id, path_id; diff --git a/src/IO/JSON/JSONIOHandlerImpl.cpp b/src/IO/JSON/JSONIOHandlerImpl.cpp index ecd40a8eff..2497ff9508 100644 --- a/src/IO/JSON/JSONIOHandlerImpl.cpp +++ b/src/IO/JSON/JSONIOHandlerImpl.cpp @@ -306,6 +306,21 @@ namespace openPMD } + void JSONIOHandlerImpl::closeFile( + Writable * writable, + Parameter< Operation::CLOSE_FILE > const & + ) + { + auto fileIterator = m_files.find( writable ); + if ( fileIterator != m_files.end( ) ) + { + putJsonContents( fileIterator->second ); + fileIterator->second.invalidate( ); + m_files.erase( fileIterator ); + } + } + + void JSONIOHandlerImpl::openPath( Writable * writable, Parameter< Operation::OPEN_PATH > const & parameters diff --git a/src/Iteration.cpp b/src/Iteration.cpp index 4668df9277..8e02d1a0cf 100644 --- a/src/Iteration.cpp +++ b/src/Iteration.cpp @@ -19,12 +19,16 @@ * If not, see . */ #include "openPMD/Iteration.hpp" + #include "openPMD/Dataset.hpp" #include "openPMD/Datatype.hpp" #include "openPMD/Series.hpp" +#include "openPMD/auxiliary/DerefDynamicCast.hpp" #include "openPMD/auxiliary/StringManip.hpp" #include "openPMD/backend/Writable.hpp" +#include + namespace openPMD { @@ -40,7 +44,8 @@ Iteration::Iteration() Iteration::Iteration(Iteration const& i) : Attributable{i}, meshes{i.meshes}, - particles{i.particles} + particles{i.particles}, + m_closed{i.m_closed} { IOHandler = i.IOHandler; parent = i.parent; @@ -57,6 +62,7 @@ Iteration& Iteration::operator=(Iteration const& i) particles = i.particles; IOHandler = i.IOHandler; parent = i.parent; + m_closed = i.m_closed; meshes.IOHandler = IOHandler; meshes.parent = this->m_writable.get(); particles.IOHandler = IOHandler; @@ -97,6 +103,72 @@ Iteration::setTimeUnitSI(double newTimeUnitSI) return *this; } +Iteration & +Iteration::close( bool _flush ) +{ + using bool_type = unsigned char; + if( this->IOHandler->m_frontendAccess != Access::READ_ONLY ) + { + setAttribute< bool_type >( "closed", 1u ); + } + *m_closed = CloseStatus::ClosedInFrontend; + if( _flush ) + { + Series * s = &auxiliary::deref_dynamic_cast< Series >( + parent->attributable->parent->attributable ); + // figure out my iteration number + uint64_t index; + bool found = false; + for( auto const & pair : s->iterations ) + { + if( pair.second.m_writable.get() == this->m_writable.get() ) + { + found = true; + index = pair.first; + break; + } + } + if( !found ) + { + throw std::runtime_error( + "[Iteration::close] Iteration not found in Series." ); + } + std::map< uint64_t, Iteration > flushOnlyThisIteration{ + { index, *this } }; + switch( *s->m_iterationEncoding ) + { + using IE = IterationEncoding; + case IE::fileBased: + s->flushFileBased( flushOnlyThisIteration ); + break; + case IE::groupBased: + s->flushGroupBased( flushOnlyThisIteration ); + break; + } + } + return *this; +} + +bool +Iteration::closed() const +{ + return *m_closed != CloseStatus::Open; +} + +bool +Iteration::closedByWriter() const +{ + using bool_type = unsigned char; + if( containsAttribute( "closed" ) ) + { + return getAttribute( "closed" ).get< bool_type >() == 0u ? false : true; + } + else + { + return false; + } +} + void Iteration::flushFileBased(std::string const& filename, uint64_t i) { @@ -359,6 +431,30 @@ Iteration::read() readAttributes(); } +bool +Iteration::dirtyRecursive() const +{ + if( dirty ) + { + return true; + } + for( auto const & pair : particles ) + { + if( pair.second.dirtyRecursive() ) + { + return true; + } + } + for( auto const & pair : meshes ) + { + if( pair.second.dirtyRecursive() ) + { + return true; + } + } + return false; +} + void Iteration::linkHierarchy(std::shared_ptr< Writable > const& w) { @@ -367,16 +463,15 @@ Iteration::linkHierarchy(std::shared_ptr< Writable > const& w) particles.linkHierarchy(m_writable); } +template float +Iteration::time< float >() const; +template double +Iteration::time< double >() const; +template long double +Iteration::time< long double >() const; -template -float Iteration::time< float >() const; -template -double Iteration::time< double >() const; -template -long double Iteration::time< long double >() const; - -template -float Iteration::dt< float >() const; +template float +Iteration::dt< float >() const; template double Iteration::dt< double >() const; template diff --git a/src/ParticleSpecies.cpp b/src/ParticleSpecies.cpp index 291a9de24a..8ce06bb82a 100644 --- a/src/ParticleSpecies.cpp +++ b/src/ParticleSpecies.cpp @@ -143,4 +143,21 @@ ParticleSpecies::flush(std::string const& path) } } } -} // openPMD + +bool +ParticleSpecies::dirtyRecursive() const +{ + if( dirty ) + { + return true; + } + for( auto const & pair : *this ) + { + if( pair.second.dirtyRecursive() ) + { + return true; + } + } + return false; +} +} // namespace openPMD diff --git a/src/RecordComponent.cpp b/src/RecordComponent.cpp index 8580268257..1ee776a555 100644 --- a/src/RecordComponent.cpp +++ b/src/RecordComponent.cpp @@ -265,5 +265,14 @@ RecordComponent::readBase() readAttributes(); } -} // openPMD +bool +RecordComponent::dirtyRecursive() const +{ + if( Attributable::dirty ) + { + return true; + } + return !m_chunks->empty(); +} +} // namespace openPMD diff --git a/src/Series.cpp b/src/Series.cpp index f7a70508a3..c4967849fb 100644 --- a/src/Series.cpp +++ b/src/Series.cpp @@ -369,10 +369,10 @@ Series::flush() { using IE = IterationEncoding; case IE::fileBased: - flushFileBased(); + flushFileBased( iterations ); break; case IE::groupBased: - flushGroupBased(); + flushGroupBased( iterations ); break; } @@ -550,22 +550,66 @@ Series::initDefaults() // TODO Potentially warn on flush if software and author are not user-provided (defaulted) } +template< typename IterationsContainer > void -Series::flushFileBased() +Series::flushFileBased( IterationsContainer && iterationsToFlush ) { - if( iterations.empty() ) - throw std::runtime_error("fileBased output can not be written with no iterations."); + if( iterationsToFlush.empty() ) + throw std::runtime_error( + "fileBased output can not be written with no iterations." ); - if(IOHandler->m_frontendAccess == Access::READ_ONLY ) - for( auto& i : iterations ) + if( IOHandler->m_frontendAccess == Access::READ_ONLY ) + for( auto & i : iterationsToFlush ) + { + if( *i.second.m_closed == Iteration::CloseStatus::ClosedInBackend ) + { + // file corresponding with the iteration has previously been + // closed and fully flushed + // verify that there have been no further accesses + if( i.second.dirtyRecursive() ) + { + throw std::runtime_error( + "[Series] Detected illegal access to iteration that " + "has been closed previously." ); + } + continue; + } i.second.flush(); + if( *i.second.m_closed == Iteration::CloseStatus::ClosedInFrontend ) + { + Parameter< Operation::CLOSE_FILE > fClose; + IOHandler->enqueue( IOTask( &i.second, std::move( fClose ) ) ); + *i.second.m_closed = Iteration::CloseStatus::ClosedInBackend; + } + IOHandler->flush(); + } else { bool allDirty = dirty; - for( auto& i : iterations ) + for( auto & i : iterationsToFlush ) { + if( *i.second.m_closed == Iteration::CloseStatus::ClosedInBackend ) + { + // file corresponding with the iteration has previously been + // closed and fully flushed + // verify that there have been no further accesses + if (!i.second.written) + { + throw std::runtime_error( + "[Series] Closed iteration has not been written. This " + "is an internal error."); + } + if( i.second.dirtyRecursive() ) + { + throw std::runtime_error( + "[Series] Detected illegal access to iteration that " + "has been closed previously." ); + } + continue; + } /* as there is only one series, - * emulate the file belonging to each iteration as not yet written */ + * emulate the file belonging to each iteration as not yet written + */ written = false; iterations.written = false; @@ -580,6 +624,13 @@ Series::flushFileBased() flushAttributes(); + if( *i.second.m_closed == Iteration::CloseStatus::ClosedInFrontend ) + { + Parameter< Operation::CLOSE_FILE > fClose; + IOHandler->enqueue( IOTask( &i.second, std::move( fClose ) ) ); + *i.second.m_closed = Iteration::CloseStatus::ClosedInBackend; + } + IOHandler->flush(); /* reset the dirty bit for every iteration (i.e. file) @@ -590,12 +641,39 @@ Series::flushFileBased() } } +template void +Series::flushFileBased< std::map< uint64_t, Iteration > & >( + std::map< uint64_t, Iteration > & ); + +template< typename IterationsContainer > void -Series::flushGroupBased() +Series::flushGroupBased( IterationsContainer && iterationsToFlush ) { - if(IOHandler->m_frontendAccess == Access::READ_ONLY ) - for( auto& i : iterations ) + if( IOHandler->m_frontendAccess == Access::READ_ONLY ) + for( auto & i : iterationsToFlush ) + { + if( *i.second.m_closed == Iteration::CloseStatus::ClosedInBackend ) + { + // file corresponding with the iteration has previously been + // closed and fully flushed + // verify that there have been no further accesses + if( i.second.dirtyRecursive() ) + { + throw std::runtime_error( + "[Series] Illegal access to iteration " + + std::to_string( i.first ) + + " that has been closed previously." ); + } + continue; + } i.second.flush(); + if( *i.second.m_closed == Iteration::CloseStatus::ClosedInFrontend ) + { + // the iteration has no dedicated file in group-based mode + *i.second.m_closed = Iteration::CloseStatus::ClosedInBackend; + } + IOHandler->flush(); + } else { if( !written ) @@ -605,22 +683,52 @@ Series::flushGroupBased() IOHandler->enqueue(IOTask(this, fCreate)); } - iterations.flush(auxiliary::replace_first(basePath(), "%T/", "")); + iterations.flush( auxiliary::replace_first( basePath(), "%T/", "" ) ); - for( auto& i : iterations ) + for( auto & i : iterationsToFlush ) { + if( *i.second.m_closed == Iteration::CloseStatus::ClosedInBackend ) + { + // file corresponding with the iteration has previously been + // closed and fully flushed + // verify that there have been no further accesses + if (!i.second.written) + { + throw std::runtime_error( + "[Series] Closed iteration has not been written. This " + "is an internal error."); + } + if( i.second.dirtyRecursive() ) + { + throw std::runtime_error( + "[Series] Illegal access to iteration " + + std::to_string( i.first ) + + " that has been closed previously." ); + } + continue; + } if( !i.second.written ) { i.second.m_writable->parent = getWritable(&iterations); i.second.parent = getWritable(&iterations); } i.second.flushGroupBased(i.first); + if( *i.second.m_closed == Iteration::CloseStatus::ClosedInFrontend ) + { + // the iteration has no dedicated file in group-based mode + *i.second.m_closed = Iteration::CloseStatus::ClosedInBackend; + } } flushAttributes(); + IOHandler->flush(); } } +template void +Series::flushGroupBased< std::map< uint64_t, Iteration > & >( + std::map< uint64_t, Iteration > & ); + void Series::flushMeshesPath() { diff --git a/src/binding/python/Iteration.cpp b/src/binding/python/Iteration.cpp index 822cf6022b..b1f50980b5 100644 --- a/src/binding/python/Iteration.cpp +++ b/src/binding/python/Iteration.cpp @@ -53,6 +53,7 @@ void init_Iteration(py::module &m) { .def("set_dt", &Iteration::setDt) .def("time_unit_SI", &Iteration::timeUnitSI) .def("set_time_unit_SI", &Iteration::setTimeUnitSI) + .def("close", &Iteration::close, py::arg("flush") = true) .def_readwrite("meshes", &Iteration::meshes, py::return_value_policy::reference, diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index 4ef087d996..29bba8fbc3 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -91,8 +91,138 @@ TEST_CASE( "multi_series_test", "[serial]" ) allSeries.clear(); } -inline void +close_iteration_test( std::string file_ending ) +{ + std::string name = "../samples/close_iterations_%T." + file_ending; + + std::vector data{2, 4, 6, 8}; + // { // we do *not* need these parentheses + Series write(name, Access::CREATE); + bool isAdios1 = write.backend() == "ADIOS1"; + { + Iteration it0 = write.iterations[ 0 ]; + auto E_x = it0.meshes[ "E" ][ "x" ]; + E_x.resetDataset( { Datatype::INT, { 2, 2 } } ); + E_x.storeChunk( data, { 0, 0 }, { 2, 2 } ); + it0.close( /* flush = */ false ); + } + write.flush(); + // } + + if (isAdios1) + { + // run a simplified test for Adios1 since Adios1 has issues opening + // twice in the same process + REQUIRE( auxiliary::file_exists( "../samples/close_iterations_0.bp" ) ); + } + else + { + Series read( name, Access::READ_ONLY ); + Iteration it0 = read.iterations[ 0 ]; + auto E_x_read = it0.meshes[ "E" ][ "x" ]; + auto chunk = E_x_read.loadChunk< int >( { 0, 0 }, { 2, 2 } ); + it0.close( /* flush = */ false ); + read.flush(); + for( size_t i = 0; i < data.size(); ++i ) + { + REQUIRE( data[ i ] == chunk.get()[ i ] ); + } + } + + { + Iteration it1 = write.iterations[1]; + auto E_x = it1.meshes[ "E" ][ "x" ]; + E_x.resetDataset( { Datatype::INT, { 2, 2 } } ); + E_x.storeChunk( data, { 0, 0 }, { 2, 2 } ); + it1.close( /* flush = */ true ); + + // illegally access iteration after closing + E_x.storeChunk( data, { 0, 0 }, { 2, 2 } ); + REQUIRE_THROWS( write.flush() ); + } + + if (isAdios1) + { + // run a simplified test for Adios1 since Adios1 has issues opening + // twice in the same process + REQUIRE( auxiliary::file_exists( "../samples/close_iterations_1.bp" ) ); + } + else + { + Series read( name, Access::READ_ONLY ); + Iteration it1 = read.iterations[ 1 ]; + auto E_x_read = it1.meshes[ "E" ][ "x" ]; + auto chunk = E_x_read.loadChunk< int >( { 0, 0 }, { 2, 2 } ); + it1.close( /* flush = */ true ); + for( size_t i = 0; i < data.size(); ++i ) + { + REQUIRE( data[ i ] == chunk.get()[ i ] ); + } + auto read_again = E_x_read.loadChunk< int >( { 0, 0 }, { 2, 2 } ); + REQUIRE_THROWS( read.flush() ); + } +} + +TEST_CASE( "close_iteration_test", "[serial]" ) +{ + for( auto const & t : backends ) + { + close_iteration_test( std::get< 0 >( t ) ); + } +} + +#if openPMD_HAVE_ADIOS2 +TEST_CASE( "close_iteration_throws_test", "[serial" ) +{ + /* + * Iterations should not be accessed any more after closing. + * Test that the openPMD API detects that case and throws. + */ + { + Series series( "close_iteration_throws_1.bp", Access::CREATE ); + auto it0 = series.iterations[ 0 ]; + auto E_x = it0.meshes[ "E" ][ "x" ]; + E_x.resetDataset( { Datatype::INT, { 5 } } ); + std::vector< int > data{ 0, 1, 2, 3, 4 }; + E_x.storeChunk( data, { 0 }, { 5 } ); + it0.close(); + + auto B_y = it0.meshes[ "B" ][ "y" ]; + B_y.resetDataset( { Datatype::INT, { 5 } } ); + B_y.storeChunk( data, { 0 }, { 5 } ); + REQUIRE_THROWS( series.flush() ); + } + { + Series series( "close_iteration_throws_2.bp", Access::CREATE ); + auto it0 = series.iterations[ 0 ]; + auto E_x = it0.meshes[ "E" ][ "x" ]; + E_x.resetDataset( { Datatype::INT, { 5 } } ); + std::vector< int > data{ 0, 1, 2, 3, 4 }; + E_x.storeChunk( data, { 0 }, { 5 } ); + it0.close(); + + auto e_position_x = it0.particles[ "e" ][ "position" ][ "x" ]; + e_position_x.resetDataset( { Datatype::INT, { 5 } } ); + e_position_x.storeChunk( data, { 0 }, { 5 } ); + REQUIRE_THROWS( series.flush() ); + } + { + Series series( "close_iteration_throws_3.bp", Access::CREATE ); + auto it0 = series.iterations[ 0 ]; + auto E_x = it0.meshes[ "E" ][ "x" ]; + E_x.resetDataset( { Datatype::INT, { 5 } } ); + std::vector< int > data{ 0, 1, 2, 3, 4 }; + E_x.storeChunk( data, { 0 }, { 5 } ); + it0.close(); + + it0.setTimeUnitSI( 2.0 ); + REQUIRE_THROWS( series.flush() ); + } +} +#endif + +inline void empty_dataset_test( std::string file_ending ) { { diff --git a/test/python/unittest/API/APITest.py b/test/python/unittest/API/APITest.py index 79ab2c629c..4e50313a38 100644 --- a/test/python/unittest/API/APITest.py +++ b/test/python/unittest/API/APITest.py @@ -1229,6 +1229,69 @@ def testFieldRecord(self): self.assertIsInstance(Ex, io.Mesh_Record_Component) + def makeCloseIterationRoundTrip(self, backend, file_ending): + # write + series = io.Series( + "unittest_closeIteration_%T." + file_ending, + io.Access_Type.create + ) + DS = io.Dataset + data = np.array([2, 4, 6, 8], dtype=np.dtype("int")) + extent = [4] + + it0 = series.iterations[0] + E_x = it0.meshes["E"]["x"] + E_x.reset_dataset(DS(np.dtype("int"), extent)) + E_x.store_chunk(data, [0], extent) + it0.close(flush=True) + + if backend != 'adios1': + read = io.Series( + "unittest_closeIteration_%T." + file_ending, + io.Access_Type.read_only + ) + it0 = read.iterations[0] + E_x = it0.meshes["E"]["x"] + chunk = E_x.load_chunk([0], extent) + it0.close() # flush = True <- default argument + + for i in range(len(data)): + self.assertEqual(data[i], chunk[i]) + del read + + it1 = series.iterations[1] + E_x = it1.meshes["E"]["x"] + E_x.reset_dataset(DS(np.dtype("int"), extent)) + E_x.store_chunk(data, [0], extent) + it1.close(flush=False) + series.flush() + + if backend != 'adios1': + read = io.Series( + "unittest_closeIteration_%T." + file_ending, + io.Access_Type.read_only + ) + it1 = read.iterations[1] + E_x = it1.meshes["E"]["x"] + chunk = E_x.load_chunk([0], extent) + it1.close(flush=False) + read.flush() + + for i in range(len(data)): + self.assertEqual(data[i], chunk[i]) + del read + + def testCloseIteration(self): + backend_filesupport = { + 'json': 'json', + 'hdf5': 'h5', + 'adios1': 'bp', + 'adios2': 'bp' + } + for b in io.variants: + if io.variants[b] is True and b in backend_filesupport: + self.makeCloseIterationRoundTrip(b, backend_filesupport[b]) + if __name__ == '__main__': unittest.main()