Skip to content

Commit

Permalink
Merge pull request #5636 from johnhaddon/dispatchersInTheGraph
Browse files Browse the repository at this point in the history
Dispatcher : Rederive from TaskNode
  • Loading branch information
johnhaddon authored Jan 31, 2024
2 parents c6eace7 + 27e0ee4 commit 5e5808b
Show file tree
Hide file tree
Showing 33 changed files with 1,714 additions and 942 deletions.
10 changes: 10 additions & 0 deletions Changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
Features
--------

- Dispatcher : Dispatchers are now TaskNodes, allowing them to be nested in a task graph. Possibilities include :
- Using a LocalDispatcher and a Wedge to launch multiple TractorDispatcher jobs.
- Using a nested LocalDispatcher to perform a group of tasks on a single blade within a TractorDispatcher job.
- DeepSlice : Added a new node for clipping out part of an image based on depth.
- ImageInspector : Added a new panel for inspecting image format, metadata and channel statistics.

Expand Down Expand Up @@ -58,6 +61,7 @@ Fixes
-----

- BackgroundTask : Fixed potential deadlock caused by destroying a BackgroundTask from Python while it was still running.
- Dispatcher : The job directory is no longer created when dispatch is cancelled by a slot connected to `preDispatchSignal()`.
- LocalDispatcher :
- Fixed delays and zombie processes caused by shutting down Gaffer while background jobs were running. Background jobs are now killed before Gaffer exits instead.
- Stopped failed jobs jumping to the end of the Local Jobs UI.
Expand Down Expand Up @@ -117,6 +121,12 @@ Breaking Changes
- Removed `createMatching()` method.
- Removed non-const TaskBatch accessors `frames()` and `preTasks()`.
- Made `TaskBatch` constructors private.
- The job directory is no longer available in slots connected to `preDispatchSignal()`.
- Removed `nodes` arguments from dispatch signals. Use the `dispatcher["tasks"]` plug instead.
- Removed `script` and `context` arguments from `frameRange()` method. The current frame and full frame range are now queried from the current context.
- DispatcherUI :
- Removed `appendMenuDefinitions()`, `appendNodeContextMenuDefinitions()`, `executeSelected()` and `repeatPrevious()` functions.
- Removed `DispatcherWindow` class.
- LocalDispatcher :
- Removed `JobPool.jobFailedSignal()`.
- Removed `JobPool.failedJobs()` method. Failed jobs now remain in place in the main `jobs()` container.
Expand Down
7 changes: 4 additions & 3 deletions apps/stats/stats-1.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,23 +628,24 @@ def __writeTask( self, script, args ) :
import GafferDispatch

task = script.descendant( args["task"].value )
if isinstance( task, GafferDispatch.TaskNode.TaskPlug ) :
task = task.node()
if isinstance( task, Gaffer.Node ) :
task = next( GafferDispatch.TaskNode.TaskPlug.RecursiveOutputRange( task ), None )

if task is None :
IECore.msg( IECore.Msg.Level.Error, "stats", "Task \"%s\" does not exist" % args["task"].value )
return

dispatcher = GafferDispatch.LocalDispatcher( jobPool = GafferDispatch.LocalDispatcher.JobPool() )
dispatcher["jobsDirectory"].setValue( tempfile.mkdtemp( prefix = "gafferStats" ) )
dispatcher["tasks"][0].setInput( task )

memory = _Memory.maxRSS()
with _Timer() as taskTimer :
with self.__performanceMonitor or contextlib.nullcontext(), self.__contextMonitor or contextlib.nullcontext(), self.__vtuneMonitor or contextlib.nullcontext() :
with self.__context( script, args ) as context :
for frame in self.__frames( script, args ) :
context.setFrame( frame )
dispatcher.dispatch( [ task ] )
dispatcher["task"].execute()

self.__timers["Task execution"] = taskTimer
self.__memory["Task execution"] = _Memory.maxRSS() - memory
Expand Down
42 changes: 20 additions & 22 deletions include/GafferDispatch/Dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,46 +97,39 @@ IE_CORE_FORWARDDECLARE( Dispatcher )
/// of Context specific Tasks from TaskNodes which exist within a ScriptNode.
/// Dispatchers can also modify TaskNodes during construction, adding
/// plugs which affect Task execution.
class GAFFERDISPATCH_API Dispatcher : public Gaffer::Node
class GAFFERDISPATCH_API Dispatcher : public TaskNode
{
public :

explicit Dispatcher( const std::string &name=defaultName<Dispatcher>() );
~Dispatcher() override;

GAFFER_NODE_DECLARE_TYPE( GafferDispatch::Dispatcher, DispatcherTypeId, Gaffer::Node );
GAFFER_NODE_DECLARE_TYPE( GafferDispatch::Dispatcher, DispatcherTypeId, TaskNode );

using PreDispatchSignal = Gaffer::Signals::Signal<bool ( const Dispatcher *, const std::vector<TaskNodePtr> & ), Detail::PreDispatchSignalCombiner>;
using DispatchSignal = Gaffer::Signals::Signal<void ( const Dispatcher *, const std::vector<TaskNodePtr> & ), Gaffer::Signals::CatchingCombiner<void>>;
using PostDispatchSignal = Gaffer::Signals::Signal<void ( const Dispatcher *, const std::vector<TaskNodePtr> &, bool ), Gaffer::Signals::CatchingCombiner<void>>;
using PreDispatchSignal = Gaffer::Signals::Signal<bool ( const Dispatcher * ), Detail::PreDispatchSignalCombiner>;
using DispatchSignal = Gaffer::Signals::Signal<void ( const Dispatcher * ), Gaffer::Signals::CatchingCombiner<void>>;
using PostDispatchSignal = Gaffer::Signals::Signal<void ( const Dispatcher *, bool ), Gaffer::Signals::CatchingCombiner<void>>;
//! @name Dispatch Signals
/// These signals are emitted on dispatch events for any registered Dispatcher instance.
////////////////////////////////////////////////////////////////////////////////////////
//@{
/// Called when any dispatcher might begin to dispatch nodes. Slots should have the
/// signature `bool slot( dispatcher, nodes )`, and may return True to cancel
/// Called when any dispatcher might begin to dispatch tasks. Slots should have the
/// signature `bool slot( dispatcher )`, and may return True to cancel
/// the dispatch, or False to allow it to continue.
static PreDispatchSignal &preDispatchSignal();
/// Called when any dispatcher is going to dispatch nodes. Slots should have the
/// signature `bool slot( dispatcher, nodes )`. This differs from the preDispatchSignal
/// Called when any dispatcher is going to dispatch tasks. Slots should have the
/// signature `bool slot( dispatcher )`. This differs from the `preDispatchSignal`
/// in that it is triggered when dispatching is imminent and non-cancellable.
static DispatchSignal &dispatchSignal();
/// Called after any dispatcher has finished dispatching nodes, or after a pending dispatch
/// has been cancelled by the preDispatchSignal slots. Slots should have the signature
/// `void slot( dispatcher, nodes, bool )`. The third argument will be True if the process
/// `void slot( dispatcher, bool )`. The third argument will be True if the process
/// was successful, and False otherwise.
static PostDispatchSignal &postDispatchSignal();
//@}

/// Calls doDispatch, taking care to trigger the dispatch signals at the appropriate times.
/// Note that this will throw unless all of the nodes are either TaskNodes or Boxes,
/// and it will also throw if cycles are detected in the resulting TaskBatch graph.
/// \todo Replace this with a version taking vector<TaskPlugPtr>. This will plug the
/// type safety issue whereby currently any old node can be passed to dispatch.
/// Alternatively, perhaps the tasks to dispatch should be specified via connections
/// into a "tasks" ArrayPlug, so dispatchers can optionally live directly in the node
/// graph.
void dispatch( const std::vector<Gaffer::NodePtr> &nodes ) const;
Gaffer::ArrayPlug *tasksPlug();
const Gaffer::ArrayPlug *tasksPlug() const;

enum FramesMode
{
Expand All @@ -155,9 +148,9 @@ class GAFFERDISPATCH_API Dispatcher : public Gaffer::Node
/// Returns frame range to be used when framesModePlug is set to CustomRange.
Gaffer::StringPlug *frameRangePlug();
const Gaffer::StringPlug *frameRangePlug() const;
/// Returns the FrameList that will be used during dispatch() to create the TaskBatches.
/// Derived classes which reimplement this must call the base class first.
virtual IECore::FrameListPtr frameRange( const Gaffer::ScriptNode *script, const Gaffer::Context *context ) const;
/// Returns the frame range that would be used by a dispatch in the
/// current context.
virtual IECore::FrameListPtr frameRange() const;
//@}

//! @name Dispatcher Jobs
Expand Down Expand Up @@ -287,6 +280,11 @@ class GAFFERDISPATCH_API Dispatcher : public Gaffer::Node

private :

void preTasks( const Gaffer::Context *context, Tasks &tasks ) const final;
void postTasks( const Gaffer::Context *context, Tasks &tasks ) const final;
IECore::MurmurHash hash( const Gaffer::Context *context ) const final;
void execute() const final;

void createJobDirectory( const Gaffer::ScriptNode *script, Gaffer::Context *context ) const;
mutable std::filesystem::path m_jobDirectory;

Expand Down
4 changes: 4 additions & 0 deletions python/GafferArnold/ArnoldTextureBake.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ def __init__( self, name = "ArnoldTextureBake" ) :
# First, setup python commands which will dispatch a chunk of a render or image tasks as
# immediate execution once they reach the farm - this allows us to run multiple tasks in
# one farm process.
## \todo Ideally we would host LocalDispatcher nodes in our internal graph, instead of
# constructing them inside of PythonCommands. That's not currently possible because the
# hash for the tasks being dispatched relies on the index file, which doesn't exist
# prior to dispatch.
self["__RenderDispatcher"] = GafferDispatch.PythonCommand()
self["__RenderDispatcher"]["preTasks"][0].setInput( self["__CleanPreTasks"]["out"] )
self["__RenderDispatcher"]["command"].setValue( inspect.cleandoc(
Expand Down
11 changes: 6 additions & 5 deletions python/GafferArnoldTest/ArnoldRenderTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,13 @@ def testWedge( self ) :
s["fileName"].setValue( self.temporaryDirectory() / "test.gfr" )
s.save()

dispatcher = GafferDispatch.LocalDispatcher( jobPool = GafferDispatch.LocalDispatcher.JobPool() )
dispatcher["jobsDirectory"].setValue( self.temporaryDirectory() / "testJobDirectory" )
dispatcher["framesMode"].setValue( GafferDispatch.Dispatcher.FramesMode.CurrentFrame )
dispatcher["executeInBackground"].setValue( False )
s["dispatcher"] = GafferDispatch.LocalDispatcher( jobPool = GafferDispatch.LocalDispatcher.JobPool() )
s["dispatcher"]["tasks"][0].setInput( s["wedge"]["task"] )
s["dispatcher"]["jobsDirectory"].setValue( self.temporaryDirectory() / "testJobDirectory" )
s["dispatcher"]["framesMode"].setValue( GafferDispatch.Dispatcher.FramesMode.CurrentFrame )
s["dispatcher"]["executeInBackground"].setValue( False )

dispatcher.dispatch( [ s["wedge"] ] )
s["dispatcher"]["task"].execute()

hidden = GafferImage.ImageReader()
hidden["fileName"].setValue( self.temporaryDirectory() / "hidden.exr" )
Expand Down
22 changes: 13 additions & 9 deletions python/GafferArnoldTest/ArnoldTextureBakeTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,10 @@ def testManyImages( self ):
# Dispatch the bake
script = Gaffer.ScriptNode()
script.addChild( arnoldTextureBake )
dispatcher = GafferDispatch.LocalDispatcher( jobPool = GafferDispatch.LocalDispatcher.JobPool() )
dispatcher["jobsDirectory"].setValue( self.temporaryDirectory() )
dispatcher.dispatch( [ arnoldTextureBake ] )
script["dispatcher"] = GafferDispatch.LocalDispatcher( jobPool = GafferDispatch.LocalDispatcher.JobPool() )
script["dispatcher"]["tasks"][0].setInput( arnoldTextureBake["task"] )
script["dispatcher"]["jobsDirectory"].setValue( self.temporaryDirectory() )
script["dispatcher"]["task"].execute()

# Test that we are writing all expected files, and that we have cleaned up all temp files
expectedUdims = [ i + j for j in [ 1001, 1033 ] for i in [ 0, 1, 10, 11 ] ]
Expand Down Expand Up @@ -306,9 +307,10 @@ def testTasks( self ):
# Dispatch the bake
script = Gaffer.ScriptNode()
script.addChild( arnoldTextureBake )
dispatcher = GafferDispatch.LocalDispatcher( jobPool = GafferDispatch.LocalDispatcher.JobPool() )
dispatcher["jobsDirectory"].setValue( self.temporaryDirectory() )
dispatcher.dispatch( [ arnoldTextureBake ] )
script["dispatcher"] = GafferDispatch.LocalDispatcher( jobPool = GafferDispatch.LocalDispatcher.JobPool() )
script["dispatcher"]["tasks"][0].setInput( arnoldTextureBake["task"] )
script["dispatcher"]["jobsDirectory"].setValue( self.temporaryDirectory() )
script["dispatcher"]["task"].execute()

self.assertEqual( sorted( [ x.name for x in ( self.temporaryDirectory() / 'bakeSpheres' ).iterdir() ] ),
[ "BAKE_FILE_INDEX_0.0001.txt", "BAKE_FILE_INDEX_1.0001.txt", "BAKE_FILE_INDEX_2.0001.txt", "beauty", "diffuse" ] )
Expand Down Expand Up @@ -450,9 +452,11 @@ def testMerging( self ):
# Dispatch the bake
script = Gaffer.ScriptNode()
script.addChild( arnoldTextureBake )
dispatcher = GafferDispatch.LocalDispatcher( jobPool = GafferDispatch.LocalDispatcher.JobPool() )
dispatcher["jobsDirectory"].setValue( self.temporaryDirectory() )
dispatcher.dispatch( [ arnoldTextureBake ] )

script["dispatcher"] = GafferDispatch.LocalDispatcher( jobPool = GafferDispatch.LocalDispatcher.JobPool() )
script["dispatcher"]["tasks"][0].setInput( arnoldTextureBake["task"] )
script["dispatcher"]["jobsDirectory"].setValue( self.temporaryDirectory() )
script["dispatcher"]["task"].execute()

# Check results
imageReader = GafferImage.ImageReader()
Expand Down
20 changes: 13 additions & 7 deletions python/GafferDispatch/LocalDispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,25 @@ def __init__( self, batch, dispatcher ) :
# Store all dispatcher settings now, as we can't access the dispatcher
# again if we're executing in the background (as it may be modified on
# the main thread).
self.__name = Gaffer.Context.current().substitute( dispatcher["jobName"].getValue() )
self.__name = dispatcher["jobName"].getValue()
self.__directory = Gaffer.Context.current()["dispatcher:jobDirectory"]
self.__scriptFile = Gaffer.Context.current()["dispatcher:scriptFileName"]
self.__frameRange = dispatcher.frameRange( script, self.__context )
self.__frameRange = dispatcher.frameRange()
self.__id = os.path.basename( self.__directory )
self.__ignoreScriptLoadErrors = dispatcher["ignoreScriptLoadErrors"].getValue()
## \todo Make `Dispatcher::dispatch()` use a Process, so we don't need to
# do substitutions manually like this.
self.__environmentCommand = Gaffer.Context.current().substitute(
dispatcher["environmentCommand"].getValue()
)
self.__environmentCommand = dispatcher["environmentCommand"].getValue()
self.__executeInBackground = dispatcher["executeInBackground"].getValue()

if self.__executeInBackground :
application = script.ancestor( Gaffer.ApplicationRoot )
if application is not None and application.getName() == "execute" :
# Background execution makes no sense within the `execute`
# app, since the app will exit as soon as `_doDispatch()`
# returns, and the background job will be killed before it
# can complete.
IECore.msg( IECore.Msg.Level.Warning, "LocalDispatcher", "Forcing foreground execution" )
self.__executeInBackground = False

self.__startTime = datetime.datetime.now( datetime.timezone.utc )
self.__endTime = None

Expand Down
Loading

0 comments on commit 5e5808b

Please sign in to comment.