This repository has been archived by the owner on Oct 10, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathharvest.py
executable file
·202 lines (178 loc) · 9.23 KB
/
harvest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
"""FRDR Harvester
Usage:
harvest.py [--openrefine-import | --onlyharvest | --onlyexport | --init] [--only-new-records] [--dump-on-failure] [--export-filepath=<file>] [--export-format=<format>] [--repository-id=<id>] [--openrefine-csv=<file>]
Options:
--openrefine-import Don't harvest or export normally; import data from OpenRefine.
--onlyharvest Just harvest new items, do not export anything.
--onlyexport Just export existing items, do not harvest anything.
--only-new-records Only export records changed since last crawl.
--dump-on-failure If a record ever fails validation, print the whole record.
--export-filepath=<file> The path to export the data to.
--openrefine-csv=<file> The CSV from OpenRefine to import.
--export-format=<format> The export format (currently gmeta or dataverse).
--repository-id=<id> Only export this repository, based on the database table ID
--init Just initialize the database, do not harvest or export.
"""
from docopt import docopt
import json
import os
import time
import sys
import configparser
import csv
from harvester.OAIRepository import OAIRepository
from harvester.CKANRepository import CKANRepository
from harvester.DataverseRepository import DataverseRepository
from harvester.MarkLogicRepository import MarkLogicRepository
from harvester.OpenDataSoftRepository import OpenDataSoftRepository
from harvester.GeoNetworkRepository import GeoNetworkRepository
from harvester.SocrataRepository import SocrataRepository
from harvester.DataStreamRepository import DataStreamRepository
from harvester.ArcGISRepository import ArcGISRepository
from harvester.DataCiteRepository import DataCiteRepository
from harvester.DryadRepository import DryadRepository
from harvester.NexusRepository import NexusRepository
from harvester.DBInterface import DBInterface
from harvester.HarvestLogger import HarvestLogger
from harvester.TimeFormatter import TimeFormatter
from harvester.Lock import Lock
from harvester.ExporterGmeta import ExporterGmeta
from harvester.ExporterDataverse import ExporterDataverse
def get_config_json(repos_json="conf/repos.json"):
configdict = {}
with open(repos_json, 'r') as jsonfile:
configdict = json.load(jsonfile)
return configdict
def get_config_ini(config_file="conf/harvester.conf"):
'''
Read ini-formatted config file from disk
:param config_file: Filename of config file
:return: configparser-style config file
'''
config = configparser.ConfigParser()
config.read(config_file)
return config
if __name__ == "__main__":
instance_lock = Lock()
tstart = int(time.time())
arguments = docopt(__doc__)
run_export = True
run_harvest = True
if arguments["--onlyexport"] == True:
run_harvest = False
if arguments["--onlyharvest"] == True:
run_export = False
if arguments["--init"] == True:
run_export = False
run_harvest = False
config = get_config_ini()
final_config = {}
final_config['dump_on_failure'] = False
if arguments["--dump-on-failure"] == True:
final_config['dump_on_failure'] = True
final_config['update_log_after_numitems'] = int(config['harvest'].get('update_log_after_numitems', 1000))
final_config['abort_after_numerrors'] = int(config['harvest'].get('abort_after_numerrors', 5))
final_config['record_refresh_days'] = int(config['harvest'].get('record_refresh_days', 30))
final_config['repo_refresh_days'] = int(config['harvest'].get('repo_refresh_days', 1))
final_config['temp_filepath'] = config['harvest'].get('temp_filepath', "temp")
final_config['geo_files_limit_gb'] = int(config['harvest'].get('geo_files_limit_gb', 1))
final_config['geo_files_limit_bytes'] = final_config['geo_files_limit_gb'] * 1000 * 1000 * 1000
final_config['export_filepath'] = config['export'].get('export_filepath', "data")
final_config['export_file_limit_mb'] = int(config['export'].get('export_file_limit_mb', 10))
final_config['export_format'] = config['export'].get('export_format', "gmeta")
final_config['socrata_app_token'] = config['socrata'].get('app_token', None)
final_config['ror_json_url'] = config['ror'].get('ror_json_url', None)
final_config['ror_data_file'] = "data/ror-data.json"
final_config['repository_id'] = None
main_log = HarvestLogger(config['logging'])
main_log.info("Starting... (pid={})".format(os.getpid()))
dbh = DBInterface(config['db'])
dbh.setLogger(main_log)
repo_configs = get_config_json()
if arguments["--openrefine-import"] == True:
if arguments["--openrefine-csv"] is None:
print("OpenRefine import needs a CSV path specified with --openrefine-csv")
else:
with open(arguments["--openrefine-csv"], encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
con = dbh.getConnection()
cur = dbh.getCursor(con)
for row in reader:
if row['No match (no equivalent or broader term)'] == 'y' or row['No match (need access to dataset for context)'] == 'y':
continue
elif row['Correct auto match to FAST'] == 'y' or row['Manual match to FAST (Within OpenRefine choices)'] == 'y' or row['Manual match to FAST (Need to Look at FAST)'] == 'y' or row['Manual match to FAST (Broader Heading)'] == 'y':
cur.execute("SELECT tag_id FROM tags WHERE tag=?", (row['Original Keyword'],))
tag_id = cur.fetchone()
try:
cur.execute(dbh._prep("""INSERT INTO reconciliations (tag_id, reconciliation, language) VALUES (?,?,?)"""), (tag_id, row['Reconciliation'], 'en'))
if row['Reconciliation - Additional Term'] is not None:
cur.execute(dbh._prep("""INSERT INTO reconciliations (tag_id, reconciliation, language) VALUES (?,?,?)"""), (tag_id, row['Reconciliation - Additional Term'], 'en'))
except dbh.dblayer.IntegrityError as e:
pass
instance_lock.unlock()
sys.exit()
if run_harvest:
# Find any new information in the repositories
for repoconfig in repo_configs['repos']:
if repoconfig['type'] == "oai":
repo = OAIRepository(final_config)
elif repoconfig['type'] == "ckan":
repo = CKANRepository(final_config)
elif repoconfig['type'] == "dataverse":
repo = DataverseRepository(final_config)
elif repoconfig['type'] == "marklogic":
repo = MarkLogicRepository(final_config)
elif repoconfig['type'] == "opendatasoft":
repo = OpenDataSoftRepository(final_config)
elif repoconfig['type'] == "geonetwork":
repo = GeoNetworkRepository(final_config)
elif repoconfig['type'] == "socrata":
repo = SocrataRepository(final_config)
elif repoconfig['type'] == "datastream":
repo = DataStreamRepository(final_config)
elif repoconfig['type'] == "arcgis":
repo = ArcGISRepository(final_config)
elif repoconfig['type'] == "datacite":
repo = DataCiteRepository(final_config)
elif repoconfig['type'] == "dryad":
repo = DryadRepository(final_config)
elif repoconfig['type'] == "nexus":
repo = NexusRepository(final_config)
repo.setLogger(main_log)
if 'copyerrorstoemail' in repoconfig and not repoconfig['copyerrorstoemail']:
main_log.setErrorsToEmail(False)
repo.setRepoParams(repoconfig)
repo.setDatabase(dbh)
repo.crawl()
repo.update_stale_records(config['db'])
if 'copyerrorstoemail' in repoconfig and not repoconfig['copyerrorstoemail']:
main_log.restoreErrorsToEmail()
if run_export:
# Default output format is gmeta
export_format = "gmeta"
exporter = None
destination = "file"
if arguments["--export-format"]:
export_format = arguments["--export-format"]
if arguments["--export-filepath"]:
final_config['export_filepath'] = arguments["--export-filepath"]
if arguments["--repository-id"]:
final_config['repository_id'] = arguments["--repository-id"]
if export_format == "gmeta":
exporter = ExporterGmeta(dbh, main_log, final_config)
elif export_format == "dataverse":
exporter = ExporterDataverse(dbh, main_log, final_config)
kwargs = {
"export_filepath": final_config['export_filepath'],
"only_new_records": False,
"temp_filepath": final_config['temp_filepath'],
"export_repository_id": final_config['repository_id'],
"destination": destination
}
if arguments["--only-new-records"] == True:
kwargs["only_new_records"] = True
exporter.export(**kwargs)
formatter = TimeFormatter()
dbh.set_setting("last_run_timestamp", tstart)
main_log.info("Done after {}".format(formatter.humanize(time.time() - tstart)))
instance_lock.unlock()