-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathtext_analysis.py
executable file
·363 lines (332 loc) · 15.3 KB
/
text_analysis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
"""
Given an audio file this module is capable of :
- using asr.audio_to_asr_text() to transcode speech to text (using google api)
- extracting aggregates of text features (text_features()) using
models.test_text.predict() for all available segment text models
- extracting text reference features if available
- merging the above in a recording-level text representation
"""
import asr
import text_scoring as ts
import numpy as np
from models.test_text import predict
import argparse
import re
import os
from models.utils import load_classifiers
from pathlib import Path
import pickle5 as pickle
def load_reference_data(path):
text = open(path).read()
return text
def text_preprocess(document):
"""
Basic text preprocessing
:param document: string containing input text
:return: updated text
"""
# Remove all the special characters
document = re.sub(r'\W', ' ', str(document))
# Substitute multiple spaces with single space
document = re.sub(r'\s+', ' ', document, flags=re.I)
# Convert to lowercase
document = document.lower()
return document
def text_segmentation(text, segmentation_threshold=None,
method=None, asr_timestamps=None):
"""
Break text into segments in accordance with a defined method
:param text: the text to be segmented
:param segmentation_threshold: the duration or magnitude of every segment
(for example: 2sec window or 2 words per segment)
:param method:
-None: the text will be segmented into sentences based on the punctuation
that asr has found
-"fixed_size_text": split text into fixed size segments
(fixed number of words)
-"fixed_window": split text into fixed time windows (fixed seconds)
:param asr_timestamps: the timestamps of words that asr has defined
:return:
-text_segmented : a list of segments of the text
(every element of the list is a string)
"""
if method == 'None' or method == None:
text_segmented = re.split(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s',
text)
elif method == "fixed_size_text":
text = text_preprocess(text)
words = text.split()
text_segmented = []
for i in range(0, len(words), segmentation_threshold):
text_segmented.append(" ".join(words[i:i + segmentation_threshold]))
elif method == "fixed_window":
first_word =asr_timestamps[0]
start_time = first_word['st']
last_word = asr_timestamps[-1]
end_time = last_word['et']
start_time_of_window = start_time
cur_segment = ""
text_segmented = []
iter_of_words = 0
word = asr_timestamps[iter_of_words]
#iterate through time windows
while start_time_of_window < end_time:
#iterate through timestamps
#if the word is included in the time window thw while is activated
while word['st'] >= start_time_of_window and word['st'] <= \
(start_time_of_window + segmentation_threshold):
#save string of the current segment
if cur_segment == "":
cur_segment = word['word']
else:
cur_segment = cur_segment + " " + word['word']
#if we haven' t reached the last word, continue else break
if iter_of_words < (len(asr_timestamps) - 1):
iter_of_words += 1
word = asr_timestamps[iter_of_words]
else:
break
#update list of segments
text_segmented.append(cur_segment)
cur_segment = ""
start_time_of_window += segmentation_threshold
return text_segmented
def text_features(text, classifiers_attributes, segmentation_threshold=None,
method=None, asr_results=None):
"""
Features exported from models(classifiers)
:param text: the text we want to extract features from (string)
:classifiers_attributes: a list of dictionaries with keys :
classifier,classes,pretrained_path,pretrained,embeddings_limit,
fasttext_model_path.
Every dictionary refers to a classifier previously loaded.
:param segmentation_threshold: the duration or magnitude of every segment
(for example: 2sec window or 2 words per segment)
:param method:
-None: the text will be segmented into sentences based on the
punctuation that asr has found
-"fixed_size_text" : split text into fixed size segments
(fixed number of words)
-"fixed_window" : split text into fixed time windows (fixed seconds)
:param asr_results: the timestamps of words that asr has defined
:return:
- features: list of text features extracted
- features_names: list of respective feature names
"""
features = []
features_names = []
# TODO: load all segment-level models that have been trainied in
# a predefined path such as segment_models/text
# TODO: add pretrained model posteriors, e.g. P(y=negative|x) etc
dictionaries = []
text_segmented = text_segmentation(text, segmentation_threshold, method,
asr_results)
#print(text_segmented)
#for every text classifier (with embeddings already loaded)
for classifier_dictionary in classifiers_attributes:
pretrained_path, classifier, classes, pretrained, embeddings_limit, max_len = \
classifier_dictionary['pretrained_path'],\
classifier_dictionary['classifier'],\
classifier_dictionary['classes'],\
classifier_dictionary['pretrained'],\
classifier_dictionary['embeddings_limit'],\
classifier_dictionary['max_len']
dictionary , _ = predict(text_segmented, pretrained_path, classifier, classes,
pretrained, embeddings_limit, max_len)
dictionaries.append(dictionary)
for dictionary in dictionaries:
for label in dictionary:
feature_string = label + "(%)"
feature_value = dictionary[label]
features_names.append(feature_string)
features.append(feature_value)
return features, features_names
def basic_text_features(data, dur):
"""
Extract basic text features (high level text features)
:param data: string of the whole text
:param dur: duration of the text in terms of time (seconds)
:return: basic text features (word rate, unique word rate and
histogram of unique word frequencies)
"""
# A. word rate
words_list = re.findall(r'\w+', data.lower())
len_of_wordslist = len(words_list)
word_rate = len_of_wordslist / ((dur / 60.0) + np.finfo(np.float).eps)
basic_feature_names = ["Word rate (words/min)"]
# B. num_of_unique words / duration
unique = set(words_list)
num_of_unique = len(unique)
unique_rate = num_of_unique / (dur + np.finfo(np.float).eps)
basic_features = [word_rate,unique_rate]
basic_feature_names.append("Unique words rate (num_of_unique_words/sec)")
# C. 10-bin histogram of word frequencies
wordfreq = []
for w in words_list:
wordfreq.append(words_list.count(w))
normalized_wordfreq = [freq / (len_of_wordslist + np.finfo(np.float).eps)
for freq in wordfreq]
histogram_of_wordfreq, hist_range = np.histogram(normalized_wordfreq,
bins=10, range=(0, 0.1))
# normalize histograms
histogram_of_wordfreq = histogram_of_wordfreq / (
histogram_of_wordfreq.sum() + np.finfo(np.float).eps)
# convert to list
histogram_of_wordfreq = [prob for prob in histogram_of_wordfreq]
# generate histogram feature names
for i, k in enumerate(hist_range):
if k != 0:
freq_center = str(round((hist_range[i] + hist_range[i - 1]) / 2, 3))
basic_feature_names.append('hist_center_word_freq_' + freq_center)
basic_features += histogram_of_wordfreq
return basic_features, basic_feature_names
def get_asr_features(input_file, google_credentials,
classifiers_attributes, reference_text=None,
segmentation_threshold=None, method=None):
"""
Extract text features from ASR results of a speech audio file
:param input_file: path to the audio file
:param google_credentials: path to the ASR google credentials file
:classifiers_attributes: a list of dictionaries with keys : classifier,
classes, pretrained_path,pretrained,
embeddings_limit, fasttext_model_path.
Every dictionary refers to a classifier previously loaded.
:param reference_text: path to the reference text
:param segmentation_threshold: the duration or magnitude of every segment
(for example: 2sec window or 2 words per segment)
:param method:
- None: the text will be segmented into sentences based on the punctuation
that asr has found
- "fixed_size_text" : split text into fixed size segments (fixed number of
words)
- "fixed_window" : split text into fixed time windows (fixed seconds)
:return:
- features: list of text features extracted
- feature_names: list of respective feature names
- metadata: list of metadata
"""
feature_names = []
features = []
# Step 1: speech recognition using google speech API:
# check if asr file already exists
folder = os.path.dirname(input_file)
file_name = os.path.basename(input_file)
file_name = os.path.splitext(file_name)[0]
file_name = file_name + '.asr'
full_path = os.path.join(folder, file_name)
full_path = Path(full_path)
if full_path.is_file():
# loading asr from cache
print("--> Loading saved asr")
asr_dict = pickle.load(open(full_path, 'rb'))
asr_results = asr_dict['timestamps']
data = asr_dict['text']
n_words = asr_dict['n_words']
dur = asr_dict['dur']
else:
print("--> Audio to asr text via google speech Api")
asr_results, data, n_words, dur = \
asr.audio_to_asr_text(input_file, google_credentials)
asr_dict = {}
asr_dict['timestamps'] = asr_results
asr_dict['text'] = data
asr_dict['n_words'] = n_words
asr_dict['dur'] = dur
# caching asr results
with open(full_path, 'wb') as handle:
pickle.dump(asr_dict, handle, protocol=pickle.HIGHEST_PROTOCOL)
metadata = {"asr timestamps": asr_results,
"Number of words": n_words,
"Total duration (sec)": dur}
if n_words!=0:
# Step 2: compute basic text features :
basic_features, basic_feature_names = basic_text_features(data, dur)
feature_names += basic_feature_names
features += basic_features
# Step 3: compute reference text - related features
# (if reference text is available)
if reference_text:
# get the reference text and align with the predicted text
# (word to word alignment):
ref_text = load_reference_data(reference_text)
alignment, rec, pre = ts.text_to_text_alignment_and_score(ref_text,
data)
# get the f1 (recall / precision are computed between the
# reference_text and the predicted text
f1 = 2 * rec * pre / (rec + pre + np.finfo(np.float32).eps)
rec = float("{:.2f}".format(rec))
pre = float("{:.2f}".format(pre))
f1 = float("{:.2f}".format(f1))
feature_names = ["Recall score (%)",
"Precision score(%)",
"F1 score (%)"]
features = [rec, pre, f1]
# temporal score calculation:
# (this info is used ONLY for plotting, so it is returned as metadata)
if alignment != []:
adjusted_results = ts.adjust_asr_results(asr_results,
alignment.second.elements,
dur)
length = 0.5
step = 0.1
recalls, precisions, f1s, ref, asr_r = \
ts.windows(alignment.first.elements, alignment.second.elements,
adjusted_results, length, step, dur)
else:
length = 0.5
step = 0.1
i=length
recalls = []
precisions = []
f1s = []
total_number_of_windows = 0
while (i + length )< dur:
total_number_of_windows += 1
recalls.append({"x": i, "y": 0})
precisions.append({"x": i, "y": 0})
f1s.append({"x": i, "y": 0})
i += step
ref, asr_r = ["-"] * total_number_of_windows,\
["-"] * total_number_of_windows
metadata["temporal_recall"] = recalls
metadata["temporal_precision"] = precisions
metadata["temporal_f1"] = f1s
metadata["temporal_ref"] = ref
metadata["temporal_asr"] = asr_r
# Step 4: compute segment-level-classifiers based features:
features_text, features_names_text = text_features(data,
classifiers_attributes,
segmentation_threshold,
method,
asr_results)
features += features_text
feature_names += features_names_text
return features, feature_names, metadata
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--input", required=True,
help="path of wav file")
parser.add_argument("-g", "--google_credentials", required=True,
help=".json file with google credentials")
parser.add_argument("-c", "--classifiers_path", required=True,
help="the directory which contains "
"all text trained classifiers")
parser.add_argument('-r', '--reference_text', required=False, default=None,
help='path of .txt file of reference text')
parser.add_argument('-s', '--segmentation_threshold', required=False,
default=None, type=int,
help='number of words or seconds of every text segment')
parser.add_argument('-m', '--method_of_segmentation', required=False,
default=None,
help='Choice between "fixed_size_text" and '
'"fixed_window"')
args = parser.parse_args()
classifiers_attributes = load_classifiers(args.classifiers_path)
features, feature_names, metadata = \
get_asr_features(args.input, args.google_credentials,
classifiers_attributes, args.reference_text,
args.segmentation_threshold,
args.method_of_segmentation)
print("Features names:\n {}".format(feature_names))
print("Features:\n {}".format(features))
print("Metadata:\n {}".format(metadata))