Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the SID Viz workflow shutdown process with the new pipeline shutdown process #1392

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 48 additions & 35 deletions morpheus/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ def __init__(self, config: Config):

self._mrc_executor: mrc.Executor = None

self._loop: asyncio.AbstractEventLoop = None

@property
def is_built(self) -> bool:
return self._is_built
Expand Down Expand Up @@ -317,7 +319,8 @@ def inner_build(builder: mrc.Builder, segment_id: str):
for port in typing.cast(StageBase, stage).input_ports:
port.link_node(builder=builder)

asyncio.run(self._async_start(segment_graph.nodes()))
# Call the start method for the stages in this segment. Must run on the loop and wait for the result
asyncio.run_coroutine_threadsafe(self._async_start(segment_graph.nodes()), self._loop).result()

logger.info("====Building Segment Complete!====")

Expand All @@ -339,9 +342,47 @@ def inner_build(builder: mrc.Builder, segment_id: str):

logger.info("====Registering Pipeline Complete!====")

def _start(self):
async def _start(self):
assert self._is_built, "Pipeline must be built before starting"

# Only execute this once
if (self._is_started):
return

# Stop from running this twice
self._is_started = True

# Save off the current loop so we can use it in async_start
self._loop = asyncio.get_running_loop()

# Setup error handling and cancellation of the pipeline
def error_handler(_, context: dict):

msg = f"Unhandled exception in async loop! Exception: \n{context['message']}"
exception = context.get("exception", Exception())

logger.critical(msg, exc_info=exception)

self._loop.set_exception_handler(error_handler)

exit_count = 0

# Handles Ctrl+C for graceful shutdown
def term_signal():

nonlocal exit_count
exit_count = exit_count + 1

if (exit_count == 1):
tqdm.write("Stopping pipeline. Please wait... Press Ctrl+C again to kill.")
self.stop()
else:
tqdm.write("Killing")
sys.exit(1)

for sig in [signal.SIGINT, signal.SIGTERM]:
self._loop.add_signal_handler(sig, term_signal)

logger.info("====Starting Pipeline====")

self._mrc_executor.start()
Expand Down Expand Up @@ -372,7 +413,10 @@ async def join(self):
if self._mrc_executor is None:
raise RuntimeError("Pipeline failed pre-flight checks.")

await self._mrc_executor.join_async()
# Make a local reference so the object doesnt go out of scope from a call to stop()
executor = self._mrc_executor

await executor.join_async()
except Exception:
logger.exception("Exception occurred in pipeline. Rethrowing")
raise
Expand Down Expand Up @@ -408,16 +452,14 @@ async def _build_and_start(self):
logger.exception("Error occurred during Pipeline.build(). Exiting.", exc_info=True)
return

self._start()
await self._start()

async def _async_start(self, stages: networkx.classes.reportviews.NodeView):
# This method is called once for each segment in the pipeline executed on this host
for stage in stages:
if (isinstance(stage, Stage)):
await stage.start_async()

self._is_started = True

def visualize(self, filename: str = None, **graph_kwargs):
"""
Output a pipeline diagram to `filename`. The file format of the diagrame is inferred by the extension of
Expand Down Expand Up @@ -578,35 +620,6 @@ async def run_async(self):
"""
This function sets up the current asyncio loop, builds the pipeline, and awaits on it to complete.
"""
loop = asyncio.get_running_loop()

def error_handler(_, context: dict):

msg = f"Unhandled exception in async loop! Exception: \n{context['message']}"
exception = context.get("exception", Exception())

logger.critical(msg, exc_info=exception)

loop.set_exception_handler(error_handler)

exit_count = 0

# Handles Ctrl+C for graceful shutdown
def term_signal():

nonlocal exit_count
exit_count = exit_count + 1

if (exit_count == 1):
tqdm.write("Stopping pipeline. Please wait... Press Ctrl+C again to kill.")
self.stop()
else:
tqdm.write("Killing")
sys.exit(1)

for sig in [signal.SIGINT, signal.SIGTERM]:
loop.add_signal_handler(sig, term_signal)

try:
await self._build_and_start()

Expand Down
23 changes: 20 additions & 3 deletions morpheus/stages/postprocess/generate_viz_frames_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,15 +235,28 @@ async def run_server():

return await super().start_async()

def stop(self):
"""
Stages can implement this to perform cleanup steps when pipeline is stopped.
"""

if (self._loop is not None):
asyncio.run_coroutine_threadsafe(self._stop_server(), loop=self._loop)
pass

async def _stop_server(self):

# Only run this once
if (self._buffer_queue.is_closed()):
return

logger.info("Shutting down queue")

await self._buffer_queue.close()

self._server_close_event.set()

# Wait for it to
# Wait for it to fully shut down
await self._server_task

def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:
Expand Down Expand Up @@ -274,8 +287,12 @@ def write_batch(x: MultiResponseMessage):

out_buf = sink.getvalue()

# Enqueue the buffer and block until that completes
asyncio.run_coroutine_threadsafe(self._buffer_queue.put(out_buf), loop=self._loop).result()
try:
# Enqueue the buffer and block until that completes
asyncio.run_coroutine_threadsafe(self._buffer_queue.put(out_buf), loop=self._loop).result()
except Closed:
# Ignore closed errors. Likely the pipeline is shutting down
pass

input_obs.pipe(ops.map(write_batch)).subscribe(output_obs)

Expand Down