-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathvector_zonal_stats.py
364 lines (296 loc) · 13.1 KB
/
vector_zonal_stats.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
# AUTOGENERATED! DO NOT EDIT! File to edit: ../notebooks/02_vector_zonal_stats.ipynb.
# %% auto 0
__all__ = ['create_zonal_stats', 'compute_quadkey', 'create_bingtile_zonal_stats']
# %% ../notebooks/02_vector_zonal_stats.ipynb 6
GEO_INDEX_NAME = "__GeoWrangleer_aoi_index"
# %% ../notebooks/02_vector_zonal_stats.ipynb 7
from functools import partial
from typing import Any, Dict, List
import geopandas as gpd
import morecantile
import numpy as np
import pandas as pd
# %% ../notebooks/02_vector_zonal_stats.ipynb 11
def _fix_agg(
agg: Dict[str, Any], # A dict containing at the minimum a 'func' key
) -> Dict[str, Any]:
"""
Fix an `agg spec`.
It outputs a dict containing the following keys:
- 'func': a list of aggregation functions (should be a valid 'agg' function)
- 'column': a column to apply the aggregation functions (should be a valid numeric column in data)
- 'output': the names of the new columns containing the application of the aggregation functions (default: concat column + '_' + func)
- 'fillna': boolean list whether to replace new columns with 'NA' values with 0 (default: False)
"""
if "func" not in agg:
return agg # skip fix as agg spec is invalid
if type(agg["func"]) == str:
agg["func"] = [agg["func"]]
# optional column, default to index count
if "column" not in agg:
agg["column"] = GEO_INDEX_NAME
if "output" not in agg:
column = "index" if agg["column"] == GEO_INDEX_NAME else agg["column"]
agg["output"] = [f"{column}_{f}" for f in agg["func"]]
if type(agg["output"]) == str:
agg["output"] = [agg["output"]]
# check matching fillna
if "fillna" not in agg:
agg["fillna"] = [False for _ in agg["func"]]
if type(agg["fillna"]) == bool:
agg["fillna"] = [agg["fillna"]]
return agg
# %% ../notebooks/02_vector_zonal_stats.ipynb 16
def _check_agg(
agg: Dict[str, Any], # A dict containing at the minimum a 'func' key
i: int, # The index into the list of aggregations
data_cols: List[str], # list of data columns
dtypes: pd.Series, # series of dtypes with column names as index
) -> None:
"""
Validate an `agg spec`.
"""
if "func" not in agg:
raise ValueError(f"Missing key 'func' in agg[{i}] {agg}")
for func in agg["func"]:
if getattr(pd.Series, func, None) is None:
raise ValueError(f"Unknown func '{func}' in agg[{i}] {agg}")
if agg["column"] != GEO_INDEX_NAME and agg["column"] not in data_cols:
raise ValueError(
f"Column '{agg['column']}' in agg[{i}] {agg} does not exist in the data"
)
if agg["column"] != GEO_INDEX_NAME and not np.issubdtype(
dtypes.loc[agg["column"]], np.number
):
raise ValueError(
f"Column '{agg['column']}' in agg[{i}] {agg} is not a numeric column in the data"
)
if len(agg["func"]) != len(agg["output"]):
raise ValueError(
f"output list {agg['output']} doesn't match func list {agg['func']} in agg[{i}] {agg}"
)
# check matching fillna
if len(agg["fillna"]) != len(agg["func"]):
raise ValueError(
f"fillna list {agg['fillna']} doesn't match func list {agg['func']} in agg[{i}] {agg}"
)
# %% ../notebooks/02_vector_zonal_stats.ipynb 19
def _validate_aggs(
fixed_aggs: List[Dict[str, Any]], # A list of fixed agg specs
data: pd.DataFrame, # Source dataframe
) -> None:
data_cols = list(data.columns.values)
outputs = []
for i, agg in enumerate(fixed_aggs):
_check_agg(agg, i, data_cols, data.dtypes)
# check duplicate outputs
if any(item in agg["output"] for item in outputs):
raise ValueError(
f"Duplicate output column name found for agg[{i}] {agg['output']}"
)
outputs += agg["output"]
# %% ../notebooks/02_vector_zonal_stats.ipynb 24
def _validate_aoi(
aoi: pd.DataFrame, # Source dataframe
) -> None:
if isinstance(aoi.index, pd.MultiIndex):
raise ValueError(
"AOI has a pandas.MultiIndex. Please convert the index to a single level such as pd.RangeIndex"
)
# %% ../notebooks/02_vector_zonal_stats.ipynb 25
def _expand_aggs(
aggs: List[Dict[str, Any]], # List of fixed valid aggs
) -> List[Dict[str, Any]]:
"""Expands agg specs with multiple funcs each into a separate agg spec"""
expanded_aggs = []
for agg in aggs:
for i, func in enumerate(agg["func"]):
expanded_agg = {
"func": func,
"column": agg["column"],
"output": agg["output"][i],
"fillna": agg["fillna"][i],
}
expanded_aggs += [expanded_agg]
return expanded_aggs
# %% ../notebooks/02_vector_zonal_stats.ipynb 27
def _build_agg_args(
aggs: List[Dict[str, Any]], # A list of expanded aggs
) -> Dict:
"""Builds a dict of args with output as key and a tuple of column and func as value from a list of expanded aggs"""
return {agg["output"]: (agg["column"], agg["func"]) for agg in aggs}
# %% ../notebooks/02_vector_zonal_stats.ipynb 29
def _prep_aoi(
aoi: pd.DataFrame, # Area of interest
) -> pd.DataFrame:
"""
Prepare aoi for spatial join
- create a column from aoi's index which will be used as grouping key
"""
if GEO_INDEX_NAME in list(aoi.columns.values):
raise ValueError(
f"Invalid column name error: AOI column should not match Geowrangler index column {GEO_INDEX_NAME}"
)
# prep for spatial join
aoi = aoi.copy()
aoi.index.name = GEO_INDEX_NAME
# create index col for broadcast to features
aoi = aoi.reset_index(level=0) # index added as new column named GEO_INDEX_NAME
return aoi
# %% ../notebooks/02_vector_zonal_stats.ipynb 34
def _fillnas(
expanded_aggs: List[Dict[str, Any]], # list of expanded aggs
results: pd.DataFrame, # results dataframe to be filled with NAs if flag set
aoi: pd.DataFrame, # aoi dataframe to merge it back to
) -> pd.DataFrame:
results = results.copy()
# set NAs to 0 if fillna
for agg in expanded_aggs:
if agg["fillna"]:
colname = agg["output"]
if colname in list(aoi.columns.values):
colname = colname + "_y" # try if merged df has colname + _y
if colname in list(results.columns.values):
# following this guide https://medium.com/@felipecaballero/deciphering-the-cryptic-futurewarning-for-fillna-in-pandas-2-01deb4e411a1
with pd.option_context("future.no_silent_downcasting", True):
results[colname] = (
results[colname].fillna(0).infer_objects(copy=False)
)
return results
# %% ../notebooks/02_vector_zonal_stats.ipynb 38
def _aggregate_stats(
aoi: pd.DataFrame, # Area of interest
groups: pd.core.groupby.DataFrameGroupBy, # Source data aggregated into groups by GEO_INDEX_NAME
expanded_aggs: List[Dict[str, Any]], # A list of expanded aggs
) -> pd.DataFrame:
"""Aggregate groups and compute the agg['func'] for agg['column'], map them to the output column in agg['column'] for all the aggs in the expanded_aggs list
and merge them back to aoi dataframe
"""
agg_dicts = _build_agg_args(expanded_aggs)
aggregates = groups.agg(**agg_dicts)
results = aoi.merge(
aggregates, how="left", on=GEO_INDEX_NAME, suffixes=(None, "_y")
)
results = _fillnas(expanded_aggs, results, aoi)
return results
# %% ../notebooks/02_vector_zonal_stats.ipynb 44
def create_zonal_stats(
aoi: gpd.GeoDataFrame, # Area of interest for which zonal stats are to be computed for
data: gpd.GeoDataFrame, # Source gdf containing data to compute zonal stats from
aggregations: List[ # List of agg specs, with each agg spec applied to a data column
Dict[str, Any]
],
overlap_method: str = "intersects", # spatial predicate to used in spatial join of aoi and data [geopandas.sjoin](https://geopandas.org/en/stable/docs/user_guide/mergingdata.html#binary-predicate-joins) for more details
# categorical_column_options: str = None,
) -> gpd.GeoDataFrame:
"""
Create zonal stats for area of interest from data using aggregration operations on data columns.
Returns the same aoi with additional columns containing the computed zonal features.
"""
_validate_aoi(aoi)
fixed_aggs = [_fix_agg(agg) for agg in aggregations]
_validate_aggs(fixed_aggs, data)
# prep for spatial join
aoi_index_name = aoi.index.name
aoi = _prep_aoi(aoi)
if not data.crs.equals(aoi.crs):
data = data.to_crs(aoi.crs)
# spatial join - broadcast aoi_index to data => features
features = gpd.sjoin(
aoi[[GEO_INDEX_NAME, "geometry"]], data, how="inner", predicate=overlap_method
)
# group
groups = features.groupby(GEO_INDEX_NAME)
# apply all aggregations all at once
expanded_aggs = _expand_aggs(fixed_aggs)
results = _aggregate_stats(aoi, groups, expanded_aggs)
# cleanup results
results = results.set_index(GEO_INDEX_NAME)
results.index.name = aoi_index_name
return results
# %% ../notebooks/02_vector_zonal_stats.ipynb 60
tms = morecantile.tms.get("WebMercatorQuad") # Tile Matrix for Bing Maps
# %% ../notebooks/02_vector_zonal_stats.ipynb 61
def get_quadkey(geometry, zoom_level):
return tms.quadkey(tms.tile(geometry.x, geometry.y, zoom_level))
# %% ../notebooks/02_vector_zonal_stats.ipynb 62
def compute_quadkey(
data: gpd.GeoDataFrame, # The geodataframe
zoom_level: int, # The quadkey zoom level (1-23)
quadkey_column: str = "quadkey", # The name of the quadkey output column
) -> gpd.GeoDataFrame:
"""
Computes the quadkeys for the geometries of the data.
If geometries are not points, the quadkeys are computed
from the centroids of the geometries.
"""
data = data.copy()
get_zoom_quadkey = partial(get_quadkey, zoom_level=zoom_level)
if data.crs.is_geographic:
centroids = data.to_crs("EPSG:3857").geometry.centroid # planar
centroids = centroids.to_crs(data.crs)
else:
centroids = data.geometry.centroid
centroids = centroids.to_crs("EPSG:4326") # use geographic
data[quadkey_column] = centroids.apply(get_zoom_quadkey)
return data
# %% ../notebooks/02_vector_zonal_stats.ipynb 69
def validate_aoi_quadkey(aoi, aoi_quadkey_column) -> None:
if aoi_quadkey_column not in list(aoi.columns.values):
raise ValueError(
f"aoi_quadkey_column '{aoi_quadkey_column}' is not in list of aoi columns: {list(aoi.columns.values)}"
)
if len(aoi) == 0:
raise ValueError("aoi dataframe is empty")
aoi_zoom_level = len(aoi[aoi_quadkey_column].iloc[0])
if not (aoi[aoi_quadkey_column].apply(len) == aoi_zoom_level).all(axis=None):
raise ValueError("aoi quadkey levels are not all at the same level")
def validate_data_quadkey(data, data_quadkey_column, min_zoom_level):
if data_quadkey_column not in list(data.columns.values):
raise ValueError(
f"data_quadkey_column '{data_quadkey_column}' is not in list of data columns: {list(data.columns.values)}"
)
if len(data) == 0:
raise ValueError("data dataframe is empty")
if not (data[data_quadkey_column].apply(len) >= min_zoom_level).all(axis=0):
raise ValueError(
f"data quadkey levels cannot be less than aoi quadkey level {min_zoom_level}"
)
# %% ../notebooks/02_vector_zonal_stats.ipynb 70
def create_bingtile_zonal_stats(
aoi: pd.DataFrame, # An aoi with quadkey column
data: pd.DataFrame, # Data with quadkey column
aggregations: List[ # List of agg specs, with each agg spec applied to a data column
Dict[str, Any]
],
aoi_quadkey_column: str = "quadkey", # Column name of aoi quadkey
data_quadkey_column: str = "quadkey", # Column name of data quadkey
) -> pd.DataFrame:
data = data.copy()
aoi = aoi.copy()
aoi[aoi_quadkey_column] = aoi[aoi_quadkey_column].apply(str)
data[data_quadkey_column] = data[data_quadkey_column].apply(str)
# validate aoi zoom level is same for all rows
validate_aoi_quadkey(aoi, aoi_quadkey_column)
# get aoi zoom level
aoi_zoom_level = len(aoi[aoi_quadkey_column].iloc[0])
validate_data_quadkey(data, data_quadkey_column, aoi_zoom_level)
fixed_aggs = [_fix_agg(agg) for agg in aggregations]
_validate_aggs(fixed_aggs, data)
# create aoi level quad_key for data (apply quadkey_to_tile)
def quadkey4zoom_level(x):
return x[:aoi_zoom_level]
data[GEO_INDEX_NAME] = data[data_quadkey_column].apply(quadkey4zoom_level)
# filter data to include only those whose quadkeys are in aoi quadkeys
features = data.join(
aoi[[aoi_quadkey_column]].set_index(aoi_quadkey_column),
how="inner",
on=GEO_INDEX_NAME,
)
# groupby data on aoi level quad key
groups = features.groupby(GEO_INDEX_NAME)
expanded_aggs = _expand_aggs(fixed_aggs)
aoi[GEO_INDEX_NAME] = aoi[aoi_quadkey_column]
results = _aggregate_stats(aoi, groups, expanded_aggs)
results = results.drop(columns=[GEO_INDEX_NAME])
return results