Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DiscreteEventScheduler #1890

Merged
merged 10 commits into from
Nov 29, 2023
106 changes: 106 additions & 0 deletions mesa/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
# Remove this __future__ import once the oldest supported Python is 3.10
from __future__ import annotations

import heapq
from collections import defaultdict

# mypy
Expand Down Expand Up @@ -327,3 +328,108 @@
Returns the current number of agents of certain type in the queue.
"""
return len(self.agents_by_type[type_class])


class DiscreteEventScheduler(BaseScheduler):
"""
A scheduler for discrete event simulation in Mesa.

This scheduler manages events where each event is associated with a
specific time and agent. The scheduler advances time not in fixed
increments, but to the moment the next event is scheduled to occur.

This implementation uses a priority queue (heapq) to manage events. Each
event is a tuple of the form (time, random_value, agent), where:
- time (float): The scheduled time for the event.
- random_value (float): A secondary sorting criterion to randomize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would have been clearer to say that the random_value is used only once for the heapq.push operation. I had to check elsewhere in the code for its usage, until I realized of this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read it for again, I think it’s quite clear, especially together with the comments lower in the code..

the order of events that are scheduled for the same time.
- agent (Agent): The agent associated with the event.

The random value for secondary sorting ensures that when two events are
scheduled for the same time, their execution order is randomized, thus
preventing direct comparison issues between different types of agents and
maintaining the integrity of the simulation's randomness.

Attributes:
model (Model): The model instance associated with the scheduler.
event_queue (list): A priority queue of scheduled events.
time_step (int or float): The fixed time period by which the model advances
on each step. Defaults to 1.

Methods:
schedule_event(time, agent): Schedule an event for a specific time.
schedule_in(delay, agent): Schedule an event after a specified delay.
step(): Execute all events within the next time_step period.
get_next_event_time(): Returns the time of the next scheduled event.

Usage:
1. Instantiate the DiscreteEventScheduler with a model instance and a time_step period.
2. Add agents to the scheduler using schedule.add(). With schedule_now=True (default),
the first event for the agent will be scheduled immediately.
3. In the Agent step() method, schedule the next event for the agent
(using schedule_in or schedule_event).
3. Add self.schedule.step() to the model's step() method, as usual.

Now, with each model step, the scheduler will execute all events within the
next time_step period, and advance time one time_step forward.
"""

def __init__(self, model: Model, time_step: TimeT = 1) -> None:
super().__init__(model)
self.event_queue: list[tuple[TimeT, float, Agent]] = []
self.time_step: TimeT = time_step # Fixed time period for each step

def schedule_event(self, time: TimeT, agent: Agent) -> None:
"""Schedule an event for an agent at a specific time."""
if time < self.time:
raise ValueError(
f"Scheduled time ({time}) must be >= the current time ({self.time})"
)
# Create an event, sorted first on time, secondary on a random value
event = (time, self.model.random.random(), agent)
heapq.heappush(self.event_queue, event)

def schedule_in(self, delay: TimeT, agent: Agent) -> None:
"""Schedule an event for an agent after a specified delay."""
if delay < 0:
raise ValueError("Delay must be non-negative")

Check warning on line 395 in mesa/time.py

View check run for this annotation

Codecov / codecov/patch

mesa/time.py#L395

Added line #L395 was not covered by tests
event_time = self.time + delay
self.schedule_event(event_time, agent)

def step(self) -> None:
"""Execute the next event and advance the time."""
end_time = self.time + self.time_step

while self.event_queue and self.event_queue[0][0] <= end_time:
# Get the next event (ignore the random value during unpacking)
time, _, agent = heapq.heappop(self.event_queue)

# Advance time to the event's time
self.time = time

# Execute the event
if agent.unique_id in self._agents:
agent.step()

# After processing events, advance time by the time_step
self.time = end_time
self.steps += 1

def get_next_event_time(self) -> TimeT | None:
"""Returns the time of the next scheduled event."""
if not self.event_queue:
return None

Check warning on line 421 in mesa/time.py

View check run for this annotation

Codecov / codecov/patch

mesa/time.py#L421

Added line #L421 was not covered by tests
return self.event_queue[0][0]

def add(self, agent: Agent, schedule_now: bool = True) -> None:
"""Add an Agent object to the schedule and optionally schedule its first event.

Args:
agent: An Agent to be added to the schedule. Must have a step() method.
schedule_now: If True, schedules the first event for the agent immediately.
"""
super().add(agent) # Call the add method from BaseScheduler

if schedule_now:
# Schedule the first event immediately
self.schedule_event(self.time, agent)
110 changes: 110 additions & 0 deletions tests/test_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from mesa import Agent, Model
from mesa.time import (
BaseScheduler,
DiscreteEventScheduler,
RandomActivation,
RandomActivationByType,
SimultaneousActivation,
Expand Down Expand Up @@ -274,5 +275,114 @@ def test_add_non_unique_ids(self):
model.schedule.add(b)


class TestDiscreteEventScheduler(TestCase):
def setUp(self):
self.model = MockModel()
self.scheduler = DiscreteEventScheduler(self.model, time_step=1)
self.model.schedule = self.scheduler
self.agent1 = MockAgent(1, self.model)
self.agent2 = MockAgent(2, self.model)
self.model.schedule.add(self.agent1, schedule_now=False)
self.model.schedule.add(self.agent2, schedule_now=False)

# Testing Initialization and Attributes
def test_initialization(self):
self.assertIsInstance(self.scheduler.event_queue, list)
self.assertEqual(self.scheduler.time_step, 1)

# Testing Event Scheduling
def test_schedule_event(self):
self.scheduler.schedule_event(5, self.agent1)
self.assertEqual(len(self.scheduler.event_queue), 1)
event_time, _, event_agent = self.scheduler.event_queue[0]
self.assertEqual(event_time, 5)
self.assertEqual(event_agent, self.agent1)

def test_schedule_event_with_float_time(self):
self.scheduler.schedule_event(5.5, self.agent1)
self.assertEqual(len(self.scheduler.event_queue), 1)
event_time, _, event_agent = self.scheduler.event_queue[0]
self.assertEqual(event_time, 5.5)
self.assertEqual(event_agent, self.agent1)

def test_schedule_in(self):
self.scheduler.schedule_in(3, self.agent2)
_, _, event_agent = self.scheduler.event_queue[0]
self.assertEqual(event_agent, self.agent2)
self.assertEqual(self.scheduler.get_next_event_time(), self.scheduler.time + 3)

# Testing Event Execution and Time Advancement
def test_step_function(self):
self.scheduler.schedule_event(1, self.agent1)
self.scheduler.schedule_event(2, self.agent2)
self.scheduler.step()
self.assertEqual(self.scheduler.time, 1)
self.assertEqual(self.agent1.steps, 1)
self.assertEqual(self.agent2.steps, 0)

def test_time_advancement(self):
self.scheduler.schedule_event(5, self.agent1)
self.scheduler.step()
self.assertEqual(self.scheduler.time, 1)
self.scheduler.step()
self.assertEqual(self.scheduler.time, 2)

def test_no_events(self):
self.scheduler.step()
self.assertEqual(self.scheduler.time, 1)

# Testing Edge Cases and Error Handling
def test_invalid_event_time(self):
with self.assertRaises(ValueError):
self.scheduler.schedule_event(-1, self.agent1)

def test_immediate_event_execution(self):
# Current time of the scheduler
current_time = self.scheduler.time

# Schedule an event at the current time
self.scheduler.schedule_event(current_time, self.agent1)

# Step the scheduler and check if the event is executed immediately
self.scheduler.step()
self.assertEqual(self.agent1.steps, 1)

# The time should advance to the next time step after execution
self.assertEqual(self.scheduler.time, current_time + 1)

# Testing Utility Functions
def test_get_next_event_time(self):
self.scheduler.schedule_event(10, self.agent1)
self.assertEqual(self.scheduler.get_next_event_time(), 10)

# Test add() method with and without immediate scheduling
def test_add_with_immediate_scheduling(self):
# Add an agent with schedule_now set to True (default behavior)
new_agent = MockAgent(3, self.model)
self.scheduler.add(new_agent)

# Check if the agent's first event is scheduled immediately
self.assertEqual(len(self.scheduler.event_queue), 1)
event_time, _, event_agent = self.scheduler.event_queue[0]
self.assertEqual(event_time, self.scheduler.time)
self.assertEqual(event_agent, new_agent)

# Step the scheduler and check if the agent's step method is executed
self.scheduler.step()
self.assertEqual(new_agent.steps, 1)

def test_add_without_immediate_scheduling(self):
# Add an agent with schedule_now set to False
new_agent = MockAgent(4, self.model)
self.scheduler.add(new_agent, schedule_now=False)

# Check if the event queue is not updated
self.assertEqual(len(self.scheduler.event_queue), 0)

# Step the scheduler and verify that the agent's step method is not executed
self.scheduler.step()
self.assertEqual(new_agent.steps, 0)


if __name__ == "__main__":
unittest.main()