From 1afc307a16b1cdb2c05d0d1cbf8cdc78ecebf5f1 Mon Sep 17 00:00:00 2001 From: Shorouq Date: Thu, 17 Oct 2024 17:19:21 +0200 Subject: [PATCH 01/16] =?UTF-8?q?=E2=9C=A8=20Add=20data=20gap=20utils?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Database/scr/normalize_data.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 Database/scr/normalize_data.py diff --git a/Database/scr/normalize_data.py b/Database/scr/normalize_data.py new file mode 100644 index 000000000..62adf91f0 --- /dev/null +++ b/Database/scr/normalize_data.py @@ -0,0 +1,14 @@ +from .log_utils import Logging + + +class DataGapUtils: + def __init__(self): + self.logger = Logging.get_logger("data-gap-utils") + + @staticmethod + def fill_date(row: dict, replace_with_date: dict): + date_cols = [x for x in row.keys() if "_Date_" in x] + if all([True if row[d] is None else False for d in date_cols]): + for c in date_cols: + row[c] = replace_with_date[c] + return row From 0c8e0c14381f90c8348755408fa9d0605a367945 Mon Sep 17 00:00:00 2001 From: Shorouq Date: Mon, 21 Oct 2024 12:14:42 +0200 Subject: [PATCH 02/16] =?UTF-8?q?=F0=9F=A9=B9=20Return=20empty=20list=20if?= =?UTF-8?q?=20no=20=5FNorm/=5FType/=5FGeoJson?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Database/parse_events.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/Database/parse_events.py b/Database/parse_events.py index 187e40fc4..6077a87cf 100644 --- a/Database/parse_events.py +++ b/Database/parse_events.py @@ -134,7 +134,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): [i[2] for i in x], ) if isinstance(x, list) - else (None, None, None) + else ([], [], []) ) ) .progress_apply(pd.Series) @@ -190,7 +190,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): [i[2] for i in x], ) if isinstance(x, list) - else (None, None, None) + else ([], [], []) ) ) .progress_apply(pd.Series) @@ -307,7 +307,6 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): logger.info(f"Normalizing nulls for {level} {col}") sub_event = utils.replace_nulls(sub_event) - _yes, _no = re.compile(r"^(yes)$|^(y)$|^(true)$", re.IGNORECASE | re.MULTILINE), re.compile( r"^(no)$|^(n)$|^(false)$", re.IGNORECASE | re.MULTILINE ) @@ -319,8 +318,7 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): if value and not isinstance(value, bool) and re.match(_yes, value) else (False if value and not isinstance(value, bool) and re.match(_no, value) else value) ) - - + ) logger.info(f"Normalizing dates for subevet {col}") start_date_col, end_date_col = [col for col in sub_event.columns if col.startswith("Start_Date")], [ col for col in sub_event.columns if col.startswith("End_Date") @@ -377,7 +375,7 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): [i[2] for i in x], ) if isinstance(x, list) - else (None, None, None) + else ([], [], []) ) ) .progress_apply(pd.Series) @@ -478,7 +476,7 @@ def get_gid(admin_area: str | None): [i[2] for i in x], ) if isinstance(x, list) - else (None, None, None) + else ([], [], []) ) ) .progress_apply(pd.Series) @@ -559,6 +557,7 @@ def df_to_parquet( pathlib.Path(target_dir).mkdir(parents=True, exist_ok=True) slc.to_parquet(fname, engine="fastparquet", **parquet_wargs) + def get_target_cols() -> tuple[list]: date_cols = [ "Start_Date_Day", From 77957342b3087cce25d8a03955393c23896c9dce Mon Sep 17 00:00:00 2001 From: Shorouq Date: Mon, 21 Oct 2024 12:15:10 +0200 Subject: [PATCH 03/16] =?UTF-8?q?=F0=9F=94=A8=20Avoid=20re-using=20var=20n?= =?UTF-8?q?ames?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Database/parse_events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Database/parse_events.py b/Database/parse_events.py index 6077a87cf..3bd0ae4dc 100644 --- a/Database/parse_events.py +++ b/Database/parse_events.py @@ -320,8 +320,8 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): ) ) logger.info(f"Normalizing dates for subevet {col}") - start_date_col, end_date_col = [col for col in sub_event.columns if col.startswith("Start_Date")], [ - col for col in sub_event.columns if col.startswith("End_Date") + start_date_col, end_date_col = [c for c in sub_event.columns if col.startswith("Start_Date")], [ + c for c in sub_event.columns if col.startswith("End_Date") ] assert len(start_date_col) == len(end_date_col), "Check the start and end date columns" assert len(start_date_col) <= 1, "Check the start and end date columns, there might be too many" From b0ebcdf9a7bd34d1e3fc0d02d37c9c922676ce9d Mon Sep 17 00:00:00 2001 From: Shorouq Date: Mon, 21 Oct 2024 12:15:40 +0200 Subject: [PATCH 04/16] =?UTF-8?q?=F0=9F=9A=91=EF=B8=8F=20Fix=20no=20GIDs?= =?UTF-8?q?=20returned?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Database/parse_events.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Database/parse_events.py b/Database/parse_events.py index 3bd0ae4dc..d1411c06f 100644 --- a/Database/parse_events.py +++ b/Database/parse_events.py @@ -428,11 +428,13 @@ def get_gid(admin_area: str | None): try: res = norm_loc.get_gadm_gid(country=admin_area) assert res + return res except BaseException as err: logger.warning(f"Could not get gadm as country. Admin area: {admin_area} Error: {err}") res = norm_loc.get_gadm_gid(area=admin_area) try: assert res + return res except BaseException as err: logger.warning(f"Could not get gadm as area. Error: {err}") return [] From 96dde8ce13a8f788f95cc705a27c65920dc7fcbf Mon Sep 17 00:00:00 2001 From: Shorouq Date: Mon, 21 Oct 2024 14:41:45 +0200 Subject: [PATCH 05/16] =?UTF-8?q?=F0=9F=94=A5=20Remove=20unused=20func?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Database/parse_events.py | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/Database/parse_events.py b/Database/parse_events.py index d1411c06f..78de7f0fc 100644 --- a/Database/parse_events.py +++ b/Database/parse_events.py @@ -539,27 +539,6 @@ def get_gid(admin_area: str | None): ) -def df_to_parquet( - df: pd.DataFrame, - target_dir: str, - chunk_size: int = 2000, - **parquet_wargs, -): - """Writes pandas DataFrame to parquet format with pyarrow. - Credit: https://stackoverflow.com/a/72010262/14123992 - Args: - df: DataFrame - target_dir: local directory where parquet files are written to - chunk_size: number of rows stored in one chunk of parquet file. Defaults to 2000. - """ - for i in range(0, len(df), chunk_size): - slc = df.iloc[i : i + chunk_size] - chunk = int(i / chunk_size) - fname = os.path.join(target_dir, f"{chunk:04d}.parquet") - pathlib.Path(target_dir).mkdir(parents=True, exist_ok=True) - slc.to_parquet(fname, engine="fastparquet", **parquet_wargs) - - def get_target_cols() -> tuple[list]: date_cols = [ "Start_Date_Day", From 7ebef674d1989fa804eabcd79c574daf3447bfba Mon Sep 17 00:00:00 2001 From: Shorouq Date: Mon, 21 Oct 2024 19:06:28 +0200 Subject: [PATCH 06/16] =?UTF-8?q?=F0=9F=9A=91=EF=B8=8F=20Fix=20no=20GID=20?= =?UTF-8?q?for=20us=20states?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Database/parse_events.py | 77 ++++++++++------------------- Database/scr/normalize_locations.py | 34 +++++++------ 2 files changed, 46 insertions(+), 65 deletions(-) diff --git a/Database/parse_events.py b/Database/parse_events.py index 78de7f0fc..d93610418 100644 --- a/Database/parse_events.py +++ b/Database/parse_events.py @@ -146,12 +146,12 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): lambda admin_areas: ( [ ( - norm_loc.get_gadm_gid(country=c) - if norm_loc.get_gadm_gid(country=c) - else norm_loc.get_gadm_gid(area=c) + norm_loc.get_gadm_gid(country=area) + if norm_loc.get_gadm_gid(country=area) + else norm_loc.get_gadm_gid(area=area) ) - for c in admin_areas - if c + for area in admin_areas + if area ] if isinstance(admin_areas, list) else [] @@ -200,7 +200,17 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): logger.info("Getting GID from GADM for Administrative Areas after purging areas above GID_0 level...") events[f"{admin_area_col}_GID"] = events[f"{admin_area_col}_Norm"].progress_apply( lambda admin_areas: ( - [(norm_loc.get_gadm_gid(country=c)) for c in admin_areas if c] if isinstance(admin_areas, list) else [] + [ + ( + norm_loc.get_gadm_gid(country=area) + if norm_loc.get_gadm_gid(country=area) + else norm_loc.get_gadm_gid(area=area) + ) + for area in admin_areas + if area + ] + if isinstance(admin_areas, list) + else [] ), ) @@ -389,12 +399,12 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): lambda admin_areas: ( [ ( - norm_loc.get_gadm_gid(country=c) - if norm_loc.get_gadm_gid(country=c) - else norm_loc.get_gadm_gid(area=c) + norm_loc.get_gadm_gid(country=area) + if norm_loc.get_gadm_gid(country=area) + else norm_loc.get_gadm_gid(area=area) ) - for c in admin_areas - if c + for area in admin_areas + if area ] if isinstance(admin_areas, list) else [[] for _ in admin_areas] @@ -421,30 +431,9 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): ) logger.info(f"Getting GID from GADM for Administrative Areas in subevent {col}") - def get_gid(admin_area: str | None): - if admin_area is None: - return [] - if isinstance(admin_area, str): - try: - res = norm_loc.get_gadm_gid(country=admin_area) - assert res - return res - except BaseException as err: - logger.warning(f"Could not get gadm as country. Admin area: {admin_area} Error: {err}") - res = norm_loc.get_gadm_gid(area=admin_area) - try: - assert res - return res - except BaseException as err: - logger.warning(f"Could not get gadm as area. Error: {err}") - return [] - else: - logger.warning(f"admin_area {admin_area} of type {type(admin_area)} is not supported.") - return [] - sub_event[f"{administrative_area_col}_GID"] = sub_event[ f"{administrative_area_col}_Norm" - ].progress_apply(lambda admin_area: (get_gid(admin_area=admin_area))) + ].progress_apply(lambda area: norm_loc.get_gadm_gid(country=area) if area else []) if location_col in sub_event.columns: logger.info(f"Normalizing location names for {level} {col}") sub_event[f"{location_col}_Tmp"] = sub_event.progress_apply( @@ -491,26 +480,14 @@ def get_gid(admin_area: str | None): lambda row: ( [ ( - ( - norm_loc.get_gadm_gid( - area=row[f"{location_col}_Norm"][i], - country=row[f"{administrative_area_col}_Norm"], - ) - if norm_loc.get_gadm_gid( - area=row[f"{location_col}_Norm"][i], - country=row[f"{administrative_area_col}_Norm"], - ) - else norm_loc.get_gadm_gid( - area=row[f"{location_col}_Norm"][i], - ) - ) - if i - else [] + norm_loc.get_gadm_gid(area=row[f"{location_col}_Norm"][i]) + if norm_loc.get_gadm_gid(area=row[f"{location_col}_Norm"][i]) + else norm_loc.get_gadm_gid(country=row[f"{location_col}_Norm"][i]) ) + if row[f"{location_col}_Norm"][i] + else [] for i in range(len(row[f"{location_col}_Norm"])) ] - if isinstance(row[f"{location_col}_Norm"], list) - else [] ), axis=1, ) diff --git a/Database/scr/normalize_locations.py b/Database/scr/normalize_locations.py index aaf26113a..fb4c70a13 100644 --- a/Database/scr/normalize_locations.py +++ b/Database/scr/normalize_locations.py @@ -362,24 +362,22 @@ def _get_unsd_region( ) @cache - def _get_american_area(self, area: str, country: str = None) -> list | None: + def _get_american_area(self, area: str) -> list | None: # TODO: slim down areas = [] if not area: return None - if area == self.united_states and (not country or country == self.united_states): + if area == self.united_states: return [self.USA_GID] - address = [x.strip() for x in area.split(",")] if area else [x.strip() for x in country.split(",")] + address = [x.strip() for x in area.split(",")] if area else [x.strip() for x in area.split(",")] # remove postal codes from the address list (common on OSM) address = [i for i in address if not re.match(r"^\d{5}(?:[-\s]\d{4})?$", i)] - if country == self.united_states and address[-1] != self.united_states: - address.append(country) - - assert address[-1] == self.united_states + if address[-1] != self.united_states: + address.append(self.united_states) # county level if len(address) == 3: @@ -433,7 +431,7 @@ def _get_american_area(self, area: str, country: str = None) -> list | None: areas = [f"{i}:{','.join(address[:-3]).strip()}" for i in areas] - return areas + return areas if areas else None @cache def get_gadm_gid( @@ -453,13 +451,19 @@ def get_gadm_gid( if unsd_search_output: return unsd_search_output - # handle American States - us_address_split = area.split(",")[-1].strip() if area else None - us_search_output = ( - self._get_american_area(area, country) - if area and (country == self.united_states or us_address_split == self.united_states) - else None - ) + # find US-areas: country, states, and counties + if country and not area: + us_search = country + elif area and not country: + us_search = area + elif area and country: + us_search = ( + area + if (self.united_states in country and self.united_states in area) + else (country if not area else (f"{area}, {country}" if country and area else None)) + ) + + us_search_output = self._get_american_area(us_search) if us_search_output: return us_search_output From 6b478ad0559b983f3741cb2b3d306d2737ba150e Mon Sep 17 00:00:00 2001 From: Shorouq Date: Mon, 21 Oct 2024 19:12:35 +0200 Subject: [PATCH 07/16] =?UTF-8?q?=E2=9C=85=20Update=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/test_normalize_locations.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/tests/test_normalize_locations.py b/tests/test_normalize_locations.py index 3d088e702..521088d38 100644 --- a/tests/test_normalize_locations.py +++ b/tests/test_normalize_locations.py @@ -13,20 +13,21 @@ def refresh_fixture(): class TestNormalizeLocations: @pytest.mark.parametrize( - "area, country, expected", + "area, expected", [ - ("Arizona", "United States", ["USA.3_1"]), - ("United States", None, ["USA"]), - ("United States", "United States", ["USA"]), - ("Kings, California", "United States", ["USA.5.16_1"]), - ("Amman", "United States", []), - ("Kansas, United States", "United States", ["USA.17_1"]), - ("Kansas", "United States", ["USA.17_1"]), + ("Arizona, United States", ["USA.3_1"]), + ("United States", ["USA"]), + ("Kings, California, United States", ["USA.5.16_1"]), + ("Amman, United States", []), + ("Kansas, United States", ["USA.17_1"]), + ("Kansas, United States", ["USA.17_1"]), + ("Orange County, California, United States", ["USA.5.30_1"]), + ("India", []), ], ) - def test__get_american_area(self, area, country, expected): + def test__get_american_area(self, area, expected): norm = refresh_fixture() - assert norm._get_american_area(area, country) == expected + assert norm._get_american_area(area) == expected @pytest.mark.parametrize( "area, country, expected", @@ -48,6 +49,9 @@ def test__get_american_area(self, area, country, expected): ("Penjab", None, ["PAK.7_1"]), ("Punjab", "India", ["IND.28_1"]), (None, "Pakistan", ["Z06", "PAK"]), + ("Orange County, California, United States", None, ["USA.5.30_1"]), + (None, "Orange County, California, United States", ["USA.5.30_1"]), + (None, "Netherlands", ["NLD"]), ], ) def test_get_gadm_gid(self, area, country, expected): From 34a137e99ab12a6bf5fe14396666798825d1cd51 Mon Sep 17 00:00:00 2001 From: Shorouq Date: Mon, 21 Oct 2024 19:59:17 +0200 Subject: [PATCH 08/16] =?UTF-8?q?=E2=9C=85=20Update=20american=20gid=20fun?= =?UTF-8?q?c=20(return=20None=20if=20no=20us=20area=20found)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/test_normalize_locations.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_normalize_locations.py b/tests/test_normalize_locations.py index 521088d38..79d52507b 100644 --- a/tests/test_normalize_locations.py +++ b/tests/test_normalize_locations.py @@ -18,11 +18,11 @@ class TestNormalizeLocations: ("Arizona, United States", ["USA.3_1"]), ("United States", ["USA"]), ("Kings, California, United States", ["USA.5.16_1"]), - ("Amman, United States", []), + ("Amman, United States", None), ("Kansas, United States", ["USA.17_1"]), ("Kansas, United States", ["USA.17_1"]), ("Orange County, California, United States", ["USA.5.30_1"]), - ("India", []), + ("India", None), ], ) def test__get_american_area(self, area, expected): From 6e6f13cf6290f1a98184d026f161a174964303be Mon Sep 17 00:00:00 2001 From: Shorouq Date: Tue, 22 Oct 2024 11:33:44 +0200 Subject: [PATCH 09/16] =?UTF-8?q?=E2=9C=85=20Update=20tests=20for=20'India?= =?UTF-8?q?'=20case?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/test_normalize_locations.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_normalize_locations.py b/tests/test_normalize_locations.py index 79d52507b..c6eff33ed 100644 --- a/tests/test_normalize_locations.py +++ b/tests/test_normalize_locations.py @@ -52,6 +52,8 @@ def test__get_american_area(self, area, expected): ("Orange County, California, United States", None, ["USA.5.30_1"]), (None, "Orange County, California, United States", ["USA.5.30_1"]), (None, "Netherlands", ["NLD"]), + ("India", None, []), + (None, "India", ["Z07", "IND", "Z01", "Z04", "Z05", "Z09"]), ], ) def test_get_gadm_gid(self, area, country, expected): From ff167403719a7d524c7e262b0d8a17d2e2c6337a Mon Sep 17 00:00:00 2001 From: Shorouq Date: Wed, 23 Oct 2024 14:52:31 +0200 Subject: [PATCH 10/16] =?UTF-8?q?=E2=9A=A1Increase=20speed=20with=20parall?= =?UTF-8?q?el=20executions=20in=20pandas?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Database/parse_events.py | 63 +++++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 30 deletions(-) diff --git a/Database/parse_events.py b/Database/parse_events.py index d93610418..16964e7fd 100644 --- a/Database/parse_events.py +++ b/Database/parse_events.py @@ -3,6 +3,7 @@ import re import pandas as pd +from pandarallel import pandarallel from tqdm import tqdm from Database.scr.log_utils import Logging @@ -12,6 +13,8 @@ tqdm.pandas() +pandarallel.initialize(progress_bar=True, nb_workers=5) + def infer_countries( row: dict, @@ -45,7 +48,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): logger.info("Unpacking Total_Summary_* columns") total_summary_cols = [col for col in df.columns if col.startswith("Total_Summary_")] for i in total_summary_cols: - df[i] = df[i].progress_apply(utils.eval) + df[i] = df[i].parallel_apply(utils.eval) events = utils.unpack_col(df, columns=total_summary_cols) logger.info(f"Total summary columns: {total_summary_cols}") del df @@ -54,7 +57,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): logger.info("STEP: Normalizing start and end dates if present") for d_col in ["Start_Date", "End_Date"]: logger.info(f"Normalizing date column: {d_col}") - dates = events[d_col].progress_apply(utils.normalize_date) + dates = events[d_col].parallel_apply(utils.normalize_date) date_cols = pd.DataFrame( dates.to_list(), columns=[f"{d_col}_Day", f"{d_col}_Month", f"{d_col}_Year"], @@ -68,7 +71,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): ) for inflation_adjusted_col in [col for col in events.columns if col.endswith("_Adjusted")]: logger.info(f"Normalizing boolean column {inflation_adjusted_col}") - events[inflation_adjusted_col] = events[inflation_adjusted_col].progress_apply( + events[inflation_adjusted_col] = events[inflation_adjusted_col].parallel_apply( lambda value: ( True if value and not isinstance(value, bool) and re.match(_yes, value) @@ -91,14 +94,14 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): logger.info(f"Normalizing ranges in {i}") events[[f"{i}_Min", f"{i}_Max", f"{i}_Approx"]] = ( events[i] - .progress_apply(lambda x: (norm_num.extract_numbers(x) if isinstance(x, str) else (None, None, None))) - .progress_apply(pd.Series) + .parallel_apply(lambda x: (norm_num.extract_numbers(x) if isinstance(x, str) else (None, None, None))) + .parallel_apply(pd.Series) ) split_by_pipe_cols = ["Hazards"] for str_col in [x for x in events.columns if x in split_by_pipe_cols]: logger.info(f"Splitting column {str_col} by pipe") - events[str_col] = events[str_col].progress_apply( + events[str_col] = events[str_col].parallel_apply( lambda x: (x.split("|") if isinstance(x, str) else (x if isinstance(x, str) else None)) ) @@ -106,7 +109,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): if "Administrative_Areas" in events.columns: logger.info(f"Ensuring that all admin area data in Administrative_Areas is of type ") - events["Administrative_Areas"] = events["Administrative_Areas"].progress_apply( + events["Administrative_Areas"] = events["Administrative_Areas"].parallel_apply( lambda x: utils.eval(x) if x is not None else [] ) @@ -126,7 +129,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): ] ] = ( events[f"{admin_area_col}_Tmp"] - .progress_apply( + .parallel_apply( lambda x: ( ( [i[0] for i in x], @@ -137,12 +140,12 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): else ([], [], []) ) ) - .progress_apply(pd.Series) + .parallel_apply(pd.Series) ) events.drop(columns=[f"{admin_area_col}_Tmp"], inplace=True) logger.info("Getting GID from GADM for Administrative Areas") - events[f"{admin_area_col}_GID"] = events[f"{admin_area_col}_Norm"].progress_apply( + events[f"{admin_area_col}_GID"] = events[f"{admin_area_col}_Norm"].parallel_apply( lambda admin_areas: ( [ ( @@ -160,7 +163,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): logger.info(f"""STEP: Infer country from list of locations""") - events[f"{admin_area_col}_GID_0_Tmp"] = events.progress_apply( + events[f"{admin_area_col}_GID_0_Tmp"] = events.parallel_apply( lambda x: infer_countries(x, admin_area_col=admin_area_col), axis=1 ) @@ -182,7 +185,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): ] ] = ( events[f"{admin_area_col}_GID_0_Tmp"] - .progress_apply( + .parallel_apply( lambda x: ( ( [i[0] for i in x], @@ -193,12 +196,12 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): else ([], [], []) ) ) - .progress_apply(pd.Series) + .parallel_apply(pd.Series) ) events.drop(columns=[f"{admin_area_col}_GID_0_Tmp"], inplace=True) logger.info("Getting GID from GADM for Administrative Areas after purging areas above GID_0 level...") - events[f"{admin_area_col}_GID"] = events[f"{admin_area_col}_Norm"].progress_apply( + events[f"{admin_area_col}_GID"] = events[f"{admin_area_col}_Norm"].parallel_apply( lambda admin_areas: ( [ ( @@ -221,7 +224,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): event_name_col = [x for x in events.columns if "Event_Name" in x] if len(event_name_col) == 1: event_name_col = event_name_col[0] - events["Event_Names"] = events[event_name_col].progress_apply( + events["Event_Names"] = events[event_name_col].parallel_apply( lambda x: ([x.strip()] if isinstance(x, str) else ([y.strip() for y in x]) if isinstance(x, list) else None) ) logger.info("Converting annotation columns to strings to store in sqlite3") @@ -273,14 +276,14 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): for col in specific_summary_cols: # evaluate string bytes to python datatype (hopefully dict, str, or list) - df[col] = df[col].progress_apply(utils.eval) + df[col] = df[col].parallel_apply(utils.eval) # unpack subevents sub_event = df[["Event_ID", col]].explode(col) # drop any events that have no subevents (aka [] exploded into NaN) sub_event.dropna(how="all", inplace=True) - sub_event = pd.concat([sub_event.Event_ID, sub_event[col].progress_apply(pd.Series)], axis=1) + sub_event = pd.concat([sub_event.Event_ID, sub_event[col].parallel_apply(pd.Series)], axis=1) logger.info( f"Dropping any columns with non-str column names due to None types in the dicts {[c for c in sub_event.columns if not isinstance(c, str)]}" @@ -309,10 +312,10 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): for i in specific_total_cols: sub_event[[f"{i}_Min", f"{i}_Max", f"{i}_Approx"]] = ( sub_event[i] - .progress_apply( + .parallel_apply( lambda x: (norm_num.extract_numbers(str(x)) if x is not None else (None, None, None)) ) - .progress_apply(pd.Series) + .parallel_apply(pd.Series) ) logger.info(f"Normalizing nulls for {level} {col}") sub_event = utils.replace_nulls(sub_event) @@ -322,7 +325,7 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): ) for inflation_adjusted_col in [col for col in sub_event.columns if col.endswith("_Adjusted")]: logger.info(f"Normalizing boolean column {inflation_adjusted_col} for {level} {col}") - sub_event[inflation_adjusted_col] = sub_event[inflation_adjusted_col].progress_apply( + sub_event[inflation_adjusted_col] = sub_event[inflation_adjusted_col].parallel_apply( lambda value: ( True if value and not isinstance(value, bool) and re.match(_yes, value) @@ -339,8 +342,8 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): if start_date_col and end_date_col: logger.info(f"Normalizing start and end date in columns {start_date_col} and {end_date_col}") start_date_col, end_date_col = start_date_col[0], end_date_col[0] - start_dates = sub_event[start_date_col].progress_apply(utils.normalize_date) - end_dates = sub_event[end_date_col].progress_apply(utils.normalize_date) + start_dates = sub_event[start_date_col].parallel_apply(utils.normalize_date) + end_dates = sub_event[end_date_col].parallel_apply(utils.normalize_date) start_date_cols = pd.DataFrame( start_dates.to_list(), columns=[ @@ -377,7 +380,7 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): ] ] = ( sub_event[f"{administrative_area_col}_Tmp"] - .progress_apply( + .parallel_apply( lambda x: ( ( [i[0] for i in x], @@ -388,14 +391,14 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): else ([], [], []) ) ) - .progress_apply(pd.Series) + .parallel_apply(pd.Series) ) sub_event.drop(columns=[f"{administrative_area_col}_Tmp"], inplace=True) logger.info(f"Getting GID from GADM for Administrative Areas in {level} {col}") sub_event[f"{administrative_area_col}_GID"] = sub_event[ f"{administrative_area_col}_Norm" - ].progress_apply( + ].parallel_apply( lambda admin_areas: ( [ ( @@ -433,7 +436,7 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): sub_event[f"{administrative_area_col}_GID"] = sub_event[ f"{administrative_area_col}_Norm" - ].progress_apply(lambda area: norm_loc.get_gadm_gid(country=area) if area else []) + ].parallel_apply(lambda area: norm_loc.get_gadm_gid(country=area) if area else []) if location_col in sub_event.columns: logger.info(f"Normalizing location names for {level} {col}") sub_event[f"{location_col}_Tmp"] = sub_event.progress_apply( @@ -459,7 +462,7 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): ] ] = ( sub_event[f"{location_col}_Tmp"] - .progress_apply( + .parallel_apply( lambda x: ( ( [i[0] for i in x], @@ -470,13 +473,13 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): else ([], [], []) ) ) - .progress_apply(pd.Series) + .parallel_apply(pd.Series) ) sub_event.drop(columns=[f"{location_col}_Tmp"], inplace=True) logger.info(f"Getting GID from GADM for locations in {level} {col}") - sub_event[f"{location_col}_GID"] = sub_event.progress_apply( + sub_event[f"{location_col}_GID"] = sub_event.parallel_apply( lambda row: ( [ ( @@ -496,7 +499,7 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): rows_before = sub_event.shape[0] null_mask = ( sub_event[[x for x in sub_event.columns if x != "Event_ID"]] - .apply(lambda row: [True if v in (None, [], float("nan")) else False for _, v in row.items()]) + .parallel_apply(lambda row: [True if v in (None, [], float("nan")) else False for _, v in row.items()]) .all(axis=1) ) sub_event = sub_event[~null_mask] From 1176a0bc6bb70e0c8e7bf237aeb09754da41a3a8 Mon Sep 17 00:00:00 2001 From: Shorouq Date: Wed, 23 Oct 2024 15:11:56 +0200 Subject: [PATCH 11/16] =?UTF-8?q?=F0=9F=9A=91=EF=B8=8F=20Fix=20bad=20tab?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Database/parse_events.py | 310 +++++++++++++++++++-------------------- 1 file changed, 153 insertions(+), 157 deletions(-) diff --git a/Database/parse_events.py b/Database/parse_events.py index 16964e7fd..65aec3ee7 100644 --- a/Database/parse_events.py +++ b/Database/parse_events.py @@ -332,54 +332,134 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): else (False if value and not isinstance(value, bool) and re.match(_no, value) else value) ) ) - logger.info(f"Normalizing dates for subevet {col}") - start_date_col, end_date_col = [c for c in sub_event.columns if col.startswith("Start_Date")], [ - c for c in sub_event.columns if col.startswith("End_Date") - ] - assert len(start_date_col) == len(end_date_col), "Check the start and end date columns" - assert len(start_date_col) <= 1, "Check the start and end date columns, there might be too many" - - if start_date_col and end_date_col: - logger.info(f"Normalizing start and end date in columns {start_date_col} and {end_date_col}") - start_date_col, end_date_col = start_date_col[0], end_date_col[0] - start_dates = sub_event[start_date_col].parallel_apply(utils.normalize_date) - end_dates = sub_event[end_date_col].parallel_apply(utils.normalize_date) - start_date_cols = pd.DataFrame( - start_dates.to_list(), - columns=[ - f"{start_date_col}_Day", - f"{start_date_col}_Month", - f"{start_date_col}_Year", - ], + logger.info(f"Normalizing dates for subevet {col}") + start_date_col, end_date_col = [c for c in sub_event.columns if col.startswith("Start_Date")], [ + c for c in sub_event.columns if col.startswith("End_Date") + ] + assert len(start_date_col) == len(end_date_col), "Check the start and end date columns" + assert len(start_date_col) <= 1, "Check the start and end date columns, there might be too many" + + if start_date_col and end_date_col: + logger.info(f"Normalizing start and end date in columns {start_date_col} and {end_date_col}") + start_date_col, end_date_col = start_date_col[0], end_date_col[0] + start_dates = sub_event[start_date_col].parallel_apply(utils.normalize_date) + end_dates = sub_event[end_date_col].parallel_apply(utils.normalize_date) + start_date_cols = pd.DataFrame( + start_dates.to_list(), + columns=[ + f"{start_date_col}_Day", + f"{start_date_col}_Month", + f"{start_date_col}_Year", + ], + ) + end_date_cols = pd.DataFrame( + end_dates.to_list(), + columns=[ + f"{end_date_col}_Day", + f"{end_date_col}_Month", + f"{end_date_col}_Year", + ], + ) + sub_event.reset_index(inplace=True, drop=True) + sub_event = pd.concat([sub_event, start_date_cols, end_date_cols], axis=1) + + if level == "l2" and administrative_area_col in sub_event.columns: + logger.info(f"Normalizing administrative area names for {level} {col}") + sub_event[f"{administrative_area_col}_Tmp"] = sub_event[administrative_area_col].progress_apply( + lambda admin_areas: ( + [norm_loc.normalize_locations(c, is_country=True) for c in admin_areas] + if isinstance(admin_areas, list) + else [] ) - end_date_cols = pd.DataFrame( - end_dates.to_list(), - columns=[ - f"{end_date_col}_Day", - f"{end_date_col}_Month", - f"{end_date_col}_Year", - ], + ) + sub_event[ + [ + f"{administrative_area_col}_Norm", + f"{administrative_area_col}_Type", + f"{administrative_area_col}_GeoJson", + ] + ] = ( + sub_event[f"{administrative_area_col}_Tmp"] + .parallel_apply( + lambda x: ( + ( + [i[0] for i in x], + [i[1] for i in x], + [i[2] for i in x], + ) + if isinstance(x, list) + else ([], [], []) + ) ) - sub_event.reset_index(inplace=True, drop=True) - sub_event = pd.concat([sub_event, start_date_cols, end_date_cols], axis=1) - - if level == "l2" and administrative_area_col in sub_event.columns: - logger.info(f"Normalizing administrative area names for {level} {col}") - sub_event[f"{administrative_area_col}_Tmp"] = sub_event[administrative_area_col].progress_apply( - lambda admin_areas: ( - [norm_loc.normalize_locations(c, is_country=True) for c in admin_areas] - if isinstance(admin_areas, list) - else [] + .parallel_apply(pd.Series) + ) + + sub_event.drop(columns=[f"{administrative_area_col}_Tmp"], inplace=True) + logger.info(f"Getting GID from GADM for Administrative Areas in {level} {col}") + sub_event[f"{administrative_area_col}_GID"] = sub_event[f"{administrative_area_col}_Norm"].parallel_apply( + lambda admin_areas: ( + [ + ( + norm_loc.get_gadm_gid(country=area) + if norm_loc.get_gadm_gid(country=area) + else norm_loc.get_gadm_gid(area=area) + ) + for area in admin_areas + if area + ] + if isinstance(admin_areas, list) + else [[] for _ in admin_areas] + ), + ) + + elif level == "l3" and administrative_area_col in sub_event.columns: + sub_event[ + [ + f"{administrative_area_col}_Norm", + f"{administrative_area_col}_Type", + f"{administrative_area_col}_GeoJson", + ] + ] = ( + sub_event[administrative_area_col] + .progress_apply( + lambda admin_area: ( + norm_loc.normalize_locations(admin_area, is_country=True) + if isinstance(admin_area, str) + else (None, None, None) ) ) + .progress_apply(pd.Series) + ) + logger.info(f"Getting GID from GADM for Administrative Areas in subevent {col}") + + sub_event[f"{administrative_area_col}_GID"] = sub_event[f"{administrative_area_col}_Norm"].parallel_apply( + lambda area: norm_loc.get_gadm_gid(country=area) if area else [] + ) + if location_col in sub_event.columns: + logger.info(f"Normalizing location names for {level} {col}") + sub_event[f"{location_col}_Tmp"] = sub_event.progress_apply( + lambda row: ( + [ + norm_loc.normalize_locations( + area=row[location_col][i], + in_country=row[f"{administrative_area_col}_Norm"], + ) + for i in range(len(row[location_col])) + ] + if isinstance(row[location_col], list) + else [] + ), + axis=1, + ) + sub_event[ [ - f"{administrative_area_col}_Norm", - f"{administrative_area_col}_Type", - f"{administrative_area_col}_GeoJson", + f"{location_col}_Norm", + f"{location_col}_Type", + f"{location_col}_GeoJson", ] ] = ( - sub_event[f"{administrative_area_col}_Tmp"] + sub_event[f"{location_col}_Tmp"] .parallel_apply( lambda x: ( ( @@ -394,129 +474,45 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): .parallel_apply(pd.Series) ) - sub_event.drop(columns=[f"{administrative_area_col}_Tmp"], inplace=True) - logger.info(f"Getting GID from GADM for Administrative Areas in {level} {col}") - sub_event[f"{administrative_area_col}_GID"] = sub_event[ - f"{administrative_area_col}_Norm" - ].parallel_apply( - lambda admin_areas: ( + sub_event.drop(columns=[f"{location_col}_Tmp"], inplace=True) + logger.info(f"Getting GID from GADM for locations in {level} {col}") + + sub_event[f"{location_col}_GID"] = sub_event.parallel_apply( + lambda row: ( [ ( - norm_loc.get_gadm_gid(country=area) - if norm_loc.get_gadm_gid(country=area) - else norm_loc.get_gadm_gid(area=area) + norm_loc.get_gadm_gid(area=row[f"{location_col}_Norm"][i]) + if norm_loc.get_gadm_gid(area=row[f"{location_col}_Norm"][i]) + else norm_loc.get_gadm_gid(country=row[f"{location_col}_Norm"][i]) ) - for area in admin_areas - if area + if row[f"{location_col}_Norm"][i] + else [] + for i in range(len(row[f"{location_col}_Norm"])) ] - if isinstance(admin_areas, list) - else [[] for _ in admin_areas] ), + axis=1, ) - - elif level == "l3" and administrative_area_col in sub_event.columns: - sub_event[ - [ - f"{administrative_area_col}_Norm", - f"{administrative_area_col}_Type", - f"{administrative_area_col}_GeoJson", - ] - ] = ( - sub_event[administrative_area_col] - .progress_apply( - lambda admin_area: ( - norm_loc.normalize_locations(admin_area, is_country=True) - if isinstance(admin_area, str) - else (None, None, None) - ) - ) - .progress_apply(pd.Series) - ) - logger.info(f"Getting GID from GADM for Administrative Areas in subevent {col}") - - sub_event[f"{administrative_area_col}_GID"] = sub_event[ - f"{administrative_area_col}_Norm" - ].parallel_apply(lambda area: norm_loc.get_gadm_gid(country=area) if area else []) - if location_col in sub_event.columns: - logger.info(f"Normalizing location names for {level} {col}") - sub_event[f"{location_col}_Tmp"] = sub_event.progress_apply( - lambda row: ( - [ - norm_loc.normalize_locations( - area=row[location_col][i], - in_country=row[f"{administrative_area_col}_Norm"], - ) - for i in range(len(row[location_col])) - ] - if isinstance(row[location_col], list) - else [] - ), - axis=1, - ) - - sub_event[ - [ - f"{location_col}_Norm", - f"{location_col}_Type", - f"{location_col}_GeoJson", - ] - ] = ( - sub_event[f"{location_col}_Tmp"] - .parallel_apply( - lambda x: ( - ( - [i[0] for i in x], - [i[1] for i in x], - [i[2] for i in x], - ) - if isinstance(x, list) - else ([], [], []) - ) - ) - .parallel_apply(pd.Series) - ) - - sub_event.drop(columns=[f"{location_col}_Tmp"], inplace=True) - logger.info(f"Getting GID from GADM for locations in {level} {col}") - - sub_event[f"{location_col}_GID"] = sub_event.parallel_apply( - lambda row: ( - [ - ( - norm_loc.get_gadm_gid(area=row[f"{location_col}_Norm"][i]) - if norm_loc.get_gadm_gid(area=row[f"{location_col}_Norm"][i]) - else norm_loc.get_gadm_gid(country=row[f"{location_col}_Norm"][i]) - ) - if row[f"{location_col}_Norm"][i] - else [] - for i in range(len(row[f"{location_col}_Norm"])) - ] - ), - axis=1, - ) - - logger.info(f"Dropping empty rows in {col}") - rows_before = sub_event.shape[0] - null_mask = ( - sub_event[[x for x in sub_event.columns if x != "Event_ID"]] - .parallel_apply(lambda row: [True if v in (None, [], float("nan")) else False for _, v in row.items()]) - .all(axis=1) - ) - sub_event = sub_event[~null_mask] - rows_after = sub_event.shape[0] - logger.info(f"Dropped {rows_before-rows_after} row(s) in {col}") - del rows_before, rows_after - - logger.info(f"Storing parsed results for subevent {col}") - for c in sub_event.columns: - sub_event[c] = sub_event[c].astype(str) - if target_columns: - sub_event = sub_event[[x for x in target_columns if x in sub_event.columns]] - utils.df_to_parquet( - sub_event, - target_dir=f"{args.output_dir}/{level}/{col}", - chunk_size=200, - ) + logger.info(f"Dropping empty rows in {col}") + rows_before = sub_event.shape[0] + null_mask = ( + sub_event[[x for x in sub_event.columns if x != "Event_ID"]] + .parallel_apply(lambda row: [True if v in (None, [], float("nan")) else False for _, v in row.items()]) + .all(axis=1) + ) + sub_event = sub_event[~null_mask] + rows_after = sub_event.shape[0] + logger.info(f"Dropped {rows_before-rows_after} row(s) in {col}") + del rows_before, rows_after + logger.info(f"Storing parsed results for subevent {col}") + for c in sub_event.columns: + sub_event[c] = sub_event[c].astype(str) + if target_columns: + sub_event = sub_event[[x for x in target_columns if x in sub_event.columns]] + utils.df_to_parquet( + sub_event, + target_dir=f"{args.output_dir}/{level}/{col}", + chunk_size=200, + ) def get_target_cols() -> tuple[list]: From 49d269930639e198d195a77a75f1c31f5c544b65 Mon Sep 17 00:00:00 2001 From: Shorouq Date: Wed, 23 Oct 2024 15:31:34 +0200 Subject: [PATCH 12/16] =?UTF-8?q?=E2=9A=A1=EF=B8=8F=20Use=2010=20workers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Database/parse_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Database/parse_events.py b/Database/parse_events.py index 65aec3ee7..57977dfda 100644 --- a/Database/parse_events.py +++ b/Database/parse_events.py @@ -13,7 +13,7 @@ tqdm.pandas() -pandarallel.initialize(progress_bar=True, nb_workers=5) +pandarallel.initialize(progress_bar=True, nb_workers=10) def infer_countries( From c67b7d6dba984059e9569bcbf12e623d4ecefab6 Mon Sep 17 00:00:00 2001 From: Shorouq Date: Wed, 23 Oct 2024 15:38:21 +0200 Subject: [PATCH 13/16] =?UTF-8?q?=E2=9A=A1=EF=B8=8F=20Avoid=20parallel=5Fa?= =?UTF-8?q?pply=20when=20converting=20DF=20to=20Series?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Database/parse_events.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Database/parse_events.py b/Database/parse_events.py index 57977dfda..0bb744f47 100644 --- a/Database/parse_events.py +++ b/Database/parse_events.py @@ -95,7 +95,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): events[[f"{i}_Min", f"{i}_Max", f"{i}_Approx"]] = ( events[i] .parallel_apply(lambda x: (norm_num.extract_numbers(x) if isinstance(x, str) else (None, None, None))) - .parallel_apply(pd.Series) + .apply(pd.Series) ) split_by_pipe_cols = ["Hazards"] @@ -140,7 +140,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): else ([], [], []) ) ) - .parallel_apply(pd.Series) + .apply(pd.Series) ) events.drop(columns=[f"{admin_area_col}_Tmp"], inplace=True) @@ -196,7 +196,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): else ([], [], []) ) ) - .parallel_apply(pd.Series) + .apply(pd.Series) ) events.drop(columns=[f"{admin_area_col}_GID_0_Tmp"], inplace=True) @@ -283,7 +283,7 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): # drop any events that have no subevents (aka [] exploded into NaN) sub_event.dropna(how="all", inplace=True) - sub_event = pd.concat([sub_event.Event_ID, sub_event[col].parallel_apply(pd.Series)], axis=1) + sub_event = pd.concat([sub_event.Event_ID, sub_event[col].apply(pd.Series)], axis=1) logger.info( f"Dropping any columns with non-str column names due to None types in the dicts {[c for c in sub_event.columns if not isinstance(c, str)]}" @@ -315,7 +315,7 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): .parallel_apply( lambda x: (norm_num.extract_numbers(str(x)) if x is not None else (None, None, None)) ) - .parallel_apply(pd.Series) + .apply(pd.Series) ) logger.info(f"Normalizing nulls for {level} {col}") sub_event = utils.replace_nulls(sub_event) @@ -391,7 +391,7 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): else ([], [], []) ) ) - .parallel_apply(pd.Series) + .apply(pd.Series) ) sub_event.drop(columns=[f"{administrative_area_col}_Tmp"], inplace=True) @@ -471,7 +471,7 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): else ([], [], []) ) ) - .parallel_apply(pd.Series) + .apply(pd.Series) ) sub_event.drop(columns=[f"{location_col}_Tmp"], inplace=True) From 6febeb57df883af605a5d97705a9be60ebd399a0 Mon Sep 17 00:00:00 2001 From: Shorouq Date: Wed, 23 Oct 2024 16:24:29 +0200 Subject: [PATCH 14/16] =?UTF-8?q?=E2=9A=A1Initialize=20pandarallel=20with?= =?UTF-8?q?=204=20workers=20+=20verbose=20settings?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Database/parse_events.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/Database/parse_events.py b/Database/parse_events.py index 0bb744f47..a3202f62b 100644 --- a/Database/parse_events.py +++ b/Database/parse_events.py @@ -13,7 +13,7 @@ tqdm.pandas() -pandarallel.initialize(progress_bar=True, nb_workers=10) +pandarallel.initialize(progress_bar=True, nb_workers=4, verbose=2, use_memory_fs=None) def infer_countries( @@ -313,7 +313,11 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): sub_event[[f"{i}_Min", f"{i}_Max", f"{i}_Approx"]] = ( sub_event[i] .parallel_apply( - lambda x: (norm_num.extract_numbers(str(x)) if x is not None else (None, None, None)) + lambda x: ( + print(str(x)) or norm_num.extract_numbers(str(x)) + if x is not None + else (None, None, None) + ) ) .apply(pd.Series) ) From 4e578cafe0a05d9d4a35c99a134c713805dd8152 Mon Sep 17 00:00:00 2001 From: Shorouq Date: Wed, 23 Oct 2024 16:25:36 +0200 Subject: [PATCH 15/16] =?UTF-8?q?=F0=9F=94=A5=20Remove=20debug=20print?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Database/parse_events.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/Database/parse_events.py b/Database/parse_events.py index a3202f62b..bee4d844d 100644 --- a/Database/parse_events.py +++ b/Database/parse_events.py @@ -313,11 +313,7 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): sub_event[[f"{i}_Min", f"{i}_Max", f"{i}_Approx"]] = ( sub_event[i] .parallel_apply( - lambda x: ( - print(str(x)) or norm_num.extract_numbers(str(x)) - if x is not None - else (None, None, None) - ) + lambda x: (norm_num.extract_numbers(str(x)) if x is not None else (None, None, None)) ) .apply(pd.Series) ) From 13cb95e5836c43b3005f9018b41d817510cfe819 Mon Sep 17 00:00:00 2001 From: Shorouq Date: Wed, 23 Oct 2024 17:14:42 +0200 Subject: [PATCH 16/16] =?UTF-8?q?=F0=9F=97=91=EF=B8=8F=20=20Deprecate=20pa?= =?UTF-8?q?ndarallel=20for=20now=20due=20to=20hanging=20in=20l3=20(known?= =?UTF-8?q?=20error=20in=20pandarallel)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Database/parse_events.py | 49 +++++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/Database/parse_events.py b/Database/parse_events.py index bee4d844d..f69d750d8 100644 --- a/Database/parse_events.py +++ b/Database/parse_events.py @@ -3,7 +3,6 @@ import re import pandas as pd -from pandarallel import pandarallel from tqdm import tqdm from Database.scr.log_utils import Logging @@ -13,8 +12,6 @@ tqdm.pandas() -pandarallel.initialize(progress_bar=True, nb_workers=4, verbose=2, use_memory_fs=None) - def infer_countries( row: dict, @@ -48,7 +45,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): logger.info("Unpacking Total_Summary_* columns") total_summary_cols = [col for col in df.columns if col.startswith("Total_Summary_")] for i in total_summary_cols: - df[i] = df[i].parallel_apply(utils.eval) + df[i] = df[i].progress_apply(utils.eval) events = utils.unpack_col(df, columns=total_summary_cols) logger.info(f"Total summary columns: {total_summary_cols}") del df @@ -57,7 +54,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): logger.info("STEP: Normalizing start and end dates if present") for d_col in ["Start_Date", "End_Date"]: logger.info(f"Normalizing date column: {d_col}") - dates = events[d_col].parallel_apply(utils.normalize_date) + dates = events[d_col].progress_apply(utils.normalize_date) date_cols = pd.DataFrame( dates.to_list(), columns=[f"{d_col}_Day", f"{d_col}_Month", f"{d_col}_Year"], @@ -71,7 +68,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): ) for inflation_adjusted_col in [col for col in events.columns if col.endswith("_Adjusted")]: logger.info(f"Normalizing boolean column {inflation_adjusted_col}") - events[inflation_adjusted_col] = events[inflation_adjusted_col].parallel_apply( + events[inflation_adjusted_col] = events[inflation_adjusted_col].progress_apply( lambda value: ( True if value and not isinstance(value, bool) and re.match(_yes, value) @@ -94,14 +91,14 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): logger.info(f"Normalizing ranges in {i}") events[[f"{i}_Min", f"{i}_Max", f"{i}_Approx"]] = ( events[i] - .parallel_apply(lambda x: (norm_num.extract_numbers(x) if isinstance(x, str) else (None, None, None))) + .progress_apply(lambda x: (norm_num.extract_numbers(x) if isinstance(x, str) else (None, None, None))) .apply(pd.Series) ) split_by_pipe_cols = ["Hazards"] for str_col in [x for x in events.columns if x in split_by_pipe_cols]: logger.info(f"Splitting column {str_col} by pipe") - events[str_col] = events[str_col].parallel_apply( + events[str_col] = events[str_col].progress_apply( lambda x: (x.split("|") if isinstance(x, str) else (x if isinstance(x, str) else None)) ) @@ -109,7 +106,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): if "Administrative_Areas" in events.columns: logger.info(f"Ensuring that all admin area data in Administrative_Areas is of type ") - events["Administrative_Areas"] = events["Administrative_Areas"].parallel_apply( + events["Administrative_Areas"] = events["Administrative_Areas"].progress_apply( lambda x: utils.eval(x) if x is not None else [] ) @@ -129,7 +126,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): ] ] = ( events[f"{admin_area_col}_Tmp"] - .parallel_apply( + .progress_apply( lambda x: ( ( [i[0] for i in x], @@ -145,7 +142,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): events.drop(columns=[f"{admin_area_col}_Tmp"], inplace=True) logger.info("Getting GID from GADM for Administrative Areas") - events[f"{admin_area_col}_GID"] = events[f"{admin_area_col}_Norm"].parallel_apply( + events[f"{admin_area_col}_GID"] = events[f"{admin_area_col}_Norm"].progress_apply( lambda admin_areas: ( [ ( @@ -163,7 +160,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): logger.info(f"""STEP: Infer country from list of locations""") - events[f"{admin_area_col}_GID_0_Tmp"] = events.parallel_apply( + events[f"{admin_area_col}_GID_0_Tmp"] = events.progress_apply( lambda x: infer_countries(x, admin_area_col=admin_area_col), axis=1 ) @@ -185,7 +182,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): ] ] = ( events[f"{admin_area_col}_GID_0_Tmp"] - .parallel_apply( + .progress_apply( lambda x: ( ( [i[0] for i in x], @@ -201,7 +198,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): events.drop(columns=[f"{admin_area_col}_GID_0_Tmp"], inplace=True) logger.info("Getting GID from GADM for Administrative Areas after purging areas above GID_0 level...") - events[f"{admin_area_col}_GID"] = events[f"{admin_area_col}_Norm"].parallel_apply( + events[f"{admin_area_col}_GID"] = events[f"{admin_area_col}_Norm"].progress_apply( lambda admin_areas: ( [ ( @@ -224,7 +221,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): event_name_col = [x for x in events.columns if "Event_Name" in x] if len(event_name_col) == 1: event_name_col = event_name_col[0] - events["Event_Names"] = events[event_name_col].parallel_apply( + events["Event_Names"] = events[event_name_col].progress_apply( lambda x: ([x.strip()] if isinstance(x, str) else ([y.strip() for y in x]) if isinstance(x, list) else None) ) logger.info("Converting annotation columns to strings to store in sqlite3") @@ -276,7 +273,7 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): for col in specific_summary_cols: # evaluate string bytes to python datatype (hopefully dict, str, or list) - df[col] = df[col].parallel_apply(utils.eval) + df[col] = df[col].progress_apply(utils.eval) # unpack subevents sub_event = df[["Event_ID", col]].explode(col) @@ -312,7 +309,7 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): for i in specific_total_cols: sub_event[[f"{i}_Min", f"{i}_Max", f"{i}_Approx"]] = ( sub_event[i] - .parallel_apply( + .progress_apply( lambda x: (norm_num.extract_numbers(str(x)) if x is not None else (None, None, None)) ) .apply(pd.Series) @@ -325,7 +322,7 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): ) for inflation_adjusted_col in [col for col in sub_event.columns if col.endswith("_Adjusted")]: logger.info(f"Normalizing boolean column {inflation_adjusted_col} for {level} {col}") - sub_event[inflation_adjusted_col] = sub_event[inflation_adjusted_col].parallel_apply( + sub_event[inflation_adjusted_col] = sub_event[inflation_adjusted_col].progress_apply( lambda value: ( True if value and not isinstance(value, bool) and re.match(_yes, value) @@ -342,8 +339,8 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): if start_date_col and end_date_col: logger.info(f"Normalizing start and end date in columns {start_date_col} and {end_date_col}") start_date_col, end_date_col = start_date_col[0], end_date_col[0] - start_dates = sub_event[start_date_col].parallel_apply(utils.normalize_date) - end_dates = sub_event[end_date_col].parallel_apply(utils.normalize_date) + start_dates = sub_event[start_date_col].progress_apply(utils.normalize_date) + end_dates = sub_event[end_date_col].progress_apply(utils.normalize_date) start_date_cols = pd.DataFrame( start_dates.to_list(), columns=[ @@ -380,7 +377,7 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): ] ] = ( sub_event[f"{administrative_area_col}_Tmp"] - .parallel_apply( + .progress_apply( lambda x: ( ( [i[0] for i in x], @@ -396,7 +393,7 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): sub_event.drop(columns=[f"{administrative_area_col}_Tmp"], inplace=True) logger.info(f"Getting GID from GADM for Administrative Areas in {level} {col}") - sub_event[f"{administrative_area_col}_GID"] = sub_event[f"{administrative_area_col}_Norm"].parallel_apply( + sub_event[f"{administrative_area_col}_GID"] = sub_event[f"{administrative_area_col}_Norm"].progress_apply( lambda admin_areas: ( [ ( @@ -432,7 +429,7 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): ) logger.info(f"Getting GID from GADM for Administrative Areas in subevent {col}") - sub_event[f"{administrative_area_col}_GID"] = sub_event[f"{administrative_area_col}_Norm"].parallel_apply( + sub_event[f"{administrative_area_col}_GID"] = sub_event[f"{administrative_area_col}_Norm"].progress_apply( lambda area: norm_loc.get_gadm_gid(country=area) if area else [] ) if location_col in sub_event.columns: @@ -460,7 +457,7 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): ] ] = ( sub_event[f"{location_col}_Tmp"] - .parallel_apply( + .progress_apply( lambda x: ( ( [i[0] for i in x], @@ -477,7 +474,7 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): sub_event.drop(columns=[f"{location_col}_Tmp"], inplace=True) logger.info(f"Getting GID from GADM for locations in {level} {col}") - sub_event[f"{location_col}_GID"] = sub_event.parallel_apply( + sub_event[f"{location_col}_GID"] = sub_event.progress_apply( lambda row: ( [ ( @@ -496,7 +493,7 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): rows_before = sub_event.shape[0] null_mask = ( sub_event[[x for x in sub_event.columns if x != "Event_ID"]] - .parallel_apply(lambda row: [True if v in (None, [], float("nan")) else False for _, v in row.items()]) + .progress_apply(lambda row: [True if v in (None, [], float("nan")) else False for _, v in row.items()]) .all(axis=1) ) sub_event = sub_event[~null_mask]