From 68cc88897a4e3f41b1e18131cd3fb949a79d295c Mon Sep 17 00:00:00 2001 From: tcezard Date: Thu, 22 Nov 2018 10:53:40 +0000 Subject: [PATCH 1/4] Create a session by running PID to ensure different processes have different session Ensure the lock is released just after the request has completed. Close all session upon deletion of the Communicator --- egcg_core/rest_communication.py | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/egcg_core/rest_communication.py b/egcg_core/rest_communication.py index 0a9e0cc..5656153 100644 --- a/egcg_core/rest_communication.py +++ b/egcg_core/rest_communication.py @@ -1,5 +1,6 @@ import json import mimetypes +import os from urllib.parse import urljoin import requests from multiprocessing import Lock @@ -18,7 +19,7 @@ def __init__(self, auth=None, baseurl=None, retries=5): self._baseurl = baseurl self._auth = auth self.retries = retries - self._session = None + self._sessions = {} self.lock = Lock() def begin_session(self): @@ -36,9 +37,10 @@ def begin_session(self): @property def session(self): - if self._session is None: - self._session = self.begin_session() - return self._session + pid = os.getpid() + if pid not in self._sessions: + self._sessions[pid] = self.begin_session() + return self._sessions[pid] @staticmethod def serialise(queries): @@ -137,23 +139,22 @@ def _req(self, method, url, quiet=False, **kwargs): kwargs['data'] = kwargs.pop('json') self.lock.acquire() - r = self.session.request(method, url, **kwargs) + try: + r = self.session.request(method, url, **kwargs) + finally: + self.lock.release() kwargs.pop('files', None) # e.g: 'POST ({"some": "args"}) -> {"some": "content"}. Status code 201. Reason: CREATED report = '%s %s (%s) -> %s. Status code %s. Reason: %s' % ( r.request.method, r.request.path_url, kwargs, r.content.decode('utf-8'), r.status_code, r.reason ) - if r.status_code in self.successful_statuses: if not quiet: self.debug(report) - - self.lock.release() return r else: self.error(report) - self.lock.release() raise RestCommunicationError('Encountered a %s status code: %s' % (r.status_code, r.reason)) def get_content(self, endpoint, paginate=True, quiet=False, **query_args): @@ -258,6 +259,13 @@ def post_or_patch(self, endpoint, input_json, id_field=None, update_lists=None): else: self.post_entry(endpoint, _payload) + def close(self): + for s in self._sessions: + s.close() + + def __del__(self): + self.close() + default = Communicator() api_url = default.api_url From 2f39d75b5b40beb8c6aad187d3a2a18c629fe3ff Mon Sep 17 00:00:00 2001 From: tcezard Date: Thu, 22 Nov 2018 11:08:38 +0000 Subject: [PATCH 2/4] Add test for new finally and multi session --- egcg_core/rest_communication.py | 2 +- tests/test_rest_communication.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/egcg_core/rest_communication.py b/egcg_core/rest_communication.py index 5656153..1b0ea15 100644 --- a/egcg_core/rest_communication.py +++ b/egcg_core/rest_communication.py @@ -260,7 +260,7 @@ def post_or_patch(self, endpoint, input_json, id_field=None, update_lists=None): self.post_entry(endpoint, _payload) def close(self): - for s in self._sessions: + for s in self._sessions.values(): s.close() def __del__(self): diff --git a/tests/test_rest_communication.py b/tests/test_rest_communication.py index ddb3fdb..bac2a80 100644 --- a/tests/test_rest_communication.py +++ b/tests/test_rest_communication.py @@ -1,7 +1,12 @@ import os import json + +import pytest from requests import Session from unittest.mock import Mock, patch, call + +from requests.exceptions import SSLError + from tests import FakeRestResponse, TestEGCG from egcg_core import rest_communication from egcg_core.util import check_if_nested @@ -30,6 +35,7 @@ def fake_request(method, url, **kwargs): patched_request = patch.object(Session, 'request', side_effect=fake_request) +patched_failed_request = patch.object(Session, 'request', side_effect=SSLError('SSL error')) auth = ('a_user', 'a_password') @@ -97,6 +103,28 @@ def test_req(self, mocked_request): assert json.loads(response.content.decode('utf-8')) == response.json() == test_nested_request_content mocked_request.assert_called_with('METHOD', rest_url(test_endpoint), json=json_content) + @patched_failed_request + def test_failed_req(self, mocked_request): + json_content = ['some', {'test': 'json'}] + self.comm.lock = Mock() + self.comm.lock.acquire.assert_not_called() + self.comm.lock.release.assert_not_called() + + with pytest.raises(SSLError): + _ = self.comm._req('METHOD', rest_url(test_endpoint), json=json_content) + + self.comm.lock.acquire.assert_called_once() + self.comm.lock.release.assert_called_once() # exception raised, but lock still released + + @patched_request + def test_multi_session(self, mocked_request): + json_content = ['some', {'test': 'json'}] + with patch('os.getpid', return_value=1): + _ = self.comm._req('METHOD', rest_url(test_endpoint), json=json_content) + with patch('os.getpid', return_value=2): + _ = self.comm._req('METHOD', rest_url(test_endpoint), json=json_content) + assert len(self.comm._sessions) == 2 + @patch.object(Session, '__exit__') @patch.object(Session, '__enter__') @patched_request From 997cb7034f30a06ac9843ace5d9716c143ebba32 Mon Sep 17 00:00:00 2001 From: tcezard Date: Thu, 22 Nov 2018 13:45:42 +0000 Subject: [PATCH 3/4] Make sure the del does not crash when some references are missing --- egcg_core/rest_communication.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/egcg_core/rest_communication.py b/egcg_core/rest_communication.py index 1b0ea15..86bfca6 100644 --- a/egcg_core/rest_communication.py +++ b/egcg_core/rest_communication.py @@ -264,8 +264,10 @@ def close(self): s.close() def __del__(self): - self.close() - + try: + self.close() + except ReferenceError: + pass default = Communicator() api_url = default.api_url From 7c52ac9bf40a1b5fbe1b32be1d493b93778121ba Mon Sep 17 00:00:00 2001 From: tcezard Date: Thu, 22 Nov 2018 13:58:27 +0000 Subject: [PATCH 4/4] Add simple test for multiprocessing --- egcg_core/rest_communication.py | 1 + tests/test_rest_communication.py | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/egcg_core/rest_communication.py b/egcg_core/rest_communication.py index 86bfca6..096648e 100644 --- a/egcg_core/rest_communication.py +++ b/egcg_core/rest_communication.py @@ -37,6 +37,7 @@ def begin_session(self): @property def session(self): + """Create and return a session per PID so each sub-processes will use their own session""" pid = os.getpid() if pid not in self._sessions: self._sessions[pid] = self.begin_session() diff --git a/tests/test_rest_communication.py b/tests/test_rest_communication.py index bac2a80..e3f372b 100644 --- a/tests/test_rest_communication.py +++ b/tests/test_rest_communication.py @@ -1,3 +1,4 @@ +import multiprocessing import os import json @@ -125,6 +126,25 @@ def test_multi_session(self, mocked_request): _ = self.comm._req('METHOD', rest_url(test_endpoint), json=json_content) assert len(self.comm._sessions) == 2 + @patched_request + def test_with_multiprocessing(self, mocked_request): + json_content = ['some', {'test': 'json'}] + + def assert_request(): + _ = self.comm._req('METHOD', rest_url(test_endpoint), json=json_content) + assert mocked_request.call_count == 2 + assert len(self.comm._sessions) == 2 + + # initiate in the Session in the main thread + self.comm._req('METHOD', rest_url(test_endpoint), json=json_content) + procs = [] + for i in range(10): + procs.append(multiprocessing.Process(target=assert_request)) + for p in procs: + p.start() + for p in procs: + p.join() + @patch.object(Session, '__exit__') @patch.object(Session, '__enter__') @patched_request