From ced17c6881883ed5f07c3dc43e5d79d702bd73c1 Mon Sep 17 00:00:00 2001 From: Ulrik Mikaelsson Date: Tue, 26 Nov 2019 23:55:59 +0100 Subject: [PATCH 1/2] stream/ws_client: Use StringIO for WSClient._all bytes() += bytes() copies both buffers into a new one, causing exponential cost and gradual slow-down. Replacing with StringIO improves that --- stream/ws_client.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/stream/ws_client.py b/stream/ws_client.py index 7f041206..775849d0 100644 --- a/stream/ws_client.py +++ b/stream/ws_client.py @@ -24,6 +24,7 @@ import yaml from six.moves.urllib.parse import urlencode, quote_plus, urlparse, urlunparse +from six import StringIO from websocket import WebSocket, ABNF, enableTrace @@ -47,7 +48,7 @@ def __init__(self, configuration, url, headers): header = [] self._connected = False self._channels = {} - self._all = "" + self._all = StringIO() # We just need to pass the Authorization, ignore all the other # http headers we get from the generated code @@ -157,8 +158,8 @@ def read_all(self): TODO: Maybe we can process this and return a more meaningful map with channels mapped for each input. """ - out = self._all - self._all = "" + out = self._all.getvalue() + self._all = self._all.__class__() self._channels = {} return out @@ -195,7 +196,7 @@ def update(self, timeout=0): if channel in [STDOUT_CHANNEL, STDERR_CHANNEL]: # keeping all messages in the order they received # for non-blocking call. - self._all += data + self._all.write(data) if channel not in self._channels: self._channels[channel] = data else: From 368d0d7b1e764adf5269b2f66ec0777331eccd82 Mon Sep 17 00:00:00 2001 From: Ulrik Mikaelsson Date: Wed, 27 Nov 2019 11:38:20 +0100 Subject: [PATCH 2/2] ws_client: Add option to disable capture-all --- stream/stream.py | 3 ++- stream/ws_client.py | 17 ++++++++++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/stream/stream.py b/stream/stream.py index a9d0b402..6d5f05f8 100644 --- a/stream/stream.py +++ b/stream/stream.py @@ -16,7 +16,8 @@ def stream(func, *args, **kwargs): - """Stream given API call using websocket""" + """Stream given API call using websocket. + Extra kwarg: capture-all=True - captures all stdout+stderr for use with WSClient.read_all()""" def _intercept_request_call(*args, **kwargs): # old generated code's api client has config. new ones has diff --git a/stream/ws_client.py b/stream/ws_client.py index 775849d0..0a8426d9 100644 --- a/stream/ws_client.py +++ b/stream/ws_client.py @@ -34,9 +34,16 @@ ERROR_CHANNEL = 3 RESIZE_CHANNEL = 4 +class _IgnoredIO: + def write(self, _x): + pass + + def getvalue(self): + raise TypeError("Tried to read_all() from a WSClient configured to not capture. Did you mean `capture_all=True`?") + class WSClient: - def __init__(self, configuration, url, headers): + def __init__(self, configuration, url, headers, capture_all): """A websocket client with support for channels. Exec command uses different channels for different streams. for @@ -48,7 +55,10 @@ def __init__(self, configuration, url, headers): header = [] self._connected = False self._channels = {} - self._all = StringIO() + if capture_all: + self._all = StringIO() + else: + self._all = _IgnoredIO() # We just need to pass the Authorization, ignore all the other # http headers we get from the generated code @@ -258,6 +268,7 @@ def websocket_call(configuration, *args, **kwargs): url = args[1] _request_timeout = kwargs.get("_request_timeout", 60) _preload_content = kwargs.get("_preload_content", True) + capture_all = kwargs.get("capture_all", True) headers = kwargs.get("headers") # Expand command parameter list to indivitual command params @@ -273,7 +284,7 @@ def websocket_call(configuration, *args, **kwargs): url += '?' + urlencode(query_params) try: - client = WSClient(configuration, get_websocket_url(url), headers) + client = WSClient(configuration, get_websocket_url(url), headers, capture_all) if not _preload_content: return client client.run_forever(timeout=_request_timeout)