diff --git a/cernet/stat.py b/cernet/stat.py index 66fcc41..6843ce9 100644 --- a/cernet/stat.py +++ b/cernet/stat.py @@ -9,7 +9,6 @@ import sys from operator import add import re -import csv import urllib import os @@ -50,34 +49,13 @@ def cleanse(record, ipdict, parser, air): def stat(x, prefix, features, f): if len(features) == 1: fea = f[features[0]] - result = x.map(lambda x: (x.split('|')[fea], 1)).reduceByKey(add).collect() + result = x.map(lambda x: (x.split('|')[fea], 1)).reduceByKey(add) else: - result = stat1(x, features, f).collect() - filename = 'result/[%s]%s.csv'%(prefix, '_'.join(features)) - output(result, filename, len(features)) - -def output(result, filename, n): - if len(result) == 0: + result = stat1(x, features, f) + filename = 'result/[%s]%s'%(prefix, '_'.join(features)) + if result.count() == 0: return - with open(filename, 'ab+') as final: - spamwriter = csv.writer(final, dialect='excel') - if n-1: - col= result[0][1] - for line in result[1:]: - if len(line[1]) > len(col): - col = line[1] - spamwriter.writerow(['']+list(col)) - for line in result: - row = [line[0]] - for item in col: - try: - row.append(line[1][item]) - except KeyError: - row.append(0) - spamwriter.writerow(row) - else: - for line in result: - spamwriter.writerow(list(line)) + result.repartition(1).saveAsTextFile(filename) def stat1(x, features, f): feas = [] @@ -93,7 +71,11 @@ def stat2(x, n): d2[item.split('|')[n]] = d2[item.split('|')[n]] + 1 except KeyError: d2[item.split('|')[n]] = 1 - return (d1, d2) + l2 = list(sorted(list(d2))) + tmp = '' + for item in l2: + tmp = tmp + '%s$%s|'%(item, d2[item]) + return (d1, tmp) def main(sc, path): os.system('mkdir result') @@ -177,11 +159,11 @@ def main(sc, path): for dim in f: stat(cernet2, 'total', [dim], f) stat(cernet2, 'total', ['month', 'hour'], f) - stat(cernet2, 'total', ['day', 'hour'], f) - stat(cernet_trip, 'trip', ['keyword', 'month'], f) - stat(cernet_trip, 'trip', ['keyword', 'school'], f) - stat(cernet_shop, 'shop', ['keyword', 'hour'], f) - stat(cernet_shop, 'shop', ['keyword', 'month'], f) + stat(cernet2, 'total', ['hour', 'day'], f) + stat(cernet_trip, 'trip', ['month', 'keyword'], f) + stat(cernet_shop, 'shop', ['month', 'keyword'], f) + stat(cernet_shop, 'shop', ['hour', 'keyword'], f) + stat(cernet_shop, 'shop', ['day', 'keyword'], f) stat(cernet_shop, 'shop', ['host', 'hour'], f) stat(cernet_shop, 'shop', ['host', 'month'], f) stat(cernet_shop, 'shop', ['host', 'day'], f) @@ -195,5 +177,7 @@ def main(sc, path): if __name__ == "__main__": path = sys.argv[1] conf = SparkConf().setAppName(APP_NAME) + conf = conf.set("spark.akka.frameSize", "1000") + conf = conf.set("spark.shuffle.consolidateFiles", "True") sc = SparkContext(conf=conf) main(sc, path) \ No newline at end of file