Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build changes to get conjure working on rockypika #74

Merged
merged 6 commits into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions INSTALL
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# go version 1.15.2
# rustc version 1.47.0

go get -d -u -t github.com/refraction-networking/conjure
cd ~/go/src/github.com/refraction-networking/conjure

# Fix for redis having wrong package for us:
go get -u github.com/go-redis/redis || true && cd ~/go/src/github.com/go-redis/redis && git checkout tags/v7.4.0 -b v7-master && cd -


# Build app
make app


# Build PF_RING
cd PF_RING && make && cd ../

# Build libtapdance
cd libtapdance && make && cd ..

# Build detector
make dark-decoy


# Make install directory
sudo mkdir -p /opt/conjure

# Install to /opt/
sudo cp -r ./* /opt/conjure/

# Install systemd stuff
sudo cp sysconfig/conjure-{app,det}.service /etc/systemd/system/
sudo cp sysconfig/zbalance.service /etc/systemd/system/

# Install zbalance stuff
sudo cp ./sysconfig/start_zbalance_ipc.sh /opt/conjure/

#sudo cp ./PF_RING/userland/examples_zc/zbalance_ipc /opt/conjure/
13 changes: 13 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ pub struct PerCoreGlobal
// testing from other stations in a conjure cluster from clogging up the logs with connection
// notifications.
filter_list: Vec<String>,

// If we're reading from a GRE tap, we can provide an optional offset that we read
// into the packet (skipping the GRE header).
gre_offset: usize,
}

// Tracking of some pretty straightforward quantities
Expand Down Expand Up @@ -147,6 +151,14 @@ impl PerCoreGlobal
&_ => Flow::set_log_client(false), // default disable
};

let gre_offset = match env::var("PARSE_GRE_OFFSET") {
Ok(val) => val.parse::<usize>().unwrap(),
Err(env::VarError::NotPresent) => 0,
Err(_) => { println!("Error, can't parse PARSE_GRE_OFFSET"); 0},
};

debug!("gre_offset: {}", gre_offset);

PerCoreGlobal {
priv_key: priv_key,
lcore: the_lcore,
Expand All @@ -157,6 +169,7 @@ impl PerCoreGlobal
ip_tree: PrefixTree::new(),
zmq_sock: zmq_sock,
filter_list: value.detector_filter_list,
gre_offset: gre_offset,
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/process_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,18 @@ pub extern "C" fn rust_process_packet(ptr: *mut PerCoreGlobal,
#[allow(unused_mut)]
let mut global = unsafe { &mut *ptr };

let rust_view_len = frame_len as usize;
let mut rust_view_len = frame_len as usize;
let rust_view = unsafe {
slice::from_raw_parts_mut(raw_ethframe as *mut u8, frame_len as usize)
};

// If this is a GRE, we want to ignore the GRE overhead in our packets
rust_view_len -= global.gre_offset;

global.stats.packets_this_period += 1;
global.stats.bytes_this_period += rust_view_len as u64;

let eth_pkt = match EthernetPacket::new(rust_view) {
let eth_pkt = match EthernetPacket::new(&rust_view[global.gre_offset..]) {
Some(pkt) => pkt,
None => return,
};
Expand Down
132 changes: 132 additions & 0 deletions stations/rockypika/erspan-pfring-hash.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
diff --git a/userland/examples_zc/zbalance_ipc.c b/userland/examples_zc/zbalance_ipc.c
index a51cf231..8cc6f6c5 100644
--- a/userland/examples_zc/zbalance_ipc.c
+++ b/userland/examples_zc/zbalance_ipc.c
@@ -35,6 +35,7 @@
#include <pthread.h>
#include <sched.h>
#include <stdio.h>
+#include <netinet/ip.h>

#include "pfring.h"
#include "pfring_zc.h"
@@ -403,7 +404,8 @@ void printHelp(void) {
" 3 - Fan-out (1st) + Round-Robin (2nd, 3rd, ..)\n"
" 4 - GTP hash (Inner IP/Port or Seq-Num or Outer IP/Port)\n"
" 5 - GRE hash (Inner or Outer IP)\n"
- " 6 - Interface X to queue X\n");
+ " 6 - Interface X to queue X\n"
+ " 7 - ERSPAN (Inner IP)\n");
printf("-r <queue>:<dev> Replace egress queue <queue> with device <dev> (multiple -r can be specified)\n");
printf("-S <core id> Enable Time Pulse thread and bind it to a core\n");
printf("-R <nsec> Time resolution (nsec) when using Time Pulse thread\n"
@@ -495,6 +497,43 @@ int64_t ip_distribution_func(pfring_zc_pkt_buff *pkt_handle, pfring_zc_queue *in
return pfring_zc_builtin_ip_hash(pkt_handle, in_queue) % num_out_queues;
}

+/* *************************************** */
+
+int npkts = 0;
+int64_t erspan_hack_ip_hash(pfring_zc_pkt_buff *pkt_handle, pfring_zc_queue *in_queue)
+{
+ // Skip the first 0x2a bytes, which is the erspan header. There lies an etherheader; parse that
+ u_char *pkt = pfring_zc_pkt_buff_data(pkt_handle, in_queue);
+ u_char *ether = &pkt[0x2a];
+
+ uint16_t ethertype = (ether[12] << 8) | ether[13];
+ if (ethertype == 0x0800) {
+ // IPv4
+ if (pkt_handle->len < (sizeof(struct ether_header)+sizeof(struct iphdr))) {
+ return 0;
+ }
+
+ struct iphdr *ip = (struct iphdr*)&ether[sizeof(struct ether_header)];
+ return ntohl(ip->saddr) + ntohl(ip->daddr);
+ } else if (ethertype == 0x86dd) {
+ // IPv6
+ // TODO
+ }
+ return 0;
+}
+
+int64_t erspan_distribution_func(pfring_zc_pkt_buff *pkt_handle, pfring_zc_queue *in_queue, void *user) {
+ long num_out_queues = (long) user;
+#ifdef HAVE_PACKET_FILTER
+ if (!packet_filter(pkt_handle, in_queue))
+ return -1;
+#endif
+ if (time_pulse) SET_TS_FROM_PULSE(pkt_handle, *pulse_timestamp_ns);
+
+ return erspan_hack_ip_hash(pkt_handle, in_queue) % num_out_queues;
+}
+
+
/* *************************************** */

int64_t gtp_distribution_func(pfring_zc_pkt_buff *pkt_handle, pfring_zc_queue *in_queue, void *user) {
@@ -609,6 +648,32 @@ int64_t fo_multiapp_ip_distribution_func(pfring_zc_pkt_buff *pkt_handle, pfring_

/* *************************************** */

+int64_t fo_multiapp_erspan_distribution_func(pfring_zc_pkt_buff *pkt_handle, pfring_zc_queue *in_queue, void *user) {
+ int32_t i, offset = 0, app_instance, hash;
+ int64_t consumers_mask = 0;
+
+#ifdef HAVE_PACKET_FILTER
+ if (!packet_filter(pkt_handle, in_queue))
+ return 0x0;
+#endif
+
+ if (time_pulse) SET_TS_FROM_PULSE(pkt_handle, *pulse_timestamp_ns);
+
+ hash = erspan_hack_ip_hash(pkt_handle, in_queue);
+
+ for (i = 0; i < num_apps; i++) {
+ app_instance = hash % instances_per_app[i];
+ consumers_mask |= ((int64_t) 1 << (offset + app_instance));
+ offset += instances_per_app[i];
+ }
+
+ return consumers_mask;
+}
+
+/* *************************************** */
+
+
+
int64_t fo_multiapp_gtp_distribution_func(pfring_zc_pkt_buff *pkt_handle, pfring_zc_queue *in_queue, void *user) {
int32_t i, offset = 0, app_instance, hash;
int64_t consumers_mask = 0;
@@ -864,6 +929,7 @@ int main(int argc, char* argv[]) {
case 4:
case 5:
case 6:
+ case 7:
num_consumer_queues_limit = 64; /* egress mask is 64 bit */
break;
default:
@@ -874,6 +940,7 @@ int main(int argc, char* argv[]) {
switch (hash_mode) {
case 1:
case 3:
+ case 7:
num_consumer_queues_limit = 64; /* egress mask is 64 bit */
break;
default:
@@ -1149,6 +1216,8 @@ int main(int argc, char* argv[]) {
break;
case 6: func = direct_distribution_func;
break;
+ case 7: if (strcmp(device, "sysdig") == 0) func = sysdig_distribution_func; else if (time_pulse) func = erspan_distribution_func; /* else built-in IP-based */
+ break;
}

zw = pfring_zc_run_balancer(
@@ -1189,6 +1258,7 @@ int main(int argc, char* argv[]) {
break;
case 6: func = fo_multiapp_direct_distribution_func;
break;
+ case 7: func = fo_multiapp_erspan_distribution_func;
}

zw = pfring_zc_run_fanout(
19 changes: 17 additions & 2 deletions sysconfig/conjure.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ CJ_CLUSTER_ID=98
CJ_COREBASE=0

# number of cores to launch detector threads on (increments from $CJ_COREBASE)
CJ_CORECOUNT=1
CJ_CORECOUNT=4

# Specify a core numerically to avoid.
CJ_SKIP_CORE=
Expand All @@ -33,4 +33,19 @@ CJ_QUEUE_OFFSET=0
CJ_STATION_CONFIG=/opt/conjure/application/config.toml

# Allow the station to log client IPs (default disabled)
LOG_CLIENT_IP=false
LOG_CLIENT_IP=false

# TODO add to per-station configs
CJ_IFACE="zc:enp179s0f0,zc:enp179s0f1"


# 1 - IP hash
# 7 - ERSPAN hash (custom; for rockypika)
ZBALANCE_HASH_MODE=1
ZBALANCE_CORE=1

# This is a hack to allow us to parse GRE/ERSPAN on some stations.
# Normal tap stations will set this to 0, but GRE-tap stations will
# set this to the number of bytes the Ether+IP+GRE headers are for
# that instance (e.g. ERSPAN on rockypika should be 42)
PARSE_GRE_OFFSET=0
27 changes: 27 additions & 0 deletions sysconfig/start_zbalance_ipc.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/bin/bash

# run zbalance (Conjure only)

#. config
. /opt/conjure/sysconfig/conjure.conf


# TD_IFACE could be a CSV list of interfaces.
# Pull them apart to ensure each gets zc: prefix
ifcarg=""
IFS=',' read -r -a ifcarray <<< "${CJ_IFACE}"
didfirst=0
for ifc in "${ifcarray[@]}"
do
ifcelem="zc:${ifc}"
if [[ $ifc = "zc:"* ]]; then
ifcelem=${ifc}
fi
if [ $didfirst -ne 0 ]; then
ifcarg="$ifcarg,$ifcelem"
else
ifcarg=$ifcelem
didfirst=1
fi
done
sudo ./PF_RING/userland/examples_zc/zbalance_ipc -i $ifcarg -c ${CJ_CLUSTER_ID} -n ${CJ_CORECOUNT} -m ${ZBALANCE_HASH_MODE} -g ${ZBALANCE_CORE}
2 changes: 1 addition & 1 deletion sysconfig/zbalance.service
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ SyslogIdentifier=zbalance
EnvironmentFile=/opt/conjure/sysconfig/conjure.conf

# makes if binary doesn't exist
ExecStartPre=/usr/bin/make zbalance
#ExecStartPre=/usr/bin/make zbalance

ExecStart=/bin/bash /opt/conjure/start_zbalance_ipc.sh

Expand Down