Skip to content

Commit

Permalink
data_eng/optimize_dataset_create (#147)
Browse files Browse the repository at this point in the history
* Modified versions of 2 classes aimed at decreasing the time it takes to process files.

* Modified the sequence of calls in dataset_create to support other modifications that form part of this branch.

* Added tests to increase coverage.

* Added tests to increase coverage (revision).

* Revision I - removed commented code.

* Revision II - amended create_cursor to optionally allow transactions, and batched the table inserts.

* Revision III - each primary function in dataset manages its own connection to the database.
New Transaction created after each commit of a batch insert.

* Revision IV - host of changes to address the latest comments.

* Revision V - insert_many of sqlite.py now commits after INSERTS. max_batch_size made optional in commands

* Revision VI - new e2e test to check that new param max-batch-size can be specified optionally

* Revision VIa - new e2e test to check that new param max-batch-size can be specified optionally
  • Loading branch information
morgan-sl authored Nov 9, 2023
1 parent 173b61c commit b6ad1e5
Show file tree
Hide file tree
Showing 7 changed files with 501 additions and 88 deletions.
6 changes: 5 additions & 1 deletion digital_land/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,21 @@ def convert_cmd(input_path, output_path):

@cli.command("dataset-create", short_help="create a dataset from processed resources")
@click.option("--output-path", type=click.Path(), default=None, help="sqlite3 path")
@click.option("--max-batch-size", type=click.INT, default=10000)
@click.argument("input-paths", nargs=-1, type=click.Path(exists=True))
@organisation_path
@click.pass_context
def dataset_create_cmd(ctx, input_paths, output_path, organisation_path):
def dataset_create_cmd(
ctx, input_paths, output_path, max_batch_size, organisation_path
):
return dataset_create(
input_paths=input_paths,
output_path=output_path,
organisation_path=organisation_path,
pipeline=ctx.obj["PIPELINE"],
dataset=ctx.obj["DATASET"],
specification=ctx.obj["SPECIFICATION"],
max_batch_size=max_batch_size,
)


Expand Down
4 changes: 4 additions & 0 deletions digital_land/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ def dataset_create(
dataset,
specification,
issue_dir="issue",
max_batch_size=10000,
):
if not output_path:
print("missing output path", file=sys.stderr)
Expand All @@ -246,11 +247,14 @@ def dataset_create(
dataset,
organisation=organisation,
path=output_path,
max_batch_size=max_batch_size,
specification_dir=None, # TBD: package should use this specification object
)
package.create()

for path in input_paths:
package.load_transformed(path)

package.load_entities()

old_entity_path = os.path.join(pipeline.path, "old-entity.csv")
Expand Down
94 changes: 70 additions & 24 deletions digital_land/package/dataset.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import csv
import json
import logging
import re
from decimal import Decimal
from pathlib import Path

import shapely.wkt

Expand Down Expand Up @@ -33,11 +33,12 @@


class DatasetPackage(SqlitePackage):
def __init__(self, dataset, organisation, **kwargs):
def __init__(self, dataset, organisation, max_batch_size=10000, **kwargs):
super().__init__(dataset, tables=tables, indexes=indexes, **kwargs)
self.dataset = dataset
self.entity_fields = self.specification.schema["entity"]["fields"]
self.organisations = organisation.organisation
self.max_batch_size = max_batch_size

def migrate_entity(self, row):
dataset = self.dataset
Expand Down Expand Up @@ -119,15 +120,22 @@ def entity_row(self, facts):
row[fact[1]] = fact[2]
return row

def insert_entity(self, facts):
def insert_entity(self, facts, insert_rows):
row = self.entity_row(facts)
row = self.migrate_entity(row)
if row:
self.insert("entity", self.entity_fields, row)
entity_fields = [
field for field in self.entity_fields if not field.endswith("-geom")
]
self.add_table_values(entity_fields, row, insert_rows)
if len(insert_rows) >= self.max_batch_size:
self.insert_many("entity", self.entity_fields, insert_rows)
insert_rows = []
self.create_cursor()

def load_old_entities(self, path):
"""load the old-entity table"""

self.connect(optimised=True)
fields = self.specification.schema["old-entity"]["fields"]
entity_min = self.specification.schema[self.dataset].get("entity-minimum")
entity_max = self.specification.schema[self.dataset].get("entity-maximum")
Expand All @@ -139,18 +147,24 @@ def load_old_entities(self, path):
entity_min = int(entity_min)
entity_max = int(entity_max)
logging.info(f"loading old-entity from {path}")
self.connect()

self.create_cursor()
insert_rows = []
for row in csv.DictReader(open(path, newline="")):
entity_id = int(row.get("old-entity"))
if entity_min <= entity_id <= entity_max:
self.insert("old-entity", fields, row)
self.commit()
self.add_table_values(fields, row, insert_rows)
if len(insert_rows) >= self.max_batch_size:
self.insert_many("old-entity", fields, insert_rows)
insert_rows = []
self.create_cursor()

self.insert_many("old-entity", fields, insert_rows)
self.disconnect()

def load_entities(self):
"""load the entity table from the fact table"""
self.connect()
self.connect(optimised=True)
self.create_cursor()
self.execute(
"select entity, field, value from fact"
Expand All @@ -160,24 +174,25 @@ def load_entities(self):
results = self.cursor.fetchall()

facts = []
insert_rows = []
for fact in results:
# If facts and fact does not point to same entity as first fact
if facts and fact[0] != facts[0][0]:
# Insert existing facts
self.insert_entity(facts)
self.insert_entity(facts, insert_rows)
# Reset facts list for new entity
facts = []
facts.append(fact)

if facts:
self.insert_entity(facts)
self.insert_entity(facts, insert_rows)

self.commit()
self.insert_many("entity", self.entity_fields, insert_rows)
self.disconnect()

def add_counts(self):
"""count the number of entities by resource"""
self.connect()
self.connect(optimised=True)
self.create_cursor()
self.execute(
"select resource, count(*)"
Expand All @@ -189,6 +204,7 @@ def add_counts(self):
" ) group by resource"
)
results = self.cursor.fetchall()

for result in results:
resource = result[0]
count = result[1]
Expand Down Expand Up @@ -236,56 +252,86 @@ def load_facts(self, path):
fact_update_fields = [
field for field in fact_fields if field not in fact_conflict_fields
]

insert_rows = []
for row in csv.DictReader(open(path, newline="")):
self.entry_date_upsert(
"fact", fact_fields, row, fact_conflict_fields, fact_update_fields
)
self.insert("fact-resource", fact_resource_fields, row, upsert=True)
self.add_table_values(fact_resource_fields, row, insert_rows)
if len(insert_rows) >= self.max_batch_size:
self.insert_many(
"fact-resource", fact_resource_fields, insert_rows, upsert=True
)
insert_rows = []
self.create_cursor()

self.insert_many(
"fact-resource", fact_resource_fields, insert_rows, upsert=True
)

def load_column_fields(self, path, resource):
fields = self.specification.schema["column-field"]["fields"]

logging.info(f"loading column_fields from {path}")

insert_rows = []
for row in csv.DictReader(open(path, newline="")):
row["resource"] = resource
row["dataset"] = self.dataset
self.insert("column-field", fields, row)
self.add_table_values(fields, row, insert_rows)
if len(insert_rows) >= self.max_batch_size:
self.insert_many("column-field", fields, insert_rows)
insert_rows = []
self.create_cursor()

self.insert_many("column-field", fields, insert_rows)

def load_issues(self, path):
self.connect()
self.connect(optimised=True)
self.create_cursor()
fields = self.specification.schema["issue"]["fields"]
logging.info(f"loading issues from {path}")

insert_rows = []
for row in csv.DictReader(open(path, newline="")):
self.insert("issue", fields, row)
self.add_table_values(fields, row, insert_rows)
if len(insert_rows) >= self.max_batch_size:
self.insert_many("issue", fields, insert_rows)
insert_rows = []
self.create_cursor()

self.commit()
self.insert_many("issue", fields, insert_rows)
self.disconnect()

def load_dataset_resource(self, path, resource):
fields = self.specification.schema["dataset-resource"]["fields"]

logging.info(f"loading dataset-resource from {path}")

insert_rows = []
for row in csv.DictReader(open(path, newline="")):
self.insert("dataset-resource", fields, row)
self.add_table_values(fields, row, insert_rows)
if len(insert_rows) >= self.max_batch_size:
self.insert_many("dataset-resource", fields, insert_rows)
insert_rows = []
self.create_cursor()

self.insert_many("dataset-resource", fields, insert_rows)

def load_transformed(self, path):
m = re.search(r"/([a-f0-9]+).csv$", path)
resource = m.group(1)
file_part = Path(path).parts[-1]
resource = file_part.split(".")[0]

self.connect()
self.connect(optimised=True)
self.create_cursor()
self.load_facts(path)
# self.load_issues(path.replace("transformed/", "issue/"), resource)
self.load_column_fields(
path.replace("transformed/", "var/column-field/"), resource
)
self.load_dataset_resource(
path.replace("transformed/", "var/dataset-resource/"), resource
)
self.commit()
self.disconnect()

def load(self):
Expand Down
Loading

0 comments on commit b6ad1e5

Please sign in to comment.