Skip to content

Commit

Permalink
Merge pull request #4 from PyMoDAQ/feature/serializer
Browse files Browse the repository at this point in the history
moved the base serializer, factory and built in serializables in pymo…
  • Loading branch information
seb5g authored Dec 3, 2024
2 parents e8d0580 + 2807d2e commit ba83755
Show file tree
Hide file tree
Showing 9 changed files with 1,069 additions and 14 deletions.
23 changes: 9 additions & 14 deletions src/pymodaq_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,30 @@
import importlib.util
import os
import sys
from pathlib import Path

import warnings

try:
# with open(str(Path(__file__).parent.joinpath('resources/VERSION')), 'r') as fvers:
# __version__ = fvers.read().strip()

from pymodaq_utils.logger import set_logger
from pymodaq_utils.utils import get_version, PackageNotFoundError
try:
__version__ = get_version('pymodaq_utils')
except PackageNotFoundError:
__version__ = '0.0.0dev'
try:
LOGGER = set_logger('pymodaq', add_handler=True, base_logger=True)
logger_var = set_logger('pymodaq', add_handler=True, base_logger=True)
logger_var.info('')
logger_var.info('')
logger_var.info('************************')
logger_var.info(f"Registering Serializables...")
from pymodaq_utils.serialize.serializer import SerializableFactory, SERIALIZABLE
logger_var.info(f"Done")
logger_var.info('************************')
except Exception:
print("Couldn't create the local folder to store logs , presets...")

LOGGER.info('')
LOGGER.info('')

from pymodaq_utils.config import Config

CONFIG = Config() # to ckeck for config file existence, otherwise create one


except Exception as e:
try:
LOGGER.exception(str(e))
logger_var.exception(str(e))
except Exception as e:
print(str(e))
Empty file.
231 changes: 231 additions & 0 deletions src/pymodaq_utils/serialize/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
from abc import ABCMeta, abstractmethod
from typing import Callable, List, Any, Optional, Tuple, TypeVar, Union

from numpy.typing import NDArray

from . import utils


class SerializableBase(metaclass=ABCMeta):
"""Base class for a Serializer. """

@classmethod
def name(cls):
"""str: the object class name"""
return cls.__class__.__name__

@classmethod
def type(cls):
"""object: the type of the object"""
raise cls.__class__

@staticmethod
@abstractmethod
def serialize(obj: "SerializableBase") -> bytes:
""" Implements self serialization into bytes
Parameters
----------
obj: SerializableBase
Returns
-------
bytes
Notes
-----
The actual serialization should be done using the SerializableFactory and its method
:meth:SerializableFactory.get_apply_serializer
"""
...

@staticmethod
@abstractmethod
def deserialize(bytes_str: bytes) -> Tuple["SerializableBase", bytes]:
""" Implements deserialization into self type from bytes
Parameters
----------
bytes_str: bytes
Returns
-------
SerializableBase: object to reconstruct
bytes: leftover bytes to deserialize
Notes
-----
The actual deserialization should be done using the SerializableFactory and its method
:meth:SerializableFactory.get_apply_deserializer
"""
...


# List of all objects serializable via the serializer
SERIALIZABLE = Union[bytes, str, int, float, complex, list, NDArray, SerializableBase]

Serializable = TypeVar("Serializable", bound=SERIALIZABLE)
_SerializableClass = TypeVar("_SerializableClass", bound=SerializableBase)

Serializer = Callable[[Serializable], bytes]
Deserializer = Callable[[bytes], Tuple[Serializable, bytes]]


class SerializableFactory:
"""The factory class for creating executors"""

serializable_registry: dict[type[SERIALIZABLE], dict[str, Union[Serializer, Deserializer]]] = {}

@classmethod
def add_type_to_serialize(
cls, serialize_method: Callable[[Serializable], bytes]
) -> Callable[[Serializable], bytes]:
def wrap(obj: Serializable) -> bytes:
bytes_str = b''
type_as_bytes, len_as_bytes = utils.str_len_to_bytes(obj.__class__.__name__)
bytes_str += len_as_bytes
bytes_str += type_as_bytes
bytes_str += serialize_method(obj)
return bytes_str
return wrap

@classmethod
def register_from_obj(cls, obj: Serializable, serialize_method: Serializer[Serializable],
deserialize_method: Optional[Deserializer[Serializable]] = None):
"""Method to register a serializable object class to the internal registry.
"""
obj_type = obj.__class__

cls.register_from_type(
obj_type=obj_type,
serialize_method=serialize_method,
deserialize_method=deserialize_method,
)

@classmethod
def register_decorator(cls) -> Callable[[type[_SerializableClass]], type[_SerializableClass]]:
"""Class decorator method to register exporter class to the internal registry. Must be used as
decorator above the definition of an H5Exporter class. H5Exporter must implement specific class
attributes and methods, see definition: h5node_exporter.H5Exporter
See h5node_exporter.H5txtExporter and h5node_exporter.H5txtExporter for usage examples.
returns:
the exporter class
"""

def inner_wrapper(
wrapped_class: type[_SerializableClass],
) -> type[_SerializableClass]:
cls.register_from_type(wrapped_class,
wrapped_class.serialize,
wrapped_class.deserialize)

# Return wrapped_class
return wrapped_class
return inner_wrapper

@classmethod
def register_from_type(cls, obj_type: type[Serializable], serialize_method: Serializer[Serializable],
deserialize_method: Deserializer[Serializable]):
"""Method to register a serializable object class to the internal registry.
"""
if obj_type not in cls.serializable_registry:
cls.serializable_registry[obj_type] = dict(
serializer=cls.add_type_to_serialize(serialize_method),
deserializer=deserialize_method)

def get_type_from_str(self, obj_type_str: str) -> type:
for k in self.serializable_registry:
if obj_type_str in str(k):
return k
raise ValueError("Unknown type")

def get_serializables(self) -> List[type]:
return list(self.serializable_registry.keys())

def get_serializer(self, obj_type: type) -> Serializer:
entry_dict = self.serializable_registry.get(obj_type, None)
if entry_dict is not None:
return entry_dict['serializer'] # type: ignore
else:
raise NotImplementedError(f'There is no known method to serialize {obj_type}')

def get_apply_serializer(self, obj: Any, append_length=False) -> bytes:
"""
Parameters
----------
obj: object
should be a serializable object (see get_serializables)
append_length: bool
if True will append the length of the bytes string in the beginning of the returned
bytes
Returns
-------
bytes: the encoded object
Notes
-----
Symmetric method of :meth:SerializableFactory.get_apply_deserializer
Examples
--------
>>> ser_factory = SerializableFactory()
>>> s = [23, 'a']
>>>> ser_factory.get_apply_deserializer(ser_factory.get_apply_serializer(s) == s
"""
serializer = self.get_serializer(obj.__class__)
bytes_str = serializer(obj)
if not append_length:
return bytes_str
else:
bytes_str = utils.int_to_bytes(len(bytes_str)) + bytes_str
return bytes_str

def get_deserializer(self, obj_type: type[Serializable]) -> Deserializer[Serializable]:
entry_dict = self.serializable_registry.get(obj_type, None)
if entry_dict is not None:
return entry_dict['deserializer'] # type: ignore
else:
raise NotImplementedError(f'There is no known method to deserialize an {obj_type} type')

def get_apply_deserializer(
self, bytes_str: bytes, only_object: bool = True
) -> Union[SERIALIZABLE, Tuple[SERIALIZABLE, bytes]]:
""" Infer which object is to be deserialized from the first bytes
The type has been encoded by the get_apply_serializer method
Parameters
----------
bytes_str: bytes
The bytes to convert back to an object
only_object: bool (default False)
if False, return the object and the remaining bytes if any
if True return only the object
Returns
-------
object: the reconstructed object
optional bytes: only if only_object parameter is False, will be the leftover bytes
Notes
-----
Symmetric method of :meth:SerializableFactory.get_apply_serializer
Examples
--------
>>> ser_factory = SerializableFactory()
>>> s = [23, 'a']
>>>> ser_factory.get_apply_deserializer(ser_factory.get_apply_serializer(s) == s
"""
obj_type_str, remaining_bytes = self.get_deserializer(str)(bytes_str)

obj_type = self.get_type_from_str(obj_type_str)
if obj_type is None:
raise NotImplementedError(f'There is no known method to deserialize an {obj_type_str} '
f'type')
result = self.get_deserializer(obj_type)(remaining_bytes)
return result[0] if only_object else result
102 changes: 102 additions & 0 deletions src/pymodaq_utils/serialize/mysocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from typing import Any, Union, TYPE_CHECKING

from pymodaq_utils.mysocket import Socket
from . import utils
from .factory import SerializableFactory, SERIALIZABLE

ser_factory = SerializableFactory()


class SocketString:
"""Mimic the Socket object but actually using a bytes string not a socket connection
Implements a minimal interface of two methods
Parameters
----------
bytes_string: bytes
See Also
--------
:class:`~pymodaq.utils.tcp_ip.mysocket.Socket`
"""
def __init__(self, bytes_string: bytes):
self._bytes_string = bytes_string

def to_bytes(self):
return self._bytes_string

def check_received_length(self, length: int) -> bytes:
"""
Make sure all bytes (length) that should be received are received through the socket.
Here just read the content of the underlying bytes string
Parameters
----------
length: int
The number of bytes to be read from the socket
Returns
-------
bytes
"""
data = self._bytes_string[0:length]
self._bytes_string = self._bytes_string[length:]
return data

def get_first_nbytes(self, length: int) -> bytes:
""" Read the first N bytes from the socket
Parameters
----------
length: int
The number of bytes to be read from the socket
Returns
-------
bytes
the read bytes string
"""
return self.check_received_length(length)


class Socket(Socket):
"""Custom Socket wrapping the built-in one and added functionalities to
make sure message have been sent and received entirely"""

def check_sended(self, data_bytes: bytes):
"""
Make sure all bytes are sent through the socket
Parameters
----------
data_bytes: bytes
"""
if not isinstance(data_bytes, bytes):
raise TypeError(f'{data_bytes} should be an bytes string, not a {type(data_bytes)}')
sended = 0
while sended < len(data_bytes):
sended += self.socket.send(data_bytes[sended:])

def check_sended_with_serializer(self, obj: SERIALIZABLE):
""" Convenience function to convert permitted objects to bytes and then use
the check_sended method
Appends to bytes the length of the message to make sure the reception knows how much bytes
to expect
For a list of allowed objects, see :meth:`Serializer.to_bytes`
"""
# do not use Serializer anymore but mimic its behavior
self.check_sended(ser_factory.get_apply_serializer(obj, append_length=True))

def check_receiving(self, bytes_str: bytes):
""" First read the 4th first bytes to get the total message length
Make sure to read that much bytes before processing the message
See check_sended and check_sended_with_serializer for a symmetric action
"""

bytes_len_bytes, remaining_bytes = utils.split_nbytes(bytes_str, 4)
bytes_len = utils.bytes_to_int(bytes_len_bytes)
self.check_received_length(bytes_len)
Loading

0 comments on commit ba83755

Please sign in to comment.