diff --git a/posttroll/address_receiver.py b/posttroll/address_receiver.py index e9c2c4b..9a26be5 100644 --- a/posttroll/address_receiver.py +++ b/posttroll/address_receiver.py @@ -167,13 +167,14 @@ def _run(self): logger.debug("Multicast socket timed out on recv!") continue else: - raise + return except ZMQError: return finally: - self._check_age(pub, min_interval=self._max_age / 20) - if self._do_heartbeat: - pub.heartbeat(min_interval=29) + if self._do_run: + self._check_age(pub, min_interval=self._max_age / 20) + if self._do_heartbeat: + pub.heartbeat(min_interval=29) if self._multicast_enabled: ip_, port = fromaddr if self._restrict_to_localhost and ip_ not in self._local_ips: @@ -204,7 +205,7 @@ def set_up_address_receiver(self, port): """Set up the address receiver depending on if it is multicast or not.""" nameservers = False if self._multicast_enabled: - while True: + while True: # should this really be tried forever? wouldn't it be enough with 3? try: recv = MulticastReceiver(port) except IOError as err: diff --git a/posttroll/backends/zmq/subscriber.py b/posttroll/backends/zmq/subscriber.py index 836f590..4e7c8cf 100644 --- a/posttroll/backends/zmq/subscriber.py +++ b/posttroll/backends/zmq/subscriber.py @@ -108,7 +108,8 @@ def add_hook_pull(self, address, callback): specified subscription. Good for pushed 'inproc' messages from another thread. """ LOGGER.info("Subscriber adding PULL hook %s", str(address)) - socket = self._create_socket(PULL, address) + options = get_tcp_keepalive_options() + socket = self._create_socket(PULL, address, options) if self._sock_receiver: self._sock_receiver.register(socket) self._add_hook(socket, callback) @@ -167,13 +168,13 @@ def __call__(self, **kwargs): """Handle calls with class instance.""" return self.recv(**kwargs) - def stop(self): + def _stop(self): """Stop the subscriber.""" self._loop = False def close(self): """Close the subscriber: stop it and close the local subscribers.""" - self.stop() + self._stop() for sub in list(self.subscribers) + self._hooks: try: close_socket(sub) diff --git a/posttroll/subscriber.py b/posttroll/subscriber.py index b6a3e52..52852b7 100644 --- a/posttroll/subscriber.py +++ b/posttroll/subscriber.py @@ -125,7 +125,7 @@ def __call__(self, **kwargs): def stop(self): """Stop the subscriber.""" - return self._subscriber.stop() + return self._subscriber.close() def close(self): """Close the subscriber: stop it and close the local subscribers.""" diff --git a/posttroll/tests/test_nameserver.py b/posttroll/tests/test_nameserver.py index af49e3c..90e370e 100644 --- a/posttroll/tests/test_nameserver.py +++ b/posttroll/tests/test_nameserver.py @@ -28,17 +28,17 @@ def new_context(monkeypatch): def get_context(): return context monkeypatch.setattr(posttroll.backends.zmq, "get_context", get_context) + yield + context.term() @pytest.fixture(autouse=True) def new_mc_group(): """Create a unique mc group for each test.""" mc_group = random_valid_mc_address() - print(mc_group) config.set(mc_group=mc_group) - def free_port() -> int: """Get a free port. @@ -109,7 +109,8 @@ def test_localhost_restriction(self, mcrec, pub, msg): @pytest.mark.parametrize( "multicast_enabled", - [True, False] + [True, False], + ids=["mc on", "mc off"] ) def test_pub_addresses(multicast_enabled): """Test retrieving addresses.""" @@ -148,7 +149,8 @@ def test_pub_addresses(multicast_enabled): @pytest.mark.parametrize( "multicast_enabled", - [True, False] + [True, False], + ids=["mc on", "mc off"] ) def test_pub_sub_ctx(multicast_enabled): """Test publish and subscribe.""" @@ -176,7 +178,8 @@ def test_pub_sub_ctx(multicast_enabled): @pytest.mark.parametrize( "multicast_enabled", - [True, False] + [True, False], + ids=["mc on", "mc off"] ) def test_pub_sub_add_rm(multicast_enabled): """Test adding and removing publishers.""" @@ -359,5 +362,4 @@ def test_message_version_compatibility(tmp_path, version): finally: nserver.stop() thr.join() - print(response) diff --git a/posttroll/tests/test_pubsub.py b/posttroll/tests/test_pubsub.py index 38f595f..9919c83 100644 --- a/posttroll/tests/test_pubsub.py +++ b/posttroll/tests/test_pubsub.py @@ -31,9 +31,11 @@ from unittest import mock import pytest +import zmq from donfig import Config import posttroll +import posttroll.backends.zmq from posttroll import config from posttroll.message import Message from posttroll.publisher import Publish, Publisher, create_publisher_from_dict_config @@ -42,6 +44,17 @@ test_lock = Lock() +@pytest.fixture(autouse=True) +def new_context(monkeypatch): + """Create a new context for each test.""" + context = zmq.Context() + def get_context(): + return context + monkeypatch.setattr(posttroll.backends.zmq, "get_context", get_context) + yield + context.term() + + def free_port(): """Get a free port. @@ -426,7 +439,7 @@ def test_dict_config_full_subscriber(): _ = create_subscriber_from_dict_config(settings) -@pytest.fixture() +@pytest.fixture def _tcp_keepalive_settings(monkeypatch): """Set TCP Keepalive settings.""" with config.set(tcp_keepalive=1, tcp_keepalive_cnt=10, tcp_keepalive_idle=1, tcp_keepalive_intvl=1): @@ -442,7 +455,7 @@ def reset_config_for_tests(): posttroll.config = old_config -@pytest.fixture() +@pytest.fixture def _tcp_keepalive_no_settings(): """Set TCP Keepalive settings.""" with config.set(tcp_keepalive=None, tcp_keepalive_cnt=None, tcp_keepalive_idle=None, tcp_keepalive_intvl=None): @@ -474,7 +487,7 @@ def test_subscriber_tcp_keepalive(): sub = ZMQSubscriber(f"tcp://127.0.0.1:{str(free_port())}") assert len(sub.addr_sub.values()) == 1 _assert_tcp_keepalive(list(sub.addr_sub.values())[0]) - sub.stop() + sub._stop() @pytest.mark.usefixtures("_tcp_keepalive_no_settings")