-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
467 lines (377 loc) · 16 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
# main.py
"""
Paraglider Flight Simulation - Main Execution Script
Manages the entire simulation lifecycle, coordinating initialization,
execution, and graceful shutdown of the paraglider flight simulation system.
"""
import sys
import os
import traceback
import numpy as np
from pathlib import Path
from datetime import datetime
from typing import Optional, Dict, List
# Ensure project root is in Python path
project_root = Path(__file__).parent
sys.path.append(str(project_root))
# Core simulation components
from models.paraglider import Paraglider
from models.environment import Environment
from controllers.flight_controller import FlightController
from visualization.visualizer import FlightVisualizer
from utils.logger import FlightLogger
from utils.time_manager import time_manager
# Configuration imports
from config.general_config import sim, safety, vis
from config.vehicle_config import vehicle
from config.flight_config import WAYPOINTS
class SimulationError(Exception):
"""Custom exception for simulation-specific critical errors."""
pass
class ParagliderSimulation:
"""
Comprehensive simulation manager responsible for coordinating
all aspects of the paraglider flight simulation.
"""
def __init__(self, log_dir: str = "logs"):
"""
Initialize simulation environment with robust configuration management.
Args:
log_dir: Base directory for logging simulation data
"""
# Session and logging configuration
self.session_id = datetime.now().strftime("%Y%m%d_%H%M%S")
self.log_dir = os.path.join(log_dir, self.session_id)
os.makedirs(self.log_dir, exist_ok=True)
# Initialize core simulation state
self._reset_simulation_state()
# Error tracking
self.initialization_errors: List[str] = []
self.runtime_errors: List[str] = []
def _reset_simulation_state(self):
"""Reset all simulation state variables to initial conditions."""
# Simulation control flags
self.initialized = False
self.running = False
self.paused = False
# Simulation components
self.vehicle: Optional[Paraglider] = None
self.environment: Optional[Environment] = None
self.controller: Optional[FlightController] = None
self.visualizer: Optional[FlightVisualizer] = None
self.logger: Optional[FlightLogger] = None
# Performance and tracking
self.frame_count = 0
self.start_time = 0.0
self.last_viz_update = 0.0
# Performance statistics
self.statistics = {
'max_speed': 0.0,
'min_altitude': float('inf'),
'max_altitude': 0.0,
'ground_contacts': 0,
'waypoints_reached': 0,
'runtime_errors': 0
}
def initialize(self) -> bool:
"""
Comprehensive initialization of all simulation components.
Returns:
bool: Indicates successful initialization
"""
print("\nInitializing Paraglider Flight Simulation...")
try:
# Validate configuration parameters
self._validate_configuration()
# Reset time management
time_manager.reset()
# Initialize critical simulation components
self._initialize_components()
# Validate initial simulation state
if not self._validate_initial_state():
raise SimulationError("Invalid initial simulation configuration")
self.initialized = True
self.start_time = time_manager.get_time()
print("✓ Simulation Initialized Successfully")
return True
except Exception as e:
self._log_error(f"Initialization Failed: {str(e)}")
return False
def _validate_configuration(self):
"""Validate and set default configuration parameters."""
configuration_defaults = [
('MAX_POSITION_LIMIT', 50000.0, safety),
('MAX_VELOCITY_LIMIT', 50.0, safety),
('MAX_ACCELERATION_LIMIT', 20.0, safety),
('DEBUG_MODE', False, sim)
]
for attr, default_value, config_module in configuration_defaults:
if not hasattr(config_module, attr):
setattr(config_module, attr, default_value)
print(f"Set default {attr}: {default_value}")
def _initialize_components(self):
"""Initialize all critical simulation components."""
component_initializers = [
("Environment", self._initialize_environment),
("Vehicle", self._initialize_vehicle),
("Flight Controller", self._initialize_flight_controller),
("Visualization", self._initialize_visualization),
("Logger", self._initialize_logger)
]
for name, initializer in component_initializers:
try:
print(f"Initializing {name}...")
initializer()
except Exception as e:
error_msg = f"Failed to initialize {name}: {str(e)}"
self._log_error(error_msg)
raise SimulationError(error_msg)
def _initialize_environment(self):
"""Initialize atmospheric and environmental model."""
self.environment = Environment()
def _initialize_vehicle(self):
"""Initialize paraglider vehicle with initial conditions."""
self.vehicle = Paraglider(
position=vehicle.INITIAL_POSITION.copy(),
velocity=vehicle.INITIAL_VELOCITY.copy(),
orientation=vehicle.INITIAL_ORIENTATION.copy()
)
def _initialize_flight_controller(self):
"""Initialize autonomous flight control system."""
self.controller = FlightController()
def _initialize_visualization(self):
"""Initialize visualization if enabled in configuration."""
if sim.ENABLE_VISUALIZATION:
self.visualizer = FlightVisualizer()
def _initialize_logger(self):
"""Initialize logging system and record initial state."""
if sim.ENABLE_LOGGING:
self.logger = FlightLogger(self.log_dir)
self._log_initial_state()
def _log_initial_state(self):
"""Log comprehensive initial simulation metadata."""
if not self.logger:
return
initial_state = {
'session_id': self.session_id,
'timestamp': datetime.now().isoformat(),
'configuration': {
'simulation': {k: v for k, v in vars(sim).items() if not k.startswith('__')},
'vehicle': {k: v for k, v in vars(vehicle).items() if not k.startswith('__')},
'visualization': {k: v for k, v in vars(vis).items() if not k.startswith('__')}
},
'initial_conditions': {
'position': vehicle.INITIAL_POSITION.tolist(),
'velocity': vehicle.INITIAL_VELOCITY.tolist(),
'orientation': vehicle.INITIAL_ORIENTATION.tolist()
},
'waypoints': [
{'position': pos.tolist(), 'mode': mode}
for pos, mode in WAYPOINTS
]
}
self.logger.log_metadata(initial_state)
def _validate_initial_state(self) -> bool:
"""
Validate critical simulation parameters and initial vehicle state.
Returns:
bool: Indicates if initial state is valid
"""
validation_checks = [
(self.environment is not None, "Environment not initialized"),
(self.vehicle is not None, "Vehicle not initialized"),
(self.controller is not None, "Flight controller not initialized"),
(self._validate_initial_position(), "Invalid initial vehicle position"),
(self._validate_initial_velocity(), "Invalid initial vehicle velocity"),
(self._validate_waypoints(), "Invalid waypoint configuration")
]
for condition, error_message in validation_checks:
if not condition:
self._log_error(error_message)
return False
return True
def _validate_initial_position(self) -> bool:
"""Validate initial vehicle position."""
position = self.vehicle.position
return (isinstance(position, np.ndarray) and
position.shape == (3,) and
np.all(np.abs(position) < safety.MAX_POSITION_LIMIT))
def _validate_initial_velocity(self) -> bool:
"""Validate initial vehicle velocity."""
velocity = self.vehicle.velocity
speed = np.linalg.norm(velocity)
return (isinstance(velocity, np.ndarray) and
velocity.shape == (3,) and
speed <= safety.MAX_VELOCITY_LIMIT)
def _validate_waypoints(self) -> bool:
"""Validate waypoint configuration."""
return len(WAYPOINTS) > 0 and all(
len(wp) == 2 and
isinstance(wp[0], np.ndarray) and
wp[0].shape == (3,)
for wp in WAYPOINTS
)
def run(self):
"""
Execute main simulation loop with comprehensive monitoring.
Manages time progression, component updates,
performance tracking, and error handling.
"""
if not self.initialized:
print("Error: Simulation not initialized. Call initialize() first.")
return
print("\nStarting Paraglider Simulation...")
self.running = True
try:
while self.running:
current_time = time_manager.get_time()
# Check simulation termination conditions
if current_time >= sim.SIMULATION_TIME:
print("\nSimulation time limit reached")
break
# Update simulation state
if not self._update_simulation():
print("\nSimulation update failed")
break
# Update visualization
if (self.visualizer and
current_time - self.last_viz_update >= 1.0/vis.DISPLAY_UPDATE_RATE):
self._update_visualization()
# Update performance metrics
self._update_performance_metrics()
# Increment frame counter
self.frame_count += 1
# Manage time scaling
if sim.TIME_SCALING > 0:
self._manage_time_scaling(current_time)
except KeyboardInterrupt:
print("\nSimulation interrupted by user")
except Exception as e:
self._handle_runtime_error(e)
finally:
self._finalize_simulation()
def _update_simulation(self) -> bool:
"""Update single simulation time step."""
try:
current_time = time_manager.get_time()
# Update environment
self.environment.update(current_time)
# Get environmental conditions
conditions = self.environment.get_conditions(
self.vehicle.position,
current_time
)
# Update flight controller
left_brake, right_brake = self.controller.update(
self.vehicle,
conditions
)
# Update vehicle controls
self.vehicle.set_control_inputs(left_brake, right_brake)
# Update vehicle physics
if not self.vehicle.update(sim.DT, conditions):
print("❌ Vehicle physics update failed")
return False
# Log data if logging is enabled
if self.logger:
self._log_frame_data(current_time, conditions)
# Update simulation time
time_manager.update(sim.DT)
return True
except Exception as e:
self._handle_runtime_error(f"Simulation update error: {str(e)}")
return False
def _update_visualization(self):
"""Update visualization with error handling."""
try:
self.visualizer.update(self.vehicle, self.controller)
self.last_viz_update = time_manager.get_time()
except Exception as e:
print(f"Visualization update error: {str(e)}")
self.visualizer = None
def _update_performance_metrics(self):
"""Track and update simulation performance statistics."""
speed = np.linalg.norm(self.vehicle.velocity)
self.statistics['max_speed'] = max(
self.statistics['max_speed'], speed
)
altitude = self.vehicle.position[2]
self.statistics['min_altitude'] = min(
self.statistics['min_altitude'], altitude
)
self.statistics['max_altitude'] = max(
self.statistics['max_altitude'], altitude
)
if getattr(self.vehicle, 'ground_contact', False):
self.statistics['ground_contacts'] += 1
def _manage_time_scaling(self, current_time: float):
"""Manage real-time simulation scaling and performance."""
target_frame_time = sim.DT / sim.TIME_SCALING
elapsed = time_manager.get_time() - current_time
if elapsed < target_frame_time:
time_manager.pause(target_frame_time - elapsed)
def _handle_runtime_error(self, error):
"""Comprehensive runtime error handling and logging."""
print(f"\nRuntime Error: {str(error)}")
if sim.DEBUG_MODE:
traceback.print_exc()
self.statistics['runtime_errors'] += 1
self.runtime_errors.append(str(error))
def _finalize_simulation(self):
"""Perform comprehensive simulation shutdown and reporting."""
print("\nFinalizing Simulation...")
# Stop visualization
if self.visualizer:
self.visualizer.quit()
# Close logger
if self.logger:
self.logger.close()
# Print final simulation summary
self._print_simulation_summary()
def _print_simulation_summary(self):
"""Generate and print comprehensive simulation report."""
print("\nSimulation Summary:")
print(f"Total Simulation Time: {time_manager.get_time():.2f} seconds")
print(f"Total Frames: {self.frame_count}")
print(f"Maximum Speed: {self.statistics['max_speed']:.2f} m/s")
print(f"Altitude Range: {self.statistics['min_altitude']:.2f} to {self.statistics['max_altitude']:.2f} m")
print(f"Ground Contacts: {self.statistics['ground_contacts']}")
print(f"Runtime Errors: {self.statistics['runtime_errors']}")
def _log_frame_data(self, current_time: float, conditions: Dict):
"""Log comprehensive frame data for analysis."""
if not self.logger:
return
# Log vehicle state
self.logger.log_vehicle_state(
self.vehicle.get_state(),
current_time
)
# Log environmental conditions
self.logger.log_environment(
conditions,
current_time
)
# Log control state
if self.controller:
self.logger.log_control(
self.controller.get_state(),
current_time
)
def _log_error(self, message: str):
"""Log simulation errors with timestamp."""
print(f"❌ ERROR: {message}")
self.initialization_errors.append(message)
def main():
"""Main entry point for paraglider flight simulation."""
try:
simulation = ParagliderSimulation()
if not simulation.initialize():
print("Simulation initialization failed. Exiting.")
sys.exit(1)
simulation.run()
except Exception as e:
print(f"Unhandled simulation error: {e}")
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()