diff --git a/benchmarks/Flocking/flocking.py b/benchmarks/Flocking/flocking.py index 025bd71cd5f..fc19c6c3820 100644 --- a/benchmarks/Flocking/flocking.py +++ b/benchmarks/Flocking/flocking.py @@ -102,6 +102,7 @@ def __init__( cohere=0.03, separate=0.015, match=0.05, + simulator=None, ): """ Create a new Flockers model. @@ -118,18 +119,18 @@ def __init__( """ super().__init__(seed=seed) self.population = population - self.vision = vision - self.speed = speed - self.separation = separation + self.width = width + self.height = height + self.simulator = simulator + self.schedule = mesa.time.RandomActivation(self) - self.space = mesa.space.ContinuousSpace(width, height, True) - self.factors = {"cohere": cohere, "separate": separate, "match": match} - self.make_agents() + self.space = mesa.space.ContinuousSpace(self.width, self.height, True) + self.factors = { + "cohere": cohere, + "separate": separate, + "match": match, + } - def make_agents(self): - """ - Create self.population agents, with random positions and starting directions. - """ for i in range(self.population): x = self.random.random() * self.space.x_max y = self.random.random() * self.space.y_max @@ -138,10 +139,11 @@ def make_agents(self): boid = Boid( unique_id=i, model=self, - speed=self.speed, + pos=pos, + speed=speed, direction=direction, - vision=self.vision, - separation=self.separation, + vision=vision, + separation=separation, **self.factors, ) self.space.place_agent(boid, pos) diff --git a/benchmarks/Schelling/schelling.py b/benchmarks/Schelling/schelling.py index c7dd3bf1deb..b7702a84bed 100644 --- a/benchmarks/Schelling/schelling.py +++ b/benchmarks/Schelling/schelling.py @@ -49,6 +49,7 @@ def __init__( density=0.8, minority_pc=0.5, seed=None, + simulator=None, ): """ Create a new Schelling model. @@ -62,10 +63,8 @@ def __init__( seed: Seed for Reproducibility """ super().__init__(seed=seed) - self.height = height - self.width = width - self.density = density self.minority_pc = minority_pc + self.simulator = simulator self.schedule = RandomActivation(self) self.grid = OrthogonalMooreGrid( @@ -75,14 +74,12 @@ def __init__( random=self.random, ) - self.happy = 0 - # Set up agents # We use a grid iterator that returns # the coordinates of a cell as well as # its contents. (coord_iter) for cell in self.grid: - if self.random.random() < self.density: + if self.random.random() < density: agent_type = 1 if self.random.random() < self.minority_pc else 0 agent = SchellingAgent( self.next_id(), self, agent_type, radius, homophily diff --git a/benchmarks/WolfSheep/wolf_sheep.py b/benchmarks/WolfSheep/wolf_sheep.py index 5e1877dbed6..ecb78594272 100644 --- a/benchmarks/WolfSheep/wolf_sheep.py +++ b/benchmarks/WolfSheep/wolf_sheep.py @@ -13,7 +13,7 @@ from mesa import Model from mesa.experimental.cell_space import CellAgent, OrthogonalVonNeumannGrid -from mesa.time import RandomActivationByType +from mesa.experimental.devs import ABMSimulator class Animal(CellAgent): @@ -36,7 +36,6 @@ def spawn_offspring(self): self.energy_from_food, ) offspring.move_to(self.cell) - self.model.schedule.add(offspring) def feed(self): ... @@ -93,26 +92,41 @@ class GrassPatch(CellAgent): A patch of grass that grows at a fixed rate and it is eaten by sheep """ - def __init__(self, unique_id, model, fully_grown, countdown): + @property + def fully_grown(self): + return self._fully_grown + + @fully_grown.setter + def fully_grown(self, value: bool) -> None: + self._fully_grown = value + + if not value: + self.model.simulator.schedule_event_relative( + setattr, + self.grass_regrowth_time, + function_args=[self, "fully_grown", True], + ) + + def __init__(self, unique_id, model, fully_grown, countdown, grass_regrowth_time): """ + TODO:: fully grown can just be an int --> so one less param (i.e. countdown) + Creates a new patch of grass Args: fully_grown: (boolean) Whether the patch of grass is fully grown or not countdown: Time for the patch of grass to be fully grown again + grass_regrowth_time : time to fully regrow grass + countdown : Time for the patch of grass to be fully regrown if fully grown is False """ super().__init__(unique_id, model) - self.fully_grown = fully_grown - self.countdown = countdown + self._fully_grown = fully_grown + self.grass_regrowth_time = grass_regrowth_time - def step(self): if not self.fully_grown: - if self.countdown <= 0: - # Set as fully grown - self.fully_grown = True - self.countdown = self.model.grass_regrowth_time - else: - self.countdown -= 1 + self.model.simulator.schedule_event_relative( + setattr, countdown, function_args=[self, "fully_grown", True] + ) class WolfSheep(Model): @@ -124,6 +138,7 @@ class WolfSheep(Model): def __init__( self, + simulator, height, width, initial_sheep, @@ -139,27 +154,26 @@ def __init__( Create a new Wolf-Sheep model with the given parameters. Args: + simulator: ABMSimulator instance initial_sheep: Number of sheep to start with initial_wolves: Number of wolves to start with sheep_reproduce: Probability of each sheep reproducing each step wolf_reproduce: Probability of each wolf reproducing each step wolf_gain_from_food: Energy a wolf gains from eating a sheep - grass: Whether to have the sheep eat grass for energy grass_regrowth_time: How long it takes for a grass patch to regrow once it is eaten sheep_gain_from_food: Energy sheep gain from grass, if enabled. - moore: - seed + seed : the random seed """ super().__init__(seed=seed) # Set parameters self.height = height self.width = width + self.simulator = simulator + self.initial_sheep = initial_sheep self.initial_wolves = initial_wolves - self.grass_regrowth_time = grass_regrowth_time - self.schedule = RandomActivationByType(self) self.grid = OrthogonalVonNeumannGrid( [self.height, self.width], torus=False, @@ -175,10 +189,13 @@ def __init__( ) energy = self.random.randrange(2 * sheep_gain_from_food) sheep = Sheep( - self.next_id(), self, energy, sheep_reproduce, sheep_gain_from_food + self.next_id(), + self, + energy, + sheep_reproduce, + sheep_gain_from_food, ) sheep.move_to(self.grid[pos]) - self.schedule.add(sheep) # Create wolves for _ in range(self.initial_wolves): @@ -188,33 +205,50 @@ def __init__( ) energy = self.random.randrange(2 * wolf_gain_from_food) wolf = Wolf( - self.next_id(), self, energy, wolf_reproduce, wolf_gain_from_food + self.next_id(), + self, + energy, + wolf_reproduce, + wolf_gain_from_food, ) wolf.move_to(self.grid[pos]) - self.schedule.add(wolf) # Create grass patches possibly_fully_grown = [True, False] for cell in self.grid: fully_grown = self.random.choice(possibly_fully_grown) if fully_grown: - countdown = self.grass_regrowth_time + countdown = grass_regrowth_time else: - countdown = self.random.randrange(self.grass_regrowth_time) - patch = GrassPatch(self.next_id(), self, fully_grown, countdown) + countdown = self.random.randrange(grass_regrowth_time) + patch = GrassPatch( + self.next_id(), self, fully_grown, countdown, grass_regrowth_time + ) patch.move_to(cell) - self.schedule.add(patch) def step(self): - self.schedule.step() + self.get_agents_of_type(Sheep).shuffle(inplace=True).do("step") + self.get_agents_of_type(Wolf).shuffle(inplace=True).do("step") if __name__ == "__main__": import time - model = WolfSheep(25, 25, 60, 40, 0.2, 0.1, 20, seed=15) + simulator = ABMSimulator() + model = WolfSheep( + simulator, + 25, + 25, + 60, + 40, + 0.2, + 0.1, + 20, + seed=15, + ) + + simulator.setup(model) start_time = time.perf_counter() - for _ in range(100): - model.step() + simulator.run(100) print("Time:", time.perf_counter() - start_time) diff --git a/benchmarks/global_benchmark.py b/benchmarks/global_benchmark.py index 677d352b5c7..539e1ca4bbe 100644 --- a/benchmarks/global_benchmark.py +++ b/benchmarks/global_benchmark.py @@ -7,6 +7,8 @@ from configurations import configurations +from mesa.experimental.devs.simulator import ABMSimulator + # making sure we use this version of mesa and not one # also installed in site_packages or so. sys.path.insert(0, os.path.abspath("..")) @@ -15,13 +17,16 @@ # Generic function to initialize and run a model def run_model(model_class, seed, parameters): start_init = timeit.default_timer() - model = model_class(seed=seed, **parameters) - # time.sleep(0.001) + simulator = ABMSimulator() + model = model_class(simulator=simulator, seed=seed, **parameters) + simulator.setup(model) end_init_start_run = timeit.default_timer() - for _ in range(config["steps"]): - model.step() + simulator.run_for(config["steps"]) + + # for _ in range(config["steps"]): + # model.step() # time.sleep(0.0001) end_run = timeit.default_timer() diff --git a/mesa/experimental/devs/__init__.py b/mesa/experimental/devs/__init__.py new file mode 100644 index 00000000000..b6dca39e29c --- /dev/null +++ b/mesa/experimental/devs/__init__.py @@ -0,0 +1,4 @@ +from .eventlist import Priority, SimulationEvent +from .simulator import ABMSimulator, DEVSimulator + +__all__ = ["ABMSimulator", "DEVSimulator", "SimulationEvent", "Priority"] diff --git a/mesa/experimental/devs/eventlist.py b/mesa/experimental/devs/eventlist.py new file mode 100644 index 00000000000..48af72a4315 --- /dev/null +++ b/mesa/experimental/devs/eventlist.py @@ -0,0 +1,166 @@ +from __future__ import annotations + +import itertools +from enum import IntEnum +from heapq import heapify, heappop, heappush +from types import MethodType +from typing import Any, Callable +from weakref import WeakMethod, ref + + +class Priority(IntEnum): + LOW = 10 + DEFAULT = 5 + HIGH = 1 + + +class SimulationEvent: + """A simulation event + + the callable is wrapped using weakref, so there is no need to explicitly cancel event if e.g., an agent + is removed from the simulation. + + Attributes: + time (float): The simulation time of the event + fn (Callable): The function to execute for this event + priority (Priority): The priority of the event + unique_id (int) the unique identifier of the event + function_args (list[Any]): Argument for the function + function_kwargs (Dict[str, Any]): Keyword arguments for the function + + """ + + _ids = itertools.count() + + @property + def CANCELED(self) -> bool: + return self._canceled + + def __init__( + self, + time: int | float, + function: Callable, + priority: Priority = Priority.DEFAULT, + function_args: list[Any] | None = None, + function_kwargs: dict[str, Any] | None = None, + ) -> None: + super().__init__() + if not callable(function): + raise Exception() + + self.time = time + self.priority = priority.value + self._canceled = False + + if isinstance(function, MethodType): + function = WeakMethod(function) + else: + function = ref(function) + + self.fn = function + self.unique_id = next(self._ids) + self.function_args = function_args if function_args else [] + self.function_kwargs = function_kwargs if function_kwargs else {} + + def execute(self): + """execute this event""" + if not self._canceled: + fn = self.fn() + if fn is not None: + fn(*self.function_args, **self.function_kwargs) + + def cancel(self) -> None: + """cancel this event""" + self._canceled = True + self.fn = None + self.function_args = [] + self.function_kwargs = {} + + def __lt__(self, other): + # Define a total ordering for events to be used by the heapq + return (self.time, self.priority, self.unique_id) < ( + other.time, + other.priority, + other.unique_id, + ) + + +class EventList: + """An event list + + This is a heap queue sorted list of events. Events are always removed from the left, so heapq is a performant and + appropriate data structure. Events are sorted based on their time stamp, their priority, and their unique_id + as a tie-breaker, guaranteeing a complete ordering. + + """ + + def __init__(self): + self._events: list[SimulationEvent] = [] + heapify(self._events) + + def add_event(self, event: SimulationEvent): + """Add the event to the event list + + Args: + event (SimulationEvent): The event to be added + + """ + + heappush(self._events, event) + + def peak_ahead(self, n: int = 1) -> list[SimulationEvent]: + """Look at the first n non-canceled event in the event list + + Args: + n (int): The number of events to look ahead + + Returns: + list[SimulationEvent] + + Raises: + IndexError: If the eventlist is empty + + Notes: + this method can return a list shorted then n if the number of non-canceled events on the event list + is less than n. + + """ + # look n events ahead + if self.is_empty(): + raise IndexError("event list is empty") + + peek: list[SimulationEvent] = [] + for event in self._events: + if not event.CANCELED: + peek.append(event) + if len(peek) >= n: + return peek + return peek + + def pop_event(self) -> SimulationEvent: + """pop the first element from the event list""" + while self._events: + event = heappop(self._events) + if not event.CANCELED: + return event + raise IndexError("Event list is empty") + + def is_empty(self) -> bool: + return len(self) == 0 + + def __contains__(self, event: SimulationEvent) -> bool: + return event in self._events + + def __len__(self) -> int: + return len(self._events) + + def remove(self, event: SimulationEvent) -> None: + """remove an event from the event list""" + # we cannot simply remove items from _eventlist because this breaks + # heap structure invariant. So, we use a form of lazy deletion. + # SimEvents have a CANCELED flag that we set to True, while popping and peak_ahead + # silently ignore canceled events + event.cancel() + + def clear(self): + self._events.clear() diff --git a/mesa/experimental/devs/examples/epstein_civil_violence.py b/mesa/experimental/devs/examples/epstein_civil_violence.py new file mode 100644 index 00000000000..9a7314e0ff3 --- /dev/null +++ b/mesa/experimental/devs/examples/epstein_civil_violence.py @@ -0,0 +1,273 @@ +import enum +import math + +from mesa import Agent, Model +from mesa.experimental.devs.simulator import ABMSimulator +from mesa.space import SingleGrid + + +class EpsteinAgent(Agent): + def __init__(self, unique_id, model, vision, movement): + super().__init__(unique_id, model) + self.vision = vision + self.movement = movement + + +class AgentState(enum.IntEnum): + QUIESCENT = enum.auto() + ARRESTED = enum.auto() + ACTIVE = enum.auto() + + +class Citizen(EpsteinAgent): + """ + A member of the general population, may or may not be in active rebellion. + Summary of rule: If grievance - risk > threshold, rebel. + + Attributes: + unique_id: unique int + model : + hardship: Agent's 'perceived hardship (i.e., physical or economic + privation).' Exogenous, drawn from U(0,1). + regime_legitimacy: Agent's perception of regime legitimacy, equal + across agents. Exogenous. + risk_aversion: Exogenous, drawn from U(0,1). + threshold: if (grievance - (risk_aversion * arrest_probability)) > + threshold, go/remain Active + vision: number of cells in each direction (N, S, E and W) that agent + can inspect + condition: Can be "Quiescent" or "Active;" deterministic function of + greivance, perceived risk, and + grievance: deterministic function of hardship and regime_legitimacy; + how aggrieved is agent at the regime? + arrest_probability: agent's assessment of arrest probability, given + rebellion + """ + + def __init__( + self, + unique_id, + model, + vision, + movement, + hardship, + regime_legitimacy, + risk_aversion, + threshold, + arrest_prob_constant, + ): + """ + Create a new Citizen. + Args: + unique_id: unique int + model : model instance + hardship: Agent's 'perceived hardship (i.e., physical or economic + privation).' Exogenous, drawn from U(0,1). + regime_legitimacy: Agent's perception of regime legitimacy, equal + across agents. Exogenous. + risk_aversion: Exogenous, drawn from U(0,1). + threshold: if (grievance - (risk_aversion * arrest_probability)) > + threshold, go/remain Active + vision: number of cells in each direction (N, S, E and W) that + agent can inspect. Exogenous. + """ + super().__init__(unique_id, model, vision, movement) + self.hardship = hardship + self.regime_legitimacy = regime_legitimacy + self.risk_aversion = risk_aversion + self.threshold = threshold + self.condition = AgentState.QUIESCENT + self.grievance = self.hardship * (1 - self.regime_legitimacy) + self.arrest_probability = None + self.arrest_prob_constant = arrest_prob_constant + + def step(self): + """ + Decide whether to activate, then move if applicable. + """ + self.update_neighbors() + self.update_estimated_arrest_probability() + net_risk = self.risk_aversion * self.arrest_probability + if self.grievance - net_risk > self.threshold: + self.condition = AgentState.ACTIVE + else: + self.condition = AgentState.QUIESCENT + if self.movement and self.empty_neighbors: + new_pos = self.random.choice(self.empty_neighbors) + self.model.grid.move_agent(self, new_pos) + + def update_neighbors(self): + """ + Look around and see who my neighbors are + """ + self.neighborhood = self.model.grid.get_neighborhood( + self.pos, moore=True, radius=self.vision + ) + self.neighbors = self.model.grid.get_cell_list_contents(self.neighborhood) + self.empty_neighbors = [ + c for c in self.neighborhood if self.model.grid.is_cell_empty(c) + ] + + def update_estimated_arrest_probability(self): + """ + Based on the ratio of cops to actives in my neighborhood, estimate the + p(Arrest | I go active). + """ + cops_in_vision = len([c for c in self.neighbors if isinstance(c, Cop)]) + actives_in_vision = 1.0 # citizen counts herself + for c in self.neighbors: + if isinstance(c, Citizen) and c.condition == AgentState.ACTIVE: + actives_in_vision += 1 + self.arrest_probability = 1 - math.exp( + -1 * self.arrest_prob_constant * (cops_in_vision / actives_in_vision) + ) + + def sent_to_jail(self, value): + self.model.active_agents.remove(self) + self.condition = AgentState.ARRESTED + self.model.simulator.schedule_event_relative(self.release_from_jail, value) + + def release_from_jail(self): + self.model.active_agents.add(self) + self.condition = AgentState.QUIESCENT + + +class Cop(EpsteinAgent): + """ + A cop for life. No defection. + Summary of rule: Inspect local vision and arrest a random active agent. + + Attributes: + unique_id: unique int + x, y: Grid coordinates + vision: number of cells in each direction (N, S, E and W) that cop is + able to inspect + """ + + def __init__(self, unique_id, model, vision, movement, max_jail_term): + super().__init__(unique_id, model, vision, movement) + self.max_jail_term = max_jail_term + + def step(self): + """ + Inspect local vision and arrest a random active agent. Move if + applicable. + """ + self.update_neighbors() + active_neighbors = [] + for agent in self.neighbors: + if isinstance(agent, Citizen) and agent.condition == "Active": + active_neighbors.append(agent) + if active_neighbors: + arrestee = self.random.choice(active_neighbors) + arrestee.sent_to_jail(self.random.randint(0, self.max_jail_term)) + if self.movement and self.empty_neighbors: + new_pos = self.random.choice(self.empty_neighbors) + self.model.grid.move_agent(self, new_pos) + + def update_neighbors(self): + """ + Look around and see who my neighbors are. + """ + self.neighborhood = self.model.grid.get_neighborhood( + self.pos, moore=True, radius=self.vision + ) + self.neighbors = self.model.grid.get_cell_list_contents(self.neighborhood) + self.empty_neighbors = [ + c for c in self.neighborhood if self.model.grid.is_cell_empty(c) + ] + + +class EpsteinCivilViolence(Model): + """ + Model 1 from "Modeling civil violence: An agent-based computational + approach," by Joshua Epstein. + http://www.pnas.org/content/99/suppl_3/7243.full + Attributes: + height: grid height + width: grid width + citizen_density: approximate % of cells occupied by citizens. + cop_density: approximate % of cells occupied by cops. + citizen_vision: number of cells in each direction (N, S, E and W) that + citizen can inspect + cop_vision: number of cells in each direction (N, S, E and W) that cop + can inspect + legitimacy: (L) citizens' perception of regime legitimacy, equal + across all citizens + max_jail_term: (J_max) + active_threshold: if (grievance - (risk_aversion * arrest_probability)) + > threshold, citizen rebels + arrest_prob_constant: set to ensure agents make plausible arrest + probability estimates + movement: binary, whether agents try to move at step end + max_iters: model may not have a natural stopping point, so we set a + max. + """ + + def __init__( + self, + width=40, + height=40, + citizen_density=0.7, + cop_density=0.074, + citizen_vision=7, + cop_vision=7, + legitimacy=0.8, + max_jail_term=1000, + active_threshold=0.1, + arrest_prob_constant=2.3, + movement=True, + max_iters=1000, + seed=None, + ): + super().__init__(seed) + if cop_density + citizen_density > 1: + raise ValueError("Cop density + citizen density must be less than 1") + + self.width = width + self.height = height + self.citizen_density = citizen_density + self.cop_density = cop_density + + self.max_iters = max_iters + + self.grid = SingleGrid(self.width, self.height, torus=True) + + for _, pos in self.grid.coord_iter(): + if self.random.random() < self.cop_density: + agent = Cop( + self.next_id(), + self, + cop_vision, + movement, + max_jail_term, + ) + elif self.random.random() < (self.cop_density + self.citizen_density): + agent = Citizen( + self.next_id(), + self, + citizen_vision, + movement, + hardship=self.random.random(), + regime_legitimacy=legitimacy, + risk_aversion=self.random.random(), + threshold=active_threshold, + arrest_prob_constant=arrest_prob_constant, + ) + else: + continue + self.grid.place_agent(agent, pos) + + self.active_agents = self.agents + + def step(self): + self.active_agents.shuffle(inplace=True).do("step") + + +if __name__ == "__main__": + model = EpsteinCivilViolence(seed=15) + simulator = ABMSimulator() + + simulator.setup(model) + + simulator.run(time_delta=100) diff --git a/mesa/experimental/devs/examples/wolf_sheep.py b/mesa/experimental/devs/examples/wolf_sheep.py new file mode 100644 index 00000000000..9fa6b3d96c7 --- /dev/null +++ b/mesa/experimental/devs/examples/wolf_sheep.py @@ -0,0 +1,250 @@ +""" +Wolf-Sheep Predation Model +================================ + +Replication of the model found in NetLogo: + Wilensky, U. (1997). NetLogo Wolf Sheep Predation model. + http://ccl.northwestern.edu/netlogo/models/WolfSheepPredation. + Center for Connected Learning and Computer-Based Modeling, + Northwestern University, Evanston, IL. +""" + +import mesa +from mesa.experimental.devs.simulator import ABMSimulator + + +class Animal(mesa.Agent): + def __init__(self, unique_id, model, moore, energy, p_reproduce, energy_from_food): + super().__init__(unique_id, model) + self.energy = energy + self.p_reproduce = p_reproduce + self.energy_from_food = energy_from_food + self.moore = moore + + def random_move(self): + next_moves = self.model.grid.get_neighborhood(self.pos, self.moore, True) + next_move = self.random.choice(next_moves) + # Now move: + self.model.grid.move_agent(self, next_move) + + def spawn_offspring(self): + self.energy /= 2 + offspring = self.__class__( + self.model.next_id(), + self.model, + self.moore, + self.energy, + self.p_reproduce, + self.energy_from_food, + ) + self.model.grid.place_agent(offspring, self.pos) + + def feed(self): ... + + def die(self): + self.model.grid.remove_agent(self) + self.remove() + + def step(self): + self.random_move() + self.energy -= 1 + + self.feed() + + if self.energy < 0: + self.die() + elif self.random.random() < self.p_reproduce: + self.spawn_offspring() + + +class Sheep(Animal): + """ + A sheep that walks around, reproduces (asexually) and gets eaten. + + The init is the same as the RandomWalker. + """ + + def feed(self): + # If there is grass available, eat it + agents = self.model.grid.get_cell_list_contents(self.pos) + grass_patch = next(obj for obj in agents if isinstance(obj, GrassPatch)) + if grass_patch.fully_grown: + self.energy += self.energy_from_food + grass_patch.fully_grown = False + + +class Wolf(Animal): + """ + A wolf that walks around, reproduces (asexually) and eats sheep. + """ + + def feed(self): + agents = self.model.grid.get_cell_list_contents(self.pos) + sheep = [obj for obj in agents if isinstance(obj, Sheep)] + if len(sheep) > 0: + sheep_to_eat = self.random.choice(sheep) + self.energy += self.energy + + # Kill the sheep + sheep_to_eat.die() + + +class GrassPatch(mesa.Agent): + """ + A patch of grass that grows at a fixed rate and it is eaten by sheep + """ + + @property + def fully_grown(self) -> bool: + return self._fully_grown + + @fully_grown.setter + def fully_grown(self, value: bool): + self._fully_grown = value + + if not value: + self.model.simulator.schedule_event_relative( + setattr, + self.grass_regrowth_time, + function_args=[self, "fully_grown", True], + ) + + def __init__(self, unique_id, model, fully_grown, countdown, grass_regrowth_time): + """ + Creates a new patch of grass + + Args: + grown: (boolean) Whether the patch of grass is fully grown or not + countdown: Time for the patch of grass to be fully grown again + """ + super().__init__(unique_id, model) + self._fully_grown = fully_grown + self.grass_regrowth_time = grass_regrowth_time + + if not self.fully_grown: + self.model.simulator.schedule_event_relative( + setattr, countdown, function_args=[self, "fully_grown", True] + ) + + def set_fully_grown(self): + self.fully_grown = True + + +class WolfSheep(mesa.Model): + """ + Wolf-Sheep Predation Model + + A model for simulating wolf and sheep (predator-prey) ecosystem modelling. + """ + + def __init__( + self, + height, + width, + initial_sheep, + initial_wolves, + sheep_reproduce, + wolf_reproduce, + grass_regrowth_time, + wolf_gain_from_food=13, + sheep_gain_from_food=5, + moore=False, + simulator=None, + seed=None, + ): + """ + Create a new Wolf-Sheep model with the given parameters. + + Args: + initial_sheep: Number of sheep to start with + initial_wolves: Number of wolves to start with + sheep_reproduce: Probability of each sheep reproducing each step + wolf_reproduce: Probability of each wolf reproducing each step + wolf_gain_from_food: Energy a wolf gains from eating a sheep + grass: Whether to have the sheep eat grass for energy + grass_regrowth_time: How long it takes for a grass patch to regrow + once it is eaten + sheep_gain_from_food: Energy sheep gain from grass, if enabled. + moore: + """ + super().__init__(seed=seed) + # Set parameters + self.height = height + self.width = width + self.initial_sheep = initial_sheep + self.initial_wolves = initial_wolves + self.simulator = simulator + + # self.sheep_reproduce = sheep_reproduce + # self.wolf_reproduce = wolf_reproduce + # self.grass_regrowth_time = grass_regrowth_time + # self.wolf_gain_from_food = wolf_gain_from_food + # self.sheep_gain_from_food = sheep_gain_from_food + # self.moore = moore + + self.grid = mesa.space.MultiGrid(self.height, self.width, torus=False) + + for _ in range(self.initial_sheep): + pos = ( + self.random.randrange(self.width), + self.random.randrange(self.height), + ) + energy = self.random.randrange(2 * sheep_gain_from_food) + sheep = Sheep( + self.next_id(), + self, + moore, + energy, + sheep_reproduce, + sheep_gain_from_food, + ) + self.grid.place_agent(sheep, pos) + + # Create wolves + for _ in range(self.initial_wolves): + pos = ( + self.random.randrange(self.width), + self.random.randrange(self.height), + ) + energy = self.random.randrange(2 * wolf_gain_from_food) + wolf = Wolf( + self.next_id(), + self, + moore, + energy, + wolf_reproduce, + wolf_gain_from_food, + ) + self.grid.place_agent(wolf, pos) + + # Create grass patches + possibly_fully_grown = [True, False] + for _agent, pos in self.grid.coord_iter(): + fully_grown = self.random.choice(possibly_fully_grown) + if fully_grown: + countdown = grass_regrowth_time + else: + countdown = self.random.randrange(grass_regrowth_time) + patch = GrassPatch( + self.next_id(), self, fully_grown, countdown, grass_regrowth_time + ) + self.grid.place_agent(patch, pos) + + def step(self): + self.get_agents_of_type(Sheep).shuffle(inplace=True).do("step") + self.get_agents_of_type(Wolf).shuffle(inplace=True).do("step") + + +if __name__ == "__main__": + import time + + simulator = ABMSimulator() + + model = WolfSheep(25, 25, 60, 40, 0.2, 0.1, 20, simulator=simulator, seed=15) + + simulator.setup(model) + + start_time = time.perf_counter() + simulator.run(100) + print(simulator.time) + print("Time:", time.perf_counter() - start_time) diff --git a/mesa/experimental/devs/simulator.py b/mesa/experimental/devs/simulator.py new file mode 100644 index 00000000000..11c33f4e074 --- /dev/null +++ b/mesa/experimental/devs/simulator.py @@ -0,0 +1,293 @@ +from __future__ import annotations + +import numbers +from typing import Any, Callable + +from mesa import Model + +from .eventlist import EventList, Priority, SimulationEvent + + +class Simulator: + """The Simulator controls the time advancement of the model. + + The simulator uses next event time progression to advance the simulation time, and execute the next event + + Attributes: + event_list (EventList): The list of events to execute + time (float | int): The current simulation time + time_unit (type) : The unit of the simulation time + model (Model): The model to simulate + + + """ + + # TODO: add replication support + # TODO: add experimentation support + + def __init__(self, time_unit: type, start_time: int | float): + # should model run in a separate thread, + # and we can then interact with start, stop, run_until, and step? + self.event_list = EventList() + self.start_time = start_time + self.time_unit = time_unit + + self.time = self.start_time + self.model = None + + def check_time_unit(self, time: int | float) -> bool: ... + + def setup(self, model: Model) -> None: + """Set up the simulator with the model to simulate + + Args: + model (Model): The model to simulate + + """ + self.event_list.clear() + self.model = model + + def reset(self): + """Reset the simulator by clearing the event list and removing the model to simulate""" + self.event_list.clear() + self.model = None + self.time = self.start_time + + def run_until(self, end_time: int | float) -> None: + while True: + try: + event = self.event_list.pop_event() + except IndexError: # event list is empty + self.time = end_time + break + + if event.time <= end_time: + self.time = event.time + event.execute() + else: + self.time = end_time + self._schedule_event(event) # reschedule event + break + + def run_for(self, time_delta: int | float): + """run the simulator for the specified time delta + + Args: + time_delta (float| int): The time delta. The simulator is run from the current time to the current time + plus the time delta + + """ + end_time = self.time + time_delta + self.run_until(end_time) + + def schedule_event_now( + self, + function: Callable, + priority: Priority = Priority.DEFAULT, + function_args: list[Any] | None = None, + function_kwargs: dict[str, Any] | None = None, + ) -> SimulationEvent: + """Schedule event for the current time instant + + Args: + function (Callable): The callable to execute for this event + priority (Priority): the priority of the event, optional + function_args (List[Any]): list of arguments for function + function_kwargs (Dict[str, Any]): dict of keyword arguments for function + + Returns: + SimulationEvent: the simulation event that is scheduled + + """ + return self.schedule_event_relative( + function, + 0.0, + priority=priority, + function_args=function_args, + function_kwargs=function_kwargs, + ) + + def schedule_event_absolute( + self, + function: Callable, + time: int | float, + priority: Priority = Priority.DEFAULT, + function_args: list[Any] | None = None, + function_kwargs: dict[str, Any] | None = None, + ) -> SimulationEvent: + """Schedule event for the specified time instant + + Args: + function (Callable): The callable to execute for this event + time (int | float): the time for which to schedule the event + priority (Priority): the priority of the event, optional + function_args (List[Any]): list of arguments for function + function_kwargs (Dict[str, Any]): dict of keyword arguments for function + + Returns: + SimulationEvent: the simulation event that is scheduled + + """ + if self.time > time: + raise ValueError("trying to schedule an event in the past") + + event = SimulationEvent( + time, + function, + priority=priority, + function_args=function_args, + function_kwargs=function_kwargs, + ) + self._schedule_event(event) + return event + + def schedule_event_relative( + self, + function: Callable, + time_delta: int | float, + priority: Priority = Priority.DEFAULT, + function_args: list[Any] | None = None, + function_kwargs: dict[str, Any] | None = None, + ) -> SimulationEvent: + """Schedule event for the current time plus the time delta + + Args: + function (Callable): The callable to execute for this event + time_delta (int | float): the time delta + priority (Priority): the priority of the event, optional + function_args (List[Any]): list of arguments for function + function_kwargs (Dict[str, Any]): dict of keyword arguments for function + + Returns: + SimulationEvent: the simulation event that is scheduled + + """ + event = SimulationEvent( + self.time + time_delta, + function, + priority=priority, + function_args=function_args, + function_kwargs=function_kwargs, + ) + self._schedule_event(event) + return event + + def cancel_event(self, event: SimulationEvent) -> None: + """remove the event from the event list + + Args: + event (SimulationEvent): The simulation event to remove + + """ + + self.event_list.remove(event) + + def _schedule_event(self, event: SimulationEvent): + if not self.check_time_unit(event.time): + raise ValueError( + f"time unit mismatch {event.time} is not of time unit {self.time_unit}" + ) + + # check timeunit of events + self.event_list.add_event(event) + + +class ABMSimulator(Simulator): + """This simulator uses incremental time progression, while allowing for additional event scheduling. + + The basic time unit of this simulator is an integer. It schedules `model.step` for each tick with the + highest priority. This implies that by default, `model.step` is the first event executed at a specific tick. + In addition, discrete event scheduling, using integer as the time unit is fully supported, paving the way + for hybrid ABM-DEVS simulations. + + """ + + def __init__(self): + super().__init__(int, 0) + + def setup(self, model): + super().setup(model) + self.schedule_event_now(self.model.step, priority=Priority.HIGH) + + def check_time_unit(self, time) -> bool: + if isinstance(time, int): + return True + if isinstance(time, float): + return time.is_integer() + else: + return False + + def schedule_event_next_tick( + self, + function: Callable, + priority: Priority = Priority.DEFAULT, + function_args: list[Any] | None = None, + function_kwargs: dict[str, Any] | None = None, + ) -> SimulationEvent: + """Schedule a SimulationEvent for the next tick + + Args + function (Callable): the callable to execute + priority (Priority): the priority of the event + function_args (List[Any]): List of arguments to pass to the callable + function_kwargs (Dict[str, Any]): List of keyword arguments to pass to the callable + + """ + return self.schedule_event_relative( + function, + 1, + priority=priority, + function_args=function_args, + function_kwargs=function_kwargs, + ) + + def run_until(self, end_time: int) -> None: + """run the simulator up to and included the specified end time + + Args: + end_time (float| int): The end_time delta. The simulator is until the specified end time + + """ + while True: + try: + event = self.event_list.pop_event() + except IndexError: + self.time = end_time + break + + if event.time <= end_time: + self.time = event.time + if event.fn() == self.model.step: + self.schedule_event_next_tick( + self.model.step, priority=Priority.HIGH + ) + + event.execute() + else: + self.time = end_time + self._schedule_event(event) + break + + def run_for(self, time_delta: int): + """run the simulator for the specified time delta + + Args: + time_delta (float| int): The time delta. The simulator is run from the current time to the current time + plus the time delta + + """ + end_time = self.time + time_delta - 1 + self.run_until(end_time) + + +class DEVSimulator(Simulator): + """A simulator where the unit of time is a float. Can be used for full-blown discrete event simulating using + event scheduling. + + """ + + def __init__(self): + super().__init__(float, 0.0) + + def check_time_unit(self, time) -> bool: + return isinstance(time, numbers.Number) diff --git a/mesa/time.py b/mesa/time.py index 50d564be551..10fa4005ac2 100644 --- a/mesa/time.py +++ b/mesa/time.py @@ -25,9 +25,7 @@ # Remove this __future__ import once the oldest supported Python is 3.10 from __future__ import annotations -import heapq import warnings -import weakref from collections import defaultdict from collections.abc import Iterable @@ -393,46 +391,7 @@ def get_type_count(self, agenttype: type[Agent]) -> int: class DiscreteEventScheduler(BaseScheduler): """ - A scheduler for discrete event simulation in Mesa. - - This scheduler manages events where each event is associated with a - specific time and agent. The scheduler advances time not in fixed - increments, but to the moment the next event is scheduled to occur. - - This implementation uses a priority queue (heapq) to manage events. Each - event is a tuple of the form (time, random_value, agent), where: - - time (float): The scheduled time for the event. - - random_value (float): A secondary sorting criterion to randomize - the order of events that are scheduled for the same time. - - agent (Agent): The agent associated with the event. - - The random value for secondary sorting ensures that when two events are - scheduled for the same time, their execution order is randomized, thus - preventing direct comparison issues between different types of agents and - maintaining the integrity of the simulation's randomness. - - Attributes: - model (Model): The model instance associated with the scheduler. - event_queue (list): A priority queue of scheduled events. - time_step (int or float): The fixed time period by which the model advances - on each step. Defaults to 1. - - Methods: - schedule_event(time, agent): Schedule an event for a specific time. - schedule_in(delay, agent): Schedule an event after a specified delay. - step(): Execute all events within the next time_step period. - get_next_event_time(): Returns the time of the next scheduled event. - - Usage: - 1. Instantiate the DiscreteEventScheduler with a model instance and a time_step period. - 2. Add agents to the scheduler using schedule.add(). With schedule_now=True (default), - the first event for the agent will be scheduled immediately. - 3. In the Agent step() method, schedule the next event for the agent - (using schedule_in or schedule_event). - 3. Add self.schedule.step() to the model's step() method, as usual. - - Now, with each model step, the scheduler will execute all events within the - next time_step period, and advance time one time_step forward. + This class has been deprecated and replaced by the functionality provided by experimental.devs """ def __init__(self, model: Model, time_step: TimeT = 1) -> None: @@ -444,72 +403,6 @@ def __init__(self, model: Model, time_step: TimeT = 1) -> None: """ super().__init__(model) - self.event_queue: list[tuple[TimeT, float, weakref.ref]] = [] - self.time_step: TimeT = time_step # Fixed time period for each step - - warnings.warn( - "The DiscreteEventScheduler is experimental. It may be changed or removed in any and all future releases, including patch releases.\n" - "We would love to hear what you think about this new feature. If you have any thoughts, share them with us here: https://github.com/projectmesa/mesa/discussions/1923", - FutureWarning, - stacklevel=2, + raise Exception( + "DiscreteEventScheduler is deprecated in favor of the functionality provided by experimental.devs" ) - - def schedule_event(self, time: TimeT, agent: Agent) -> None: - """Schedule an event for an agent at a specific time.""" - if time < self.time: - raise ValueError( - f"Scheduled time ({time}) must be >= the current time ({self.time})" - ) - if agent not in self._agents: - raise ValueError( - "trying to schedule an event for agent which is not known to the scheduler" - ) - - # Create an event, sorted first on time, secondary on a random value - event = (time, self.model.random.random(), weakref.ref(agent)) - heapq.heappush(self.event_queue, event) - - def schedule_in(self, delay: TimeT, agent: Agent) -> None: - """Schedule an event for an agent after a specified delay.""" - if delay < 0: - raise ValueError("Delay must be non-negative") - event_time = self.time + delay - self.schedule_event(event_time, agent) - - def step(self) -> None: - """Execute the next event and advance the time.""" - end_time = self.time + self.time_step - - while self.event_queue and self.event_queue[0][0] <= end_time: - # Get the next event (ignore the random value during unpacking) - time, _, agent = heapq.heappop(self.event_queue) - agent = agent() # unpack weakref - - if agent: - # Advance time to the event's time - self.time = time - # Execute the event - agent.step() - - # After processing events, advance time by the time_step - self.time = end_time - self.steps += 1 - - def get_next_event_time(self) -> TimeT | None: - """Returns the time of the next scheduled event.""" - if not self.event_queue: - return None - return self.event_queue[0][0] - - def add(self, agent: Agent, schedule_now: bool = True) -> None: - """Add an Agent object to the schedule and optionally schedule its first event. - - Args: - agent: An Agent to be added to the schedule. Must have a step() method. - schedule_now: If True, schedules the first event for the agent immediately. - """ - super().add(agent) # Call the add method from BaseScheduler - - if schedule_now: - # Schedule the first event immediately - self.schedule_event(self.time, agent) diff --git a/tests/test_devs.py b/tests/test_devs.py new file mode 100644 index 00000000000..06d3629ed16 --- /dev/null +++ b/tests/test_devs.py @@ -0,0 +1,282 @@ +from unittest.mock import MagicMock + +import pytest + +from mesa import Model +from mesa.experimental.devs.eventlist import EventList, Priority, SimulationEvent +from mesa.experimental.devs.simulator import ABMSimulator, DEVSimulator + + +def test_devs_simulator(): + simulator = DEVSimulator() + + # setup + model = MagicMock(spec=Model) + simulator.setup(model) + + assert len(simulator.event_list) == 0 + assert simulator.model == model + assert simulator.time == 0 + + # schedule_event_now + fn1 = MagicMock() + event1 = simulator.schedule_event_now(fn1) + assert event1 in simulator.event_list + assert len(simulator.event_list) == 1 + + # schedule_event_absolute + fn2 = MagicMock() + event2 = simulator.schedule_event_absolute(fn2, 1.0) + assert event2 in simulator.event_list + assert len(simulator.event_list) == 2 + + # schedule_event_relative + fn3 = MagicMock() + event3 = simulator.schedule_event_relative(fn3, 0.5) + assert event3 in simulator.event_list + assert len(simulator.event_list) == 3 + + # run_for + simulator.run_for(0.8) + fn1.assert_called_once() + fn3.assert_called_once() + assert simulator.time == 0.8 + + simulator.run_for(0.2) + fn2.assert_called_once() + assert simulator.time == 1.0 + + simulator.run_for(0.2) + assert simulator.time == 1.2 + + with pytest.raises(ValueError): + simulator.schedule_event_absolute(fn2, 0.5) + + # cancel_event + simulator = DEVSimulator() + model = MagicMock(spec=Model) + simulator.setup(model) + fn = MagicMock() + event = simulator.schedule_event_relative(fn, 0.5) + simulator.cancel_event(event) + assert event.CANCELED + + # simulator reset + simulator.reset() + assert len(simulator.event_list) == 0 + assert simulator.model is None + assert simulator.time == 0.0 + + +def test_abm_simulator(): + simulator = ABMSimulator() + + # setup + model = MagicMock(spec=Model) + simulator.setup(model) + + # schedule_event_next_tick + fn = MagicMock() + simulator.schedule_event_next_tick(fn) + assert len(simulator.event_list) == 2 + + simulator.run_for(3) + assert model.step.call_count == 3 + assert simulator.time == 2 + + +def test_simulation_event(): + some_test_function = MagicMock() + + time = 10 + event = SimulationEvent( + time, + some_test_function, + priority=Priority.DEFAULT, + function_args=[], + function_kwargs={}, + ) + + assert event.time == time + assert event.fn() is some_test_function + assert event.function_args == [] + assert event.function_kwargs == {} + assert event.priority == Priority.DEFAULT + + # execute + event.execute() + some_test_function.assert_called_once() + + with pytest.raises(Exception): + SimulationEvent( + time, None, priority=Priority.DEFAULT, function_args=[], function_kwargs={} + ) + + # check calling with arguments + some_test_function = MagicMock() + event = SimulationEvent( + time, + some_test_function, + priority=Priority.DEFAULT, + function_args=["1"], + function_kwargs={"x": 2}, + ) + event.execute() + some_test_function.assert_called_once_with("1", x=2) + + # check if we pass over deletion of callable silently because of weakrefs + def some_test_function(x, y): + return x + y + + event = SimulationEvent(time, some_test_function, priority=Priority.DEFAULT) + del some_test_function + event.execute() + + # cancel + some_test_function = MagicMock() + event = SimulationEvent( + time, + some_test_function, + priority=Priority.DEFAULT, + function_args=["1"], + function_kwargs={"x": 2}, + ) + event.cancel() + assert event.fn is None + assert event.function_args == [] + assert event.function_kwargs == {} + assert event.priority == Priority.DEFAULT + assert event.CANCELED + + # comparison for sorting + event1 = SimulationEvent( + 10, + some_test_function, + priority=Priority.DEFAULT, + function_args=[], + function_kwargs={}, + ) + event2 = SimulationEvent( + 10, + some_test_function, + priority=Priority.DEFAULT, + function_args=[], + function_kwargs={}, + ) + assert event1 < event2 # based on just unique_id as tiebraker + + event1 = SimulationEvent( + 11, + some_test_function, + priority=Priority.DEFAULT, + function_args=[], + function_kwargs={}, + ) + event2 = SimulationEvent( + 10, + some_test_function, + priority=Priority.DEFAULT, + function_args=[], + function_kwargs={}, + ) + assert event1 > event2 + + event1 = SimulationEvent( + 10, + some_test_function, + priority=Priority.DEFAULT, + function_args=[], + function_kwargs={}, + ) + event2 = SimulationEvent( + 10, + some_test_function, + priority=Priority.HIGH, + function_args=[], + function_kwargs={}, + ) + assert event1 > event2 + + +def test_eventlist(): + event_list = EventList() + + assert len(event_list._events) == 0 + assert isinstance(event_list._events, list) + assert event_list.is_empty() + + # add event + some_test_function = MagicMock() + event = SimulationEvent( + 1, + some_test_function, + priority=Priority.DEFAULT, + function_args=[], + function_kwargs={}, + ) + event_list.add_event(event) + assert len(event_list) == 1 + assert event in event_list + + # remove event + event_list.remove(event) + assert len(event_list) == 1 + assert event.CANCELED + + # peak ahead + event_list = EventList() + for i in range(10): + event = SimulationEvent( + i, + some_test_function, + priority=Priority.DEFAULT, + function_args=[], + function_kwargs={}, + ) + event_list.add_event(event) + events = event_list.peak_ahead(2) + assert len(events) == 2 + assert events[0].time == 0 + assert events[1].time == 1 + + events = event_list.peak_ahead(11) + assert len(events) == 10 + + event_list._events[6].cancel() + events = event_list.peak_ahead(10) + assert len(events) == 9 + + event_list = EventList() + with pytest.raises(Exception): + event_list.peak_ahead() + + # pop event + event_list = EventList() + for i in range(10): + event = SimulationEvent( + i, + some_test_function, + priority=Priority.DEFAULT, + function_args=[], + function_kwargs={}, + ) + event_list.add_event(event) + event = event_list.pop_event() + assert event.time == 0 + + event_list = EventList() + event = SimulationEvent( + 9, + some_test_function, + priority=Priority.DEFAULT, + function_args=[], + function_kwargs={}, + ) + event_list.add_event(event) + event.cancel() + with pytest.raises(Exception): + event_list.pop_event() + + # clear + event_list.clear() + assert len(event_list) == 0 diff --git a/tests/test_examples.py b/tests/test_examples.py index 1c149da4b75..e5c0381f065 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -51,12 +51,12 @@ def test_examples(self): print(f"testing example {example!r}") with self.active_example_dir(example): try: - # model.py at the top level + # epstein_civil_violence.py at the top level mod = importlib.import_module("model") server = importlib.import_module("server") server.server.render_model() except ImportError: - # /model.py + # /epstein_civil_violence.py mod = importlib.import_module(f"{example.replace('-', '_')}.model") server = importlib.import_module( f"{example.replace('-', '_')}.server" diff --git a/tests/test_time.py b/tests/test_time.py index 53967d1a8ef..a9d8e2e6055 100644 --- a/tests/test_time.py +++ b/tests/test_time.py @@ -8,7 +8,6 @@ from mesa import Agent, Model from mesa.time import ( BaseScheduler, - DiscreteEventScheduler, RandomActivation, RandomActivationByType, SimultaneousActivation, @@ -339,119 +338,5 @@ def test_random_activation_counts(self): # model.schedule.add(b) -class TestDiscreteEventScheduler(TestCase): - def setUp(self): - self.model = MockModel() - self.scheduler = DiscreteEventScheduler(self.model, time_step=1) - self.model.schedule = self.scheduler - self.agent1 = MockAgent(1, self.model) - self.agent2 = MockAgent(2, self.model) - self.model.schedule.add(self.agent1, schedule_now=False) - self.model.schedule.add(self.agent2, schedule_now=False) - - # Testing Initialization and Attributes - def test_initialization(self): - self.assertIsInstance(self.scheduler.event_queue, list) - self.assertEqual(self.scheduler.time_step, 1) - - # Testing Event Scheduling - def test_schedule_event(self): - self.scheduler.schedule_event(5, self.agent1) - self.assertEqual(len(self.scheduler.event_queue), 1) - event_time, _, event_agent = self.scheduler.event_queue[0] - self.assertEqual(event_time, 5) - self.assertEqual(event_agent(), self.agent1) - - def test_schedule_event_with_float_time(self): - self.scheduler.schedule_event(5.5, self.agent1) - self.assertEqual(len(self.scheduler.event_queue), 1) - event_time, _, event_agent = self.scheduler.event_queue[0] - self.assertEqual(event_time, 5.5) - self.assertEqual(event_agent(), self.agent1) - - def test_schedule_in(self): - self.scheduler.schedule_in(3, self.agent2) - _, _, event_agent = self.scheduler.event_queue[0] - self.assertEqual(event_agent(), self.agent2) - self.assertEqual(self.scheduler.get_next_event_time(), self.scheduler.time + 3) - - # Testing Event Execution and Time Advancement - def test_step_function(self): - self.scheduler.schedule_event(1, self.agent1) - self.scheduler.schedule_event(2, self.agent2) - self.scheduler.step() - self.assertEqual(self.scheduler.time, 1) - self.assertEqual(self.agent1.steps, 1) - self.assertEqual(self.agent2.steps, 0) - - def test_time_advancement(self): - self.scheduler.schedule_event(5, self.agent1) - self.scheduler.step() - self.assertEqual(self.scheduler.time, 1) - self.scheduler.step() - self.assertEqual(self.scheduler.time, 2) - - def test_no_events(self): - self.scheduler.step() - self.assertEqual(self.scheduler.time, 1) - - # Testing Edge Cases and Error Handling - def test_invalid_event_time(self): - with self.assertRaises(ValueError): - self.scheduler.schedule_event(-1, self.agent1) - - def test_invalid_aget_time(self): - with self.assertRaises(ValueError): - agent3 = MockAgent(3, self.model) - self.scheduler.schedule_event(2, agent3) - - def test_immediate_event_execution(self): - # Current time of the scheduler - current_time = self.scheduler.time - - # Schedule an event at the current time - self.scheduler.schedule_event(current_time, self.agent1) - - # Step the scheduler and check if the event is executed immediately - self.scheduler.step() - self.assertEqual(self.agent1.steps, 1) - - # The time should advance to the next time step after execution - self.assertEqual(self.scheduler.time, current_time + 1) - - # Testing Utility Functions - def test_get_next_event_time(self): - self.scheduler.schedule_event(10, self.agent1) - self.assertEqual(self.scheduler.get_next_event_time(), 10) - - # Test add() method with and without immediate scheduling - def test_add_with_immediate_scheduling(self): - # Add an agent with schedule_now set to True (default behavior) - new_agent = MockAgent(3, self.model) - self.scheduler.add(new_agent) - - # Check if the agent's first event is scheduled immediately - self.assertEqual(len(self.scheduler.event_queue), 1) - event_time, _, event_agent = self.scheduler.event_queue[0] - self.assertEqual(event_time, self.scheduler.time) - self.assertEqual(event_agent(), new_agent) - - # Step the scheduler and check if the agent's step method is executed - self.scheduler.step() - self.assertEqual(new_agent.steps, 1) - - def test_add_without_immediate_scheduling(self): - # Add an agent with schedule_now set to False - new_agent = MockAgent(4, self.model) - self.scheduler.add(new_agent, schedule_now=False) - - # Check if the event queue is not updated - self.assertEqual(len(self.scheduler.event_queue), 0) - - # Step the scheduler and verify that the agent's step method is not executed - self.scheduler.step() - self.assertEqual(new_agent.steps, 0) - - if __name__ == "__main__": unittest.main()