Skip to content

Commit

Permalink
Merge pull request #29 from JezaChen/features/common_msg_types
Browse files Browse the repository at this point in the history
[Feature] Add common message type class to reduce hard-coding
  • Loading branch information
JezaChen authored Jan 6, 2024
2 parents b7ef199 + d8c4a5a commit df31899
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 27 deletions.
29 changes: 15 additions & 14 deletions frontend/client/services/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import frontend.client.services.file_manager as FileManager
import schemes
from frontend.common.constants import MsgType
# bits represents status
from frontend.constants import KEY_TYPE, KEY_SID, TYPE_INIT
from global_config import ClientConfig
Expand Down Expand Up @@ -166,15 +167,15 @@ def __init__(self, sid=""):
self._load_config_object()

self.recv_msg_handler = {
"config": self.handle_upload_config_echo,
"upload_edb": self.handle_upload_encrypted_database_echo,
"result": self.handle_result,
"control": self.handle_control_message,
MsgType.CONFIG: self.handle_upload_config_echo,
MsgType.UPLOAD_DB: self.handle_upload_encrypted_database_echo,
MsgType.RESULT: self.handle_result,
MsgType.CONTROL: self.handle_control_message,
}

self.echo_handler = {
"config": [],
"upload_edb": []
MsgType.CONFIG: [],
MsgType.UPLOAD_DB: []
}

self.echo_futures = {}
Expand Down Expand Up @@ -276,7 +277,7 @@ async def _recv_message(self):
self.echo_futures[msg_type] = [] # clear

# result future handler
if msg_type == "result":
if msg_type == MsgType.RESULT:
token_digest = message_dict.get("token_digest")
for fut in self.result_futures.get(token_digest, []):
fut.set_result(content_byte)
Expand Down Expand Up @@ -424,9 +425,9 @@ async def handle_upload_config(self,
# Create a new Future object.
fut = loop.create_future()
fut.add_done_callback(wait_callback_func)
self.register_upload_echo_future_once("config", fut)
self.register_upload_echo_future_once(MsgType.CONFIG, fut)

await self._send_message("config", pickle.dumps(self.config))
await self._send_message(MsgType.CONFIG, pickle.dumps(self.config))
logger.info(f"[{self.sid}] Uploading config.")

if wait:
Expand Down Expand Up @@ -524,9 +525,9 @@ async def handle_upload_encrypted_database(self,
# Create a new Future object.
fut = loop.create_future()
fut.add_done_callback(wait_callback_func)
self.register_upload_echo_future_once("upload_edb", fut)
self.register_upload_echo_future_once(MsgType.UPLOAD_DB, fut)

await self._send_message("upload_edb", self.edb.serialize())
await self._send_message(MsgType.UPLOAD_DB, self.edb.serialize())
logger.info(f"[{self.sid}] Uploading encrypted database.")

if wait:
Expand Down Expand Up @@ -554,13 +555,13 @@ async def handle_keyword_search(self, keyword: bytes,
# Create a new Future object.
fut = loop.create_future()
fut.add_done_callback(wait_callback_func)
self.register_upload_echo_future_once("result", fut)
self.register_upload_echo_future_once(MsgType.RESULT, fut)

token = self.sse_scheme.TokenGen(self.key, keyword)
token_bytes = token.serialize()
token_digest = hashlib.sha256(token_bytes).digest()

await self._send_message("token",
await self._send_message(MsgType.TOKEN,
token_bytes,
token_digest=token_digest)
logger.info(f"[{self.sid}] Uploading search token.")
Expand Down Expand Up @@ -627,7 +628,7 @@ def _compare_result(fut: asyncio.Future, actual_result):
assert return_result.result == actual_result

for keyword in itertools.islice(db.keys(), 10):
# service.register_echo_handler_once("result", functools.partial(_compare_result, actual_result=db[keyword]))
# service.register_echo_handler_once(MsgType.RESULT, functools.partial(_compare_result, actual_result=db[keyword]))
await service.handle_keyword_search(keyword,
wait=True,
wait_callback_func=functools.partial(_compare_result,
Expand Down
12 changes: 12 additions & 0 deletions frontend/common/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# -*- coding:utf-8 _*-
"""
LIB-SSE CODE
@author: Jeza Chen
@license: GPL-3.0 License
@file: services_manager.py
@time: 2023/12/17
@contact: jeza@vip.qq.com
@site:
@software: PyCharm
@description: common constants and functions for client and server endpoints.
"""
27 changes: 27 additions & 0 deletions frontend/common/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# -*- coding:utf-8 _*-
"""
LIB-SSE CODE
@author: Jeza Chen
@license: GPL-3.0 License
@file: services_manager.py
@time: 2023/12/17
@contact: jeza@vip.qq.com
@site:
@software: PyCharm
@description: constants shared by clients and servers
"""


# Types of messages that can be sent between the client and server
class MsgType:
# init echo
INIT = "init"
# service config
CONFIG = "config"
# upload encrypted databases
UPLOAD_DB = "upload_edb"
# for search request
TOKEN = "token"
RESULT = "result"
# for debug
CONTROL = "control"
25 changes: 13 additions & 12 deletions frontend/server/services/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import asyncio
import pickle

from frontend.common.constants import MsgType
from frontend.server.services.comm import send_message
from toolkit.logger.logger import getSSELogger
from websockets.legacy.server import WebSocketServerProtocol
Expand Down Expand Up @@ -84,9 +85,9 @@ def __init__(self, sid, websocket: WebSocketServerProtocol):
self.service_meta = {"state": SERVICE_STATE.NOT_EXISTS}

self.recv_msg_handler = {
"config": self.handle_upload_config,
"upload_edb": self.handle_upload_encrypted_database,
"token": self.handle_search_token
MsgType.CONFIG: self.handle_upload_config,
MsgType.UPLOAD_DB: self.handle_upload_encrypted_database,
MsgType.TOKEN: self.handle_search_token
}

if self.get_current_service_state() == SERVICE_STATE.ALL_READY:
Expand Down Expand Up @@ -168,15 +169,15 @@ def get_current_service_state(self):
return self.service_meta["state"]

def send_init_echo(self):
self.send_message("init", pickle.dumps({"ok": True, "state": self.get_current_service_state()}))
self.send_message(MsgType.INIT, pickle.dumps({"ok": True, "state": self.get_current_service_state()}))
logger.info(f"Send initialization echo of service {self.sid}.")

def handle_upload_config(self, config_bytes: bytes, raw_msg_dict: dict):
logger.info(f"Receive config file from service {self.sid}.")

if self.get_current_service_state() != SERVICE_STATE.NOT_EXISTS:
reason = f"The config of service {self.sid} has been already uploaded."
self.send_message("config", pickle.dumps({"ok": False, "reason": reason}))
self.send_message(MsgType.CONFIG, pickle.dumps({"ok": False, "reason": reason}))
logger.error(reason)
raise ValueError(reason)

Expand All @@ -187,42 +188,42 @@ def handle_upload_config(self, config_bytes: bytes, raw_msg_dict: dict):
self.config = config
self.service_meta["state"] = SERVICE_STATE.CONFIG_UPLOADED_BUT_EDB_NOT_UPLOADED
FileManager.write_service_meta(self.sid, self.service_meta)
self.send_message("config", pickle.dumps({"ok": True}))
self.send_message(MsgType.CONFIG, pickle.dumps({"ok": True}))
logger.info(f"Store config for service {self.sid} successfully.")

def handle_upload_encrypted_database(self, edb_bytes: bytes, raw_msg_dict: dict):
logger.info(f"Receive encrypted database from service {self.sid}.")

if self.get_current_service_state() == SERVICE_STATE.NOT_EXISTS:
reason = f"The config of service {self.sid} has not been uploaded."
self.send_message("upload_edb", pickle.dumps({"ok": False, "reason": reason}))
self.send_message(MsgType.UPLOAD_DB, pickle.dumps({"ok": False, "reason": reason}))
logger.error(reason)
raise ValueError(reason)

if self.get_current_service_state() == SERVICE_STATE.ALL_READY:
reason = f"The database of service {self.sid} has been already uploaded."
self.send_message("upload_edb", pickle.dumps({"ok": False, "reason": reason}))
self.send_message(MsgType.UPLOAD_DB, pickle.dumps({"ok": False, "reason": reason}))
logger.error(reason)
raise ValueError(reason)

FileManager.write_encrypted_database(self.sid, edb_bytes)
self.service_meta["state"] = SERVICE_STATE.ALL_READY
FileManager.write_service_meta(self.sid, self.service_meta)
self.send_message("upload_edb", pickle.dumps({"ok": True}))
self.send_message(MsgType.UPLOAD_DB, pickle.dumps({"ok": True}))
logger.info(f"Store encrypted database for service {self.sid} successfully.")

def handle_search_token(self, token_bytes: bytes, raw_msg_dict: dict):
logger.info(f"Receive search token from service {self.sid}.")

if self.get_current_service_state() == SERVICE_STATE.NOT_EXISTS:
reason = f"The config of service {self.sid} has not been uploaded."
self.send_message("result", pickle.dumps({"ok": False, "reason": reason}))
self.send_message(MsgType.RESULT, pickle.dumps({"ok": False, "reason": reason}))
logger.error(reason)
raise ValueError(reason)

if self.get_current_service_state() == SERVICE_STATE.CONFIG_UPLOADED_BUT_EDB_NOT_UPLOADED:
reason = f"The encrypted database of service {self.sid} has not been uploaded."
self.send_message("result", pickle.dumps({"ok": False, "reason": reason}))
self.send_message(MsgType.RESULT, pickle.dumps({"ok": False, "reason": reason}))
logger.error(reason)
raise ValueError(reason)

Expand All @@ -233,7 +234,7 @@ def handle_search_token(self, token_bytes: bytes, raw_msg_dict: dict):
tk_digest = raw_msg_dict.get("token_digest")
tk_object = self.sse_module_loader.SSEToken.deserialize(token_bytes, self.config_object)
result = self.sse_scheme.Search(self.edb, tk_object)
self.send_message("result", content=result.serialize(), token_digest=tk_digest)
self.send_message(MsgType.RESULT, content=result.serialize(), token_digest=tk_digest)
logger.info(f"Search for service {self.sid} successfully.")

def close_service(self):
Expand Down
3 changes: 2 additions & 1 deletion frontend/server/services/services_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from websockets.legacy.server import WebSocketServerProtocol

from frontend.common.constants import MsgType
from frontend.server.services.comm import send_message
from frontend.server.services.service import Service
from toolkit.logger.logger import getSSELogger
Expand All @@ -35,7 +36,7 @@ async def create_service(self, sid: str, websocket: WebSocketServerProtocol):
prev_server = self._service_dict[sid]
reason = f"Service {sid} is already running, we need to wait for the previous connection to close..."
logger.warning(reason)
service.send_message("control", reason.encode('utf8'))
service.send_message(MsgType.CONTROL, reason.encode('utf8'))
await prev_server.wait_closed() # wait for the previous socket to close

async with self._access_dict_lock:
Expand Down

0 comments on commit df31899

Please sign in to comment.