-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
128 lines (108 loc) · 4.92 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import os
from dotenv import load_dotenv
import logging
import time
import signal
import multiprocessing
from multiprocessing import Queue, Event
from threading import Thread
from queue import Empty
from Config_files.config import Config
from logo import create_logo
from database.database_manager import DatabaseManager
from network.initialize_network import initialize_network
from traffic.traffic_generator import TrafficController
from network.ue_manager import UEManager
from network.gNodeB_manager import gNodeBManager
from network.cell_manager import CellManager
from network.sector_manager import SectorManager
from network.NetworkLoadManager import NetworkLoadManager
from logs.logger_config import gnodbe_load_logger, ue_logger
from network.network_delay import NetworkDelay
from simulator_cli import SimulatorCLI
from API_Gateway import API
def run_api(queue, shutdown_event):
def start_api_server():
print("Starting API server on port 5000...")
API.app.run(port=5000, use_reloader=False)
api_server_thread = Thread(target=start_api_server)
api_server_thread.daemon = True
api_server_thread.start()
while not shutdown_event.is_set():
try:
message = queue.get(timeout=1)
if message == "SHUTDOWN":
print("Shutting down API server...")
break
except Empty:
continue
def generate_traffic_loop(traffic_controller, ue_list, network_load_manager, network_delay_calculator, db_manager, cell_manager, shutdown_event):
logging.debug("Starting traffic generation loop")
while not shutdown_event.is_set():
for ue in ue_list:
throughput_data = traffic_controller.calculate_throughput(ue)
network_load_manager.network_measurement()
time.sleep(1)
def initialize_components(base_dir):
config = Config(base_dir)
db_manager = DatabaseManager.get_instance()
if not db_manager.test_connection():
raise ConnectionError("Failed to connect to InfluxDB")
gNodeB_manager = gNodeBManager.get_instance(base_dir=base_dir)
gNodeBs, cells, sectors, ues, cell_manager = initialize_network(base_dir, num_ues_to_launch=config.ue_config.get('num_ues_to_launch', 5))
sector_manager = SectorManager.get_instance(db_manager=db_manager)
network_load_manager = NetworkLoadManager.get_instance(cell_manager, sector_manager, gNodeB_manager)
ue_manager = UEManager.get_instance(base_dir)
network_delay_calculator = NetworkDelay()
return config, db_manager, gNodeB_manager, cell_manager, sector_manager, network_load_manager, ue_manager, network_delay_calculator, gNodeBs, cells, sectors, ues
def main():
logging.basicConfig(level=logging.INFO)
base_dir = os.path.dirname(os.path.abspath(__file__))
dotenv_path = os.path.join(base_dir, '.env')
load_dotenv(dotenv_path)
logging.getLogger('sector_load_logger').setLevel(logging.WARNING)
logging.getLogger('cell_load_logger').setLevel(logging.WARNING)
logging.getLogger('gnodbe_load_logger').setLevel(logging.WARNING)
print(create_logo())
shutdown_event = Event()
ipc_queue = Queue()
api_proc = multiprocessing.Process(target=run_api, args=(ipc_queue, shutdown_event))
api_proc.start()
time.sleep(1) # Wait for API server to start
try:
config, db_manager, gNodeB_manager, cell_manager, sector_manager, network_load_manager, ue_manager, network_delay_calculator, gNodeBs, cells, sectors, ues = initialize_components(base_dir)
except Exception as e:
logging.error(f"Failed to initialize components: {e}")
return
traffic_controller_instance = TrafficController()
traffic_thread = Thread(target=generate_traffic_loop, args=(traffic_controller_instance, ues, network_load_manager, network_delay_calculator, db_manager, cell_manager, shutdown_event))
traffic_thread.start()
monitoring_thread = Thread(target=network_load_manager.monitoring, args=(shutdown_event,))
monitoring_thread.start()
def signal_handler(signum, frame):
print("Signal received, shutting down gracefully...")
logging.info("Signal received, shutting down gracefully...")
shutdown_event.set()
ipc_queue.put("SHUTDOWN")
api_proc.join(timeout=5)
if api_proc.is_alive():
logging.warning("API server didn't shut down gracefully. Terminating.")
api_proc.terminate()
logging.info("API server shutdown complete.")
traffic_thread.join()
monitoring_thread.join()
exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
cli = SimulatorCLI(
gNodeB_manager=gNodeB_manager,
cell_manager=cell_manager,
sector_manager=sector_manager,
ue_manager=ue_manager,
network_load_manager=network_load_manager,
base_dir=base_dir,
shutdown_event=shutdown_event
)
cli.cmdloop()
if __name__ == "__main__":
main()