diff --git a/extracturls/README.md b/extracturls/README.md new file mode 100644 index 0000000..04cd7f3 --- /dev/null +++ b/extracturls/README.md @@ -0,0 +1,28 @@ +ExtractUrlFromHttp +=================== + +Prepare +------- + +Install and configure Apache Spark for local test. + + +Build +----- + + sbt assembly + +Run +--- + + spark-submit --master local --class cn.edu.sjtu.omnilab.cobac.ExtractUrlFromHttp\ + target/scala-2.10/cobac-extracturls-assembly-1.0.jar \ + input output + +Test script +----------- + + chmod +x bin/test_extract_urls.sh + ./bin/test_extract_urls.sh + +Enjoy! \ No newline at end of file diff --git a/extracturls/bin/test_extract_urls.sh b/extracturls/bin/test_extract_urls.sh new file mode 100755 index 0000000..0f5bf39 --- /dev/null +++ b/extracturls/bin/test_extract_urls.sh @@ -0,0 +1,25 @@ +#!/bin/bash +set -e + +if [ $# -lt 2 ]; then + echo "Usage: $0 " + exit -1 +fi + +httplog=$1 +output=$2-`date +%y%m%d-%H%M%S` +echo "Output file:" $output + +# build with java 6 to be compatible with our Spark cluster +sbt -java-home $(/usr/libexec/java_home -v '1.6*') assembly + +APP_NAME="cn.edu.sjtu.omnilab.cobac.ExtractUrlFromHttp" +BINJAR="target/scala-2.10/cobac-extracturls-assembly-1.0.jar" + +spark-submit --master local --class $APP_NAME $BINJAR $httplog $output + +if [ -d $output ]; then + echo "Output: " + head -n 20 $output/part-00000 + echo "..." +fi \ No newline at end of file diff --git a/extracturls/build.sbt b/extracturls/build.sbt new file mode 100644 index 0000000..f04bf97 --- /dev/null +++ b/extracturls/build.sbt @@ -0,0 +1,8 @@ +name := "cobac-extracturls" + +version := "1.0" + +scalaVersion := "2.10.5" + +libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0" % "provided" + \ No newline at end of file diff --git a/extracturls/project/plugins.sbt b/extracturls/project/plugins.sbt new file mode 100644 index 0000000..58b2aae --- /dev/null +++ b/extracturls/project/plugins.sbt @@ -0,0 +1,3 @@ +logLevel := Level.Warn + +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0") \ No newline at end of file diff --git a/extracturls/src/main/scala/cn/edu/sjtu/omnilab/cobac/DataSchema.scala b/extracturls/src/main/scala/cn/edu/sjtu/omnilab/cobac/DataSchema.scala new file mode 100644 index 0000000..eaad97e --- /dev/null +++ b/extracturls/src/main/scala/cn/edu/sjtu/omnilab/cobac/DataSchema.scala @@ -0,0 +1,47 @@ +package cn.edu.sjtu.omnilab.cobac + +/** + * Data Schema of SJTU HTTP logs + */ +object DataSchema { + + val source_ip = 0 + val source_port = 1 + val dest_ip = 2 + val dest_port = 3 + val conn = 4 + val conn_ts = 5 + val close_ts = 6 + val conn_dur = 7 + val idle_time0 = 8 + val request_ts = 9 + val request_dur = 10 + val response_ts = 11 + val response_dur_b = 12 + val response_dur_e = 13 + val idle_time1 = 14 + val request_size = 15 + val response_size = 16 + val request_method = 17 + val request_url = 18 + val request_protocol = 19 + val request_host = 20 + val request_user_agent = 21 + val request_referrer = 22 + val request_conn = 23 + val request_keep_alive = 24 + val response_protocol = 25 + val response_code = 26 + val response_server = 27 + val response_clen = 28 + val response_ctype = 29 + val response_cenc = 30 + val response_etag = 31 + val response_cache_ctl = 32 + val response_last_mod = 33 + val response_age = 34 + val response_expire = 35 + val response_connval = 36 + val response_keep_alive = 37 + +} diff --git a/extracturls/src/main/scala/cn/edu/sjtu/omnilab/cobac/ExtractUrlFromHttp.scala b/extracturls/src/main/scala/cn/edu/sjtu/omnilab/cobac/ExtractUrlFromHttp.scala new file mode 100644 index 0000000..57ac265 --- /dev/null +++ b/extracturls/src/main/scala/cn/edu/sjtu/omnilab/cobac/ExtractUrlFromHttp.scala @@ -0,0 +1,116 @@ +package cn.edu.sjtu.omnilab.cobac + +import java.util.regex.{Matcher, Pattern} + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.SparkContext._ +import org.omg.CORBA.PERSIST_STORE + +object ExtractUrlFromHttp { + + final val httpProtoPattern = Pattern.compile("^(\\w+:?//)?(.*)$", Pattern.CASE_INSENSITIVE) + final val httpParamPattern = Pattern.compile("^((\\w+://)?([^\\?&]+))\\??", Pattern.CASE_INSENSITIVE) + + def main( args: Array[String] ): Unit = { + + // parse command options + if (args.length < 2){ + println("Usage: ExtractUrlFromHttp ") + sys.exit(0) + } + + val httplog = args(0) + val output = args(1) + + // configure spark + val conf = new SparkConf() + .setAppName("Extracting unique urls from http logs") + val spark = new SparkContext(conf) + + // extract fields from raw logs and validate input data + val allUrlRDD = spark.textFile(httplog).map(cleanseLog(_)) + .filter( line => line != null && + line(DataSchema.request_host) != null && + line(DataSchema.request_url) != null) + .map( line => { + val url = combineHostUri(line(DataSchema.request_host), line(DataSchema.request_url)) + val curl = stripUrlParam(stripUrlProto(url)) + (curl, url) + }).groupBy(_._1) + .map { case (curl, pairs) => { + val tnum = pairs.size + if (tnum > 500 ) { // about TOP-60K in one month + val oneurl = pairs.toList(1)._2 + (tnum, oneurl) + } else { + null + } + }}.filter(m => m != null) + .sortByKey(false) + .zipWithIndex + .map { case ((tnum, url), index) => "%d|%d|%s".format(index, tnum, url) } + .saveAsTextFile(output) + + spark.stop() + + } + + def cleanseLog(line: String): Array[String] = { + // get HTTP header fields + val chops = line.split("""\"\s\""""); + if ( chops.length != 21 ) + return null + + // get timestamps + val timestamps = chops(0).split(" "); + if (timestamps.length != 18 ) + return null + + val results = timestamps ++ chops.slice(1, 21) + + // remove N/A values and extrat quote + results.transform( field => { + var new_field = field.replaceAll("\"", "") + if (new_field == "N/A") + new_field = null + new_field + }) + + results + } + + def hasProtoPrefix(uri: String): Boolean = { + if ( uri.matches("^(\\w+:?//).*")) + return true + return false + } + + def combineHostUri(host: String, uri: String): String = { + if ( hasProtoPrefix(uri) ){ + return uri; + } else { + return host+uri; + } + } + + def stripUrlProto(url: String): String = { + val matcher = httpProtoPattern.matcher(url); + if ( matcher.find() ){ + return matcher.group(2); + } else { + return url; + } + } + + def stripUrlParam(url: String): String = { + val matcher = httpParamPattern.matcher(url); + if ( matcher.find() ){ + return matcher.group(1); + } else { + return url; + } + } + + +} \ No newline at end of file