Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing multi-sender stage configurations #951

Merged
merged 2 commits into from
May 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions morpheus/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ def inner_build(builder: mrc.Builder, segment_id: str):
# Finally, execute the link phase (only necessary for circular pipelines)
# for s in source_and_stages:
for stage in segment_graph.nodes():
for port in stage.input_ports:
port.link()
for port in typing.cast(StreamWrapper, stage).input_ports:
port.link(builder=builder)

logger.info("====Building Segment Complete!====")

Expand Down
21 changes: 10 additions & 11 deletions morpheus/pipeline/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging
import typing

import mrc
import typing_utils

import morpheus.pipeline as _pipeline
Expand Down Expand Up @@ -80,7 +81,7 @@ def in_stream(self):
def in_type(self):
return self._input_type

def get_input_pair(self) -> StreamPair:
def get_input_pair(self, builder: mrc.Builder) -> StreamPair:
"""
Returns the input `StreamPair` which is a tuple consisting of the parent node and the parent node's output type.
"""
Expand All @@ -99,17 +100,15 @@ def get_input_pair(self) -> StreamPair:
self._is_linked = True
else:
# We have multiple senders. Create a dummy stream to connect all senders
self._input_stream = builder.make_node_component(
f"{self.parent.unique_name}-reciever[{self.port_number}]", mrc.core.operators.map(lambda x: x))

if (self.is_complete):
# Connect all streams now
# self._input_stream = streamz.Stream(upstreams=[x.out_stream for x in self._input_senders],
# asynchronous=True,
# loop=IOLoop.current())
raise NotImplementedError("Still using streamz")
for input_sender in self._input_senders:
builder.make_edge(input_sender.out_stream, self._input_stream)

self._is_linked = True
else:
# Create a dummy stream that needs to be linked later
# self._input_stream = streamz.Stream(asynchronous=True, loop=IOLoop.current())
raise NotImplementedError("Still using streamz")

# Now determine the output type from what we have
great_ancestor = greatest_ancestor(*[x.out_type for x in self._input_senders if x.is_complete])
Expand All @@ -123,7 +122,7 @@ def get_input_pair(self) -> StreamPair:

return (self._input_stream, self._input_type)

def link(self):
def link(self, builder: mrc.Builder):
"""
The linking phase determines the final type of the `Receiver` and connects all underlying stages.

Expand All @@ -146,6 +145,6 @@ def link(self):
"Invalid linking phase. Input port type does not match predicted type determined during build phase")

for out_stream in [x.out_stream for x in self._input_senders]:
out_stream.connect(self._input_stream)
builder.make_edge(out_stream, self._input_stream)

self._is_linked = True
4 changes: 2 additions & 2 deletions morpheus/pipeline/single_port_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ def accepted_types(self) -> typing.Tuple:
"""
pass

def _pre_build(self) -> typing.List[StreamPair]:
in_ports_pairs = super()._pre_build()
def _pre_build(self, builder: mrc.Builder) -> typing.List[StreamPair]:
in_ports_pairs = super()._pre_build(builder=builder)

# Check the types of all inputs
for x in in_ports_pairs:
Expand Down
10 changes: 5 additions & 5 deletions morpheus/pipeline/stream_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,12 @@ def build(self, builder: mrc.Builder, do_propagate=True):
assert self._pipeline is not None, "Must be attached to a pipeline before building!"

# Pre-Build returns the input pairs for each port
in_ports_pairs = self._pre_build()
in_ports_pairs = self._pre_build(builder=builder)

out_ports_pair = self._build(builder, in_ports_pairs)
out_ports_pair = self._build(builder=builder, in_ports_streams=in_ports_pairs)

# Allow stages to do any post build steps (i.e., for sinks, or timing functions)
out_ports_pair = self._post_build(builder, out_ports_pair)
out_ports_pair = self._post_build(builder=builder, out_ports_pair=out_ports_pair)

assert len(out_ports_pair) == len(self.output_ports), \
"Build must return same number of output pairs as output ports"
Expand All @@ -348,8 +348,8 @@ def build(self, builder: mrc.Builder, do_propagate=True):

dep.build(builder, do_propagate=do_propagate)

def _pre_build(self) -> typing.List[StreamPair]:
in_pairs: typing.List[StreamPair] = [x.get_input_pair() for x in self.input_ports]
def _pre_build(self, builder: mrc.Builder) -> typing.List[StreamPair]:
in_pairs: typing.List[StreamPair] = [x.get_input_pair(builder=builder) for x in self.input_ports]

return in_pairs

Expand Down
25 changes: 25 additions & 0 deletions tests/test_nonlinear_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import mrc
import mrc.core.operators as ops
import pytest
from mrc.core.node import Broadcast

from morpheus.config import Config
Expand All @@ -27,6 +28,7 @@
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from utils import assert_results
from utils.dataset_manager import DatasetManager

Expand Down Expand Up @@ -95,3 +97,26 @@ def test_forking_pipeline(config, dataset_cudf: DatasetManager):
# Get the results
assert_results(comp_higher.get_results())
assert_results(comp_lower.get_results())


@pytest.mark.parametrize("source_count, expected_count", [(1, 1), (2, 2), (3, 3)])
def test_port_multi_sender(config, dataset_cudf: DatasetManager, source_count, expected_count):

filter_probs_df = dataset_cudf["filter_probs.csv"]

pipe = Pipeline(config)

input_ports = []
for x in range(source_count):
input_port = f"input_{x}"
input_ports.append(input_port)

sink_stage = pipe.add_stage(InMemorySinkStage(config))

for x in range(source_count):
source_stage = pipe.add_stage(InMemorySourceStage(config, [filter_probs_df]))
pipe.add_edge(source_stage, sink_stage)

pipe.run()

assert len(sink_stage.get_messages()) == expected_count