-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathgen_dss.py
181 lines (152 loc) · 7.4 KB
/
gen_dss.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import re
import textwrap
import json
import sys
from gen import Schema2Doc, codelist_mappings, codelists_paths
# Define the namespaces necessary for opening schema files
namespaces = {
'xsd': 'http://www.w3.org/2001/XMLSchema'
}
def match_codelists(path):
"""
Looks up the codelist that the given path (xpath) should be on.
Returns a tuple of the codelist name, and a boolean as describing whether any conditions apply.
If there is no codelist for the given path, the first part of the tuple is None.
"""
codelist_tuples = []
for mapping in codelist_mappings:
if mapping.find('path').text.startswith('//'):
if path.endswith(mapping.find('path').text.strip('/')):
codelist = mapping.find('codelist').attrib['ref']
if path not in codelists_paths[codelist]:
codelists_paths[codelist].append(path)
condition = mapping.find('condition')
tup = (codelist, '' if condition is None else condition.text)
codelist_tuples.append(tup)
else:
pass
return codelist_tuples
def field_to_label(field):
up_str = ''
for word in field.split('_'):
if up_str == '':
up_str = word.capitalize()
else:
up_str += " " + word.capitalize()
return up_str
def get_codelist_json(name):
return json.load(open('IATI-Codelists/out/clv3/json/en/' + name + '.json'))
def path_to_solr(path):
final = path
if 'iati-activities/iati-activity/@' in path:
final = path.replace('iati-activities/iati-activity/@', '')
elif 'iati-activities/iati-activity/' in path:
final = path.replace('iati-activities/iati-activity/', '')
elif 'iati-activities' in path:
final = path.replace('iati-activities', 'dataset')
return final.replace('/@', '_').replace('/', '_').replace('-', '_').replace(':', '_')
def xsd_type_to_search(element_name=None, xsd_type=None):
if (element_name is not None and re.search('_narrative$', element_name) is not None):
return "text"
if (element_name == 'location_administrative_level'):
return "text"
switch = {
'xsd:string': 'text',
'xsd:NMTOKEN': 'text',
'xsd:anyURI': 'text',
'xsd:decimal': 'number',
'xsd:dateTime': 'date',
'xsd:date': 'date',
'xsd:boolean': 'boolean',
'xsd:nonNegativeInteger': 'integer',
'xsd:positiveInteger': 'integer',
'xsd:int': 'integer'
}
return switch.get(xsd_type, "text")
def filter_columns(row):
if row['field'] in ['dataset', 'dataset_iati_activity']:
return False
return True
class Schema2Solr(Schema2Doc):
def output_solr(self, element_name, path, element=None, output=False, template_path='', filename='', codelist_dest='', collection='', out_type='order', minOccurs='', maxOccurs='', ref_element=None, type_element=None, parent_req=True, parent_multi=False):
if element is None:
element = self.get_schema_element('element', element_name)
if element is None:
return
full_path = '/'.join(path.split('/')[1:]) + element_name
solr_name = path_to_solr(full_path)
xsd_type = element.get('type') if element.get('type') and element.get('type').startswith('xsd:') else ''
if type_element is not None:
complex_base_types = [x for x in type_element.xpath('xsd:simpleContent/xsd:extension/@base', namespaces=namespaces) if x.startswith('xsd:')]
if len(complex_base_types) and xsd_type == '':
xsd_type = complex_base_types[0]
required = (minOccurs == '1') and parent_req
if element_name == 'iati-activity':
maxOccurs = '1'
multivalued = (maxOccurs == 'unbounded') or parent_multi
rows = []
# elements should only be in solr if they contain something with a type, otherwise they wouldn't have a flattened value
if element.xpath('xsd:complexType[@mixed="true"] or xsd:complexType/xsd:simpleContent', namespaces=namespaces) or xsd_type != '':
rows = [{
"field": solr_name,
"label": field_to_label(solr_name),
'type': xsd_type_to_search(solr_name, xsd_type),
"description": textwrap.dedent(self.schema_documentation(element, ref_element, type_element)),
"name": element_name,
'path': full_path,
'xsd_type': xsd_type,
'solr_required': 'true' if required else 'false',
'solr_multivalued': 'true' if multivalued else 'false'
}]
for a_name, a_type, a_description, a_required in self.attribute_loop(element):
full_path = '/'.join(path.split('/')[1:]) + element_name + '/@' + a_name
solr_name = path_to_solr(full_path)
codelist_name_tup = match_codelists(full_path)
codelist_name = ''
codelist_condition = ''
if len(codelist_name_tup) != 0:
codelist_name = codelist_name_tup[0][0]
codelist_condition = codelist_name_tup[0][1]
# use parent description if attribute description is blank (mainly for @iso-date)
description = ''
if a_description == '':
description = self.schema_documentation(element, ref_element, type_element)
else:
description = a_description
rows.append({
'field': solr_name,
"label": field_to_label(solr_name),
'type': 'select' if codelist_name != '' else xsd_type_to_search(solr_name, xsd_type=a_type),
'description': textwrap.dedent(description),
'codelist_name': codelist_name,
'codelist_condition': codelist_condition,
'attribute_name': a_name,
'path': full_path,
'xsd_type': a_type,
'solr_required': 'true' if required and a_required else 'false',
'solr_multivalued': 'true' if multivalued else 'false'
})
for child_name, child_element, child_ref_element, child_type_element, minOccurs, maxOccurs in self.element_loop(element, path):
rows += self.output_solr(child_name, path + element.attrib['name'] + '/', child_element, minOccurs=minOccurs, maxOccurs=maxOccurs, ref_element=child_ref_element, type_element=child_type_element, parent_req=required, parent_multi=multivalued)
if output:
out = ''
if out_type == 'filter':
out = list(filter(filter_columns, rows))
with open(filename, 'w') as fp:
json.dump(out, fp, indent=2)
codelists = {}
for row in out:
if 'codelist_name' in row and row['codelist_name'] != '':
name = row['codelist_name']
codelists[name] = (get_codelist_json(name))
with open(codelist_dest, 'w') as fp:
json.dump(codelists, fp, indent=2)
return rows
if __name__ == '__main__':
filter_dest = sys.argv[1]
codelist_dest = sys.argv[2]
activities = Schema2Solr('iati-activities-schema.xsd', lang='en')
activities.output_solr(
'iati-activities', 'activity-standard/', minOccurs='1', maxOccurs='1', output=True,
filename=filter_dest, codelist_dest=codelist_dest, out_type='filter'
)