Skip to content

Commit

Permalink
saveAsTestFile
Browse files Browse the repository at this point in the history
  • Loading branch information
cenkai88 committed Sep 27, 2015
1 parent 96f1f74 commit 8c00def
Showing 1 changed file with 17 additions and 33 deletions.
50 changes: 17 additions & 33 deletions cernet/stat.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import sys
from operator import add
import re
import csv
import urllib
import os

Expand Down Expand Up @@ -50,34 +49,13 @@ def cleanse(record, ipdict, parser, air):
def stat(x, prefix, features, f):
if len(features) == 1:
fea = f[features[0]]
result = x.map(lambda x: (x.split('|')[fea], 1)).reduceByKey(add).collect()
result = x.map(lambda x: (x.split('|')[fea], 1)).reduceByKey(add)
else:
result = stat1(x, features, f).collect()
filename = 'result/[%s]%s.csv'%(prefix, '_'.join(features))
output(result, filename, len(features))

def output(result, filename, n):
if len(result) == 0:
result = stat1(x, features, f)
filename = 'result/[%s]%s'%(prefix, '_'.join(features))
if result.count() == 0:
return
with open(filename, 'ab+') as final:
spamwriter = csv.writer(final, dialect='excel')
if n-1:
col= result[0][1]
for line in result[1:]:
if len(line[1]) > len(col):
col = line[1]
spamwriter.writerow(['']+list(col))
for line in result:
row = [line[0]]
for item in col:
try:
row.append(line[1][item])
except KeyError:
row.append(0)
spamwriter.writerow(row)
else:
for line in result:
spamwriter.writerow(list(line))
result.repartition(1).saveAsTextFile(filename)

def stat1(x, features, f):
feas = []
Expand All @@ -93,7 +71,11 @@ def stat2(x, n):
d2[item.split('|')[n]] = d2[item.split('|')[n]] + 1
except KeyError:
d2[item.split('|')[n]] = 1
return (d1, d2)
l2 = list(sorted(list(d2)))
tmp = ''
for item in l2:
tmp = tmp + '%s$%s|'%(item, d2[item])
return (d1, tmp)

def main(sc, path):
os.system('mkdir result')
Expand Down Expand Up @@ -177,11 +159,11 @@ def main(sc, path):
for dim in f:
stat(cernet2, 'total', [dim], f)
stat(cernet2, 'total', ['month', 'hour'], f)
stat(cernet2, 'total', ['day', 'hour'], f)
stat(cernet_trip, 'trip', ['keyword', 'month'], f)
stat(cernet_trip, 'trip', ['keyword', 'school'], f)
stat(cernet_shop, 'shop', ['keyword', 'hour'], f)
stat(cernet_shop, 'shop', ['keyword', 'month'], f)
stat(cernet2, 'total', ['hour', 'day'], f)
stat(cernet_trip, 'trip', ['month', 'keyword'], f)
stat(cernet_shop, 'shop', ['month', 'keyword'], f)
stat(cernet_shop, 'shop', ['hour', 'keyword'], f)
stat(cernet_shop, 'shop', ['day', 'keyword'], f)
stat(cernet_shop, 'shop', ['host', 'hour'], f)
stat(cernet_shop, 'shop', ['host', 'month'], f)
stat(cernet_shop, 'shop', ['host', 'day'], f)
Expand All @@ -195,5 +177,7 @@ def main(sc, path):
if __name__ == "__main__":
path = sys.argv[1]
conf = SparkConf().setAppName(APP_NAME)
conf = conf.set("spark.akka.frameSize", "1000")
conf = conf.set("spark.shuffle.consolidateFiles", "True")
sc = SparkContext(conf=conf)
main(sc, path)

0 comments on commit 8c00def

Please sign in to comment.