Skip to content

Commit

Permalink
Merge pull request #5648 from johnhaddon/localJobsFixesPR
Browse files Browse the repository at this point in the history
LocalJobs fixes
  • Loading branch information
johnhaddon authored Jan 31, 2024
2 parents b327a2b + 6021361 commit a606da4
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 35 deletions.
27 changes: 19 additions & 8 deletions python/GafferDispatch/LocalDispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
##########################################################################

import atexit
import collections
import datetime
import enum
import functools
Expand Down Expand Up @@ -101,7 +102,7 @@ def __init__( self, batch, dispatcher ) :

self.__messageHandler = _MessageHandler()
self.__messagesChangedSignal = Gaffer.Signal1()
self.__messageHandler.messagesChangedSignal().connect( Gaffer.WeakMethod( self.__messagesChanged ), scoped = False )
self.__messageHandler.messagesChangedSignal().connect( Gaffer.WeakMethod( self.__messagesChanged, fallbackResult = None ), scoped = False )

self.__initBatchWalk( batch )

Expand Down Expand Up @@ -407,17 +408,25 @@ class JobPool :

def __init__( self ) :

self.__jobs = []
self.__jobs = collections.OrderedDict()
self.__nextId = 0

self.__jobAddedSignal = Gaffer.Signals.Signal1()
self.__jobRemovedSignal = Gaffer.Signals.Signal1()

# Returns a list of jobs in the order they were added.
def jobs( self ) :

return list(self.__jobs)
return list( self.__jobs.values() )

# Returns an ordered dictionary mapping from unique IDs to jobs.
def jobsDict( self ) :

return self.__jobs

def waitForAll( self ) :

while any( j.status() in ( j.Status.Waiting, j.Status.Running ) for j in self.__jobs ) :
while any( j.status() in ( j.Status.Waiting, j.Status.Running ) for j in self.__jobs.values() ) :
time.sleep( 0.2 )

def jobAddedSignal( self ) :
Expand All @@ -432,14 +441,16 @@ def addJob( self, job ) :

assert( isinstance( job, LocalDispatcher.Job ) )

self.__jobs.append( job )
job.__jobPoolId = self.__nextId
self.__nextId += 1
self.__jobs[job.__jobPoolId] = job

self.jobAddedSignal()( job )

def removeJob( self, job ) :

if job in self.__jobs :
self.__jobs.remove( job )
self.jobRemovedSignal()( job )
del self.__jobs[job.__jobPoolId]
self.jobRemovedSignal()( job )

__defaultJobPool = None

Expand Down
50 changes: 23 additions & 27 deletions python/GafferDispatchUI/LocalJobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,15 @@

class _LocalJobsPath( Gaffer.Path ) :

def __init__( self, jobPool, job = None, path = None, root = "/" ) :
def __init__( self, jobPool, path = None, root = "/", filter = None ) :

Gaffer.Path.__init__( self, path = path, root = root )
Gaffer.Path.__init__( self, path = path, root = root, filter = filter )

self.__jobPool = jobPool
self.__job = job

def copy( self ) :

c = self.__class__( self.__jobPool, self.__job )

return c
return self.__class__( self.__jobPool, self[:], self.root(), self.getFilter() )

def propertyNames( self, canceller = None ) :

Expand All @@ -84,31 +81,35 @@ def property( self, name, canceller = None ) :
if result is not None :
return result

if self.__job is None :
job = self.job()
if job is None :
return None

if name == "localDispatcher:status" :
return self.__job.status().name
return job.status().name
elif name == "localDispatcher:id" :
return self.__job.id()
return job.id()
elif name == "localDispatcher:jobName" :
return self.__job.name()
return job.name()
elif name == "localDispatcher:directory" :
return self.__job.directory()
return job.directory()
elif name == "localDispatcher:startTime" :
return self.__job.startTime()
return job.startTime()
elif name == "localDispatcher:runningTime" :
return self.__job.runningTime().total_seconds()
return job.runningTime().total_seconds()
elif name == "localDispatcher:cpuUsage" :
return self.__job.cpuUsage()
return job.cpuUsage()
elif name == "localDispatcher:memoryUsage" :
return self.__job.memoryUsage()
return job.memoryUsage()

return None

def job( self ) :

return self.__job
if len( self ) != 1 :
return None

return self.__jobPool.jobsDict().get( int( self[0] ) )

def jobPool( self ) :

Expand All @@ -123,17 +124,10 @@ def _children( self, canceller ) :
if self.isLeaf() :
return []

result = []
for index, job in enumerate( self.__jobPool.jobs() ) :
result.append(
_LocalJobsPath(
jobPool = self.__jobPool,
job = job,
path = [ str( index ) ],
)
)

return result
return [
_LocalJobsPath( self.__jobPool, [ str( x ) ], self.root(), self.getFilter() )
for x in self.__jobPool.jobsDict().keys()
]

class _StatusColumn( GafferUI.PathColumn ) :

Expand Down Expand Up @@ -293,6 +287,7 @@ def __jobRemoved( self, job ) :

assert( threading.current_thread() is threading.main_thread() )
self.__jobListingWidget.getPath()._emitPathChanged()
self.__jobSelectionChanged( self.__jobListingWidget )

def __jobStatusChanged( self, job ) :

Expand All @@ -315,6 +310,7 @@ def __removeClicked( self, button ) :
for job in self.__selectedJobs() :
job.kill()
jobPool.removeJob( job )
self.__jobSelectionChanged( self.__jobListingWidget )

def __selectedJobs( self ) :

Expand Down

0 comments on commit a606da4

Please sign in to comment.