From 2e78940728f2b946abcdca571fd4e85343849a50 Mon Sep 17 00:00:00 2001 From: Corvince Date: Fri, 19 Jan 2024 14:32:56 +0100 Subject: [PATCH] Add draft implementation for CellSpace --- mesa/agent.py | 7 +- mesa/gridspace.py | 197 ++++++++++++++++++++++++++++++++++++++++++++++ mesa/schelling.py | 86 ++++++++++++++++++++ 3 files changed, 289 insertions(+), 1 deletion(-) create mode 100644 mesa/gridspace.py create mode 100644 mesa/schelling.py diff --git a/mesa/agent.py b/mesa/agent.py index 8688dc702a4..97c323cfcb5 100644 --- a/mesa/agent.py +++ b/mesa/agent.py @@ -22,6 +22,7 @@ if TYPE_CHECKING: # We ensure that these are not imported during runtime to prevent cyclic # dependency. + from mesa.gridspace import Cell from mesa.model import Model from mesa.space import Position @@ -46,7 +47,7 @@ def __init__(self, unique_id: int, model: Model) -> None: """ self.unique_id = unique_id self.model = model - self.pos: Position | None = None + self.cell: Cell | None = None # register agent try: @@ -62,6 +63,10 @@ def __init__(self, unique_id: int, model: Model) -> None: stacklevel=2, ) + @property + def pos(self): + return self.cell.coords + def remove(self) -> None: """Remove and delete the agent from the model.""" with contextlib.suppress(KeyError): diff --git a/mesa/gridspace.py b/mesa/gridspace.py new file mode 100644 index 00000000000..d4ce654b3e3 --- /dev/null +++ b/mesa/gridspace.py @@ -0,0 +1,197 @@ +import itertools +import random +from functools import cache +from typing import Any + +from mesa import Agent + +Coordinates = tuple[int, int] + + +def create_neighborhood_getter(moore=True, include_center=False, radius=1): + @cache + def of(cell: Cell): + if radius == 0: + return {cell: cell.content} + + neighborhood = {} + for neighbor in cell.connections: + if ( + moore + or neighbor.coords[0] == cell.coords[0] + or neighbor.coords[1] == cell.coords[1] + ): + neighborhood[neighbor] = neighbor.content + + if radius > 1: + for neighbor in list(neighborhood.keys()): + neighborhood.update( + create_neighborhood_getter(moore, include_center, radius - 1)( + neighbor + ) + ) + + if not include_center: + neighborhood.pop(cell, None) + + return CellCollection(neighborhood) + + return of + + +class Cell: + __slots__ = ["coords", "connections", "content"] + + def __init__(self, i: int, j: int) -> None: + self.coords = (i, j) + self.connections: list[Cell] = [] + self.content: list[Agent] = [] + + def connect(self, other) -> None: + """Connects this cell to another cell.""" + self.connections.append(other) + + def disconnect(self, other) -> None: + """Disconnects this cell from another cell.""" + self.connections.remove(other) + + def add_agent(self, agent: Agent) -> None: + """Adds an agent to the cell.""" + self.content.append(agent) + agent.cell = self + + def remove_agent(self, agent: Agent) -> None: + """Removes an agent from the cell.""" + if agent in self.content: + self.content.remove(agent) + + def __repr__(self): + return f"Cell({self.coords})" + + +class CellCollection: + def __init__(self, cells: dict[Cell, list[Agent]]) -> None: + self.cells = cells + + def __iter__(self): + return iter(self.cells) + + def __getitem__(self, key): + return self.cells[key] + + def __len__(self): + return len(self.cells) + + def __repr__(self): + return f"CellCollection({self.cells})" + + @property + def agents(self): + return itertools.chain.from_iterable(self.cells.values()) + + def select_random(self): + return random.choice(list(self.cells.keys())) + + +class Space: + cells: dict[Coordinates, Cell] + + def _connect_single_cell(self, cell): # <= different for every concrete Space + ... + + def __iter__(self): + return iter(self.cells.values()) + + def get_neighborhood(self, coords: Coordinates, neighborhood_getter: Any): + return neighborhood_getter(self.cells[coords]) + + def move_agent(self, agent: Agent, pos) -> None: + """Move an agent from its current position to a new position.""" + if (old_cell := agent.cell) is not None: + old_cell.remove_agent(agent) + if self._empties_built: + self._empties.add(old_cell.coords) + + new_cell = self.cells[pos] + new_cell.add_agent(agent) + if self._empties_built: + self._empties.discard(new_cell.coords) + + @property + def empties(self) -> CellCollection: + if not self._empties_built: + self.build_empties() + + return CellCollection( + { + self.cells[coords]: self.cells[coords].content + for coords in sorted(self._empties) + } + ) + + def build_empties(self) -> None: + self._empties = set(filter(self.is_cell_empty, self.cells.keys())) + self._empties_built = True + + def move_to_empty(self, agent: Agent) -> None: + """Moves agent to a random empty cell, vacating agent's old cell.""" + num_empty_cells = len(self.empties) + if num_empty_cells == 0: + raise Exception("ERROR: No empty cells") + + # This method is based on Agents.jl's random_empty() implementation. See + # https://github.com/JuliaDynamics/Agents.jl/pull/541. For the discussion, see + # https://github.com/projectmesa/mesa/issues/1052 and + # https://github.com/projectmesa/mesa/pull/1565. The cutoff value provided + # is the break-even comparison with the time taken in the else branching point. + if num_empty_cells > self.cutoff_empties: + while True: + new_pos = ( + agent.random.randrange(self.width), + agent.random.randrange(self.height), + ) + if self.is_cell_empty(new_pos): + break + else: + new_pos = self.empties.select_random().coords + self.move_agent(agent, new_pos) + + def is_cell_empty(self, pos) -> bool: + """Returns a bool of the contents of a cell.""" + return len(self.cells[pos].content) == 0 + + +class Grid(Space): + def __init__(self, width: int, height: int, torus: bool = False) -> None: + self.width = width + self.height = height + self.torus = torus + self.cells = {(i, j): Cell(i, j) for j in range(width) for i in range(height)} + + self._empties_built = False + self.cutoff_empties = 7.953 * len(self.cells) ** 0.384 + + for cell in self.cells.values(): + self._connect_single_cell(cell) + + def _connect_single_cell(self, cell): + i, j = cell.coords + directions = [ + (-1, -1), + (-1, 0), + (-1, 1), + (0, -1), + (0, 1), + (1, -1), + (1, 0), + (1, 1), + ] + for di, dj in directions: + ni, nj = (i + di, j + dj) + if self.torus: + ni, nj = ni % self.height, nj % self.width + if 0 <= ni < self.height and 0 <= nj < self.width: + cell.connect(self.cells[ni, nj]) + + def get_neighborhood(self, coords, neighborhood_getter: Any) -> CellCollection: + return neighborhood_getter(self.cells[coords]) diff --git a/mesa/schelling.py b/mesa/schelling.py new file mode 100644 index 00000000000..43aa90e3d3d --- /dev/null +++ b/mesa/schelling.py @@ -0,0 +1,86 @@ +import mesa +from mesa.gridspace import Grid, create_neighborhood_getter + + +class SchellingAgent(mesa.Agent): + """ + Schelling segregation agent + """ + + def __init__(self, pos, model, agent_type, cell): + """ + Create a new Schelling agent. + + Args: + unique_id: Unique identifier for the agent. + x, y: Agent initial location. + agent_type: Indicator for the agent's type (minority=1, majority=0) + """ + super().__init__(pos, model) + self.pos = pos + self.cell = cell + self.type = agent_type + self.get_neighborhood = create_neighborhood_getter() + + def step(self): + similar = 0 + for neighbor in self.get_neighborhood(self.cell).agents: + if neighbor.type == self.type: + similar += 1 + + # If unhappy, move: + if similar < self.model.homophily: + self.model.grid.move_to_empty(self) + else: + self.model.happy += 1 + + +class Schelling(mesa.Model): + """ + Model class for the Schelling segregation model. + """ + + def __init__(self, width=20, height=20, density=0.8, minority_pc=0.2, homophily=3): + """ """ + + self.width = width + self.height = height + self.density = density + self.minority_pc = minority_pc + self.homophily = homophily + + self.schedule = mesa.time.RandomActivation(self) + self.grid = Grid(width, height, torus=True) + + self.happy = 0 + self.datacollector = mesa.DataCollector( + {"happy": "happy"}, # Model-level count of happy agents + # For testing purposes, agent's individual x and y + # {"x": lambda a: a.pos.coords[0], "y": lambda a: a.pos.coords[1]}, + ) + + # Set up agents + # We use a grid iterator that returns + # the coordinates of a cell as well as + # its contents. (coord_iter) + for cell in self.grid: + if self.random.random() < self.density: + agent_type = 1 if self.random.random() < self.minority_pc else 0 + agent = SchellingAgent(None, self, agent_type, cell) + self.grid.move_agent(agent, cell.coords) + self.schedule.add(agent) + + self.running = True + self.datacollector.collect(self) + + def step(self): + """ + Run one step of the model. If All agents are happy, halt the model. + """ + self.happy = 0 # Reset counter of happy agents + self.schedule.step() + # collect data + # self.datacollector.collect(self) + + if self.happy == self.schedule.get_agent_count(): + self.running = False