-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathsimple_s3.py
181 lines (156 loc) · 6.22 KB
/
simple_s3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# emacs: -*- mode: python; py-indent-offset: 4; tab-width: 4; indent-tabs-mode: nil -*-
# ex: set sts=4 ts=4 sw=4 noet:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
# See COPYING file distributed along with the datalad package for the
# copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""A pipeline for crawling openfmri s3 bucket"""
from os.path import join as opj
# Import necessary nodes
from ..nodes.misc import switch, assign, sub
from ..nodes.s3 import crawl_s3
from ..nodes.annex import Annexificator
from datalad_crawler.consts import DATALAD_SPECIAL_REMOTE
from datalad.support.strings import get_replacement_dict
from .simple_with_archives import pipeline as swa_pipeline
from datalad.utils import assure_bool
# Possibly instantiate a logger if you would like to log
# during pipeline creation
from logging import getLogger
lgr = getLogger("datalad.crawler.pipelines.openfmri")
# Since all content of openfmri is anyways available openly, no need atm
# to use https which complicates proxying etc. Thus provide a node which
# would replace s3:// urls with regular http
# TODO: we might want to make it an option for crawl_s3 to yield http urls
# so then we could just generalize this whole shebang into a single helper
# for crawling any S3 bucket.
# Right away think about having an 'incoming' branch and handling of versioned files
sub_s3_to_http = sub({
'url': {'^s3://([^/]*)/': r'http://\1.s3.amazonaws.com/'}
},
ok_missing=True
)
# TODO: make a unittest for all of this on a simple bucket
# TODO: branch option
def pipeline(bucket,
prefix=None,
no_annex=False,
tag=True, skip_problematic=False, to_http=False,
rename=None,
directory=None,
archives=False,
allow_dirty=False,
backend='MD5E',
drop=False,
drop_force=False,
drop_immediately=False,
strategy='commit-versions',
exclude=None,
**kwargs):
"""Pipeline to crawl/annex an arbitrary bucket
Parameters
----------
bucket : str
prefix : str, optional
prefix within the bucket
tag : bool, optional
tag "versions"
skip_problematic : bool, optional
pass to Annexificator
to_http : bool, optional
Convert s3:// urls to corresponding generic http:// . So to be used for resources
which are publicly available via http
directory : {subdataset}, optional
What to do when encountering a directory. 'subdataset' would initiate a new sub-dataset
at that directory
strategy : {'commit-versions', 'naive'}, optional
What strategy to use whenever processing "delete" event, See `crawl_s3` node for more information.
drop : bool, optional
Drop all the files whenever done crawling
drop_immediately: bool, optional
Drop each file right after downloading it
exclude : str, optional
Regular expression to be passed to s3_crawl to exclude some files
**kwargs:
passed into simple_with_archives.pipeline
"""
lgr.info("Creating a pipeline for the %s bucket", bucket)
# TODO: see if we could make it more generic!!!
# drop = assure_bool(drop)
# drop_force = assure_bool(drop_force)
annex_kw = {}
to_http = assure_bool(to_http)
tag = assure_bool(tag)
archives = assure_bool(archives)
no_annex = assure_bool(no_annex)
allow_dirty = assure_bool(allow_dirty)
drop = assure_bool(drop)
drop_force = assure_bool(drop_force)
drop_immediately = assure_bool(drop_immediately)
if not to_http:
annex_kw['special_remotes'] = [DATALAD_SPECIAL_REMOTE]
annex = Annexificator(
create=False, # must be already initialized etc
backend=backend,
no_annex=no_annex,
skip_problematic=skip_problematic,
allow_dirty=allow_dirty,
batch_add=not drop_immediately,
# Primary purpose of this one is registration of all URLs with our
# upcoming "ultimate DB" so we don't get to git anything
# largefiles="exclude=CHANGES* and exclude=changelog.txt and exclude=dataset_description.json and exclude=README* and exclude=*.[mc]"
**annex_kw
)
s3_actions = {
'commit': annex.finalize(tag=tag),
'annex': [annex]
}
if drop_immediately:
s3_actions['annex'].append(annex.drop(force=drop_force))
s3_switch_kw = {}
recursive = True
if directory:
if directory == 'subdataset':
new_prefix = '%(filename)s/'
if prefix:
new_prefix = opj(prefix, new_prefix)
s3_actions['directory'] = [
# for initiate_dataset we should replicate filename as handle_name, prefix
assign({'prefix': new_prefix, 'dataset_name': '%(filename)s'}, interpolate=True),
annex.initiate_dataset(
template='simple_s3',
data_fields=['prefix'],
add_fields={
'bucket': bucket,
'to_http': to_http,
'skip_problematic': skip_problematic,
}
)
]
s3_switch_kw['missing'] = 'skip' # ok to not remove
recursive = False
else:
raise ValueError("Do not know how to treat %s" % directory)
else:
s3_actions['remove'] = annex.remove
incoming_pipeline = [
crawl_s3(bucket, prefix=prefix, strategy=strategy, repo=annex.repo, recursive=recursive,
exclude=exclude),
]
from ..nodes.misc import debug
if to_http:
incoming_pipeline.append(sub_s3_to_http)
if rename:
incoming_pipeline += [sub({'filename': get_replacement_dict(rename)},
ok_missing=True)]
incoming_pipeline.append(switch('datalad_action', s3_actions, **s3_switch_kw))
if archives:
pipeline = swa_pipeline(incoming_pipeline=incoming_pipeline, annex=annex,
**kwargs)
else:
pipeline = incoming_pipeline
if drop:
pipeline.append(annex.drop(all=True, force=drop_force))
return pipeline