Skip to content

Commit

Permalink
Add extracturls module
Browse files Browse the repository at this point in the history
  • Loading branch information
caesar0301 committed Aug 5, 2015
1 parent 2021a01 commit e2a81ab
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 0 deletions.
28 changes: 28 additions & 0 deletions extracturls/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
ExtractUrlFromHttp
===================

Prepare
-------

Install and configure Apache Spark for local test.


Build
-----

sbt assembly

Run
---

spark-submit --master local --class cn.edu.sjtu.omnilab.cobac.ExtractUrlFromHttp\
target/scala-2.10/cobac-extracturls-assembly-1.0.jar \
input output

Test script
-----------

chmod +x bin/test_extract_urls.sh
./bin/test_extract_urls.sh

Enjoy!
25 changes: 25 additions & 0 deletions extracturls/bin/test_extract_urls.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash
set -e

if [ $# -lt 2 ]; then
echo "Usage: $0 <httplog.in> <out>"
exit -1
fi

httplog=$1
output=$2-`date +%y%m%d-%H%M%S`
echo "Output file:" $output

# build with java 6 to be compatible with our Spark cluster
sbt -java-home $(/usr/libexec/java_home -v '1.6*') assembly

APP_NAME="cn.edu.sjtu.omnilab.cobac.ExtractUrlFromHttp"
BINJAR="target/scala-2.10/cobac-extracturls-assembly-1.0.jar"

spark-submit --master local --class $APP_NAME $BINJAR $httplog $output

if [ -d $output ]; then
echo "Output: "
head -n 20 $output/part-00000
echo "..."
fi
8 changes: 8 additions & 0 deletions extracturls/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
name := "cobac-extracturls"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0" % "provided"

3 changes: 3 additions & 0 deletions extracturls/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
logLevel := Level.Warn

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package cn.edu.sjtu.omnilab.cobac

/**
* Data Schema of SJTU HTTP logs
*/
object DataSchema {

val source_ip = 0
val source_port = 1
val dest_ip = 2
val dest_port = 3
val conn = 4
val conn_ts = 5
val close_ts = 6
val conn_dur = 7
val idle_time0 = 8
val request_ts = 9
val request_dur = 10
val response_ts = 11
val response_dur_b = 12
val response_dur_e = 13
val idle_time1 = 14
val request_size = 15
val response_size = 16
val request_method = 17
val request_url = 18
val request_protocol = 19
val request_host = 20
val request_user_agent = 21
val request_referrer = 22
val request_conn = 23
val request_keep_alive = 24
val response_protocol = 25
val response_code = 26
val response_server = 27
val response_clen = 28
val response_ctype = 29
val response_cenc = 30
val response_etag = 31
val response_cache_ctl = 32
val response_last_mod = 33
val response_age = 34
val response_expire = 35
val response_connval = 36
val response_keep_alive = 37

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package cn.edu.sjtu.omnilab.cobac

import java.util.regex.{Matcher, Pattern}

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
import org.omg.CORBA.PERSIST_STORE

object ExtractUrlFromHttp {

final val httpProtoPattern = Pattern.compile("^(\\w+:?//)?(.*)$", Pattern.CASE_INSENSITIVE)
final val httpParamPattern = Pattern.compile("^((\\w+://)?([^\\?&]+))\\??", Pattern.CASE_INSENSITIVE)

def main( args: Array[String] ): Unit = {

// parse command options
if (args.length < 2){
println("Usage: ExtractUrlFromHttp <HTTPLOG> <OUTPUT>")
sys.exit(0)
}

val httplog = args(0)
val output = args(1)

// configure spark
val conf = new SparkConf()
.setAppName("Extracting unique urls from http logs")
val spark = new SparkContext(conf)

// extract fields from raw logs and validate input data
val allUrlRDD = spark.textFile(httplog).map(cleanseLog(_))
.filter( line => line != null &&
line(DataSchema.request_host) != null &&
line(DataSchema.request_url) != null)
.map( line => {
val url = combineHostUri(line(DataSchema.request_host), line(DataSchema.request_url))
val curl = stripUrlParam(stripUrlProto(url))
(curl, url)
}).groupBy(_._1)
.map { case (curl, pairs) => {
val tnum = pairs.size
if (tnum > 500 ) { // about TOP-60K in one month
val oneurl = pairs.toList(1)._2
(tnum, oneurl)
} else {
null
}
}}.filter(m => m != null)
.sortByKey(false)
.zipWithIndex
.map { case ((tnum, url), index) => "%d|%d|%s".format(index, tnum, url) }
.saveAsTextFile(output)

spark.stop()

}

def cleanseLog(line: String): Array[String] = {
// get HTTP header fields
val chops = line.split("""\"\s\"""");
if ( chops.length != 21 )
return null

// get timestamps
val timestamps = chops(0).split(" ");
if (timestamps.length != 18 )
return null

val results = timestamps ++ chops.slice(1, 21)

// remove N/A values and extrat quote
results.transform( field => {
var new_field = field.replaceAll("\"", "")
if (new_field == "N/A")
new_field = null
new_field
})

results
}

def hasProtoPrefix(uri: String): Boolean = {
if ( uri.matches("^(\\w+:?//).*"))
return true
return false
}

def combineHostUri(host: String, uri: String): String = {
if ( hasProtoPrefix(uri) ){
return uri;
} else {
return host+uri;
}
}

def stripUrlProto(url: String): String = {
val matcher = httpProtoPattern.matcher(url);
if ( matcher.find() ){
return matcher.group(2);
} else {
return url;
}
}

def stripUrlParam(url: String): String = {
val matcher = httpParamPattern.matcher(url);
if ( matcher.find() ){
return matcher.group(1);
} else {
return url;
}
}


}

0 comments on commit e2a81ab

Please sign in to comment.