Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graph Dependencies #1182

Merged
merged 15 commits into from
Jan 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion meshroom/core/attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ def getName(self):
def getType(self):
return self.attributeDesc.__class__.__name__

def getBaseType(self):
return self.getType()

def getLabel(self):
return self._label

Expand Down Expand Up @@ -137,7 +140,7 @@ def _set_value(self, value):
self.valueChanged.emit()

def resetValue(self):
self._value = ""
self._value = self.attributeDesc.value

def requestGraphUpdate(self):
if self.node.graph:
Expand Down Expand Up @@ -258,6 +261,7 @@ def updateInternals(self):
fullName = Property(str, getFullName, constant=True)
label = Property(str, getLabel, constant=True)
type = Property(str, getType, constant=True)
baseType = Property(str, getType, constant=True)
desc = Property(desc.Attribute, lambda self: self.attributeDesc, constant=True)
valueChanged = Signal()
value = Property(Variant, _get_value, _set_value, notify=valueChanged)
Expand Down Expand Up @@ -292,6 +296,9 @@ def __init__(self, node, attributeDesc, isOutput, root=None, parent=None):
def __len__(self):
return len(self._value)

def getBaseType(self):
return self.attributeDesc.elementDesc.__class__.__name__

def at(self, idx):
""" Returns child attribute at index 'idx' """
# implement 'at' rather than '__getitem__'
Expand Down Expand Up @@ -396,6 +403,7 @@ def updateInternals(self):
# Override value property setter
value = Property(Variant, Attribute._get_value, _set_value, notify=Attribute.valueChanged)
isDefault = Property(bool, _isDefault, notify=Attribute.valueChanged)
baseType = Property(str, getBaseType, constant=True)


class GroupAttribute(Attribute):
Expand Down
81 changes: 49 additions & 32 deletions meshroom/core/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,10 @@ class Visitor(object):
Base class for Graph Visitors that does nothing.
Sub-classes can override any method to implement specific algorithms.
"""
def __init__(self, reverse):
def __init__(self, reverse, dependenciesOnly):
super(Visitor, self).__init__()
self.reverse = reverse
self.dependenciesOnly = dependenciesOnly

# def initializeVertex(self, s, g):
# '''is invoked on every vertex of the graph before the start of the graph search.'''
Expand Down Expand Up @@ -383,7 +384,7 @@ def duplicateNodesFromNode(self, fromNode):
Returns:
OrderedDict[Node, Node]: the source->duplicate map
"""
srcNodes, srcEdges = self.dfsOnDiscover(startNodes=[fromNode], reverse=True)
srcNodes, srcEdges = self.dfsOnDiscover(startNodes=[fromNode], reverse=True, dependenciesOnly=True)
# use OrderedDict to keep duplicated nodes creation order
duplicates = OrderedDict()

Expand Down Expand Up @@ -581,13 +582,13 @@ def findNodes(self, nodesExpr):
def edge(self, dstAttributeName):
return self._edges.get(dstAttributeName)

def getLeafNodes(self):
nodesWithOutput = set([edge.src.node for edge in self.edges])
return set(self._nodes) - nodesWithOutput
def getLeafNodes(self, dependenciesOnly):
nodesWithOutputLink = set([edge.src.node for edge in self.getEdges(dependenciesOnly)])
return set(self._nodes) - nodesWithOutputLink

def getRootNodes(self):
nodesWithInput = set([edge.dst.node for edge in self.edges])
return set(self._nodes) - nodesWithInput
def getRootNodes(self, dependenciesOnly):
nodesWithInputLink = set([edge.dst.node for edge in self.getEdges(dependenciesOnly)])
return set(self._nodes) - nodesWithInputLink

@changeTopology
def addEdge(self, srcAttr, dstAttr):
Expand Down Expand Up @@ -635,29 +636,29 @@ def getDepth(self, node, minimal=False):
minDepth, maxDepth = self._nodesMinMaxDepths[node]
return minDepth if minimal else maxDepth

def getInputEdges(self, node):
return set([edge for edge in self.edges if edge.dst.node is node])
def getInputEdges(self, node, dependenciesOnly):
return set([edge for edge in self.getEdges(dependenciesOnly=dependenciesOnly) if edge.dst.node is node])

def _getInputEdgesPerNode(self):
def _getInputEdgesPerNode(self, dependenciesOnly):
nodeEdges = defaultdict(set)

for edge in self.edges:
for edge in self.getEdges(dependenciesOnly=dependenciesOnly):
nodeEdges[edge.dst.node].add(edge.src.node)

return nodeEdges

def _getOutputEdgesPerNode(self):
def _getOutputEdgesPerNode(self, dependenciesOnly):
nodeEdges = defaultdict(set)

for edge in self.edges:
for edge in self.getEdges(dependenciesOnly=dependenciesOnly):
nodeEdges[edge.src.node].add(edge.dst.node)

return nodeEdges

def dfs(self, visitor, startNodes=None, longestPathFirst=False):
# Default direction (visitor.reverse=False): from node to root
# Reverse direction (visitor.reverse=True): from node to leaves
nodeChildren = self._getOutputEdgesPerNode() if visitor.reverse else self._getInputEdgesPerNode()
nodeChildren = self._getOutputEdgesPerNode(visitor.dependenciesOnly) if visitor.reverse else self._getInputEdgesPerNode(visitor.dependenciesOnly)
# Initialize color map
colors = {}
for u in self._nodes:
Expand All @@ -668,7 +669,7 @@ def dfs(self, visitor, startNodes=None, longestPathFirst=False):
# it is not possible to handle this case at the moment
raise NotImplementedError("Graph.dfs(): longestPathFirst=True and visitor.reverse=True are not compatible yet.")

nodes = startNodes or (self.getRootNodes() if visitor.reverse else self.getLeafNodes())
nodes = startNodes or (self.getRootNodes(visitor.dependenciesOnly) if visitor.reverse else self.getLeafNodes(visitor.dependenciesOnly))

if longestPathFirst:
# Graph topology must be known and node depths up-to-date
Expand Down Expand Up @@ -711,7 +712,7 @@ def _dfsVisit(self, u, visitor, colors, nodeChildren, longestPathFirst):
colors[u] = BLACK
visitor.finishVertex(u, self)

def dfsOnFinish(self, startNodes=None, longestPathFirst=False, reverse=False):
def dfsOnFinish(self, startNodes=None, longestPathFirst=False, reverse=False, dependenciesOnly=False):
"""
Return the node chain from startNodes to the graph roots/leaves.
Order is defined by the visit and finishVertex event.
Expand All @@ -728,13 +729,13 @@ def dfsOnFinish(self, startNodes=None, longestPathFirst=False, reverse=False):
"""
nodes = []
edges = []
visitor = Visitor(reverse=reverse)
visitor = Visitor(reverse=reverse, dependenciesOnly=dependenciesOnly)
visitor.finishVertex = lambda vertex, graph: nodes.append(vertex)
visitor.finishEdge = lambda edge, graph: edges.append(edge)
self.dfs(visitor=visitor, startNodes=startNodes, longestPathFirst=longestPathFirst)
return nodes, edges

def dfsOnDiscover(self, startNodes=None, filterTypes=None, longestPathFirst=False, reverse=False):
def dfsOnDiscover(self, startNodes=None, filterTypes=None, longestPathFirst=False, reverse=False, dependenciesOnly=False):
"""
Return the node chain from startNodes to the graph roots/leaves.
Order is defined by the visit and discoverVertex event.
Expand All @@ -753,7 +754,7 @@ def dfsOnDiscover(self, startNodes=None, filterTypes=None, longestPathFirst=Fals
"""
nodes = []
edges = []
visitor = Visitor(reverse=reverse)
visitor = Visitor(reverse=reverse, dependenciesOnly=dependenciesOnly)

def discoverVertex(vertex, graph):
if not filterTypes or vertex.nodeType in filterTypes:
Expand All @@ -777,7 +778,7 @@ def dfsToProcess(self, startNodes=None):
"""
nodes = []
edges = []
visitor = Visitor(reverse=False)
visitor = Visitor(reverse=False, dependenciesOnly=True)

def discoverVertex(vertex, graph):
if vertex.hasStatus(Status.SUCCESS):
Expand Down Expand Up @@ -832,7 +833,7 @@ def updateNodesTopologicalData(self):
self._computationBlocked.clear()

compatNodes = []
visitor = Visitor(reverse=False)
visitor = Visitor(reverse=False, dependenciesOnly=True)

def discoverVertex(vertex, graph):
# initialize depths
Expand Down Expand Up @@ -866,7 +867,7 @@ def finishEdge(edge, graph):
# propagate inputVertex computability
self._computationBlocked[currentVertex] |= self._computationBlocked[inputVertex]

leaves = self.getLeafNodes()
leaves = self.getLeafNodes(visitor.dependenciesOnly)
visitor.finishEdge = finishEdge
visitor.discoverVertex = discoverVertex
self.dfs(visitor=visitor, startNodes=leaves)
Expand All @@ -890,7 +891,7 @@ def dfsMaxEdgeLength(self, startNodes=None):
"""
nodesStack = []
edgesScore = defaultdict(lambda: 0)
visitor = Visitor(reverse=False)
visitor = Visitor(reverse=False, dependenciesOnly=False)

def finishEdge(edge, graph):
u, v = edge
Expand Down Expand Up @@ -926,18 +927,34 @@ def flowEdges(self, startNodes=None):
flowEdges.append(link)
return flowEdges

def getInputNodes(self, node, recursive=False):
def getEdges(self, dependenciesOnly=False):
if not dependenciesOnly:
return self.edges

outEdges = []
for e in self.edges:
attr = e.src
if dependenciesOnly:
if attr.isLink:
attr = attr.getLinkParam(recursive=True)
if not attr.isOutput:
continue
newE = Edge(attr, e.dst)
outEdges.append(newE)
return outEdges

def getInputNodes(self, node, recursive, dependenciesOnly):
""" Return either the first level input nodes of a node or the whole chain. """
if not recursive:
return set([edge.src.node for edge in self.edges if edge.dst.node is node])
return set([edge.src.node for edge in self.getEdges(dependenciesOnly) if edge.dst.node is node])

inputNodes, edges = self.dfsOnDiscover(startNodes=[node], filterTypes=None, reverse=False)
return inputNodes[1:] # exclude current node

def getOutputNodes(self, node, recursive=False):
def getOutputNodes(self, node, recursive, dependenciesOnly):
""" Return either the first level output nodes of a node or the whole chain. """
if not recursive:
return set([edge.dst.node for edge in self.edges if edge.src.node is node])
return set([edge.dst.node for edge in self.getEdges(dependenciesOnly) if edge.src.node is node])

outputNodes, edges = self.dfsOnDiscover(startNodes=[node], filterTypes=None, reverse=True)
return outputNodes[1:] # exclude current node
Expand All @@ -957,8 +974,8 @@ def canSubmitOrCompute(self, startNode):
return 0

class SCVisitor(Visitor):
def __init__(self, reverse):
super(SCVisitor, self).__init__(reverse)
def __init__(self, reverse, dependenciesOnly):
super(SCVisitor, self).__init__(reverse, dependenciesOnly)

canCompute = True
canSubmit = True
Expand All @@ -969,7 +986,7 @@ def discoverVertex(self, vertex, graph):
if vertex.isExtern():
self.canCompute = False

visitor = SCVisitor(reverse=False)
visitor = SCVisitor(reverse=False, dependenciesOnly=True)
self.dfs(visitor=visitor, startNodes=[startNode])
return visitor.canCompute + (2 * visitor.canSubmit)

Expand Down Expand Up @@ -1131,7 +1148,7 @@ def clearSubmittedNodes(self):

@Slot(Node)
def clearDataFrom(self, startNode):
for node in self.dfsOnDiscover(startNodes=[startNode], reverse=True)[0]:
for node in self.dfsOnDiscover(startNodes=[startNode], reverse=True, dependenciesOnly=True)[0]:
node.clearData()

def iterChunksByStatus(self, status):
Expand Down
16 changes: 8 additions & 8 deletions meshroom/core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,11 +589,11 @@ def depth(self):
def minDepth(self):
return self.graph.getDepth(self, minimal=True)

def getInputNodes(self, recursive=False):
return self.graph.getInputNodes(self, recursive=recursive)
def getInputNodes(self, recursive, dependenciesOnly):
return self.graph.getInputNodes(self, recursive=recursive, dependenciesOnly=dependenciesOnly)

def getOutputNodes(self, recursive=False):
return self.graph.getOutputNodes(self, recursive=recursive)
def getOutputNodes(self, recursive, dependenciesOnly):
return self.graph.getOutputNodes(self, recursive=recursive, dependenciesOnly=dependenciesOnly)

def toDict(self):
pass
Expand Down Expand Up @@ -883,7 +883,7 @@ def updateLocked(self):
# Warning: we must handle some specific cases for global start/stop
if self._locked and currentStatus in (Status.ERROR, Status.STOPPED, Status.NONE):
self.setLocked(False)
inputNodes = self.getInputNodes(recursive=True)
inputNodes = self.getInputNodes(recursive=True, dependenciesOnly=True)

for node in inputNodes:
if node.getGlobalStatus() == Status.RUNNING:
Expand All @@ -901,8 +901,8 @@ def updateLocked(self):

if currentStatus == Status.SUCCESS:
# At this moment, the node is necessarily locked because of previous if statement
inputNodes = self.getInputNodes(recursive=True)
outputNodes = self.getOutputNodes(recursive=True)
inputNodes = self.getInputNodes(recursive=True, dependenciesOnly=True)
outputNodes = self.getOutputNodes(recursive=True, dependenciesOnly=True)
stayLocked = None

# Check if at least one dependentNode is submitted or currently running
Expand All @@ -918,7 +918,7 @@ def updateLocked(self):
return
elif currentStatus in lockedStatus and self._chunks.at(0).statusNodeName == self.name:
self.setLocked(True)
inputNodes = self.getInputNodes(recursive=True)
inputNodes = self.getInputNodes(recursive=True, dependenciesOnly=True)
for node in inputNodes:
node.setLocked(True)
return
Expand Down
6 changes: 3 additions & 3 deletions meshroom/core/taskManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def restart(self):
self.removeNode(node, displayList=False, processList=True)

# Remove output nodes from display and computing lists
outputNodes = node.getOutputNodes(recursive=True)
outputNodes = node.getOutputNodes(recursive=True, dependenciesOnly=True)
for n in outputNodes:
if n.getGlobalStatus() in (Status.ERROR, Status.SUBMITTED):
n.upgradeStatusTo(Status.NONE)
Expand Down Expand Up @@ -184,7 +184,7 @@ def compute(self, graph=None, toNodes=None, forceCompute=False, forceStatus=Fals
else:
# Check dependencies of toNodes
if not toNodes:
toNodes = graph.getLeafNodes()
toNodes = graph.getLeafNodes(dependenciesOnly=True)
toNodes = list(toNodes)
allReady = self.checkNodesDependencies(graph, toNodes, "COMPUTATION")

Expand Down Expand Up @@ -402,7 +402,7 @@ def submit(self, graph=None, submitter=None, toNodes=None):

# Check dependencies of toNodes
if not toNodes:
toNodes = graph.getLeafNodes()
toNodes = graph.getLeafNodes(dependenciesOnly=True)
toNodes = list(toNodes)
allReady = self.checkNodesDependencies(graph, toNodes, "SUBMITTING")

Expand Down
Loading