Skip to content

Commit

Permalink
Merge pull request #6232 from johnhaddon/sceneWriterPerformance
Browse files Browse the repository at this point in the history
SceneWriter : Improve performance
  • Loading branch information
johnhaddon authored Feb 4, 2025
2 parents 74ba658 + a309dfa commit 2008b9f
Show file tree
Hide file tree
Showing 7 changed files with 437 additions and 218 deletions.
10 changes: 10 additions & 0 deletions Changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ API

- ScriptWindow : Added `instanceCreatedSignal()`.

Improvements
------------

- SceneWriter : Improved performance. Benchmarks rewriting complex scenes via a SceneReader->SceneWriter graph show around a 2x speedup.

API
---

- SceneAlgo : Added `parallelGatherLocations()` function.

1.5.4.0 (relative to 1.5.3.0)
=======

Expand Down
11 changes: 11 additions & 0 deletions include/GafferScene/SceneAlgo.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,17 @@ void filteredParallelTraverse( const ScenePlug *scene, const FilterPlug *filterP
template <class ThreadableFunctor>
void filteredParallelTraverse( const ScenePlug *scene, const IECore::PathMatcher &filter, ThreadableFunctor &f, const ScenePlug::ScenePath &root = ScenePlug::ScenePath() );

/// Calls `locationFunctor` in parallel for all locations in the scene, passing the results
/// serially to `gatherFunctor`. Parent locations are guaranteed to be passed to `gatherFunctor`
/// before their children, but otherwise the order of execution is unspecified.
template <class LocationFunctor, class GatherFunctor>
void parallelGatherLocations(
const ScenePlug *scene,
LocationFunctor &&locationFunctor, // Signature : T locationFunctor( const ScenePlug *scene, const ScenePath &path )
GatherFunctor &&gatherFunctor, // Signature : void gatherFunctor( T &locationFunctorResult )
const ScenePlug::ScenePath &root = ScenePlug::ScenePath()
);

/// Searching
/// =========

Expand Down
81 changes: 81 additions & 0 deletions include/GafferScene/SceneAlgo.inl
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@

#include "Gaffer/Context.h"

#include "tbb/concurrent_queue.h"
#include "tbb/enumerable_thread_specific.h"
#include "tbb/parallel_for.h"
#include "tbb/task_arena.h"

#include <variant>

namespace GafferScene
{
Expand Down Expand Up @@ -180,6 +184,83 @@ void filteredParallelTraverse( const ScenePlug *scene, const IECore::PathMatcher
parallelTraverse( scene, ff, root );
}


template <class LocationFunctor, class GatherFunctor>
void parallelGatherLocations( const ScenePlug *scene, LocationFunctor &&locationFunctor, GatherFunctor &&gatherFunctor, const ScenePlug::ScenePath &root )
{
// We use `parallelTraverse()` to run `locationFunctor`, passing the results to
// `gatherFunctor` on the current thread via a queue. In testing, this proved to
// have lower overhead than using TBB's `parallel_pipeline()`.

using LocationResult = std::invoke_result_t<LocationFunctor, const ScenePlug *, const ScenePlug::ScenePath &>;
using QueueValue = std::variant<std::monostate, LocationResult, std::exception_ptr>;
tbb::concurrent_bounded_queue<QueueValue> queue;
queue.set_capacity( tbb::this_task_arena::max_concurrency() );

IECore::Canceller traverseCanceller;
auto locationFunctorWrapper = [&] ( const ScenePlug *scene, const ScenePlug::ScenePath &path ) {
IECore::Canceller::check( &traverseCanceller );
queue.push( std::move( locationFunctor( scene, path ) ) );
return true;
};

tbb::task_arena( tbb::task_arena::attach() ).enqueue(

[&, &threadState = Gaffer::ThreadState::current()] () {

Gaffer::ThreadState::Scope threadStateScope( threadState );
try
{
SceneAlgo::parallelTraverse( scene, locationFunctorWrapper, root );
}
catch( ... )
{
queue.push( std::current_exception() );
return;
}
queue.push( std::monostate() );
}

);

while( true )
{
QueueValue value;
queue.pop( value );
if( auto locationResult = std::get_if<LocationResult>( &value ) )
{
try
{
gatherFunctor( *locationResult );
}
catch( ... )
{
// We can't rethrow until the `parallelTraverse()` has
// completed, as it references the `queue` and
// `traverseCanceller` from this stack frame.
traverseCanceller.cancel();
while( true )
{
queue.pop( value );
if( std::get_if<std::exception_ptr>( &value ) || std::get_if<std::monostate>( &value ) )
{
throw;
}
}
}
}
else if( auto exception = std::get_if<std::exception_ptr>( &value ) )
{
std::rethrow_exception( *exception );
}
else
{
// We use `monostate` to signal completion.
break;
}
}
}

template<typename Predicate>
IECore::PathMatcher findAll( const ScenePlug *scene, Predicate &&predicate, const ScenePlug::ScenePath &root )
{
Expand Down
78 changes: 78 additions & 0 deletions python/GafferSceneTest/SceneAlgoTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2422,6 +2422,84 @@ def testFindAllWithAttribute( self ) :
IECore.PathMatcher( [ "/group/light1" ] )
)

def testParallelGatherLocations( self ) :

plane = GafferScene.Plane()
group = GafferScene.Group()
group["in"][0].setInput( plane["out"] )

groupFilter = GafferScene.PathFilter()
groupFilter["paths"].setValue( IECore.StringVectorData( [ "/group" ] ) )

duplicate = GafferScene.Duplicate()
duplicate["in"].setInput( group["out"] )
duplicate["filter"].setInput( groupFilter["out"] )
duplicate["copies"].setValue( 100 )

gathered = []
GafferScene.SceneAlgo.parallelGatherLocations(

duplicate["out"],

lambda scene, path : path,
lambda path : gathered.append( path )

)

# We expect to have visited all locations.

expected = set(
[ "/", "/group", "/group/plane" ] +
[ f"/group{x}" for x in range( 1, 101 ) ] +
[ f"/group{x}/plane" for x in range( 1, 101 ) ]
)
self.assertEqual( set( gathered ), expected )

# And we expect to have visited parent locations
# before child locations.

indices = {
value : index
for index, value in enumerate( gathered )
}

self.assertEqual( gathered[0], "/" )
self.assertGreater( indices["/group/plane"], indices["/group"] )

for i in range( 1, 101 ) :
self.assertGreater( indices[f"/group{i}/plane"], indices[f"/group{i}"] )

def testParallelGatherExceptionHandling( self ) :

plane = GafferScene.Plane()

with self.assertRaisesRegex( ZeroDivisionError, "division by zero" ) :

GafferScene.SceneAlgo.parallelGatherLocations(

plane["out"],
lambda scene, path : path,
lambda x : 1/0

)

def testParallelGatherLocationExceptionHandling( self ) :

plane = GafferScene.Plane()

gathered = []
with self.assertRaisesRegex( Exception, "ZeroDivisionError" ) :

GafferScene.SceneAlgo.parallelGatherLocations(

plane["out"],
lambda scene, path : 1/0,
lambda x : gathered.append( x )

)

self.assertEqual( gathered, [] )

def tearDown( self ) :

GafferSceneTest.SceneTestCase.tearDown( self )
Expand Down
Loading

0 comments on commit 2008b9f

Please sign in to comment.