diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index 724e92da..76b091fa 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -2,7 +2,7 @@ import logging import duckdb from .package import Package -import resource +import pandas as pd logger = logging.getLogger(__name__) @@ -71,35 +71,44 @@ def create_temp_table(self, input_paths): # max_limit = 200000000 # Maximum allowable line size to attempt # increment = False - while True: - try: - self.conn.execute("DROP TABLE IF EXISTS temp_table") - query = f""" - CREATE TEMPORARY TABLE temp_table AS - SELECT * - FROM read_csv( - [{input_paths_str}], - columns = {self.schema}, - header = true, - force_not_null = {[field for field in self.schema.keys()]}, - max_line_size={max_size} - ) - """ - self.conn.execute(query) - break - except duckdb.Error as e: # Catch specific DuckDB error - if "Value with unterminated quote" in str(e): - hard_limit = int(resource.getrlimit(resource.RLIMIT_AS)[1]) - if max_size < hard_limit / 3: - logging.info( - f"Initial max_size did not work, setting it to {hard_limit / 2}" - ) - max_size = hard_limit / 2 - else: - raise - else: - logging.info(f"Failed to read in when max_size = {max_size}") - raise + try: + self.conn.execute("DROP TABLE IF EXISTS temp_table") + query = f""" + CREATE TEMPORARY TABLE temp_table AS + SELECT * + FROM read_csv( + [{input_paths_str}], + columns = {self.schema}, + header = true, + force_not_null = {[field for field in self.schema.keys()]}, + max_line_size={max_size} + ) + """ + self.conn.execute(query) + except duckdb.Error as e: # Catch specific DuckDB error when running border collection + if "Value with unterminated quote" in str(e): + dataframes = [] + for file_path in input_paths_str: + df = pd.read_csv(file_path, lineterminator="\n") + dataframes.append(df) + combined_df = pd.concat(dataframes, ignore_index=True) + + # Register the combined DataFrame + self.conn.register("temp_table", combined_df) + + del combined_df + + # hard_limit = resource.getrlimit(resource.RLIMIT_AS)[1] + # if max_size < hard_limit / 3: + # logging.info( + # f"Initial max_size did not work, setting it to {hard_limit / 2}" + # ) + # max_size = hard_limit / 2 + # else: + # raise + else: + logging.info(f"Failed to read in when max_size = {max_size}") + raise def load_facts(self): logging.info("loading facts from temp table")