Skip to content

Commit

Permalink
Use parquet to speed up dataset building. (#290)
Browse files Browse the repository at this point in the history
Use Parquet files to build datasets.
---------

Co-authored-by: alexglasertpx <alex.glaser@tpximpact.com>
Co-authored-by: alexiglaser <alexglaser1973@gmail.com>
  • Loading branch information
3 people authored Dec 3, 2024
1 parent 1be654b commit 93a97bd
Show file tree
Hide file tree
Showing 11 changed files with 1,848 additions and 6 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ demodata/
*.gfs
.venv
.direnv
var/cache
/var/cache
/collection
/specification
ssl.pem
var
/var
.junitxml
pyrightconfig.json
.idea
Expand Down
6 changes: 6 additions & 0 deletions digital_land/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ def convert_cmd(input_path, output_path):
@column_field_dir
@dataset_resource_dir
@issue_dir
@click.option("--cache-dir", type=click.Path(), default="var/cache/parquet")
@click.option("--resource-path", type=click.Path(), default="collection/resource.csv")
@click.argument("input-paths", nargs=-1, type=click.Path(exists=True))
@click.pass_context
def dataset_create_cmd(
Expand All @@ -150,6 +152,8 @@ def dataset_create_cmd(
column_field_dir,
dataset_resource_dir,
issue_dir,
cache_dir,
resource_path,
):
return dataset_create(
input_paths=input_paths,
Expand All @@ -161,6 +165,8 @@ def dataset_create_cmd(
column_field_dir=column_field_dir,
dataset_resource_dir=dataset_resource_dir,
issue_dir=issue_dir,
cache_dir=cache_dir,
resource_path=resource_path,
)


Expand Down
41 changes: 37 additions & 4 deletions digital_land/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import geojson
import shapely

import subprocess

from digital_land.package.organisation import OrganisationPackage
from digital_land.check import duplicate_reference_check
from digital_land.specification import Specification
Expand All @@ -26,6 +28,7 @@
)
from digital_land.organisation import Organisation
from digital_land.package.dataset import DatasetPackage
from digital_land.package.datasetparquet import DatasetParquetPackage
from digital_land.phase.combine import FactCombinePhase
from digital_land.phase.concat import ConcatFieldPhase
from digital_land.phase.convert import ConvertPhase, execute
Expand Down Expand Up @@ -357,7 +360,11 @@ def dataset_create(
issue_dir="issue",
column_field_dir="var/column-field",
dataset_resource_dir="var/dataset-resource",
cache_dir="var/cache/parquet",
resource_path="collection/resource.csv",
):
cache_dir = os.path.join(cache_dir, dataset)

if not output_path:
print("missing output path", file=sys.stderr)
sys.exit(2)
Expand All @@ -377,10 +384,8 @@ def dataset_create(
package.create()
for path in input_paths:
path_obj = Path(path)
package.load_transformed(path)
package.load_column_fields(column_field_dir / dataset / path_obj.name)
package.load_dataset_resource(dataset_resource_dir / dataset / path_obj.name)
package.load_entities()

old_entity_path = os.path.join(pipeline.path, "old-entity.csv")
if os.path.exists(old_entity_path):
Expand All @@ -395,9 +400,29 @@ def dataset_create(

package.add_counts()

# Repeat for parquet
# Set up cache directory to store parquet files. The sqlite files created from this will be saved in the dataset
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)

pqpackage = DatasetParquetPackage(
dataset,
organisation=organisation,
path=output_path,
cache_dir=cache_dir,
resource_path=resource_path,
specification_dir=None, # TBD: package should use this specification object
)
pqpackage.create_temp_table(input_paths)
pqpackage.load_facts()
pqpackage.load_fact_resource()
pqpackage.load_entities()
pqpackage.pq_to_sqlite()
pqpackage.close_conn()


def dataset_dump(input_path, output_path):
cmd = f"sqlite3 -header -csv {input_path} 'select * from entity;' > {output_path}"
cmd = f"sqlite3 -header -csv {input_path} 'select * from entity order by entity;' > {output_path}"
logging.info(cmd)
os.system(cmd)

Expand All @@ -409,7 +434,7 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset):
elif isinstance(csv_path, Path):
dataset_name = csv_path.stem
else:
logging.error(f"Can't extract datapackage name from {csv_path}")
logging.error(f"Can't extract datapackage name from {csv_path}")
sys.exit(-1)

flattened_csv_path = os.path.join(flattened_dir, f"{dataset_name}.csv")
Expand Down Expand Up @@ -456,6 +481,7 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset):
batch_size = 100000
temp_geojson_files = []
geography_entities = [e for e in entities if e["typology"] == "geography"]

for i in range(0, len(geography_entities), batch_size):
batch = geography_entities[i : i + batch_size]
feature_collection = process_data_in_batches(batch, flattened_dir, dataset_name)
Expand All @@ -470,6 +496,13 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset):

if all(os.path.isfile(path) for path in temp_geojson_files):
rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson")
env = os.environ.copy()

out, _ = subprocess.Popen(
["ogr2ogr", "--version"],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
).communicate()
env = (
dict(os.environ, OGR_GEOJSON_MAX_OBJ_SIZE="0")
if get_gdal_version() >= Version("3.5.2")
Expand Down
Loading

0 comments on commit 93a97bd

Please sign in to comment.