Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove loop from several asyncio API calls #1033

Merged
merged 3 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 27 additions & 23 deletions morpheus/stages/postprocess/generate_viz_frames_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ def __init__(self, c: Config, server_url: str = "0.0.0.0", server_port: int = 87

self._replay_buffer = []

# Properties set on start
self._loop: asyncio.AbstractEventLoop = None
self._server_task: asyncio.Task = None
self._server_close_event: asyncio.Event = None

@property
def name(self) -> str:
return "gen_viz"
Expand All @@ -89,13 +94,13 @@ def supports_cpp_node(self):
return False

@staticmethod
def round_to_sec(x):
def round_to_sec(x: int | float):
"""
Round to even seconds second

Parameters
----------
x : int/float
x : int | float
Rounding up the value

Returns
Expand Down Expand Up @@ -126,7 +131,7 @@ def _to_vis_df(self, x: MultiResponseMessage):
def indent_data(y: str):
try:
return json.dumps(json.loads(y), indent=3)
except: # noqa: E722
except Exception:
return y

df["data"] = df["data"].apply(indent_data)
Expand All @@ -146,7 +151,7 @@ def indent_data(y: str):
df["ts_round_sec"] = (df["timestamp"] / 1000.0).astype(int) * 1000

# Return a list of tuples of (ts_round_sec, dataframe)
return [(key, group) for key, group in df.groupby(df.ts_round_sec)]
return list(df.groupby(df.ts_round_sec))

def _write_viz_file(self, x: typing.List[typing.Tuple[int, pd.DataFrame]]):

Expand All @@ -159,28 +164,27 @@ def _write_viz_file(self, x: typing.List[typing.Tuple[int, pd.DataFrame]]):

offset = (curr_timestamp - self._first_timestamp) / 1000

fn = os.path.join(self._out_dir, "{}.csv".format(offset))
out_file = os.path.join(self._out_dir, f"{offset}.csv")

assert not os.path.exists(fn)
assert not os.path.exists(out_file)

in_df.to_csv(fn, columns=["timestamp", "src_ip", "dest_ip", "src_port", "dest_port", "si", "data"])
in_df.to_csv(out_file, columns=["timestamp", "src_ip", "dest_ip", "src_port", "dest_port", "si", "data"])

async def start_async(self):
"""
Launch the Websocket server and asynchronously send messages via Websocket.
"""

loop = asyncio.get_event_loop()
self._loop = loop
self._loop = asyncio.get_event_loop()

self._buffer_queue = AsyncIOProducerConsumerQueue(maxsize=2, loop=loop)
self._buffer_queue = AsyncIOProducerConsumerQueue(maxsize=2)

async def client_connected(websocket: websockets.legacy.server.WebSocketServerProtocol):
"""
Establishes a connection with the WebSocket server.
"""

logger.info("Got connection from: {}:{}".format(*websocket.remote_address))
logger.info("Got connection from: %s:%s", *websocket.remote_address)

while True:
try:
Expand All @@ -191,7 +195,7 @@ async def client_connected(websocket: websockets.legacy.server.WebSocketServerPr
except Exception as ex:
logger.exception("Error occurred trying to send message over socket", exc_info=ex)

logger.info("Disconnected from: {}:{}".format(*websocket.remote_address))
logger.info("Disconnected from: %s:%s", *websocket.remote_address)

async def run_server():
"""
Expand All @@ -205,20 +209,20 @@ async def run_server():
listening_on = [":".join([str(y) for y in x.getsockname()]) for x in server.sockets]
listening_on_str = [f"'{x}'" for x in listening_on]

logger.info("Websocket server listening at: {}".format(", ".join(listening_on_str)))
logger.info("Websocket server listening at: %s", ", ".join(listening_on_str))

await self._server_close_event.wait()

logger.info("Server shut down")

logger.info("Server shut down. Is queue empty: {}".format(self._buffer_queue.empty()))
logger.info("Server shut down. Is queue empty: %s", self._buffer_queue.empty())
except Exception as e:
logger.error("Error during serve", exc_info=e)
raise

self._server_task = loop.create_task(run_server())
self._server_task = self._loop.create_task(run_server())

self._server_close_event = asyncio.Event(loop=loop)
self._server_close_event = asyncio.Event()

await asyncio.sleep(1.0)

Expand All @@ -235,24 +239,24 @@ async def _stop_server(self):
# Wait for it to
await self._server_task

def _build_single(self, seg: mrc.Builder, input_stream: StreamPair) -> StreamPair:
def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:

stream = input_stream[0]

def node_fn(input, output):
def node_fn(input_obs, output_obs):

def write_batch(x: MultiResponseMessage):

sink = pa.BufferOutputStream()

# This is the timestamp of the earliest message
t0 = x.get_meta("timestamp").min()
time0 = x.get_meta("timestamp").min()

df = x.get_meta(["timestamp", "src_ip", "dest_ip", "secret_keys", "data"])

out_df = cudf.DataFrame()

out_df["dt"] = (df["timestamp"] - t0).astype(np.int32)
out_df["dt"] = (df["timestamp"] - time0).astype(np.int32)
out_df["src"] = df["src_ip"].str.ip_to_int().astype(np.int32)
out_df["dst"] = df["dest_ip"].str.ip_to_int().astype(np.int32)
out_df["lvl"] = df["secret_keys"].astype(np.int32)
Expand All @@ -268,7 +272,7 @@ def write_batch(x: MultiResponseMessage):
# Enqueue the buffer and block until that completes
asyncio.run_coroutine_threadsafe(self._buffer_queue.put(out_buf), loop=self._loop).result()

input.pipe(ops.map(write_batch)).subscribe(output)
input_obs.pipe(ops.map(write_batch)).subscribe(output_obs)

logger.info("Gen-viz stage completed. Waiting for shutdown")

Expand All @@ -280,8 +284,8 @@ def write_batch(x: MultiResponseMessage):
logger.info("Gen-viz shutdown complete")

# Sink to file
to_file = seg.make_node(self.unique_name, ops.build(node_fn))
seg.make_edge(stream, to_file)
to_file = builder.make_node(self.unique_name, ops.build(node_fn))
builder.make_edge(stream, to_file)
stream = to_file

# Return input unchanged to allow passthrough
Expand Down
10 changes: 5 additions & 5 deletions morpheus/utils/producer_consumer_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,10 @@ class AsyncIOProducerConsumerQueue(asyncio.Queue, typing.Generic[_T]):
Custom queue.Queue implementation which supports closing and uses recursive locks
"""

def __init__(self, maxsize=0, *, loop=None) -> None:
super().__init__(maxsize=maxsize, loop=loop)
def __init__(self, maxsize=0) -> None:
super().__init__(maxsize=maxsize)

self._closed = asyncio.Event(loop=loop)
self._closed = asyncio.Event()
self._is_closed = False

async def join(self):
Expand All @@ -166,7 +166,7 @@ async def put(self, item):
slot is available before adding item.
"""
while self.full() and not self._is_closed:
putter = self._loop.create_future()
putter = self._get_loop().create_future()
self._putters.append(putter)
try:
await putter
Expand Down Expand Up @@ -196,7 +196,7 @@ async def get(self) -> _T:
If queue is empty, wait until an item is available.
"""
while self.empty() and not self._is_closed:
getter = self._loop.create_future()
getter = self._get_loop().create_future()
self._getters.append(getter)
try:
await getter
Expand Down