Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iteration::close #746

Merged
merged 37 commits into from
Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
2bc80ac
Add IOTask for closing file
franzpoeschel May 26, 2020
dacc28b
Add API calls for Iteration::close
franzpoeschel May 26, 2020
5fa9285
Add closeFile to ADIOS2 backend
franzpoeschel May 26, 2020
b08211a
remove skipFlush
franzpoeschel May 26, 2020
68aec92
Add test
franzpoeschel May 26, 2020
3bda4e6
add back skipFlush
franzpoeschel May 26, 2020
c6a894d
Invalidate file in ADIOS2 after closing
franzpoeschel May 26, 2020
38aaef7
Remove superfluous ; after function
ax3l May 27, 2020
791f106
Make Iteration::close synchronous by default
franzpoeschel May 27, 2020
7cca69b
Add closeFile operation to JSON backend
franzpoeschel Jun 2, 2020
ed62754
Remove name parameter from CLOSE_FILE task
franzpoeschel Jun 2, 2020
5fec37f
Fix name shadowing
franzpoeschel Jun 2, 2020
626b112
[wip] close_file for ADIOS1
franzpoeschel Jun 3, 2020
2a8fedc
Temporary workaround for ADIOS1
franzpoeschel Jun 3, 2020
56b44b5
Implement file closing in HDF5
franzpoeschel Jun 3, 2020
3f0f2e1
change erase order
ax3l Jun 8, 2020
199f56e
run a simplified test for ADIOS1
franzpoeschel Jun 9, 2020
5195621
Split closed into closed and closedByWriter
franzpoeschel Jun 9, 2020
ebffb2a
Fix ADIOS1 test
franzpoeschel Jun 12, 2020
95a8885
cleanup
franzpoeschel Jun 12, 2020
947c69c
Throw if flushing a closed iteration that has been accessed
franzpoeschel Jun 19, 2020
49a06f8
Add verifyClosed
franzpoeschel Jun 19, 2020
26d1157
Close files in read mode too
franzpoeschel Jun 19, 2020
e26037a
Test throwing upon accessing an iteration post closing
franzpoeschel Jun 19, 2020
d091b89
Also do sanity-checks in group-based mode
franzpoeschel Jun 19, 2020
597ae56
Rename m_closed* members of Iteration and update docu
franzpoeschel Jun 21, 2020
d57f900
Unify m_closed(Backend|Frontend) into common enum
franzpoeschel Jun 21, 2020
20da16e
Flush only the closed iteration upon Iteration::close
franzpoeschel Jun 24, 2020
3861c8c
Undo unnecessary whitespace changes
franzpoeschel Jun 24, 2020
91c7946
Two small fixes in ADIOS1 IO Handler
franzpoeschel Jun 24, 2020
49d5c78
Prevent throwing in destructor in close_iteration_test
franzpoeschel Jul 3, 2020
9cc7259
Do not use boolean attributes
franzpoeschel Jul 3, 2020
c320171
Revert "Prevent throwing in destructor in close_iteration_test"
franzpoeschel Jul 10, 2020
9a702f2
Add some documentation
franzpoeschel Jul 21, 2020
8df6ebb
Add reviewer's suggestions
franzpoeschel Jul 23, 2020
22e8fba
Expose Iteration::close to Python API
franzpoeschel Jul 23, 2020
55c2041
Run simplified test for ADIOS1 in Python
franzpoeschel Jul 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/openPMD/IO/ADIOS/ADIOS1IOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ namespace openPMD
void createDataset(Writable*, Parameter< Operation::CREATE_DATASET > const&) override;
void extendDataset(Writable*, Parameter< Operation::EXTEND_DATASET > const&) override;
void openFile(Writable*, Parameter< Operation::OPEN_FILE > const&) override;
void closeFile(Writable*, Parameter< Operation::CLOSE_FILE > const&) override;
void openPath(Writable*, Parameter< Operation::OPEN_PATH > const&) override;
void openDataset(Writable*, Parameter< Operation::OPEN_DATASET > &) override;
void deleteFile(Writable*, Parameter< Operation::DELETE_FILE > const&) override;
Expand Down
3 changes: 3 additions & 0 deletions include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ class ADIOS2IOHandlerImpl
void openFile( Writable *,
Parameter< Operation::OPEN_FILE > const & ) override;

void closeFile( Writable *,
Parameter< Operation::CLOSE_FILE > const & ) override;

void openPath( Writable *,
Parameter< Operation::OPEN_PATH > const & ) override;

Expand Down
1 change: 1 addition & 0 deletions include/openPMD/IO/ADIOS/ParallelADIOS1IOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ namespace openPMD
void createDataset(Writable*, Parameter< Operation::CREATE_DATASET > const&) override;
void extendDataset(Writable*, Parameter< Operation::EXTEND_DATASET > const&) override;
void openFile(Writable*, Parameter< Operation::OPEN_FILE > const&) override;
void closeFile(Writable*, Parameter< Operation::CLOSE_FILE > const&) override;
void openPath(Writable*, Parameter< Operation::OPEN_PATH > const&) override;
void openDataset(Writable*, Parameter< Operation::OPEN_DATASET > &) override;
void deleteFile(Writable*, Parameter< Operation::DELETE_FILE > const&) override;
Expand Down
9 changes: 9 additions & 0 deletions include/openPMD/IO/AbstractIOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ class AbstractIOHandlerImpl
case O::OPEN_FILE:
openFile(i.writable, deref_dynamic_cast< Parameter< O::OPEN_FILE > >(i.parameter.get()));
break;
case O::CLOSE_FILE:
closeFile(i.writable, *dynamic_cast< Parameter< O::CLOSE_FILE >* >(i.parameter.get()));
break;
case O::OPEN_PATH:
openPath(i.writable, deref_dynamic_cast< Parameter< O::OPEN_PATH > >(i.parameter.get()));
break;
Expand Down Expand Up @@ -118,6 +121,12 @@ class AbstractIOHandlerImpl
return std::future< void >();
}

/**
* Close the file corresponding with the writable and release file handles.
* The operation should succeed in any access mode.
*/
virtual void
closeFile( Writable *, Parameter< Operation::CLOSE_FILE > const & ) = 0;
/** Create a new file in physical storage, possibly overriding an existing file.
*
* The operation should fail if m_handler->m_frontendAccess is Access::READ_ONLY.
Expand Down
1 change: 1 addition & 0 deletions include/openPMD/IO/HDF5/HDF5IOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ namespace openPMD
void createDataset(Writable*, Parameter< Operation::CREATE_DATASET > const&) override;
void extendDataset(Writable*, Parameter< Operation::EXTEND_DATASET > const&) override;
void openFile(Writable*, Parameter< Operation::OPEN_FILE > const&) override;
void closeFile(Writable*, Parameter< Operation::CLOSE_FILE > const&) override;
void openPath(Writable*, Parameter< Operation::OPEN_PATH > const&) override;
void openDataset(Writable*, Parameter< Operation::OPEN_DATASET > &) override;
void deleteFile(Writable*, Parameter< Operation::DELETE_FILE > const&) override;
Expand Down
15 changes: 15 additions & 0 deletions include/openPMD/IO/IOTask.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ OPENPMDAPI_EXPORT_ENUM_CLASS(Operation)
{
CREATE_FILE,
OPEN_FILE,
CLOSE_FILE,
DELETE_FILE,

CREATE_PATH,
Expand Down Expand Up @@ -127,6 +128,20 @@ struct OPENPMDAPI_EXPORT Parameter< Operation::OPEN_FILE > : public AbstractPara
std::string name = "";
};

template<>
struct OPENPMDAPI_EXPORT Parameter< Operation::CLOSE_FILE > : public AbstractParameter
{
Parameter() = default;
Parameter( Parameter const & ) : AbstractParameter() {}

std::unique_ptr< AbstractParameter >
clone() const override
{
return std::unique_ptr< AbstractParameter >(
new Parameter< Operation::CLOSE_FILE >( *this ) );
}
};

template<>
struct OPENPMDAPI_EXPORT Parameter< Operation::DELETE_FILE > : public AbstractParameter
{
Expand Down
5 changes: 5 additions & 0 deletions include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ namespace openPMD
Parameter< Operation::OPEN_FILE > const &
) override;

void closeFile(
Writable *,
Parameter< Operation::CLOSE_FILE > const &
) override;

void openPath(
Writable *,
Parameter< Operation::OPEN_PATH > const &
Expand Down
73 changes: 72 additions & 1 deletion include/openPMD/Iteration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,44 @@ class Iteration : public Attributable
*/
Iteration& setTimeUnitSI(double newTimeUnitSI);

/**
* @brief Close an iteration. No further (backend-propagating) accesses
* may be performed on this iteration.
* A closed iteration may not (yet) be reopened.
* @return Reference to iteration.
*/
/*
* Note: If the API is changed in future to allow reopening closed
* iterations, measures should be taken to prevent this in the streaming
* API. Currently, disallowing to reopen closed iterations satisfies
* the requirements of the streaming API.
*/
Iteration &
close( bool flush = true );

/**
* @brief Has the iteration been closed?
* A closed iteration may not (yet) be reopened.
*
* @return Whether the iteration has been closed.
*/
bool
closed() const;

/**
* @brief Has the iteration been closed by the writer?
* Background: Upon calling Iteration::close(), the openPMD API
* will add metadata to the iteration in form of an attribute,
* indicating that the iteration has indeed been closed.
* Useful mainly in streaming context when a reader inquires from
* a writer that it is done writing.
*
* @return Whether the iteration has been explicitly closed (yet) by the
* writer.
*/
bool
closedByWriter() const;

Container< Mesh > meshes;
Container< ParticleSpecies > particles; //particleSpecies?

Expand All @@ -99,8 +137,41 @@ class Iteration : public Attributable
void flush();
void read();

/**
* @brief Whether an iteration has been closed yet.
*
*/
enum class CloseStatus
ax3l marked this conversation as resolved.
Show resolved Hide resolved
{
Open, //!< Iteration has not been closed
ClosedInFrontend, /*!< Iteration has been closed, but task has not yet
been propagated to the backend */
ClosedInBackend /*!< Iteration has been closed and task has been
propagated to the backend */
};

/*
* An iteration may be logically closed in the frontend,
* but not necessarily yet in the backend.
* Will be propagated to the backend upon next flush.
* Store the current status.
*/
std::shared_ptr< CloseStatus > m_closed =
std::make_shared< CloseStatus >( CloseStatus::Open );

/*
* @brief Check recursively whether this Iteration is dirty.
* It is dirty if any attribute or dataset is read from or written to
* the backend.
*
* @return true If dirty.
* @return false Otherwise.
*/
bool
dirtyRecursive() const;

virtual void linkHierarchy(std::shared_ptr< Writable > const& w);
}; //Iteration
}; // Iteration

extern template
float
Expand Down
11 changes: 11 additions & 0 deletions include/openPMD/ParticleSpecies.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ class ParticleSpecies : public Container< Record >

void read();
void flush(std::string const &) override;

/**
* @brief Check recursively whether this ParticleSpecies is dirty.
* It is dirty if any attribute or dataset is read from or written to
* the backend.
*
* @return true If dirty.
* @return false Otherwise.
*/
bool
dirtyRecursive() const;
};

namespace traits
Expand Down
11 changes: 11 additions & 0 deletions include/openPMD/RecordComponent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,17 @@ class RecordComponent : public BaseRecordComponent
* @return Reference to this RecordComponent instance.
*/
RecordComponent& makeEmpty( Dataset d );

/**
* @brief Check recursively whether this RecordComponent is dirty.
* It is dirty if any attribute or dataset is read from or written to
* the backend.
*
* @return true If dirty.
* @return false Otherwise.
*/
bool
dirtyRecursive() const;
}; // RecordComponent


Expand Down
6 changes: 4 additions & 2 deletions include/openPMD/Series.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,10 @@ class Series : public Attributable
std::unique_ptr< ParsedInput > parseInput(std::string);
void init(std::shared_ptr< AbstractIOHandler >, std::unique_ptr< ParsedInput >);
void initDefaults();
void flushFileBased();
void flushGroupBased();
template< typename IterationsContainer >
void flushFileBased( IterationsContainer && iterationsToFlush );
template< typename IterationsContainer >
void flushGroupBased( IterationsContainer && iterationsToFlush );
void flushMeshesPath();
void flushParticlesPath();
void readFileBased();
Expand Down
33 changes: 31 additions & 2 deletions include/openPMD/backend/BaseRecord.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,18 @@ class BaseRecord : public Container< T_elem >
void flush(std::string const&) final;
virtual void flush_impl(std::string const&) = 0;
virtual void read() = 0;
}; //BaseRecord

/**
* @brief Check recursively whether this BaseRecord is dirty.
* It is dirty if any attribute or dataset is read from or written to
* the backend.
*
* @return true If dirty.
* @return false Otherwise.
*/
bool
dirtyRecursive() const;
}; // BaseRecord


template< typename T_elem >
Expand Down Expand Up @@ -295,4 +306,22 @@ BaseRecord< T_elem >::flush(std::string const& name)

this->flush_impl(name);
}
} // openPMD

template< typename T_elem >
inline bool
BaseRecord< T_elem >::dirtyRecursive() const
{
if( Attributable::dirty )
{
return true;
}
for( auto const & pair : *this )
{
if( pair.second.dirtyRecursive() )
{
return true;
}
}
return false;
}
} // namespace openPMD
3 changes: 3 additions & 0 deletions src/IO/ADIOS/ADIOS1IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ ADIOS1IOHandlerImpl::flush()
case O::OPEN_DATASET:
openDataset(i.writable, deref_dynamic_cast< Parameter< O::OPEN_DATASET > >(i.parameter.get()));
break;
case O::CLOSE_FILE:
closeFile(i.writable, *dynamic_cast< Parameter< O::CLOSE_FILE >* >(i.parameter.get()));
break;
case O::DELETE_FILE:
deleteFile(i.writable, deref_dynamic_cast< Parameter< O::DELETE_FILE > >(i.parameter.get()));
break;
Expand Down
18 changes: 18 additions & 0 deletions src/IO/ADIOS/ADIOS2IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,24 @@ void ADIOS2IOHandlerImpl::openFile(
writable->abstractFilePosition = std::make_shared< ADIOS2FilePosition >( );
}

void
ADIOS2IOHandlerImpl::closeFile(
Writable * writable,
Parameter< Operation::CLOSE_FILE > const & )
{
auto fileIterator = m_files.find( writable );
if ( fileIterator != m_files.end( ) )
{
fileIterator->second.invalidate( );
auto it = m_fileData.find( fileIterator->second );
if ( it != m_fileData.end( ) )
{
it->second->flush( );
m_fileData.erase( it );
}
}
}

void ADIOS2IOHandlerImpl::openPath(
Writable * writable, const Parameter< Operation::OPEN_PATH > & parameters )
{
Expand Down
64 changes: 62 additions & 2 deletions src/IO/ADIOS/CommonADIOS1IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -540,8 +540,68 @@ CommonADIOS1IOHandlerImpl::openFile(Writable* writable,
}

void
CommonADIOS1IOHandlerImpl::openPath(Writable* writable,
Parameter< Operation::OPEN_PATH > const& parameters)
CommonADIOS1IOHandlerImpl::closeFile(
Writable * writable,
Parameter< Operation::CLOSE_FILE > const & )
{
auto myFile = m_filePaths.find( writable );
if( myFile == m_filePaths.end() )
{
return;
}

// finish write operations
auto myGroup = m_groups.find( myFile->second );
if( myGroup != m_groups.end() )
{
auto attributeWrites = m_attributeWrites.find( myGroup->second );
if( this->m_handler->m_backendAccess != Access::READ_ONLY &&
attributeWrites != m_attributeWrites.end() )
{
for( auto & att : attributeWrites->second )
{
flush_attribute( myGroup->second, att.first, att.second );
}
m_attributeWrites.erase( attributeWrites );
}
m_groups.erase( myGroup );
}

auto handle_write = m_openWriteFileHandles.find( myFile->second );
if( handle_write != m_openWriteFileHandles.end() )
{
close( handle_write->second );
m_openWriteFileHandles.erase( handle_write );
}

// finish read operations
auto handle_read = m_openReadFileHandles.find( myFile->second );
if( handle_read != m_openReadFileHandles.end() )
{
auto scheduled = m_scheduledReads.find( handle_read->second );
if( scheduled != m_scheduledReads.end() )
{
auto status = adios_perform_reads( scheduled->first, 1 );
VERIFY(
status == err_no_error,
"[ADIOS1] Internal error: Failed to perform ADIOS reads during "
"dataset reading" );

for( auto & sel : scheduled->second )
adios_selection_delete( sel );
m_scheduledReads.erase( scheduled );
}
close( handle_read->second );
m_openReadFileHandles.erase( handle_read );
}
m_existsOnDisk.erase( myFile->second );
m_filePaths.erase( myFile );
}

void
CommonADIOS1IOHandlerImpl::openPath(
Writable * writable,
Parameter< Operation::OPEN_PATH > const & parameters )
{
/* Sanitize path */
std::string path = parameters.path;
Expand Down
Loading