Skip to content

Commit

Permalink
Merge pull request #5 from cenkai88/master
Browse files Browse the repository at this point in the history
cache
  • Loading branch information
Xiaming committed Sep 21, 2015
2 parents 32cbbf0 + e2a8b17 commit 0ff1564
Showing 1 changed file with 25 additions and 31 deletions.
56 changes: 25 additions & 31 deletions cernet/stat.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,13 @@ def cleanse(record, ipdict, parser, air):

return '%s|%s|%s|%s|%s|%s|%s'%(month, day, hour, school, host, t, keyword)

def stat(x, condition, features, f):
c = condition.split('=')
if len(c)!= 1:
con = f[c[0]]
value = c[1]
if len(features) == 1:
fea = f[features[0]]
result = x.filter(lambda x: x.split('|')[con]==value).map(lambda x: (x.split('|')[fea], 1)).reduceByKey(add).collect()
else:
result = stat1(x.filter(lambda x: x.split('|')[con]==value), features, f).collect()
def stat(x, features, f):
if len(features) == 1:
fea = f[features[0]]
result = x.map(lambda x: (x.split('|')[fea], 1)).reduceByKey(add).collect()
else:
if len(features) == 1:
fea = f[features[0]]
result = x.map(lambda x: (x.split('|')[fea], 1)).reduceByKey(add).collect()
else:
result = stat1(x, features, f).collect()
filename = 'result/[%s]%s.csv'%(condition, '_'.join(features))
result = stat1(x, features, f).collect()
filename = 'result/[%s]%s.csv'%('_'.join(features))
output(result, filename, len(features))

def output(result, filename, n):
Expand Down Expand Up @@ -118,7 +108,11 @@ def main(sc, path):
with open('ipdict.txt') as ipdicts:
for line in ipdicts.readlines():
ipdict[line.split('|')[0]] = line.split('|')[1].strip().decode('utf-8', 'ignore')
cernet = sc.textFile(path).persist()
cernet = sc.textFile(path)
cernet2 = cernet.map(lambda x : cleanse(x, ipdict, parser, air)).filter(lambda x : x!=None).cache()
cernet_trip = cernet.filter(lambda x: x.split('|')[5]==u'旅游').cache()
cernet_shop = cernet.filter(lambda x: x.split('|')[5]==u'购物').cache()
cernet_search = cernet.filter(lambda x: x.split('|')[5]==u'搜索').cache()
parser = [{'regex': '^tieba\.baidu\.com\/+[\w\W]+kw=', 'type': u'搜索', 'dest' : lambda x: urllib.unquote(x.split('kw=')[1].split('&')[0]).encode('raw_unicode_escape').decode('utf-8', 'ignore')},\
{'regex': '^m\.tieba\.com\/+[\w\W]+word=', 'type': u'搜索', 'dest' : lambda x: urllib.unquote(x.split('word=')[1].split('&')[0]).encode('raw_unicode_escape').decode('utf-8', 'ignore')}, \
{'regex': '^(www\.b|b)aidu\.com\/+[\w\W]+wd=', 'type': u'搜索', 'dest' : lambda x: urllib.unquote(x.split('wd=')[1].split('&')[0]).encode('raw_unicode_escape').decode('utf-8', 'ignore')}, \
Expand Down Expand Up @@ -183,26 +177,26 @@ def main(sc, path):
parser[i]['regex'] = re.compile(parser[i]['regex'])
#一维度统计
for dim in f:
stat(cernet.map(lambda x : cleanse(x, ipdict, parser, air)).filter(lambda x : x!=None), u'', [dim], f)
stat(cernet2, [dim], f)

#二维度
stat(cernet.map(lambda x : cleanse(x, ipdict, parser, air)).filter(lambda x : x!=None), '', ['month', 'hour'], f)
stat(cernet.map(lambda x : cleanse(x, ipdict, parser, air)).filter(lambda x : x!=None), '', ['day', 'hour'], f)
stat(cernet.map(lambda x : cleanse(x, ipdict, parser, air)).filter(lambda x : x!=None), u'type=旅游', ['keyword', 'month'], f)
stat(cernet.map(lambda x : cleanse(x, ipdict, parser, air)).filter(lambda x : x!=None), u'type=旅游', ['keyword', 'school'], f)
stat(cernet.map(lambda x : cleanse(x, ipdict, parser, air)).filter(lambda x : x!=None), u'type=购物', ['keyword', 'hour'], f)
stat(cernet.map(lambda x : cleanse(x, ipdict, parser, air)).filter(lambda x : x!=None), u'type=购物', ['keyword', 'month'], f)
stat(cernet.map(lambda x : cleanse(x, ipdict, parser, air)).filter(lambda x : x!=None), u'type=购物', ['host', 'hour'], f)
stat(cernet.map(lambda x : cleanse(x, ipdict, parser, air)).filter(lambda x : x!=None), u'type=购物', ['host', 'month'], f)
stat(cernet.map(lambda x : cleanse(x, ipdict, parser, air)).filter(lambda x : x!=None), u'type=购物', ['host', 'day'], f)
stat(cernet.map(lambda x : cleanse(x, ipdict, parser, air)).filter(lambda x : x!=None), u'type=购物', ['host', 'school'], f)
stat(cernet2, ['month', 'hour'], f)
stat(cernet2, ['day', 'hour'], f)
stat(cernet_trip, ['keyword', 'month'], f)
stat(cernet_trip, ['keyword', 'school'], f)
stat(cernet_shop, ['keyword', 'hour'], f)
stat(cernet_shop, ['keyword', 'month'], f)
stat(cernet_shop, ['host', 'hour'], f)
stat(cernet_shop, ['host', 'month'], f)
stat(cernet_shop, ['host', 'day'], f)
stat(cernet_shop, ['host', 'school'], f)

#分类的keyword统计
for dim in f:
if dim!='type':
stat(cernet.map(lambda x : cleanse(x, ipdict, parser, air)).filter(lambda x : x!=None), u'type=购物', [dim], f)
stat(cernet.map(lambda x : cleanse(x, ipdict, parser, air)).filter(lambda x : x!=None), u'type=旅游', [dim], f)
stat(cernet.map(lambda x : cleanse(x, ipdict, parser, air)).filter(lambda x : x!=None), u'type=搜索', [dim], f)
stat(cernet_shop, [dim], f)
stat(cernet_trip, [dim], f)
stat(cernet_search, [dim], f)

if __name__ == "__main__":
path = sys.argv[1]
Expand Down

0 comments on commit 0ff1564

Please sign in to comment.