Skip to content

Commit

Permalink
debug KNN, add KNN section to examples notebook
Browse files Browse the repository at this point in the history
  • Loading branch information
wpbonelli committed Feb 16, 2022
1 parent aef3169 commit 2fdd486
Show file tree
Hide file tree
Showing 3 changed files with 374 additions and 47 deletions.
56 changes: 44 additions & 12 deletions cactice/knn.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from typing import List, Dict, Tuple
from itertools import islice
from collections import Counter
import logging

import numpy as np

# TODO: more distance function options?
from cactice.neighbors import get_neighborhood, Neighbors
from cactice.distance import hamming_distance
import cactice.stats as stats
Expand All @@ -22,6 +22,8 @@ def __init__(
:param neighbors: Which adjacent cells to consider neighbors.
:param layers: How many layers of adjacent cells to consider neighbors.
"""

self.__logger = logging.getLogger(__name__)
self.__k: int = k
self.__neighbors: Neighbors = neighbors
self.__layers: int = layers
Expand Down Expand Up @@ -66,26 +68,56 @@ def predict(self, grids: List[np.ndarray] = None) -> List[np.ndarray]:
# find cells to predict (missing locations)
rows = range(0, grid.shape[0])
cols = range(0, grid.shape[1])
missing = [(i, j) for i in rows for j in cols if grid[i, j] == 0]
grid_pred = grid_predictions[gi].copy()
missing = [(i, j) for i in rows for j in cols if grid_pred[i, j] == 0]

# if this grid has no missing locations, skip it
if len(missing) == 0: continue

# predict cells one by one
for i, j in missing:
# get the missing location's neighbors
neighborhood = get_neighborhood(grid, i, j, self.__neighbors, self.__layers)
neighborhood = get_neighborhood(
grid=grid,
i=i,
j=j,
neighbors=self.__neighbors,
layers=self.__layers,
exclude_zero=True)

# ignore central cell
del neighborhood[(0, 0)]

# pull out neighbor cell values
neighbors = list(neighborhood.values())

# compute distance from this neighborhood to every training neighborhood
distances = {nh[(0, 0)]: hamming_distance(list(neighborhood.values()), list(nh.values())) for nh in neighborhoods}
if len(neighbors) > 0:
self.__logger.debug(f"Assigning location ({i}, {j}) via KNN")

# sort distances ascending
distances = dict(sorted(distances.items(), key=lambda k, v: v, reverse=True))
# compute distance from this neighborhood to every training neighborhood
distances = {nh[(0, 0)]: hamming_distance(list(neighborhood.values()), list(nh.values())) for nh in neighborhoods}

# keep k most similar neighborhoods (k nearest neighbor neighborhoods)
distances = dict(islice(distances, self.__k))
# sort distances ascending
distances = dict(sorted(distances.items(), key=lambda k, v: v, reverse=True))

# count frequency of each cell value in and pick the most common (ties broken randomly)
cell_prediction = Counter(distances.values()).most_common(1)[0][0]
# keep k most similar neighborhoods (k nearest neighbor neighborhoods)
distances = dict(islice(distances, self.__k))

# count frequency of each cell value in and pick the most common (ties broken randomly)
cell_pred = Counter(distances.values()).most_common(1)[0][0]
else:
self.__logger.debug(
f"Location ({i}, {j}) has no neighbors, assigning by sampling from cell distribution")

# sample randomly according to cell class distribution
cell_pred = np.random.choice(
a=list(self.__cell_distribution.keys()),
p=list(self.__cell_distribution.values()))

# set the cell in the corresponding grid
grid_predictions[gi][i, j] = cell_prediction
grid_pred[i, j] = cell_pred

# set the predicted grid
grid_predictions[gi] = grid_pred

return grid_predictions
10 changes: 7 additions & 3 deletions cactice/rns.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ def __init__(
:param neighbors: The cells to consider part of the neighborhood.
:param layers: The width of the neighborhood
"""

self.__logger = logging.getLogger(__name__)
self.__neighbors: Neighbors = neighbors
self.__layers: int = layers
self.__train: List[np.ndarray] = []
self.__cell_distribution: Dict[str, float] = {}
self.__logger = logging.getLogger(__name__)

def fit(self, grids: List[np.ndarray] = None):
"""
Expand Down Expand Up @@ -78,10 +79,13 @@ def predict(self, grids: List[np.ndarray] = None) -> List[np.ndarray]:
# predict cell value by making a random selection from its neighbors, if any
# or if none, choosing randomly according to the observed class distribution
if len(neighbors) > 0:
self.__logger.debug(f"Assigning location ({i}, {j}) from neighbors")
self.__logger.debug(f"Assigning location ({i}, {j}) via RNS")
cell_pred = random.choice(neighbors)
else:
self.__logger.debug(f"Location ({i}, {j}) has no neighbors, sampling from distribution")
self.__logger.debug(
f"Location ({i}, {j}) has no neighbors, assigning by sampling from cell distribution")

# sample randomly according to cell class distribution
cell_pred = np.random.choice(
a=list(self.__cell_distribution.keys()),
p=list(self.__cell_distribution.values()))
Expand Down
Loading

0 comments on commit 2fdd486

Please sign in to comment.