forked from frequenz-floss/frequenz-channels-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_bidirectional.py
83 lines (62 loc) · 2.56 KB
/
test_bidirectional.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
"""Tests for the RequestResponse implementation."""
import asyncio
import pytest
from frequenz.channels import (
Bidirectional,
ChannelClosedError,
ChannelError,
ReceiverError,
SenderError,
)
async def test_request_response() -> None:
"""Ensure bi-directional communication is possible."""
req_resp: Bidirectional[int, str] = Bidirectional("test_client", "test_service")
async def service(handle: Bidirectional.Handle[str, int]) -> None:
while True:
num = await handle.receive()
if num is None:
break
if num == 42:
break
if num >= 0:
await handle.send("positive")
else:
await handle.send("negative")
service_task = asyncio.create_task(
service(req_resp.service_handle),
)
client_handle: Bidirectional.Handle[int, str] = req_resp.client_handle
for ctr in range(-5, 5):
await client_handle.send(ctr)
ret = await client_handle.receive()
if ctr < 0:
assert ret == "negative"
else:
assert ret == "positive"
await client_handle.send(42) # Stop the service task
await service_task
async def test_sender_error_chaining() -> None:
"""Ensure bi-directional communication is possible."""
req_resp: Bidirectional[int, str] = Bidirectional("test_client", "test_service")
await req_resp._response_channel.close() # pylint: disable=protected-access
with pytest.raises(SenderError, match="The channel was closed") as exc_info:
await req_resp.service_handle.send("I'm closed!")
err = exc_info.value
cause = err.__cause__
assert isinstance(cause, ChannelError)
assert cause.args[0].startswith("Error in the underlying channel")
assert isinstance(cause.__cause__, ChannelClosedError)
async def test_consume_error_chaining() -> None:
"""Ensure bi-directional communication is possible."""
req_resp: Bidirectional[int, str] = Bidirectional("test_client", "test_service")
await req_resp._request_channel.close() # pylint: disable=protected-access
await req_resp.service_handle.ready()
with pytest.raises(ReceiverError, match="Receiver .* was stopped") as exc_info:
_ = req_resp.service_handle.consume()
err = exc_info.value
cause = err.__cause__
assert isinstance(cause, ChannelError)
assert cause.args[0].startswith("Error in the underlying channel")
assert isinstance(cause.__cause__, ChannelClosedError)