Skip to content

Commit

Permalink
Merge pull request #127 from dathere/copy_readbuffer_size_setting
Browse files Browse the repository at this point in the history
make COPY_READBUFFER_SIZE a configurable parameter
  • Loading branch information
jqnatividad authored Jan 23, 2024
2 parents 07e3170 + 691247a commit 936b753
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 1 deletion.
1 change: 1 addition & 0 deletions datapusher/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class DataPusherPlusConfig(MutableMapping):
# ckan_service_provider settings
SQLALCHEMY_DATABASE_URI: str = _DATABASE_URI
WRITE_ENGINE_URL: str = _WRITE_ENGINE_URL
COPY_READBUFFER_SIZE: int = 1048576
DEBUG: bool = False
TESTING: bool = False
SECRET_KEY: str = str(uuid.uuid4())
Expand Down
4 changes: 4 additions & 0 deletions datapusher/dot-env.template
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ WRITE_ENGINE_URL = 'postgresql://datapusher:YOURPASSWORD@localhost/datastore_def
# The connect string of the Datapusher+ Job database
SQLALCHEMY_DATABASE_URI = 'postgresql://datapusher_jobs:YOURPASSWORD@localhost/datapusher_jobs'

# READ BUFFER SIZE IN BYTES WHEN READING CSV FILE WHEN USING POSTGRES COPY
# default 1mb = 1048576
COPY_READBUFFER_SIZE = 1048576

# =============== DOWNLOAD SETTINGS ==============
# 25mb, this is ignored if either PREVIEW_ROWS > 0
MAX_CONTENT_LENGTH = 25600000
Expand Down
4 changes: 3 additions & 1 deletion datapusher/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1417,6 +1417,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
except psycopg2.Error as e:
raise util.JobError("Could not connect to the Datastore: {}".format(e))
else:
copy_readbuffer_size = config.get("COPY_READBUFFER_SIZE")
cur = raw_connection.cursor()
"""
truncate table to use copy freeze option and further increase
Expand All @@ -1441,7 +1442,8 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
sql.Identifier(resource_id),
column_names,
)
with open(tmp, "rb", 8192) as f:
# specify a 1MB buffer size for COPY read from disk
with open(tmp, "rb", copy_readbuffer_size) as f:
try:
cur.copy_expert(copy_sql, f)
except psycopg2.Error as e:
Expand Down

0 comments on commit 936b753

Please sign in to comment.