Skip to content

Commit

Permalink
Build changes to get conjure working on rockypika (#74)
Browse files Browse the repository at this point in the history
* add basic install instructions

* add/update zbalance sysconfig for conjure-only

* Add neeeded zbalance configs to conjure.conf

* Add erspan pfring patch to a station-specific repo

* Read/use PARSE_GRE_OFFSET env variable in detector

* Add default (0) PARSE_GRE_OFFSET to conf file
  • Loading branch information
ewust authored Mar 12, 2021
1 parent 08eb9b5 commit 44540ea
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 5 deletions.
38 changes: 38 additions & 0 deletions INSTALL
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# go version 1.15.2
# rustc version 1.47.0

go get -d -u -t github.com/refraction-networking/conjure
cd ~/go/src/github.com/refraction-networking/conjure

# Fix for redis having wrong package for us:
go get -u github.com/go-redis/redis || true && cd ~/go/src/github.com/go-redis/redis && git checkout tags/v7.4.0 -b v7-master && cd -


# Build app
make app


# Build PF_RING
cd PF_RING && make && cd ../

# Build libtapdance
cd libtapdance && make && cd ..

# Build detector
make dark-decoy


# Make install directory
sudo mkdir -p /opt/conjure

# Install to /opt/
sudo cp -r ./* /opt/conjure/

# Install systemd stuff
sudo cp sysconfig/conjure-{app,det}.service /etc/systemd/system/
sudo cp sysconfig/zbalance.service /etc/systemd/system/

# Install zbalance stuff
sudo cp ./sysconfig/start_zbalance_ipc.sh /opt/conjure/

#sudo cp ./PF_RING/userland/examples_zc/zbalance_ipc /opt/conjure/
13 changes: 13 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ pub struct PerCoreGlobal
// testing from other stations in a conjure cluster from clogging up the logs with connection
// notifications.
filter_list: Vec<String>,

// If we're reading from a GRE tap, we can provide an optional offset that we read
// into the packet (skipping the GRE header).
gre_offset: usize,
}

// Tracking of some pretty straightforward quantities
Expand Down Expand Up @@ -147,6 +151,14 @@ impl PerCoreGlobal
&_ => Flow::set_log_client(false), // default disable
};

let gre_offset = match env::var("PARSE_GRE_OFFSET") {
Ok(val) => val.parse::<usize>().unwrap(),
Err(env::VarError::NotPresent) => 0,
Err(_) => { println!("Error, can't parse PARSE_GRE_OFFSET"); 0},
};

debug!("gre_offset: {}", gre_offset);

PerCoreGlobal {
priv_key: priv_key,
lcore: the_lcore,
Expand All @@ -157,6 +169,7 @@ impl PerCoreGlobal
ip_tree: PrefixTree::new(),
zmq_sock: zmq_sock,
filter_list: value.detector_filter_list,
gre_offset: gre_offset,
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/process_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,18 @@ pub extern "C" fn rust_process_packet(ptr: *mut PerCoreGlobal,
#[allow(unused_mut)]
let mut global = unsafe { &mut *ptr };

let rust_view_len = frame_len as usize;
let mut rust_view_len = frame_len as usize;
let rust_view = unsafe {
slice::from_raw_parts_mut(raw_ethframe as *mut u8, frame_len as usize)
};

// If this is a GRE, we want to ignore the GRE overhead in our packets
rust_view_len -= global.gre_offset;

global.stats.packets_this_period += 1;
global.stats.bytes_this_period += rust_view_len as u64;

let eth_pkt = match EthernetPacket::new(rust_view) {
let eth_pkt = match EthernetPacket::new(&rust_view[global.gre_offset..]) {
Some(pkt) => pkt,
None => return,
};
Expand Down
132 changes: 132 additions & 0 deletions stations/rockypika/erspan-pfring-hash.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
diff --git a/userland/examples_zc/zbalance_ipc.c b/userland/examples_zc/zbalance_ipc.c
index a51cf231..8cc6f6c5 100644
--- a/userland/examples_zc/zbalance_ipc.c
+++ b/userland/examples_zc/zbalance_ipc.c
@@ -35,6 +35,7 @@
#include <pthread.h>
#include <sched.h>
#include <stdio.h>
+#include <netinet/ip.h>

#include "pfring.h"
#include "pfring_zc.h"
@@ -403,7 +404,8 @@ void printHelp(void) {
" 3 - Fan-out (1st) + Round-Robin (2nd, 3rd, ..)\n"
" 4 - GTP hash (Inner IP/Port or Seq-Num or Outer IP/Port)\n"
" 5 - GRE hash (Inner or Outer IP)\n"
- " 6 - Interface X to queue X\n");
+ " 6 - Interface X to queue X\n"
+ " 7 - ERSPAN (Inner IP)\n");
printf("-r <queue>:<dev> Replace egress queue <queue> with device <dev> (multiple -r can be specified)\n");
printf("-S <core id> Enable Time Pulse thread and bind it to a core\n");
printf("-R <nsec> Time resolution (nsec) when using Time Pulse thread\n"
@@ -495,6 +497,43 @@ int64_t ip_distribution_func(pfring_zc_pkt_buff *pkt_handle, pfring_zc_queue *in
return pfring_zc_builtin_ip_hash(pkt_handle, in_queue) % num_out_queues;
}

+/* *************************************** */
+
+int npkts = 0;
+int64_t erspan_hack_ip_hash(pfring_zc_pkt_buff *pkt_handle, pfring_zc_queue *in_queue)
+{
+ // Skip the first 0x2a bytes, which is the erspan header. There lies an etherheader; parse that
+ u_char *pkt = pfring_zc_pkt_buff_data(pkt_handle, in_queue);
+ u_char *ether = &pkt[0x2a];
+
+ uint16_t ethertype = (ether[12] << 8) | ether[13];
+ if (ethertype == 0x0800) {
+ // IPv4
+ if (pkt_handle->len < (sizeof(struct ether_header)+sizeof(struct iphdr))) {
+ return 0;
+ }
+
+ struct iphdr *ip = (struct iphdr*)&ether[sizeof(struct ether_header)];
+ return ntohl(ip->saddr) + ntohl(ip->daddr);
+ } else if (ethertype == 0x86dd) {
+ // IPv6
+ // TODO
+ }
+ return 0;
+}
+
+int64_t erspan_distribution_func(pfring_zc_pkt_buff *pkt_handle, pfring_zc_queue *in_queue, void *user) {
+ long num_out_queues = (long) user;
+#ifdef HAVE_PACKET_FILTER
+ if (!packet_filter(pkt_handle, in_queue))
+ return -1;
+#endif
+ if (time_pulse) SET_TS_FROM_PULSE(pkt_handle, *pulse_timestamp_ns);
+
+ return erspan_hack_ip_hash(pkt_handle, in_queue) % num_out_queues;
+}
+
+
/* *************************************** */

int64_t gtp_distribution_func(pfring_zc_pkt_buff *pkt_handle, pfring_zc_queue *in_queue, void *user) {
@@ -609,6 +648,32 @@ int64_t fo_multiapp_ip_distribution_func(pfring_zc_pkt_buff *pkt_handle, pfring_

/* *************************************** */

+int64_t fo_multiapp_erspan_distribution_func(pfring_zc_pkt_buff *pkt_handle, pfring_zc_queue *in_queue, void *user) {
+ int32_t i, offset = 0, app_instance, hash;
+ int64_t consumers_mask = 0;
+
+#ifdef HAVE_PACKET_FILTER
+ if (!packet_filter(pkt_handle, in_queue))
+ return 0x0;
+#endif
+
+ if (time_pulse) SET_TS_FROM_PULSE(pkt_handle, *pulse_timestamp_ns);
+
+ hash = erspan_hack_ip_hash(pkt_handle, in_queue);
+
+ for (i = 0; i < num_apps; i++) {
+ app_instance = hash % instances_per_app[i];
+ consumers_mask |= ((int64_t) 1 << (offset + app_instance));
+ offset += instances_per_app[i];
+ }
+
+ return consumers_mask;
+}
+
+/* *************************************** */
+
+
+
int64_t fo_multiapp_gtp_distribution_func(pfring_zc_pkt_buff *pkt_handle, pfring_zc_queue *in_queue, void *user) {
int32_t i, offset = 0, app_instance, hash;
int64_t consumers_mask = 0;
@@ -864,6 +929,7 @@ int main(int argc, char* argv[]) {
case 4:
case 5:
case 6:
+ case 7:
num_consumer_queues_limit = 64; /* egress mask is 64 bit */
break;
default:
@@ -874,6 +940,7 @@ int main(int argc, char* argv[]) {
switch (hash_mode) {
case 1:
case 3:
+ case 7:
num_consumer_queues_limit = 64; /* egress mask is 64 bit */
break;
default:
@@ -1149,6 +1216,8 @@ int main(int argc, char* argv[]) {
break;
case 6: func = direct_distribution_func;
break;
+ case 7: if (strcmp(device, "sysdig") == 0) func = sysdig_distribution_func; else if (time_pulse) func = erspan_distribution_func; /* else built-in IP-based */
+ break;
}

zw = pfring_zc_run_balancer(
@@ -1189,6 +1258,7 @@ int main(int argc, char* argv[]) {
break;
case 6: func = fo_multiapp_direct_distribution_func;
break;
+ case 7: func = fo_multiapp_erspan_distribution_func;
}

zw = pfring_zc_run_fanout(
19 changes: 17 additions & 2 deletions sysconfig/conjure.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ CJ_CLUSTER_ID=98
CJ_COREBASE=0

# number of cores to launch detector threads on (increments from $CJ_COREBASE)
CJ_CORECOUNT=1
CJ_CORECOUNT=4

# Specify a core numerically to avoid.
CJ_SKIP_CORE=
Expand All @@ -33,4 +33,19 @@ CJ_QUEUE_OFFSET=0
CJ_STATION_CONFIG=/opt/conjure/application/config.toml

# Allow the station to log client IPs (default disabled)
LOG_CLIENT_IP=false
LOG_CLIENT_IP=false

# TODO add to per-station configs
CJ_IFACE="zc:enp179s0f0,zc:enp179s0f1"


# 1 - IP hash
# 7 - ERSPAN hash (custom; for rockypika)
ZBALANCE_HASH_MODE=1
ZBALANCE_CORE=1

# This is a hack to allow us to parse GRE/ERSPAN on some stations.
# Normal tap stations will set this to 0, but GRE-tap stations will
# set this to the number of bytes the Ether+IP+GRE headers are for
# that instance (e.g. ERSPAN on rockypika should be 42)
PARSE_GRE_OFFSET=0
27 changes: 27 additions & 0 deletions sysconfig/start_zbalance_ipc.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/bin/bash

# run zbalance (Conjure only)

#. config
. /opt/conjure/sysconfig/conjure.conf


# TD_IFACE could be a CSV list of interfaces.
# Pull them apart to ensure each gets zc: prefix
ifcarg=""
IFS=',' read -r -a ifcarray <<< "${CJ_IFACE}"
didfirst=0
for ifc in "${ifcarray[@]}"
do
ifcelem="zc:${ifc}"
if [[ $ifc = "zc:"* ]]; then
ifcelem=${ifc}
fi
if [ $didfirst -ne 0 ]; then
ifcarg="$ifcarg,$ifcelem"
else
ifcarg=$ifcelem
didfirst=1
fi
done
sudo ./PF_RING/userland/examples_zc/zbalance_ipc -i $ifcarg -c ${CJ_CLUSTER_ID} -n ${CJ_CORECOUNT} -m ${ZBALANCE_HASH_MODE} -g ${ZBALANCE_CORE}
2 changes: 1 addition & 1 deletion sysconfig/zbalance.service
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ SyslogIdentifier=zbalance
EnvironmentFile=/opt/conjure/sysconfig/conjure.conf

# makes if binary doesn't exist
ExecStartPre=/usr/bin/make zbalance
#ExecStartPre=/usr/bin/make zbalance

ExecStart=/bin/bash /opt/conjure/start_zbalance_ipc.sh

Expand Down

0 comments on commit 44540ea

Please sign in to comment.