Skip to content

Commit

Permalink
Add mergetags utility
Browse files Browse the repository at this point in the history
  • Loading branch information
caesar0301 committed Aug 10, 2015
1 parent feb333d commit deadd32
Show file tree
Hide file tree
Showing 15 changed files with 22,684 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# idea
.idea/

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
32 changes: 32 additions & 0 deletions mergetags/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
Merge tagged httplog and wifilog
================================

Build
-----

$ sbt assembly

The binary jar is located in `target/scala-2.10/cobac-mergetags-assembly-1.0.jar`

Spark Jobs
----------

There are two jobs to generate the merged HTTP data from multiple sources:

* `cn.edu.sjtu.omnilab.cobac.CleanseWifilogJob`: this job mangles the raw wifi syslog and generates user sessions.
* `cn.edu.sjtu.omnilab.cobac.MergeHttpAndWifi`: this job merge the cleansed http data and wifi syslog to generate a unified dataset.

How to Use
----------

* `CleanseWifilogJob`:

spark-submit --master yarn --class cn.edu.sjtu.omnilab.cobac.CleanseWifilogJob \
cobac-mergetags-assembly-1.0.jar \
<rawwifisyslog> <output>

* `MergeHttpAndWifi`:

spark-submit --master yarn --class cn.edu.sjtu.omnilab.cobac.MergeHttpAndWifi \
cobac-mergetags-assembly-1.0.jar \
<httplog> <wifilog> <output>
25 changes: 25 additions & 0 deletions mergetags/bin/test_cleanse_wifilog.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash
set -e

if [ $# -lt 2 ]; then
echo "Usage: $0 <wifilog> <cleanlog>"
exit -1
fi

httplog=$1
output=$2
echo "Output file:" $output

# build with java 6 to be compatible with our Spark cluster
sbt -java-home $(/usr/libexec/java_home -v '1.6*') assembly

APP_NAME="cn.edu.sjtu.omnilab.cobac.CleanseWifilogJob"
BINJAR="target/scala-2.10/cobac-mergetags-assembly-1.0.jar"

spark-submit --master local --class $APP_NAME $BINJAR $httplog $output

if [ -d $output ]; then
echo "Output: "
head -n 20 $output/part-00000
echo "..."
fi
26 changes: 26 additions & 0 deletions mergetags/bin/test_merge_http_wifi.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/bin/bash
set -e

if [ $# -lt 3 ]; then
echo "Usage: $0 <httplog> <wifilog> <cleanlog>"
exit -1
fi

httplog=$1
wifilog=$2
output=$3
echo "Output file:" $output

# build with java 6 to be compatible with our Spark cluster
sbt -java-home $(/usr/libexec/java_home -v '1.6*') assembly

APP_NAME="cn.edu.sjtu.omnilab.cobac.MergeHttpAndWifi"
BINJAR="target/scala-2.10/cobac-mergetags-assembly-1.0.jar"

spark-submit --master local --class $APP_NAME $BINJAR $httplog $wifilog $output

if [ -d $output ]; then
echo "Output: "
head -n 20 $output/part-00000
echo "..."
fi
13 changes: 13 additions & 0 deletions mergetags/bin/ungzip_wifilog.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

for i in {2014,2015}; do
for j in {1..12}; do
logname=`printf "wifilog%d-%02d" $i $j`
input=/user/omnilab/warehouse/sjtu_wifi_syslog/$logname*
output=sjtu_wifi_syslog/$logname
echo $logname
echo $input
echo $output
pig -e "dat = load '$input'; store dat into '$output';"
done
done
13 changes: 13 additions & 0 deletions mergetags/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name := "cobac-mergetags"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0" % "provided"

libraryDependencies += "com.google.guava" % "guava" % "18.0"

libraryDependencies += "org.yaml" % "snakeyaml" % "1.15"

libraryDependencies += "joda-time" % "joda-time" % "2.7"
3 changes: 3 additions & 0 deletions mergetags/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
logLevel := Level.Warn

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package cn.edu.sjtu.omnilab.cobac;

import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.SafeConstructor;

import java.io.InputStream;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
* Given a AP name string, this function return the description of the building.
* Specifically for AP name, both full name string (the default mode, e.g. BYGTSG-4F-01) and
* building name (e.g. BYGTSG) can be used.
* If only building name is given, you can save processing time to declare this method with @full_apname param.
* But for accuracy, the full AP name is preferred.
* @author chenxm
*/
public class APToBuilding {
private static final String AP_BUILDING_DATABASE = "/apnames-utf8-0.3.2.yaml";
private Map<String, Map<String, String>> APNameDB;
private boolean full_apname = true;
private Map<String, String> APBN_RealBN_Cache = new HashMap<String, String>();

public APToBuilding(){
this(APToBuilding.class.getResourceAsStream(AP_BUILDING_DATABASE), true);
}

public APToBuilding(boolean full_apname){
this(APToBuilding.class.getResourceAsStream(AP_BUILDING_DATABASE), full_apname);
}

@SuppressWarnings("unchecked")
public APToBuilding(InputStream APDBYAML, boolean full_apname) {
this.full_apname = full_apname;
// Load yaml database
Yaml yaml = new Yaml(new SafeConstructor());
Map<String,Object> regexConfig = (Map<String,Object>) yaml.load(APDBYAML);
APNameDB = (Map<String, Map<String, String>>) regexConfig.get("apprefix_sjtu");
}

public List<String> parse(String APName){
List<String> result = null;
if ( APName == null )
return result;

if ( full_apname ) { // Given full AP name string
String[] parts = APName.split("-\\d+F-", 2);
String buildName = parts[0];

// Remove MH- prefix
if (buildName.startsWith("MH-"))
buildName = buildName.substring(3, buildName.length());

// Check cache first
if ( APBN_RealBN_Cache.containsKey(buildName) ) { // Cache hit
String cacheRealBN = APBN_RealBN_Cache.get(buildName);
result = getBuildInfo(cacheRealBN);
} else { // Cache miss
if ( APNameDB.containsKey(buildName)) {
result = getBuildInfo(buildName);
APBN_RealBN_Cache.put(buildName, buildName);
} else {
// Worst case; try to find its longest matched building name
String realBuildName = null;
for ( String BN : APNameDB.keySet())
if ( buildName.contains(BN) )
if ( realBuildName == null )
realBuildName = BN;
else if ( BN.length() > realBuildName.length() )
realBuildName = BN; // Get the longest match
if ( realBuildName != null ){
result = getBuildInfo(realBuildName);
// Cache the real building name
APBN_RealBN_Cache.put(buildName, realBuildName);
}
}
}
} else { // Given build name, skip cache actions
if ( APNameDB.containsKey(APName) ) // Have item
result = getBuildInfo(APName);
}

return result;
}

private List<String> getBuildInfo(String realBuildName){
List info = new LinkedList<String>();
Map<String, String> buildInfo = APNameDB.get(realBuildName);
info.add(buildInfo.get("name"));
info.add(buildInfo.get("type"));
info.add(buildInfo.get("user"));
return info;
}
}
Loading

0 comments on commit deadd32

Please sign in to comment.