Skip to content

Commit

Permalink
Merge pull request #138 from dathere/sync-with-master
Browse files Browse the repository at this point in the history
Sync with master
  • Loading branch information
tino097 authored May 6, 2024
2 parents ba18d25 + 6041af4 commit 90a4868
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 4 deletions.
1 change: 1 addition & 0 deletions ckanext/datapusher_plus/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class DataPusherPlusConfig(MutableMapping):
# ckan_service_provider settings
SQLALCHEMY_DATABASE_URI: str = _DATABASE_URI
WRITE_ENGINE_URL: str = _WRITE_ENGINE_URL
COPY_READBUFFER_SIZE: int = 1048576
DEBUG: bool = False
TESTING: bool = False
SECRET_KEY: str = str(uuid.uuid4())
Expand Down
58 changes: 54 additions & 4 deletions ckanext/datapusher_plus/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,55 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
if resource_format.upper() == "CSV":
logger.info("Normalizing/UTF-8 transcoding {}...".format(resource_format))
else:
logger.info("Normalizing/UTF-8 transcoding {} to CSV...".format(format))
# if not CSV (e.g. TSV, TAB, etc.) we need to normalize to CSV
logger.info(
"Normalizing/UTF-8 transcoding {} to CSV...".format(resource_format)
)

qsv_input_utf_8_encoded_csv = os.path.join(temp_dir, 'qsv_input_utf_8_encoded.csv')

# using uchardet to determine encoding
file_encoding = subprocess.run(
[
"uchardet",
tmp
],
check=True,
capture_output=True,
text=True,
)
logger.info("Identified encoding of the file: {}".format(file_encoding.stdout))

# trim the encoding string
file_encoding.stdout = file_encoding.stdout.strip()

# using iconv to re-encode in UTF-8
if file_encoding.stdout != "UTF-8":
logger.info("File is not UTF-8 encoded. Re-encoding from {} to UTF-8".format(
file_encoding.stdout)
)
try:
subprocess.run(
[
"iconv",
"-f",
file_encoding.stdout,
"-t",
"UTF-8",
tmp,
"--output",
qsv_input_utf_8_encoded_csv,
],
check=True,
)
except subprocess.CalledProcessError as e:
# return as we can't push a non UTF-8 CSV
logger.error(
"Job aborted as the file cannot be re-encoded to UTF-8: {}.".format(e)
)
return
else:
qsv_input_utf_8_encoded_csv = tmp
try:
qsv_input = subprocess.run(
[
Expand Down Expand Up @@ -693,7 +741,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
)
except subprocess.CalledProcessError as e:
# return as we can't push an invalid CSV file
validate_error_msg = qsv_validate.stderr
validate_error_msg = e.stderr
logger.error("Invalid CSV! Job aborted: {}.".format(validate_error_msg))
return
logger.info("Well-formed, valid CSV file confirmed...")
Expand Down Expand Up @@ -1359,6 +1407,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
except psycopg2.Error as e:
raise utils.JobError("Could not connect to the Datastore: {}".format(e))
else:
copy_readbuffer_size = config.get("COPY_READBUFFER_SIZE")
cur = raw_connection.cursor()
"""
truncate table to use copy freeze option and further increase
Expand All @@ -1383,9 +1432,10 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
sql.Identifier(resource_id),
column_names,
)
with open(tmp, "rb") as f:
# specify a 1MB buffer size for COPY read from disk
with open(tmp, "rb", copy_readbuffer_size) as f:
try:
cur.copy_expert(copy_sql, f)
cur.copy_expert(copy_sql, f, size=copy_readbuffer_size)
except psycopg2.Error as e:
raise utils.JobError("Postgres COPY failed: {}".format(e))
else:
Expand Down
4 changes: 4 additions & 0 deletions dot-env.template
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ WRITE_ENGINE_URL = 'postgresql://datapusher:YOURPASSWORD@localhost/datastore_def
# The connect string of the Datapusher+ Job database
SQLALCHEMY_DATABASE_URI = 'postgresql://datapusher_jobs:YOURPASSWORD@localhost/datapusher_jobs'

# READ BUFFER SIZE IN BYTES WHEN READING CSV FILE WHEN USING POSTGRES COPY
# default 64k = 65536
COPY_READBUFFER_SIZE = 65536

# =============== DOWNLOAD SETTINGS ==============
# 25mb, this is ignored if either PREVIEW_ROWS > 0
MAX_CONTENT_LENGTH = 1256000000
Expand Down

0 comments on commit 90a4868

Please sign in to comment.