-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfreq_utils.py
462 lines (413 loc) · 15.1 KB
/
freq_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
'''
Word frequency counting for `tubelex` and `wikipedia-word-frequency-clean`.
'''
from typing import Optional, Union, TextIO
from unicodedata import normalize as unicode_normalize
from collections import Counter, defaultdict
from collections.abc import Iterable, Sequence
from enum import Enum
from zipfile import ZIP_DEFLATED, ZIP_BZIP2, ZIP_LZMA
import gzip
import bz2
import lzma
import argparse
import sys
NORMALIZED_SUFFIX_FNS = (
(False, '_no-normalization', None),
(True, '_lower', lambda w: w.lower()),
(True, '_nfkc', lambda w: unicode_normalize('NFKC', w)),
(True, '', lambda w: unicode_normalize('NFKC', w).lower())
)
TOTAL_LABEL = '[TOTAL]'
DEFAULT_MARKUP = (
# GT (lowercased: gt) is actually quite common in Japanese wikipedia:-)
# BR (lcased: br) "BR Deutschland" in German
'lt', 'br', 'ref', 'onlyinclude', 'colspan', 'align', 'ruby',
# 'del' is very common in Italian and Spanish => all European wikipedias
# 'font' is a common word in French
'https'
)
DEFAULT_TOP_N = 6000
class Storage(Enum):
PLAIN = (None, open, '')
DEFLATE = (ZIP_DEFLATED, gzip.open, '.gz')
BZIP2 = (ZIP_BZIP2, bz2.open, '.bz2')
LZMA = (ZIP_LZMA, lzma.open, '.xz')
def __init__(self, zip_compression, open_fn, suffix):
self.zip_compression = zip_compression
self.open = open_fn
self.suffix = suffix
@staticmethod
def from_args(args: argparse.Namespace) -> 'Storage':
return (
Storage.DEFLATE if args.deflate else
Storage.BZIP2 if args.bzip2 else
Storage.LZMA if args.lzma else
Storage.PLAIN
)
@staticmethod
def add_arg_group(
parser: argparse.ArgumentParser,
title: Optional[str] = None,
zip_suffix: bool = False
):
titled_group = parser.add_argument_group(title=title)
arg_group = titled_group.add_mutually_exclusive_group()
opt_zip_s = '.zip/' if zip_suffix else ''
arg_group.add_argument(
'--deflate', '--zip', '-z', action='store_true',
help=f'Store data deflated ({opt_zip_s}.gz)'
)
arg_group.add_argument(
'--bzip2', '-j', action='store_true',
help=f'Store data using Bzip2 ({opt_zip_s}.bz2)'
)
arg_group.add_argument(
'--lzma', '--xz', '-x', action='store_true',
help=f'Store data using LZMA ({opt_zip_s}.xz)'
)
class WordCounter:
'''
The lifecycle of a WordCounter object:
1. For each document:
- add words with add() (possibly calling it several times)
- close document with close_doc()
2. Merge/adjust and issue output:
- merge()
- remove_less_than_min_docs()
- remove_less_than_min_channels()
- warnings_for_markup()
- dump()
>>> c = WordCounter()
>>> c.add('abcdefgh')
>>> c.close_doc()
>>> c.add('abcd')
>>> c.close_doc()
>>> sum(c.word_count.values())
12
>>> d = WordCounter()
>>> d.add('abcdabcdijklijkl')
>>> d.close_doc()
>>> sum(d.word_count.values())
16
>>> m = c.merge(d)
>>> c is m
True
>>> sum(m.word_count.values())
28
>>> m.word_count['a']
4
>>> m.word_count['e']
1
>>> m.word_count['i']
2
>>> m.word_docn['a']
3
>>> m.word_docn['e']
1
>>> m.word_docn['i']
1
>>> m.remove_less_than_min_docs(3)
>>> ''.join(sorted(m.word_count.keys()))
'abcd'
Can be pickled (and implements __eq__):
>>> import pickle
>>> p = pickle.loads(pickle.dumps(m))
>>> p == m
True
'''
__slots__ = ('word_count', 'cat2word_count', 'word_docn', 'word_channels',
'word_pos', 'doc_words')
word_count: Counter[str]
cat2word_count: Optional[dict[str, Counter[str]]]
word_docn: Counter[str] # documents or videos
word_channels: Optional[dict[str, set[Union[int, str]]]] # for tubelex (YouTube)
word_pos: Optional[dict[str, Counter[str]]] # optional, for tubelex (YouTube)
doc_words: set[str] # words in current doc
def __init__(self,
channels: bool = False, pos: bool = False, categories: bool = False
):
super().__init__()
self.word_count = Counter()
self.cat2word_count = defaultdict(Counter) if categories else None
self.word_docn = Counter()
self.word_channels = defaultdict(set) if channels else None
self.word_pos = defaultdict(Counter) if pos else None
self.doc_words = set()
def __eq__(self, other):
return (
self.word_count == other.word_count and
self.cat2word_count == other.cat2word_count and
self.word_docn == other.word_docn and
self.word_channels == other.word_channels and
self.word_pos == other.word_pos and
self.doc_words == self.doc_words
)
def add(
self,
words: Sequence[str],
channel_id: Optional[Union[int, str]] = None,
category: Optional[str] = None
):
assert (channel_id is None) == (self.word_channels is None), (
channel_id, self.word_channels
)
assert (category is None) == (self.cat2word_count is None), (
category, self.cat2word_count
)
cat_word_count = (
self.cat2word_count[category] if (category is not None) else
None
)
for w in words:
self.word_count[w] += 1
self.doc_words.add(w)
if self.word_channels is not None:
self.word_channels[w].add(channel_id) # type: ignore
if cat_word_count is not None:
cat_word_count[w] += 1
def add_pos(
self,
words_pos: Sequence[tuple[str, str]],
channel_id: Optional[Union[int, str]] = None,
category: Optional[str] = None
):
assert (channel_id is None) == (self.word_channels is None), (
channel_id, self.word_channels
)
assert (category is None) == (self.cat2word_count is None), (
category, self.cat2word_count
)
cat_word_count = (
self.cat2word_count[category] if (category is not None) else
None
)
for w, p in words_pos:
self.word_count[w] += 1
self.word_pos[w][p] += 1
self.doc_words.add(w)
if self.word_channels is not None:
self.word_channels[w].add(channel_id) # type: ignore
if cat_word_count is not None:
cat_word_count[w] += 1
def close_doc(self):
self.word_docn.update(self.doc_words)
self.doc_words = set()
def remove_less_than_min_docs(self, min_docs: int):
assert not self.doc_words, 'Missing `close_doc()`?'
for word, docn in self.word_docn.items():
if docn < min_docs:
del self.word_count[word]
def remove_less_than_min_channels(self, min_channels: int):
assert self.word_channels is not None
for word, channels in self.word_channels.items():
if len(channels) < min_channels:
del self.word_count[word]
def warnings_for_markup(
self,
top_n: int = DEFAULT_TOP_N,
markup: Iterable[str] = DEFAULT_MARKUP,
suffix: str = ''
):
top_words = set(w for w, __ in self.word_count.most_common(top_n))
suffix_str = f', in *{suffix}' if suffix else ''
for w in top_words.intersection(markup):
sys.stdout.write(
f'Warning: Possible markup "{w}" found among top {top_n} words with '
f'frequency {self.word_count[w]}{suffix_str}.\n'
)
def merge(self, other: 'WordCounter') -> 'WordCounter':
assert not self.doc_words, 'Missing `self.close_doc()`?'
assert not other.doc_words, 'Missing `other.close_doc()`?'
assert self.word_pos is None, 'Merge does not support POS (self).'
assert other.word_pos is None, 'Merge does not support POS (other).'
assert self.cat2word_count is None, 'Merge does not support categories (self).'
assert other.cat2word_count is None, (
'Merge does not support categories (other).'
)
self.word_count.update(other.word_count)
# Documents have unique ids, we just add the counts:
wdn = self.word_docn
owd = other.word_docn
for w, od in owd.items():
wdn[w] += od
# Merge sets of channels:
wc = self.word_channels
owc = other.word_channels
if wc is not None:
assert owc is not None
for w, oc in owc.items():
c = wc.get(w)
if c is None:
wc[w] = oc
else:
c.update(oc)
else:
assert owc is None
return self
def dump(
self,
f: TextIO,
cols: Sequence[str],
totals: Sequence[int],
sep: str = '\t'
):
'''
>>> c = WordCounter()
>>> c.add('deabcdabcaba')
>>> c.close_doc()
>>> c.add('abc')
>>> c.close_doc()
>>> c.dump(sys.stdout, ('word', 'count', 'documents'), (100, 200), sep=' ')
word count documents
a 5 2
b 4 2
c 3 2
d 2 1
e 1 1
[in] 100 200
'''
assert not self.doc_words, 'Missing `close_doc()`?'
w_count = self.word_count
w_docn = self.word_docn
w_channels = self.word_channels
w_pos = self.word_pos
n_numbers = 2 if w_channels is None else 3 # Not including n_cats
n_cols = 1 + n_numbers # 1 is for word/TOTAL_LABEL
if w_pos is not None:
n_cols += 1
n_cats = 0
cat_w_counts = ()
if self.cat2word_count is not None:
n_cats = len(self.cat2word_count)
n_cols += n_cats
wc_col = cols[1]
# Do not modify original variable:
cols = cols + [f'{wc_col}:{cat}' for cat in self.cat2word_count]
cat_w_counts = self.cat2word_count.values()
assert len(cols) == n_cols, (cols, len(cols), n_cols)
assert len(totals) == n_numbers, (totals, n_numbers)
line_format = (
'%s' + (f'{sep}%d' * n_numbers) +
(f'{sep}%s' if (w_pos is not None) else '') +
(f'{sep}%d' * n_cats) +
'\n'
)
words = sorted(w_count, key=w_count.__getitem__, reverse=True)
f.write(sep.join(cols) + '\n')
if w_pos is not None:
if w_channels is None:
for word in words:
f.write(line_format % (
word, w_count[word], w_docn[word],
w_pos[word].most_common(1)[0][0],
*(wc[word] for wc in cat_w_counts)
))
else:
for word in words:
f.write(line_format % (
word, w_count[word], w_docn[word],
len(w_channels[word]),
w_pos[word].most_common(1)[0][0],
*(wc[word] for wc in cat_w_counts)
))
else:
if w_channels is None:
for word in words:
f.write(line_format % (
word, w_count[word], w_docn[word],
*(wc[word] for wc in cat_w_counts)
))
else:
for word in words:
f.write(line_format % (
word, w_count[word], w_docn[word], len(w_channels[word]),
*(wc[word] for wc in cat_w_counts)
))
f.write(line_format % (
TOTAL_LABEL, *totals, *(('',) if (w_pos is not None) else ()),
*(wc.total() for wc in cat_w_counts)
))
class WordCounterGroup(dict[str, WordCounter]):
__slots__ = ('n_words', 'n_docs')
n_words: int
n_docs: int
def __init__(self, normalize: bool, channels: bool = False, pos: bool = False,
categories: bool = False):
super().__init__((
(suffix, WordCounter(channels=channels, pos=pos, categories=categories))
for normalized, suffix, __ in NORMALIZED_SUFFIX_FNS
if normalize or not normalized
))
self.n_words = 0
self.n_docs = 0
def add(
self,
words: Sequence[str],
channel_id: Optional[Union[int, str]] = None,
category: Optional[str] = None
):
for __, suffix, norm_fn in NORMALIZED_SUFFIX_FNS:
c = self.get(suffix)
if c is not None:
c.add(
map(norm_fn, words) if (norm_fn is not None) else words,
channel_id=channel_id, category=category
)
self.n_words += len(words)
def add_pos(
self,
words_pos: Sequence[tuple[str, str]],
channel_id: Optional[Union[int, str]] = None,
category: Optional[str] = None
):
for __, suffix, norm_fn in NORMALIZED_SUFFIX_FNS:
c = self.get(suffix)
if c is not None:
c.add_pos(
map(lambda wp: (norm_fn(wp[0]), wp[1]), words_pos)
if (norm_fn is not None) else words_pos,
channel_id=channel_id, category=category
)
self.n_words += len(words_pos)
def close_doc(self):
for c in self.values():
c.close_doc()
def remove_less_than_min_docs(self, min_docs: int):
for c in self.values():
c.remove_less_than_min_docs(min_docs)
def remove_less_than_min_channels(self, min_channels: int):
for c in self.values():
c.remove_less_than_min_docs(min_channels)
def warnings_for_markup(
self,
top_n: int = DEFAULT_TOP_N,
markup: Iterable[str] = DEFAULT_MARKUP
):
for suffix, c in self.items():
c.warnings_for_markup(top_n, markup, suffix)
def merge(self, other: 'WordCounterGroup') -> 'WordCounterGroup':
for suffix, c in self.items():
c.merge(other[suffix])
self.n_words += other.n_words
self.n_docs += other.n_docs
return self
def dump(
self,
path_pattern: str,
storage: Storage,
cols: Sequence[str],
n_docs: Optional[int] = None,
n_channels: Optional[int] = None
):
if n_docs is None:
n_docs = self.n_docs
totals = [self.n_words, n_docs]
if n_channels is not None:
totals.append(n_channels)
for suffix, c in self.items():
with storage.open(
path_pattern.replace('%', suffix), # no effect if not do_norm (no '%')
'wt'
) as f:
c.dump(f, cols, totals)