diff --git a/requirements.txt b/requirements.txt index 69ab33175..de412c5a2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,6 @@ maturin==0.14.12 uvloop; platform_system!="Windows" watchdog==2.2.1 multiprocess==0.70.14 +psutil==5.9.4 jinja2==3.1.2 nestd==0.3.1 diff --git a/robyn/__init__.py b/robyn/__init__.py index 071b529a5..736f8a963 100644 --- a/robyn/__init__.py +++ b/robyn/__init__.py @@ -2,17 +2,14 @@ import logging import multiprocess as mp import os -import signal from typing import Callable, List, Optional from nestd import get_all_nested -from watchdog.observers import Observer - from robyn.argument_parser import Config -from robyn.dev_event_handler import EventHandler +from robyn.reloader import setup_reloader from robyn.env_populator import load_vars from robyn.events import Events -from robyn.logger import Colors, logger +from robyn.logger import logger from robyn.processpool import run_processes from robyn.responses import jsonify, serve_file, serve_html from robyn.robyn import FunctionInfo, Request, Response @@ -25,11 +22,21 @@ class Robyn: """This is the python wrapper for the Robyn binaries.""" - def __init__(self, file_object: str) -> None: + def __init__(self, file_object: str, config: Config = Config()) -> None: directory_path = os.path.dirname(os.path.abspath(file_object)) self.file_path = file_object self.directory_path = directory_path - self.config = Config() + self.config = config + + load_vars(project_root=directory_path) + logging.basicConfig(level=self.config.log_level) + + # If we are in dev mode, we need to setup the reloader + # This process will be used by the watchdog observer while running the actual server as children processes + if self.config.dev and not os.environ.get("IS_RELOADER_SETUP", False): + setup_reloader(self.directory_path, self.file_path) + exit(0) + self.router = Router() self.middleware_router = MiddlewareRouter() self.web_socket_router = WebSocketRouter() @@ -37,8 +44,6 @@ def __init__(self, file_object: str) -> None: self.response_headers: List[Header] = [] # This needs a better type self.directories: List[Directory] = [] self.event_handlers = {} - load_vars(project_root=directory_path) - logging.basicConfig(level=self.config.log_level) def _add_route(self, route_type, endpoint, handler, is_const=False): """ @@ -119,59 +124,19 @@ def start(self, url: str = "127.0.0.1", port: int = 8080): mp.allow_connection_pickling() - if not self.config.dev: - run_processes( - url, - port, - self.directories, - self.request_headers, - self.router.get_routes(), - self.middleware_router.get_routes(), - self.web_socket_router.get_routes(), - self.event_handlers, - self.config.workers, - self.config.processes, - self.response_headers, - ) - else: - event_handler = EventHandler( - url, - port, - self.directories, - self.request_headers, - self.router.get_routes(), - self.middleware_router.get_routes(), - self.web_socket_router.get_routes(), - self.event_handlers, - self.config.workers, - self.config.processes, - self.response_headers, - ) - event_handler.start_server() - logger.info( - f"Dev server initialized with the directory_path : {self.directory_path}", - Colors.BLUE, - ) - - def terminating_signal_handler(_sig, _frame): - logger.info("Terminating server!!", bold=True) - event_handler.stop_server() - observer.stop() - observer.join() - - signal.signal(signal.SIGINT, terminating_signal_handler) - signal.signal(signal.SIGTERM, terminating_signal_handler) - - observer = Observer() - observer.schedule(event_handler, path=self.directory_path, recursive=True) - observer.start() - - try: - while observer.is_alive(): - observer.join(1) - finally: - observer.stop() - observer.join() + run_processes( + url, + port, + self.directories, + self.request_headers, + self.router.get_routes(), + self.middleware_router.get_routes(), + self.web_socket_router.get_routes(), + self.event_handlers, + self.config.workers, + self.config.processes, + self.response_headers, + ) def add_view(self, endpoint: str, view: Callable, const: bool = False): """ diff --git a/robyn/dev_event_handler.py b/robyn/dev_event_handler.py deleted file mode 100644 index e2ebc932c..000000000 --- a/robyn/dev_event_handler.py +++ /dev/null @@ -1,73 +0,0 @@ -from typing import Dict, List - -from watchdog.events import FileSystemEventHandler -from robyn.events import Events -from robyn.processpool import run_processes - -from robyn.robyn import FunctionInfo -from robyn.router import MiddlewareRoute, Route -from robyn.types import Directory, Header -from robyn.ws import WS - - -class EventHandler(FileSystemEventHandler): - def __init__( - self, - url: str, - port: int, - directories: List[Directory], - request_headers: List[Header], - routes: List[Route], - middlewares: List[MiddlewareRoute], - web_sockets: Dict[str, WS], - event_handlers: Dict[Events, FunctionInfo], - workers: int, - processes: int, - response_headers: List[Header], - ) -> None: - self.url = url - self.port = port - self.directories = directories - self.request_headers = request_headers - self.response_headers = response_headers - self.routes = routes - self.middlewares = middlewares - self.web_sockets = web_sockets - self.event_handlers = event_handlers - self.n_workers = workers - self.n_processes = processes - self.processes = [] - - def start_server(self): - processes = run_processes( - self.url, - self.port, - self.directories, - self.request_headers, - self.routes, - self.middlewares, - self.web_sockets, - self.event_handlers, - self.n_workers, - self.n_processes, - self.response_headers, - True, - ) - - self.processes.extend(processes) - - def stop_server(self): - for process in self.processes: - process.kill() - - def on_any_event(self, event) -> None: - """ - This function is a callback that will start a new server on every even change - - :param event FSEvent: a data structure with info about the events - """ - - for process in self.processes: - process.kill() - - self.start_server() diff --git a/robyn/processpool.py b/robyn/processpool.py index 0d5ea1ef1..d0ed9bc4b 100644 --- a/robyn/processpool.py +++ b/robyn/processpool.py @@ -24,7 +24,6 @@ def run_processes( workers: int, processes: int, response_headers: List[Header], - from_event_handler: bool = False, ) -> List[Process]: socket = SocketHeld(url, port) @@ -41,19 +40,17 @@ def run_processes( response_headers, ) - if not from_event_handler: - - def terminating_signal_handler(_sig, _frame): - logger.info("Terminating server!!", bold=True) - for process in process_pool: - process.kill() + def terminating_signal_handler(_sig, _frame): + logger.info("Terminating server!!", bold=True) + for process in process_pool: + process.kill() - signal.signal(signal.SIGINT, terminating_signal_handler) - signal.signal(signal.SIGTERM, terminating_signal_handler) + signal.signal(signal.SIGINT, terminating_signal_handler) + signal.signal(signal.SIGTERM, terminating_signal_handler) - logger.info("Press Ctrl + C to stop \n") - for process in process_pool: - process.join() + logger.info("Press Ctrl + C to stop \n") + for process in process_pool: + process.join() return process_pool diff --git a/robyn/reloader.py b/robyn/reloader.py new file mode 100644 index 000000000..afb2ce848 --- /dev/null +++ b/robyn/reloader.py @@ -0,0 +1,84 @@ +import os +import signal +import subprocess +import sys +import time +import psutil + +from watchdog.events import FileSystemEventHandler +from watchdog.observers import Observer + +from robyn.logger import Colors, logger + + +def setup_reloader(directory_path: str, file_path: str): + event_handler = EventHandler(file_path) + + event_handler.reload() + + logger.info( + f"Dev server initialized with the directory_path : {directory_path}", + Colors.BLUE, + ) + + def terminating_signal_handler(_sig, _frame): + event_handler.stop_server() + logger.info("Terminating reloader", bold=True) + observer.stop() + observer.join() + + signal.signal(signal.SIGINT, terminating_signal_handler) + signal.signal(signal.SIGTERM, terminating_signal_handler) + + observer = Observer() + observer.schedule(event_handler, path=directory_path, recursive=True) + observer.start() + + try: + while observer.is_alive(): + observer.join(1) + finally: + observer.stop() + observer.join() + + +class EventHandler(FileSystemEventHandler): + def __init__(self, file_path: str) -> None: + self.file_path = file_path + + self.last_reload = time.time() + + def stop_server(self): + for process in psutil.Process().children(recursive=True): + process.kill() + process.wait() # Required to avoid zombies + + def reload(self): + self.stop_server() + + new_env = os.environ.copy() + new_env[ + "IS_RELOADER_SETUP" + ] = "True" # This is used to check if a reloader is already running + + subprocess.Popen( + [sys.executable, *sys.argv], + env=new_env, + start_new_session=False, + ) + + self.last_reload = time.time() + + def on_modified(self, event) -> None: + """ + This function is a callback that will start a new server on every even change + + :param event FSEvent: a data structure with info about the events + """ + + # Avoid reloading multiple times when watchdog detects multiple events + if time.time() - self.last_reload < 0.5: + return + + time.sleep(0.2) # Wait for the file to be fully written + self.reload()