From da37f1236a19beb3ee9a9da4dbe8734139ab883a Mon Sep 17 00:00:00 2001 From: cenkai Date: Thu, 6 Aug 2015 13:44:45 +0800 Subject: [PATCH] get_tags_from_SiXian --- gettag.py | 49 ++++++++++++++++++++++++++++++++++++++ taghttp.py | 70 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 119 insertions(+) create mode 100644 gettag.py create mode 100644 taghttp.py diff --git a/gettag.py b/gettag.py new file mode 100644 index 0000000..dde9854 --- /dev/null +++ b/gettag.py @@ -0,0 +1,49 @@ + +from pyspark import SparkConf, SparkContext +import sys +import re + +reload(sys) +sys.setdefaultencoding('utf-8') + +APP_NAME = "TAGS" + +def cleanse(line): + line = line.split(" ") + if len(line)!= 3: + return None + return (line[0], line[2]) +def stripUrlProto(url): + match = protopattern.match(url) + if match: + return match.group(2) + else: + return url +def stripUrlParam(url): + match = parampattern.match(url) + if match: + return match.group(1) + return url +def cleanse2(line): + line = line.split("|") + if len(line)!= 3: + return None + try: + return [stripUrlParam(stripUrlProto(line[2])),cleantag[line[0]].replace("|",",")] + except KeyError: + return None +def main(sc,path): + global cleantag + global protopattern, parampattern + protopattern = re.compile("^(\\w+:?//)?(.*)$", re.I) + parampattern = re.compile("^((\\w+://)?([^\\?&]+))\\??", re.I) + cleantag = sc.textFile("hdfs://namenode.omnilab.sjtu.edu.cn/user/omnilab/warehouse/sjtu-wifi-urls-tags/urlstat" + path + "/*.result").map(cleanse).collectAsMap() + urltag = sc.textFile("hdfs://namenode.omnilab.sjtu.edu.cn/user/omnilab/warehouse/sjtu-wifi-urls-tags/urlstat" + path + "/*.txt").map(cleanse2) + final = urltag.filter(lambda x : x!=None).map(lambda x : "%s|%s"%(x[0],x[1])) + final.saveAsTextFile('tag/'+path) + +if __name__ == "__main__": + conf = SparkConf().setAppName(APP_NAME) + sc = SparkContext(conf=conf) + path = str(sys.argv[1]) + main(sc,path) \ No newline at end of file diff --git a/taghttp.py b/taghttp.py new file mode 100644 index 0000000..ccc8e7d --- /dev/null +++ b/taghttp.py @@ -0,0 +1,70 @@ +from pyspark import SparkConf, SparkContext +import sys +import re + +reload(sys) +sys.setdefaultencoding('utf-8') + +APP_NAME = "FEATURES_FROM_HTTP" + +def cleanse0(text): + text = text.split("|") + return (text[0],text[1]) +def cleanse(text): + chops = text.split("\" \"") + if len(chops)!=21: + return None + timestamps = chops[0].split(" ") + if len(timestamps) != 18: + return None + results = timestamps + chops[1:21] + results = map(lambda x: None if x=="\"N/A\"" else x.strip("\""), results) + return results +def hasProtoPrefix(url): + if re.match(r"^(\\w+:?//).*", url): + return True + return False +def combineHostUri(text): + host = text[20] + url = text[18] + if hasProtoPrefix(url): + text[18] = url + else: + text[18] = host+url + return text +def stripUrlProto(text): + url = text[18] + match = protopattern.match(url) + if match: + text[20] = match.group(2) + else: + text[20] = url + return text +def stripUrlParam(text): + url = text[20] + match = parampattern.match(url) + if match: + text[20] = match.group(1) + return text +def tagging(text): + try: + text[20] = tag[text[20]] + except KeyError: + text[20] = None + return text +def main(sc,path): + global protopattern, parampattern,tag + protopattern = re.compile("^(\\w+:?//)?(.*)$", re.I) + parampattern = re.compile("^((\\w+://)?([^\\?&]+))\\??", re.I) + tag = sc.textFile("hdfs://namenode.omnilab.sjtu.edu.cn/user/cenkai/tag/"+path).map(cleanse0).collectAsMap() + cleanhttp = sc.textFile("hdfs://namenode.omnilab.sjtu.edu.cn/user/omnilab/warehouse/sjtu_wifi/"+path+"/http/").map(cleanse) + exthttp = cleanhttp.filter(lambda x: x!=None and x[18]!=None and x[20]!=None).map(combineHostUri).map(lambda x: stripUrlParam(stripUrlProto(x))).map(tagging) + final = exthttp.map(lambda text : "%s|%s|%s|%d|%s|%s|%s|%s"%(text[0],text[5],text[7], int(text[15])+int(text[16]), text[18], text[21], text[20], text[29])) + final.saveAsTextFile('tagged/'+path) + +if __name__ == "__main__": + conf = SparkConf().setAppName(APP_NAME) + sc = SparkContext(conf=conf) + path = sys.argv[1] + main(sc,path) +