Skip to content

Commit

Permalink
Merge pull request #6 from cenkai88/master
Browse files Browse the repository at this point in the history
fix
  • Loading branch information
Xiaming committed Oct 13, 2015
2 parents 0ff1564 + 8c3300a commit d1c5b9b
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 55 deletions.
85 changes: 31 additions & 54 deletions cernet/stat.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import sys
from operator import add
import re
import csv
import urllib
import os

Expand Down Expand Up @@ -45,40 +44,18 @@ def cleanse(record, ipdict, parser, air):
if not k:
keyword = 'unknown'
t = 'unknown'

return '%s|%s|%s|%s|%s|%s|%s'%(month, day, hour, school, host, t, keyword)

def stat(x, features, f):
def stat(x, prefix, features, f):
if len(features) == 1:
fea = f[features[0]]
result = x.map(lambda x: (x.split('|')[fea], 1)).reduceByKey(add).collect()
result = x.map(lambda x: (x.split('|')[fea], 1)).reduceByKey(add)
else:
result = stat1(x, features, f).collect()
filename = 'result/[%s]%s.csv'%('_'.join(features))
output(result, filename, len(features))

def output(result, filename, n):
if len(result) == 0:
result = stat1(x, features, f)
filename = 'result/[%s]%s'%(prefix, '_'.join(features))
if result.count() == 0:
return
with open(filename, 'ab+') as final:
spamwriter = csv.writer(final, dialect='excel')
if n-1:
col= result[0][1]
for line in result[1:]:
if len(line[1]) > len(col):
col = line[1]
spamwriter.writerow(['']+list(col))
for line in result:
row = [line[0]]
for item in col:
try:
row.append(line[1][item])
except KeyError:
row.append(0)
spamwriter.writerow(row)
else:
for line in result:
spamwriter.writerow(list(line))
result.repartition(1).saveAsTextFile(filename)

def stat1(x, features, f):
feas = []
Expand All @@ -94,7 +71,11 @@ def stat2(x, n):
d2[item.split('|')[n]] = d2[item.split('|')[n]] + 1
except KeyError:
d2[item.split('|')[n]] = 1
return (d1, d2)
l2 = list(sorted(list(d2)))
tmp = ''
for item in l2:
tmp = tmp + '%s$%s|'%(item, d2[item])
return (d1, tmp)

def main(sc, path):
os.system('mkdir result')
Expand All @@ -109,10 +90,6 @@ def main(sc, path):
for line in ipdicts.readlines():
ipdict[line.split('|')[0]] = line.split('|')[1].strip().decode('utf-8', 'ignore')
cernet = sc.textFile(path)
cernet2 = cernet.map(lambda x : cleanse(x, ipdict, parser, air)).filter(lambda x : x!=None).cache()
cernet_trip = cernet.filter(lambda x: x.split('|')[5]==u'旅游').cache()
cernet_shop = cernet.filter(lambda x: x.split('|')[5]==u'购物').cache()
cernet_search = cernet.filter(lambda x: x.split('|')[5]==u'搜索').cache()
parser = [{'regex': '^tieba\.baidu\.com\/+[\w\W]+kw=', 'type': u'搜索', 'dest' : lambda x: urllib.unquote(x.split('kw=')[1].split('&')[0]).encode('raw_unicode_escape').decode('utf-8', 'ignore')},\
{'regex': '^m\.tieba\.com\/+[\w\W]+word=', 'type': u'搜索', 'dest' : lambda x: urllib.unquote(x.split('word=')[1].split('&')[0]).encode('raw_unicode_escape').decode('utf-8', 'ignore')}, \
{'regex': '^(www\.b|b)aidu\.com\/+[\w\W]+wd=', 'type': u'搜索', 'dest' : lambda x: urllib.unquote(x.split('wd=')[1].split('&')[0]).encode('raw_unicode_escape').decode('utf-8', 'ignore')}, \
Expand Down Expand Up @@ -175,29 +152,29 @@ def main(sc, path):

for i in xrange(len(parser)):
parser[i]['regex'] = re.compile(parser[i]['regex'])
#一维度统计
cernet2 = cernet.map(lambda x : cleanse(x, ipdict, parser, air)).filter(lambda x : x!=None).cache()
cernet_trip = cernet2.filter(lambda x: x.split('|')[5]==u'旅游').cache()
cernet_shop = cernet2.filter(lambda x: x.split('|')[5]==u'购物').cache()
cernet_search = cernet2.filter(lambda x: x.split('|')[5]==u'搜索').cache()
for dim in f:
stat(cernet2, [dim], f)

#二维度
stat(cernet2, ['month', 'hour'], f)
stat(cernet2, ['day', 'hour'], f)
stat(cernet_trip, ['keyword', 'month'], f)
stat(cernet_trip, ['keyword', 'school'], f)
stat(cernet_shop, ['keyword', 'hour'], f)
stat(cernet_shop, ['keyword', 'month'], f)
stat(cernet_shop, ['host', 'hour'], f)
stat(cernet_shop, ['host', 'month'], f)
stat(cernet_shop, ['host', 'day'], f)
stat(cernet_shop, ['host', 'school'], f)

#分类的keyword统计
stat(cernet2, 'total', [dim], f)
stat(cernet2, 'total', ['month', 'hour'], f)
stat(cernet2, 'total', ['hour', 'day'], f)
stat(cernet2, 'total', ['month', 'day'], f)
stat(cernet_trip, 'trip', ['month', 'keyword'], f)
stat(cernet_shop, 'shop', ['month', 'keyword'], f)
stat(cernet_shop, 'shop', ['hour', 'keyword'], f)
stat(cernet_shop, 'shop', ['day', 'keyword'], f)
stat(cernet_shop, 'shop', ['host', 'hour'], f)
stat(cernet_shop, 'shop', ['host', 'month'], f)
stat(cernet_shop, 'shop', ['host', 'day'], f)
stat(cernet_shop, 'shop', ['host', 'school'], f)
for dim in f:
if dim!='type':
stat(cernet_shop, [dim], f)
stat(cernet_trip, [dim], f)
stat(cernet_search, [dim], f)
stat(cernet_shop, 'shop', [dim], f)
stat(cernet_trip, 'trip', [dim], f)
stat(cernet_search, 'search', [dim], f)

if __name__ == "__main__":
path = sys.argv[1]
conf = SparkConf().setAppName(APP_NAME)
Expand Down
2 changes: 1 addition & 1 deletion cernet/stat.sh
Original file line number Diff line number Diff line change
@@ -1 +1 @@
spark-submit2 stat.py $1
spark-submit2 --conf spark.akka.frameSize=100 spark.driver.maxResultSize=10g stat.py $1

0 comments on commit d1c5b9b

Please sign in to comment.