Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove quite a few array access code #574

Merged
merged 7 commits into from
Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/checks/safety.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ while getopts hauiprebldxcft opt; do
exit 0
;;
a)
exec "$0" -uirpeldxcft
exec "$0" -uirpeldxcftb
;;
u)
for file in $files
Expand Down Expand Up @@ -151,6 +151,7 @@ while getopts hauiprebldxcft opt; do
if sed -e '/mod test.*/,$d' -e '/ALLOW: /{N;d;}' "$file" | grep '[a-z]\[' > /dev/null
then
echo "##[error] array access ([...]) found in $file that could go wrong, array access can panic."
grep -nH '[a-z]\[' "$file"
count=$((count + 1))
fi
done
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
with:
version: "0.15.0"
args: " --exclude-files target* tremor-cli deprecated --all"
- uses: codecov/codecov-action@v1.0.2
- uses: codecov/codecov-action@v1
with:
token: ${{secrets.CODECOV_TOKEN}} #required
file: ./cobertura.xml
78 changes: 77 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1 +1,77 @@
123,125
{
mfelsche marked this conversation as resolved.
Show resolved Hide resolved
"cSpell.words": [
"Aggr",
"Aggrs",
"Arity",
"Bitshift",
"Defn",
"Imut",
"Metas",
"Nonexhaustive",
"Stdev",
"Stmts",
"Upable",
"backoff",
"bools",
"boop",
"borp",
"cfile",
"chash",
"codepoint",
"damerau",
"ddsketch",
"dedup",
"dflt",
"eval",
"florp",
"fqon",
"fqsn",
"gelf",
"hahaha",
"havbad",
"havf",
"havt",
"hdrhistogram",
"histo",
"ival",
"jumphash",
"lalrpop",
"libflate",
"mult",
"multiline",
"mutd",
"nanotime",
"nogrp",
"nohav",
"nowhr",
"nowin",
"pcount",
"petgraph",
"pluggable",
"powi",
"preprocess",
"preprocessors",
"quantile",
"recordpattern",
"rentwrapped",
"rollup",
"roundrobin",
"rposition",
"signalflow",
"stry",
"subrange",
"subslice",
"substr",
"tagsx",
"testcase",
"tiltframe",
"trunc",
"unpublish",
"upsert",
"usefn",
"vals",
"whrbad",
"whrf",
"whrt"
]
}
50 changes: 36 additions & 14 deletions src/codec/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use super::prelude::*;
use simd_json::value::borrowed::{Object, Value};
use std::str;
use std::{slice::SliceIndex, str};

#[derive(Clone)]
pub struct StatsD {}
Expand Down Expand Up @@ -103,13 +103,14 @@ fn decode<'input>(data: &'input [u8], _ingest_ns: u64) -> Result<Value<'input>>
loop {
match d.next() {
Some((idx, b':')) => {
let v = str::from_utf8(&data[0..idx])?;
let raw = data.get(0..idx).ok_or_else(invalid)?;
let v = str::from_utf8(raw)?;
value_start = idx + 1;
m.insert("metric".into(), Value::from(v));
break;
}
Some(_) => (),
None => return Err(ErrorKind::InvalidStatsD.into()),
None => return Err(invalid()),
}
}
let sign = match d.peek() {
Expand All @@ -122,7 +123,7 @@ fn decode<'input>(data: &'input [u8], _ingest_ns: u64) -> Result<Value<'input>>
match d.next() {
Some((_, b'.')) => is_float = true,
Some((idx, b'|')) => {
let s = str::from_utf8(&data[value_start..idx])?;
let s = substr(data, value_start..idx)?;
if is_float {
let v: f64 = s.parse()?;
value = Value::from(v);
Expand All @@ -133,18 +134,18 @@ fn decode<'input>(data: &'input [u8], _ingest_ns: u64) -> Result<Value<'input>>
break;
}
Some(_) => (),
None => return Err(ErrorKind::InvalidStatsD.into()),
None => return Err(invalid()),
}
}
match d.next() {
Some((i, b'c')) | Some((i, b'h')) | Some((i, b's')) => {
m.insert("type".into(), str::from_utf8(&data[i..=i])?.into())
m.insert("type".into(), substr(&data, i..=i)?.into())
}
Some((i, b'm')) => {
if let Some((j, b's')) = d.next() {
m.insert("type".into(), str::from_utf8(&data[i..=j])?.into())
m.insert("type".into(), substr(data, i..=j)?.into())
} else {
return Err(ErrorKind::InvalidStatsD.into());
return Err(invalid());
}
}
Some((i, b'g')) => {
Expand All @@ -159,37 +160,58 @@ fn decode<'input>(data: &'input [u8], _ingest_ns: u64) -> Result<Value<'input>>
} else if let Some(v) = value.as_f64() {
Value::from(-v)
} else {
return Err(ErrorKind::InvalidStatsD.into());
return Err(invalid());
};
m.insert("action".into(), "sub".into());
}
Sign::None => (),
};
m.insert("type".into(), str::from_utf8(&data[i..=i])?.into())
m.insert("type".into(), substr(data, i..=i)?.into())
}
_ => return Err(ErrorKind::InvalidStatsD.into()),
_ => return Err(invalid()),
};
match d.next() {
Some((_, b'|')) => {
if let Some((sample_start, b'@')) = d.next() {
let s = str::from_utf8(&data[sample_start + 1..])?;
let s = substr(data, sample_start + 1..)?;
let v: f64 = s.parse()?;
m.insert("sample_rate".into(), Value::from(v));
} else {
return Err(ErrorKind::InvalidStatsD.into());
return Err(invalid());
}
}
Some(_) => return Err(ErrorKind::InvalidStatsD.into()),
Some(_) => return Err(invalid()),
None => (),
};
m.insert("value".into(), value);
Ok(Value::from(m))
}

fn invalid() -> Error {
Error::from(ErrorKind::InvalidStatsD)
}

fn substr<I: SliceIndex<[u8], Output = [u8]>>(data: &[u8], r: I) -> Result<&str> {
let raw = data.get(r).ok_or_else(invalid)?;
let s = str::from_utf8(raw)?;
Ok(s)
}

#[cfg(test)]
mod test {
use super::*;
use simd_json::json;
#[test]
fn test_subslice() {
let a = b"012345";

assert_eq!(substr(a, 1..), Ok("12345"));
assert_eq!(substr(a, ..4), Ok("0123"));
assert_eq!(substr(a, 1..4), Ok("123"));
assert!(substr(a, 99..).is_err());
assert!(substr(a, ..99).is_err());
}

// gorets:1|c
#[test]
fn gorets() {
Expand Down
20 changes: 12 additions & 8 deletions src/preprocessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod gelf;
pub(crate) use gelf::GELF;
pub(crate) mod lines;

use crate::errors::Result;
use crate::errors::{Error, Result};
use crate::url::TremorURL;
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use bytes::buf::Buf;
Expand Down Expand Up @@ -103,7 +103,7 @@ pub fn preprocess(
match pp.process(ingest_ns, d) {
Ok(mut r) => data1.append(&mut r),
Err(e) => {
error!("[{}] Preprocessor[{}] error {}", instance_id, i, e);
error!("[{}] Preprocessor [{}] error {}", instance_id, i, e);
return Err(e);
}
}
Expand All @@ -125,9 +125,9 @@ impl SliceTrim for [u8] {

if let Some(first) = self.iter().position(|v| is_not_whitespace(*v)) {
if let Some(last) = self.iter().rposition(|v| is_not_whitespace(*v)) {
&self[first..=last]
self.get(first..=last).unwrap_or_default()
} else {
&self[first..]
self.get(first..).unwrap_or_default()
}
} else {
&[]
Expand Down Expand Up @@ -177,8 +177,12 @@ impl Preprocessor for ExtractIngresTs {

fn process(&mut self, ingest_ns: &mut u64, data: &[u8]) -> Result<Vec<Vec<u8>>> {
use std::io::Cursor;
*ingest_ns = Cursor::new(data).read_u64::<BigEndian>()?;
Ok(vec![data[8..].to_vec()])
if let Some(d) = data.get(8..) {
*ingest_ns = Cursor::new(data).read_u64::<BigEndian>()?;
Ok(vec![d.to_vec()])
} else {
Err(Error::from("Extract Ingest Ts Preprocessor: < 8 byte"))
}
}
}

Expand All @@ -205,7 +209,7 @@ impl Preprocessor for Gzip {

fn process(&mut self, _ingest_ns: &mut u64, data: &[u8]) -> Result<Vec<Vec<u8>>> {
use libflate::gzip::MultiDecoder;
let mut decoder = MultiDecoder::new(&data[..])?;
let mut decoder = MultiDecoder::new(data)?;
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed)?;
Ok(vec![decompressed])
Expand All @@ -222,7 +226,7 @@ impl Preprocessor for Zlib {

fn process(&mut self, _ingest_ns: &mut u64, data: &[u8]) -> Result<Vec<Vec<u8>>> {
use libflate::zlib::Decoder;
let mut decoder = Decoder::new(&data[..])?;
let mut decoder = Decoder::new(data)?;
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed)?;
Ok(vec![decompressed])
Expand Down
68 changes: 31 additions & 37 deletions src/preprocessor/gelf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ struct GELFSegment {
fn decode_gelf(bin: &[u8]) -> Result<GELFSegment> {
// We got to do that for badly compressed / non standard conform
// gelf messages
match bin.get(0..2) {
// WELF magic header - wayfair uncompressed gelf
Some(&[0x1f, 0x3c]) => {
match *bin {
// WELF magic header - Wayfair uncompressed GELF
[0x1f, 0x3c, ref rest @ ..] => {
// If we are less then 2 byte we can not be a proper Package
if bin.len() < 2 {
Err(ErrorKind::InvalidGELFHeader(bin.len(), None).into())
Expand All @@ -80,50 +80,39 @@ fn decode_gelf(bin: &[u8]) -> Result<GELFSegment> {
id: rand::rngs::OsRng.next_u64(),
seq: 0,
count: 1,
data: bin[2..].to_vec(),
data: rest.to_vec(),
})
}
}

// GELF magic header
Some(&[0x1e, 0x0f]) => {
[0x1e, 0x0f, b2, b3, b4, b5, b6, b7, b8, b9, seq, count, ref rest @ ..] => {
// If we are less then 12 byte we can not be a proper Package
if bin.len() < 12 {
if bin.len() >= 2 {
Err(ErrorKind::InvalidGELFHeader(bin.len(), Some([0x1e, 0x0f])).into())
} else {
Err(ErrorKind::InvalidGELFHeader(bin.len(), None).into())
}
} else {
// we would allow up to 255 chunks
Ok(GELFSegment {
id: (u64::from(bin[2]) << 56)
+ (u64::from(bin[3]) << 48)
+ (u64::from(bin[4]) << 40)
+ (u64::from(bin[5]) << 32)
+ (u64::from(bin[6]) << 24)
+ (u64::from(bin[7]) << 16)
+ (u64::from(bin[8]) << 8)
+ u64::from(bin[9]),
seq: bin[10],
count: bin[11],
data: bin[12..].to_vec(),
})
}
// we would allow up to 255 chunks
let id = (u64::from(b2) << 56)
+ (u64::from(b3) << 48)
+ (u64::from(b4) << 40)
+ (u64::from(b5) << 32)
+ (u64::from(b6) << 24)
+ (u64::from(b7) << 16)
+ (u64::from(b8) << 8)
+ u64::from(b9);
Ok(GELFSegment {
id,
seq,
count,
data: rest.to_vec(),
})
}
Some(&[b'{', _]) => Ok(GELFSegment {
[b'{', _] => Ok(GELFSegment {
id: 0,
seq: 0,
count: 1,
data: bin.to_vec(),
}),
Some(&[a, b]) => Err(ErrorKind::InvalidGELFHeader(bin.len(), Some([a, b])).into()),
_ => {
if bin.len() == 1 {
Err(ErrorKind::InvalidGELFHeader(1, bin.get(0).map(|v| [*v, 0])).into())
} else {
Err(ErrorKind::InvalidGELFHeader(0, None).into())
}
}
[a, b, ..] => Err(ErrorKind::InvalidGELFHeader(bin.len(), Some([a, b])).into()),
[v] => Err(ErrorKind::InvalidGELFHeader(1, Some([v, 0])).into()),
[] => Err(ErrorKind::InvalidGELFHeader(0, None).into()),
}
}

Expand All @@ -136,12 +125,17 @@ impl Preprocessor for GELF {
fn process(&mut self, ingest_ns: &mut u64, data: &[u8]) -> Result<Vec<Vec<u8>>> {
let msg = decode_gelf(data)?;
if let Some(data) = self.enqueue(*ingest_ns, msg) {
// TODO: WHY :sob:
let len = if self.is_tcp {
data.len() - 1
} else {
data.len()
};
Ok(vec![data[0..len].to_vec()])
if let Some(d) = data.get(0..len) {
Ok(vec![d.to_vec()])
} else {
Ok(vec![])
}
} else {
Ok(vec![])
}
Expand Down
Loading