-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexported_corpus_reader.py
322 lines (299 loc) · 15 KB
/
exported_corpus_reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
# -*- coding: utf-8 -*-
#
# Developed and tested under Python's version: 3.3.2
#
# Script for reading and displaying Estonian TimeML corpus annotations;
#
import sys, os, re
baseAnnotationFile = "base-segmentation-morph-syntax"
eventAnnotationFile = "event-annotation"
timexAnnotationFile = "timex-annotation"
timexAnnotationDCTFile = "timex-annotation-dct"
tlinkEventTimexFile = "tlink-event-timex"
tlinkEventDCTFile = "tlink-event-dct"
tlinkMainEventsFile = "tlink-main-events"
tlinkSubEventsFile = "tlink-subordinate-events"
# =========================================================================
# Loading corpus files
# =========================================================================
def load_base_segmentation(inputFile):
base_segmentation = dict()
last_sentenceID = ""
f = open(inputFile, mode='r', encoding="utf-8")
for line in f:
# Skip the comment line
if ( re.match("^#.+$", line) ):
continue
items = (line.rstrip()).split("\t")
if (len(items) != 7):
raise Exception(" Unexpected number of items on line: '"+str(line)+"'")
file = items[0]
if (file not in base_segmentation):
base_segmentation[file] = []
sentenceID = items[1]
if (sentenceID != last_sentenceID):
base_segmentation[file].append([])
wordID = items[2]
# fileName sentence_ID word_ID_in_sentence token morphological_and_syntactic_annotations syntactic_ID syntactic_ID_of_head
token = items[3]
morphSyntactic = items[4]
syntacticID = items[5]
syntacticHeadID = items[6]
base_segmentation[file][-1].append( [sentenceID, wordID, token, morphSyntactic, syntacticID, syntacticHeadID] )
last_sentenceID = sentenceID
f.close()
return base_segmentation
def load_entity_annotation(inputFile):
annotationsByLoc = dict()
annotationsByID = dict()
f = open(inputFile, mode='r', encoding="utf-8")
for line in f:
# Skip the comment line
if ( re.match("^#.+$", line) ):
continue
items = (line.rstrip()).split("\t")
# fileName sentence_ID word_ID_in_sentence expression event_annotation event_ID
# fileName sentence_ID word_ID_in_sentence expression timex_annotation timex_ID
if (len(items) != 6):
raise Exception(" Unexpected number of items on line: '"+str(line)+"'")
file = items[0]
sentenceID = items[1]
wordID = items[2]
expression = items[3]
annotation = items[4]
entityID = items[5]
if (file not in annotationsByLoc):
annotationsByLoc[file] = dict()
if (file not in annotationsByID):
annotationsByID[file] = dict()
# Record annotation by its location in text
locKey = (sentenceID, wordID)
if (locKey not in annotationsByLoc[file]):
annotationsByLoc[file][locKey] = []
annotationsByLoc[file][locKey].append( [entityID, expression, annotation] )
# Record annotation by its unique ID in text
if (entityID not in annotationsByID[file]):
annotationsByID[file][entityID] = []
annotationsByID[file][entityID].append( [sentenceID, wordID, expression, annotation] )
f.close()
return (annotationsByLoc, annotationsByID)
def load_dct_annotation(inputFile):
DCTsByFile = dict()
f = open(inputFile, mode='r', encoding="utf-8")
for line in f:
# Skip the comment line
if ( re.match("^#.+$", line) ):
continue
items = (line.rstrip()).split("\t")
# fileName document_creation_time
if (len(items) != 2):
raise Exception(" Unexpected number of items on line: '"+str(line)+"'")
file = items[0]
dct = items[1]
DCTsByFile[ file ] = dct
f.close()
return DCTsByFile
def load_relation_annotation(inputFile):
annotationsByID = dict()
f = open(inputFile, mode='r', encoding="utf-8")
for line in f:
# Skip the comment line
if ( re.match("^#.+$", line) ):
continue
items = line.split("\t")
# old format: fileName entityID_A relation entityID_B comment expression_A expression_B
# new format: fileName entityID_A relation entityID_B comment
if (len(items) != 5):
print (len(items))
raise Exception(" Unexpected number of items on line: '"+str(line)+"'")
file = items[0]
entityA = items[1]
relation = items[2]
entityB = items[3]
comment = items[4].rstrip()
if (file not in annotationsByID):
annotationsByID[file] = dict()
annotation = [entityA, relation, entityB, comment]
if (entityA not in annotationsByID[file]):
annotationsByID[file][entityA] = []
annotationsByID[file][entityA].append( annotation )
if (entityB not in annotationsByID[file]):
annotationsByID[file][entityB] = []
annotationsByID[file][entityB].append( annotation )
f.close()
return annotationsByID
def load_relation_to_dct_annotations(inputFile):
annotationsByID = dict()
f = open(inputFile, mode='r', encoding="utf-8")
for line in f:
# Skip the comment line
if ( re.match("^#.+$", line) ):
continue
items = line.split("\t")
# old format: fileName entityID_A relation_to_DCT comment expression_A
# new format: fileName entityID_A relation_to_DCT comment
if (len(items) != 4):
raise Exception(" Unexpected number of items on line: '"+str(line)+"'")
file = items[0]
entityA = items[1]
relationToDCT = items[2]
comment = items[3].rstrip()
if (file not in annotationsByID):
annotationsByID[file] = dict()
annotation = [entityA, relationToDCT, "t0", comment]
if (entityA not in annotationsByID[file]):
annotationsByID[file][entityA] = []
annotationsByID[file][entityA].append( annotation )
f.close()
return annotationsByID
# =========================================================================
# Displaying annotations on corpus files
# =========================================================================
def getEntityIDsOfTheSentence(file, sentID, base, eventsByLoc, timexesByLoc):
events = []
timexes = []
seenIDs = dict()
for wordID in range(len(base[file][sentID])):
[sID, wID, token, morphSyntactic, syntacticID, syntacticHeadID] = base[file][sentID][wordID]
key = (sID, wID)
if (file in eventsByLoc and key in eventsByLoc[file]):
for [entityID, expression, annotation] in eventsByLoc[file][key]:
if ( entityID not in seenIDs ):
events.append(entityID)
seenIDs[entityID] = 1
if (file in timexesByLoc and key in timexesByLoc[file]):
for [entityID, expression, annotation] in timexesByLoc[file][key]:
if ( entityID not in seenIDs ):
timexes.append(entityID)
seenIDs[entityID] = 1
return ( events, timexes )
def getSentenceWithEntityAnnotations(file, sentID, base, eventsByLoc, timexesByLoc):
sentAnnotation = " s"+str(sentID)+" "
for wordID in range(len(base[file][sentID])):
[sID, wID, token, morphSyntactic, syntacticID, syntacticHeadID] = base[file][sentID][wordID]
key = (sID, wID)
# Start of tag
if (file in timexesByLoc and key in timexesByLoc[file]):
for [entityID, expression, annotation] in timexesByLoc[file][key]:
expressionMatcher = re.match("^\"(.+)\"$", expression)
expressionClean = expressionMatcher.group(1)
multiWord = ("multiword=\"true\"" in annotation)
if (not multiWord or (multiWord and expressionClean.startswith(token))):
sentAnnotation += " ["+entityID+""
if (file in eventsByLoc and key in eventsByLoc[file]):
for [entityID, expression, annotation] in eventsByLoc[file][key]:
expressionMatcher = re.match("^\"(.+)\"$", expression)
expressionClean = expressionMatcher.group(1)
multiWord = ("multiword=\"true\"" in annotation)
sentAnnotation += " ["+entityID+""
# Token
sentAnnotation += " "+token
# End of tag
if (file in timexesByLoc and key in timexesByLoc[file]):
for [entityID, expression, annotation] in timexesByLoc[file][key]:
expressionMatcher = re.match("^\"(.+)\"$", expression)
expressionClean = expressionMatcher.group(1)
multiWord = ("multiword=\"true\"" in annotation)
if (not multiWord or (multiWord and expressionClean.endswith(token))):
sentAnnotation += " ]"
if (file in eventsByLoc and key in eventsByLoc[file]):
for [entityID, expression, annotation] in eventsByLoc[file][key]:
expressionMatcher = re.match("^\"(.+)\"$", expression)
expressionClean = expressionMatcher.group(1)
multiWord = ("multiword=\"true\"" in annotation)
sentAnnotation += " ]"
return sentAnnotation
# Retrieves an expression corresponding to the entity
def getExpr(file, entityID, entitiesByIDs):
if (entityID in entitiesByIDs[file]):
# Collect entity expressions
expressions = set()
for item in entitiesByIDs[file][entityID]:
# [sentenceID, wordID, expression, annotation]
expressions.add( item[2] )
if (len(expressions) == 1):
return expressions.pop()
else:
raise Exception(" Unexpected number of expressions for "+entityID+": "+str(expressions))
else:
raise Exception(" Unable to the retrieve expression for the entity "+entityID)
def getTLINKAnnotations(file, eventIDs, timexIDs, eventsByID, timexesByID, \
eventTimexLinks, eventDCTLinks, mainEventLinks, subEventLinks):
linkAnnotations = []
for eventID in eventIDs:
if (file in eventTimexLinks and eventID in eventTimexLinks[file]):
for annotation in eventTimexLinks[file][eventID]:
[entityA, relation, entityB, comment] = annotation
if (eventID == entityA):
exprA = getExpr(file, entityA, eventsByID)
exprB = getExpr(file, entityB, timexesByID)
linkAnnotations.append( " "*5+entityA+" "+exprA+" "+relation+" "+entityB+" "+exprB+" "+comment )
if (file in eventDCTLinks and eventID in eventDCTLinks[file]):
for annotation in eventDCTLinks[file][eventID]:
[entityA, relation, entityB, comment] = annotation
if (eventID == entityA):
exprA = getExpr(file, entityA, eventsByID)
exprB = "DCT"
linkAnnotations.append( " "*5+entityA+" "+exprA+" "+relation+" "+exprB+" "+comment )
if (file in subEventLinks and eventID in subEventLinks[file]):
for annotation in subEventLinks[file][eventID]:
[entityA, relation, entityB, comment] = annotation
if (eventID == entityA):
exprA = getExpr(file, entityA, eventsByID)
exprB = getExpr(file, entityB, eventsByID)
linkAnnotations.append( " "*5+entityA+" "+exprA+" "+relation+" "+entityB+" "+exprB+" "+comment )
if (file in mainEventLinks and eventID in mainEventLinks[file]):
for annotation in mainEventLinks[file][eventID]:
[entityA, relation, entityB, comment] = annotation
if (eventID == entityA):
exprA = getExpr(file, entityA, eventsByID)
exprB = getExpr(file, entityB, eventsByID)
linkAnnotations.append( " "*5+entityA+" "+exprA+" "+relation+" "+entityB+" "+exprB+" "+comment )
return "\n".join(linkAnnotations)
def display(base, eventsByLoc, timexesByLoc, eventsByID, timexesByID, \
DCTsByFile, eventTimexLinks, eventDCTLinks, mainEventLinks, subEventLinks):
for file in sorted(base):
print ("="*50)
print (" "*5 + file)
print (" "*5 + " DCT: "+DCTsByFile[file])
print ("="*50)
for sentID in range(len(base[file])):
# Display sentence annotation
sentAnnotation = getSentenceWithEntityAnnotations(file, sentID, base, eventsByLoc, timexesByLoc)
try:
print ( sentAnnotation )
except:
print ( sentAnnotation.encode("utf-8") )
# Display relation annotations
( eventIDs, timexIDs ) = getEntityIDsOfTheSentence(file, sentID, base, eventsByLoc, timexesByLoc)
linkAnnotations = \
getTLINKAnnotations(file, eventIDs, timexIDs, eventsByID, timexesByID, \
eventTimexLinks, eventDCTLinks, mainEventLinks, subEventLinks)
if (len(linkAnnotations) > 0):
try:
print ( linkAnnotations+"\n" )
except:
print ( linkAnnotations.encode("utf-8")+"\n" )
print ()
# =========================================================================
# Main program : loading corpus from files and displaying the content
# =========================================================================
if len(sys.argv) > 1 and os.path.isdir(sys.argv[1]):
corpusDir = sys.argv[1]
# Load base segmentation, morphological and syntactic annotations
baseSegmentationFile = os.path.join(corpusDir, baseAnnotationFile)
baseAnnotations = load_base_segmentation(baseSegmentationFile)
# Load EVENT, TIMEX annotations
(eventsByLoc, eventsByID) = load_entity_annotation( os.path.join(corpusDir, eventAnnotationFile) )
(timexesByLoc, timexesByID) = load_entity_annotation( os.path.join(corpusDir, timexAnnotationFile) )
DCTsByFile = load_dct_annotation( os.path.join(corpusDir, timexAnnotationDCTFile) )
# Load TLINK annotations
eventTimexLinks = load_relation_annotation( os.path.join(corpusDir, tlinkEventTimexFile) )
eventDCTLinks = load_relation_to_dct_annotations( os.path.join(corpusDir, tlinkEventDCTFile) )
mainEventLinks = load_relation_annotation( os.path.join(corpusDir, tlinkMainEventsFile) )
subEventLinks = load_relation_annotation( os.path.join(corpusDir, tlinkSubEventsFile) )
# Display annotations
display(baseAnnotations, eventsByLoc, timexesByLoc, eventsByID, timexesByID, DCTsByFile, eventTimexLinks, eventDCTLinks, mainEventLinks, subEventLinks)
else:
print(" Please give argument: <annotated_corpus_dir> ")
print(" Example:\n python "+sys.argv[0]+" corpus")