Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor python code to reflect SDK changes #4

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ jobs:
- uses: actions/checkout@v4
with:
repository: p2p-industries/hyveos_ros_msgs
ref: ${{ github.head_ref || github.ref_name }}
path: src/hyveos_msgs
- name: Build and test workspace
uses: ichiro-its/ros2-ws-action@v1.0.1
Expand Down
9 changes: 5 additions & 4 deletions hyveos_bridge/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,11 @@ class Bridge(Node):
def __init__(self, connection: OpenedConnection):
super().__init__('hyveos_bridge')

from .dht import DHTClient # noqa: F401
from .discovery import DiscoveryClient # noqa: F401
from .pubsub import PubsubClient # noqa: F401
from .reqres import ReqResClient # noqa: F401
from .control import ControlClient # noqa: F401
from .kv import KVClient # noqa: F401
from .neighbours import NeighboursClient # noqa: F401
from .pub_sub import PubsubClient # noqa: F401
from .req_res import ReqResClient # noqa: F401

self.connection = connection
self.bridge_clients = [client(self) for client in BridgeClient.__subclasses__()]
Expand Down
38 changes: 38 additions & 0 deletions hyveos_bridge/control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from hyveos_msgs.srv import GetId
from hyveos_sdk import OpenedConnection
from rclpy.impl.rcutils_logger import RcutilsLogger

from .bridge import Bridge, BridgeClient, service_callback


class ControlClient(BridgeClient):
logger: RcutilsLogger
connection: OpenedConnection

def __init__(self, node: Bridge):
def namespaced(name: str) -> str:
return f'{node.get_name()}/{name}'

self.get_id_service = node.create_service(
GetId,
namespaced('get_id'),
self._get_id_callback
)

self.logger = node.get_logger()
self.connection = node.connection

@service_callback
async def _get_id_callback(
self,
_: GetId.Request,
response: GetId.Response
):
self.logger.info('Getting own ID')

response.id = await self.connection.get_id()
response.success = True
return response

async def run(self):
pass
28 changes: 14 additions & 14 deletions hyveos_bridge/dht.py → hyveos_bridge/kv.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,41 @@
from hyveos_msgs.srv import DHTGetRecord, DHTPutRecord
from hyveos_sdk import DHTService
from hyveos_msgs.srv import GetKVRecord, PutKVRecord
from hyveos_sdk import KVService
from rclpy.impl.rcutils_logger import RcutilsLogger

from .bridge import Bridge, BridgeClient, service_callback


class DHTClient(BridgeClient):
class KVClient(BridgeClient):
logger: RcutilsLogger
dht: DHTService
kv: KVService

def __init__(self, node: Bridge):
def namespaced(name: str) -> str:
return f'{node.get_name()}/dht/{name}'
return f'{node.get_name()}/kv/{name}'

self.get_record_service = node.create_service(
DHTGetRecord,
GetKVRecord,
namespaced('get_record'),
self._get_record_callback
)
self.put_record_service = node.create_service(
DHTPutRecord,
PutKVRecord,
namespaced('put_record'),
self._put_record_callback
)

self.logger = node.get_logger()
self.dht = node.connection.get_dht_service()
self.kv = node.connection.get_kv_service()

@service_callback
async def _get_record_callback(
self,
request: DHTGetRecord.Request,
response: DHTGetRecord.Response
request: GetKVRecord.Request,
response: GetKVRecord.Response
):
self.logger.info(f'Getting record in topic {request.topic} with key {request.key}')

record = await self.dht.get_record(request.topic, request.key)
record = await self.kv.get_record(request.topic, request.key)

if record.data is None:
response.success = False
Expand All @@ -49,12 +49,12 @@ async def _get_record_callback(
@service_callback
async def _put_record_callback(
self,
request: DHTPutRecord.Request,
response: DHTPutRecord.Response
request: PutKVRecord.Request,
response: PutKVRecord.Response
):
self.logger.info(f'Putting record in topic {request.topic} with key {request.key}')

await self.dht.put_record(request.topic, request.key, request.value)
await self.kv.put_record(request.topic, request.key, request.value)

response.success = True
return response
Expand Down
59 changes: 12 additions & 47 deletions hyveos_bridge/discovery.py → hyveos_bridge/neighbours.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import asyncio

from hyveos_msgs.msg import NeighbourEvent
from hyveos_msgs.srv import GetId, GetNeighbours
from hyveos_sdk import DiscoveryService
from hyveos_msgs.srv import GetNeighbours
from hyveos_sdk import NeighboursService
from rclpy.impl.rcutils_logger import RcutilsLogger

from .bridge import Bridge, BridgeClient, service_callback


class DiscoveryClient(BridgeClient):
class NeighboursClient(BridgeClient):
logger: RcutilsLogger
discovery: DiscoveryService
neighbours: set[str]
neighbours_lock: asyncio.Lock
neighbours: NeighboursService

def __init__(self, node: Bridge):
def namespaced(name: str) -> str:
Expand All @@ -23,33 +19,14 @@ def namespaced(name: str) -> str:
namespaced('neighbour_events'),
10
)
self.get_id_service = node.create_service(
GetId,
namespaced('get_id'),
self._get_id_callback
)
self.get_neighbours_service = node.create_service(
GetNeighbours,
namespaced('get_neighbours'),
self._get_neighbours_callback
)

self.logger = node.get_logger()
self.discovery = node.connection.get_discovery_service()
self.neighbours = set()
self.neighbours_lock = asyncio.Lock()

@service_callback
async def _get_id_callback(
self,
_: GetId.Request,
response: GetId.Response
):
self.logger.info('Getting own ID')

response.id = await self.discovery.get_own_id()
response.success = True
return response
self.neighbours = node.connection.get_neighbours_service()

@service_callback
async def _get_neighbours_callback(
Expand All @@ -59,37 +36,25 @@ async def _get_neighbours_callback(
):
self.logger.info('Getting neighbours')

async with self.neighbours_lock:
response.success = True
response.neighbour_ids = list(self.neighbours)
return response
response.success = True
response.neighbour_ids = await self.neighbours.get()
return response

async def run(self):
async with self.discovery.discovery_events() as events:
async with self.neighbours.subscribe() as events:
async for event in events:
event_type = event.WhichOneof('event')
if event_type == 'init':
async with self.neighbours_lock:
self.neighbours = {peer.peer_id for peer in event.init.peers}
self.logger.info('Neighbours initialized')
elif event_type == 'discovered':
peer_id = event.discovered.peer_id

async with self.neighbours_lock:
self.neighbours.add(peer_id)

msg = NeighbourEvent()
msg.event = NeighbourEvent.DISCOVERED
msg.neighbour_id = peer_id
msg.neighbour_id = event.discovered.peer_id
self.neighbour_events_publisher.publish(msg)
elif event_type == 'lost':
peer_id = event.lost.peer_id

async with self.neighbours_lock:
self.neighbours.discard(peer_id)

msg = NeighbourEvent()
msg.event = NeighbourEvent.LOST
msg.neighbour_id = peer_id
msg.neighbour_id = event.lost.peer_id
self.neighbour_events_publisher.publish(msg)
else:
self.logger.warn(f'Unknown event type: {event_type}')
16 changes: 8 additions & 8 deletions hyveos_bridge/pubsub.py → hyveos_bridge/pub_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from hyveos_msgs.msg import ReceivedPubsubMessage
from hyveos_msgs.srv import PubsubPublish, PubsubSubscription
from hyveos_sdk import GossipSubService, ManagedStream
from hyveos_sdk.protocol.script_pb2 import GossipSubRecvMessage
from hyveos_sdk import ManagedStream, PubSubService
from hyveos_sdk.protocol.bridge_pb2 import PubSubRecvMessage
from rclpy.impl.rcutils_logger import RcutilsLogger
from rclpy.publisher import Publisher

Expand All @@ -17,15 +17,15 @@ class Subscription:

def __init__(
self,
stream: ManagedStream[GossipSubRecvMessage],
stream: ManagedStream[PubSubRecvMessage],
publisher: Publisher,
logger: RcutilsLogger
):
self.event = asyncio.Event()
self.task = asyncio.create_task(self.run(stream, publisher))
self.logger = logger

async def run(self, stream: ManagedStream[GossipSubRecvMessage], publisher: Publisher):
async def run(self, stream: ManagedStream[PubSubRecvMessage], publisher: Publisher):
async with stream:
iterator = stream.__aiter__()

Expand Down Expand Up @@ -65,7 +65,7 @@ async def cancel(self):

class PubsubClient(BridgeClient):
logger: RcutilsLogger
gos: GossipSubService
pub_sub: PubSubService
subscriptions: dict[str, Subscription]
subscriptions_lock: asyncio.Lock

Expand Down Expand Up @@ -95,7 +95,7 @@ def namespaced(name: str) -> str:
)

self.logger = node.get_logger()
self.gos = node.connection.get_gossip_sub_service()
self.pub_sub = node.connection.get_pub_sub_service()
self.subscriptions = {}
self.subscriptions_lock = asyncio.Lock()

Expand All @@ -109,7 +109,7 @@ async def _publish_callback(

data = prepare_data(request.data)

msg_id = await self.gos.publish(data, request.topic)
msg_id = await self.pub_sub.publish(data, request.topic)

response.success = True
response.msg_id = msg_id
Expand All @@ -126,7 +126,7 @@ async def _subscribe_callback(

async with self.subscriptions_lock:
if topic not in self.subscriptions:
stream = await self.gos.subscribe(topic)
stream = await self.pub_sub.subscribe(topic)
self.subscriptions[topic] = Subscription(
stream,
self.received_messages_publisher,
Expand Down
2 changes: 1 addition & 1 deletion hyveos_bridge/reqres.py → hyveos_bridge/req_res.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from hyveos_msgs.msg import ReceivedRequest
from hyveos_msgs.srv import RequestSubscription, Respond, SendRequest
from hyveos_sdk import ManagedStream, RequestResponseService
from hyveos_sdk.protocol.script_pb2 import RecvRequest
from hyveos_sdk.protocol.bridge_pb2 import RecvRequest
from rclpy.impl.rcutils_logger import RcutilsLogger
from rclpy.publisher import Publisher

Expand Down