Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Use parquet to speed up dataset building." #303

Merged
merged 1 commit into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ demodata/
*.gfs
.venv
.direnv
/var/cache
var/cache
/collection
/specification
ssl.pem
/var
var
.junitxml
pyrightconfig.json
.idea
Expand Down
6 changes: 0 additions & 6 deletions digital_land/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ def convert_cmd(input_path, output_path):
@column_field_dir
@dataset_resource_dir
@issue_dir
@click.option("--cache-dir", type=click.Path(), default="var/cache/parquet")
@click.option("--resource-path", type=click.Path(), default="collection/resource.csv")
@click.argument("input-paths", nargs=-1, type=click.Path(exists=True))
@click.pass_context
def dataset_create_cmd(
Expand All @@ -152,8 +150,6 @@ def dataset_create_cmd(
column_field_dir,
dataset_resource_dir,
issue_dir,
cache_dir,
resource_path,
):
return dataset_create(
input_paths=input_paths,
Expand All @@ -165,8 +161,6 @@ def dataset_create_cmd(
column_field_dir=column_field_dir,
dataset_resource_dir=dataset_resource_dir,
issue_dir=issue_dir,
cache_dir=cache_dir,
resource_path=resource_path,
)


Expand Down
41 changes: 4 additions & 37 deletions digital_land/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import geojson
import shapely

import subprocess

from digital_land.package.organisation import OrganisationPackage
from digital_land.check import duplicate_reference_check
from digital_land.specification import Specification
Expand All @@ -28,7 +26,6 @@
)
from digital_land.organisation import Organisation
from digital_land.package.dataset import DatasetPackage
from digital_land.package.datasetparquet import DatasetParquetPackage
from digital_land.phase.combine import FactCombinePhase
from digital_land.phase.concat import ConcatFieldPhase
from digital_land.phase.convert import ConvertPhase, execute
Expand Down Expand Up @@ -360,11 +357,7 @@ def dataset_create(
issue_dir="issue",
column_field_dir="var/column-field",
dataset_resource_dir="var/dataset-resource",
cache_dir="var/cache/parquet",
resource_path="collection/resource.csv",
):
cache_dir = os.path.join(cache_dir, dataset)

if not output_path:
print("missing output path", file=sys.stderr)
sys.exit(2)
Expand All @@ -384,8 +377,10 @@ def dataset_create(
package.create()
for path in input_paths:
path_obj = Path(path)
package.load_transformed(path)
package.load_column_fields(column_field_dir / dataset / path_obj.name)
package.load_dataset_resource(dataset_resource_dir / dataset / path_obj.name)
package.load_entities()

old_entity_path = os.path.join(pipeline.path, "old-entity.csv")
if os.path.exists(old_entity_path):
Expand All @@ -400,29 +395,9 @@ def dataset_create(

package.add_counts()

# Repeat for parquet
# Set up cache directory to store parquet files. The sqlite files created from this will be saved in the dataset
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)

pqpackage = DatasetParquetPackage(
dataset,
organisation=organisation,
path=output_path,
cache_dir=cache_dir,
resource_path=resource_path,
specification_dir=None, # TBD: package should use this specification object
)
pqpackage.create_temp_table(input_paths)
pqpackage.load_facts()
pqpackage.load_fact_resource()
pqpackage.load_entities()
pqpackage.pq_to_sqlite()
pqpackage.close_conn()


def dataset_dump(input_path, output_path):
cmd = f"sqlite3 -header -csv {input_path} 'select * from entity order by entity;' > {output_path}"
cmd = f"sqlite3 -header -csv {input_path} 'select * from entity;' > {output_path}"
logging.info(cmd)
os.system(cmd)

Expand All @@ -434,7 +409,7 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset):
elif isinstance(csv_path, Path):
dataset_name = csv_path.stem
else:
logging.error(f"Can't extract datapackage name from {csv_path}")
logging.error(f"Can't extract datapackage name from {csv_path}")
sys.exit(-1)

flattened_csv_path = os.path.join(flattened_dir, f"{dataset_name}.csv")
Expand Down Expand Up @@ -481,7 +456,6 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset):
batch_size = 100000
temp_geojson_files = []
geography_entities = [e for e in entities if e["typology"] == "geography"]

for i in range(0, len(geography_entities), batch_size):
batch = geography_entities[i : i + batch_size]
feature_collection = process_data_in_batches(batch, flattened_dir, dataset_name)
Expand All @@ -496,13 +470,6 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset):

if all(os.path.isfile(path) for path in temp_geojson_files):
rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson")
env = os.environ.copy()

out, _ = subprocess.Popen(
["ogr2ogr", "--version"],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
).communicate()
env = (
dict(os.environ, OGR_GEOJSON_MAX_OBJ_SIZE="0")
if get_gdal_version() >= Version("3.5.2")
Expand Down
Loading
Loading