Skip to content

Commit

Permalink
Remove quite a few array access code
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <heinz@licenser.net>
  • Loading branch information
Licenser committed Nov 6, 2020
1 parent 8b8c1b5 commit 8cada0e
Show file tree
Hide file tree
Showing 42 changed files with 409 additions and 221 deletions.
3 changes: 2 additions & 1 deletion .github/checks/safety.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ while getopts hauiprebldxcft opt; do
exit 0
;;
a)
exec "$0" -uirpeldxcft
exec "$0" -uirpeldxcftb
;;
u)
for file in $files
Expand Down Expand Up @@ -151,6 +151,7 @@ while getopts hauiprebldxcft opt; do
if sed -e '/mod test.*/,$d' -e '/ALLOW: /{N;d;}' "$file" | grep '[a-z]\[' > /dev/null
then
echo "##[error] array access ([...]) found in $file that could go wrong, array access can panic."
grep -nH '[a-z]\[' "$file"
count=$((count + 1))
fi
done
Expand Down
69 changes: 68 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1 +1,68 @@
123,125
{
"cSpell.words": [
"Aggr",
"Aggrs",
"Arity",
"Bitshift",
"Defn",
"Imut",
"Metas",
"Nonexhaustive",
"Stdev",
"Stmts",
"Upable",
"backoff",
"bools",
"boop",
"borp",
"cfile",
"chash",
"codepoint",
"damerau",
"ddsketch",
"dedup",
"dflt",
"eval",
"florp",
"fqon",
"fqsn",
"hahaha",
"havbad",
"havf",
"havt",
"hdrhistogram",
"histo",
"ival",
"jumphash",
"lalrpop",
"mult",
"multiline",
"mutd",
"nogrp",
"nohav",
"nowhr",
"nowin",
"pcount",
"petgraph",
"powi",
"preprocess",
"preprocessors",
"quantile",
"recordpattern",
"rentwrapped",
"rollup",
"roundrobin",
"stry",
"subrange",
"subslice",
"tiltframe",
"trunc",
"unpublish",
"upsert",
"usefn",
"vals",
"whrbad",
"whrf",
"whrt"
]
}
6 changes: 4 additions & 2 deletions src/preprocessor/lines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Lines {
"Invalid line of length {} since it exceeds maximum allowed length of {}: {:?}",
v.len(),
self.max_length,
String::from_utf8_lossy(&v[0..min(v.len(), 256)]),
String::from_utf8_lossy(v.get(0..min(v.len(), 256)).unwrap_or_default()),
);
false
}
Expand Down Expand Up @@ -146,12 +146,14 @@ impl Preprocessor for Lines {
// if incoming data had at least one line separator boundary (anywhere)
// AND if the preprocessor has memory of line fragment from earlier,
// reconstruct the first event fully (by adding the buffer contents to it)

// FIXME: what is going on here?
if (last_event.is_empty() || !events.is_empty()) && !self.buffer.is_empty() {
self.complete_fragment(&mut events[0])?;
}

// if the incoming data did not end in a line boundary, last event is actually
// a fragment so we need to remmeber it for later (when more data arrives)
// a fragment so we need to remember it for later (when more data arrives)
if !last_event.is_empty() {
self.save_fragment(&last_event)?;
}
Expand Down
9 changes: 6 additions & 3 deletions src/ramp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,20 @@ pub struct MmapAnon {

impl MmapFile {
fn as_mut_slice(&mut self) -> &mut [u8] {
&mut self.store[..self.end]
// We read until it's end, so we know it's OK
unsafe { self.store.get_unchecked_mut(..self.end) }
}
}

impl MmapAnon {
fn as_slice(&self) -> &[u8] {
&self.store[..self.end]
// We read until it's end, so we know it's OK
unsafe { self.store.get_unchecked(..self.end) }
}

fn as_mut_slice(&mut self) -> &mut [u8] {
&mut self.store[..self.end]
// We read until it's end, so we know it's OK
unsafe { self.store.get_unchecked_mut(..self.end) }
}

fn flush(&mut self) -> io::Result<()> {
Expand Down
5 changes: 4 additions & 1 deletion src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ impl<A: Artefact> Registry<A> {
id.trim_to_instance();
match self.map.insert(id.clone(), servant) {
Some(_old) => Err(ErrorKind::UnpublishFailedDoesNotExist(id.to_string()).into()),
None => Ok(&self.map[&id]),
None => self
.map
.get(&id)
.ok_or_else(|| ErrorKind::UnpublishFailedDoesNotExist(id.to_string()).into()),
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/sink/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ impl Sink for Postgres {
"INSERT INTO {} ({}) VALUES ({});",
self.config.table, fields, params
);
let s_slice: &str = &q[..];

if self.client.is_none() {
self.client = match init_cli(&self.config) {
Expand All @@ -125,7 +124,7 @@ impl Sink for Postgres {
};

match client.query_raw(
s_slice,
q.as_str(),
records
.iter()
.map(|p| p as &dyn postgres::types::ToSql)
Expand Down
1 change: 1 addition & 0 deletions src/sink/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,7 @@ async fn build_response_events(
for pp in preprocessed {
events.push(
LineValue::try_new(vec![pp], |mutd| {
// ALLOW: we define mutd as a vector of one element above
let mut_data = mutd[0].as_mut_slice();
let body = the_chosen_one
.decode(mut_data, nanotime())?
Expand Down
2 changes: 1 addition & 1 deletion src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ where
// this is safe, because we get the vec we created in the previous argument and we now it has 1 element
// so it will never panic.
// take this, rustc!
let mut_data = mutd[0].as_mut_slice();
let mut_data = unsafe { mutd.get_unchecked_mut(0).as_mut_slice() };
let decoded = if let Some(doh) = &codec_override {
if let Some(c) = self.codec_map.get_mut(doh) {
c.decode(mut_data, *ingest_ns)
Expand Down
2 changes: 1 addition & 1 deletion src/source/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ fn make_response(
if let Some(header_values) = values.as_array() {
if name.eq_ignore_ascii_case("content-type") {
// pick first value in case of multiple content-type headers
header_content_type = header_values[0].as_str();
header_content_type = header_values.first().and_then(Value::as_str);
}
let mut v = Vec::with_capacity(header_values.len());
for value in header_values {
Expand Down
1 change: 1 addition & 0 deletions src/source/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ impl Source for Int {
if let Err(e) = tx
.send(SourceReply::Data {
origin_uri: origin_uri.clone(),
// ALLOW: we define n as part of the read
data: buffer[0..n].to_vec(),
meta: None, // TODO: add peer address etc. to meta
codec_override: None,
Expand Down
1 change: 1 addition & 0 deletions src/source/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl Source for Int {
origin_uri.port = Some(peer.port());
Ok(SourceReply::Data {
origin_uri,
// ALLOW: we get n from recv
data: buf[0..n].to_vec(),
meta: None,
codec_override: None,
Expand Down
2 changes: 1 addition & 1 deletion tests/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ macro_rules! test_cases {
for (_, result) in results {
for value in result.value_iter() {
if let Some(expected) = out_json.pop() {
assert_eq!(sorsorted_serialize(value)?, sorsorted_serialize(&expected)?);
assert_eq!(sorted_serialize(value)?, sorted_serialize(&expected)?);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions tests/script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ macro_rules! test_cases {
assert_eq!(results.len(), out_json.len());
for (i, value) in results.iter().enumerate() {
if let Some(expected) = out_json.pop() {
assert_eq!(sorsorted_serialize(&value)?, sorsorted_serialize(&expected)?, "Input event #{} Expected `{}`, but got `{}`", i, sorsorted_serialize(&expected)?, sorsorted_serialize(&value)?);
assert_eq!(sorted_serialize(&value)?, sorted_serialize(&expected)?, "Input event #{} Expected `{}`, but got `{}`", i, sorted_serialize(&expected)?, sorted_serialize(&value)?);
}
}
Ok(())
Expand All @@ -87,7 +87,7 @@ test_cases!(
array_paths,
array_pattern,
assign_move,
assing_and_path_match,
assign_and_path_match,
base64,
binary_float,
binary_int,
Expand Down
File renamed without changes.
File renamed without changes.
5 changes: 4 additions & 1 deletion tremor-cli/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ impl Ingress {
}
Ok(n) => {
let mut at = nanotime();
let x = self.preprocessor.process(&mut at, &self.buf[0..n])?;
// We access the entire read buffer the len is provided by read
let x = self
.preprocessor
.process(&mut at, unsafe { self.buf.get_unchecked(0..n) })?;
for mut data in x {
let event = match self.codec.decode(data.as_mut_slice(), at) {
Ok(Some(data)) => data,
Expand Down
1 change: 1 addition & 0 deletions tremor-pipeline/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ error_chain! {
description("Bad output pipeline id.")
display("Bad output pipeline id {}", i - 1)
}

}
}

Expand Down
20 changes: 14 additions & 6 deletions tremor-pipeline/src/executable_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{borrow::Cow, fmt, fmt::Display, sync::Arc};

use crate::{
common_cow,
errors::Error,
errors::Result,
influx_value,
op::{prelude::IN, trickle::select::WindowImpl},
Expand Down Expand Up @@ -432,7 +433,11 @@ impl ExecutableGraph {
self.last_metrics = event.ingest_ns;
}
}
self.stack.push((self.inputs[stream_name], IN, event));
let input = *self
.inputs
.get(stream_name)
.ok_or_else(|| Error::from(format!("invalid stream name: `{}`", stream_name)))?;
self.stack.push((input, IN, event));
self.run(returns)
}

Expand All @@ -458,8 +463,10 @@ impl ExecutableGraph {
if node.kind == NodeKind::Output {
returns.push((node.id.clone(), event));
} else {
// ALLOW: We know the state was initiated
let state = unsafe { self.state.ops.get_unchecked_mut(idx) };
let EventAndInsights { events, insights } =
node.on_event(0, &port, &mut self.state.ops[idx], event)?;
node.on_event(0, &port, state, event)?;

for (out_port, _) in &events {
unsafe { self.metrics.get_unchecked_mut(idx) }.inc_output(out_port);
Expand Down Expand Up @@ -548,7 +555,7 @@ impl ExecutableGraph {
}
insight
}
/// Enque a signal
/// Enqueue a signal
pub fn enqueue_signal(&mut self, signal: Event, returns: &mut Returns) -> Result<()> {
if self.signalflow(signal)? {
self.run(returns)?;
Expand All @@ -558,8 +565,11 @@ impl ExecutableGraph {

fn signalflow(&mut self, mut signal: Event) -> Result<bool> {
let mut has_events = false;
// We can't use an iterator over signalfow here
// rust refuses to let us use enqueue_events if we do
for idx in 0..self.signalflow.len() {
let i = self.signalflow[idx];
// ALLOW: we guarantee that idx exists above
let i = unsafe { *self.signalflow.get_unchecked(idx) };
let EventAndInsights { events, insights } = {
let op = unsafe { self.graph.get_unchecked_mut(i) }; // We know this exists
op.on_signal(op.uid, &mut signal)?
Expand All @@ -569,8 +579,6 @@ impl ExecutableGraph {
}
has_events = has_events || !events.is_empty();
self.enqueue_events(i, events);
// We shouldn't call run in signal flow it should just enqueue
// self.run(returns)?
}
Ok(has_events)
}
Expand Down
4 changes: 2 additions & 2 deletions tremor-pipeline/src/op/grouper/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
//! if let window bucket.windows(dimension) {
//! if window.inc() { pass } else { drop }
//! } else {
//! bucket.windows[dimension] = Window::new(bucket.limit)
//! if bucket.windows[dimension].inc() { pass } else { drop }
//! bucket.windows(dimension) = Window::new(bucket.limit)
//! if bucket.windows(dimension).inc() { pass } else { drop }
//! }
//! } else {
//! return drop
Expand Down
13 changes: 7 additions & 6 deletions tremor-pipeline/src/op/qos/rr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ impl Operator for RoundRobin {
let mut output = None;
for n in 0..self.outputs.len() {
let id = (self.next + n) % self.outputs.len();
let o = &mut self.outputs[id];
// ALLOW: we calculate the id above it's modulo the output
let o = unsafe { self.outputs.get_unchecked_mut(id) };
if o.open {
// :/ need pipeline lifetime to fix
output = Some((o.output.clone(), id));
Expand Down Expand Up @@ -181,8 +182,8 @@ mod test {

let mut state = Value::null();

// Sent a first event, as all is initited clean
// we syould see this pass
// Sent a first event, as all is initiated clean
// we should see this pass
let event1 = Event {
id: 1.into(),
ingest_ns: 1_000_000,
Expand All @@ -196,8 +197,8 @@ mod test {
let (out, _event) = r.pop().expect("no results");
assert_eq!("out", out);

// Sent a first event, as all is initited clean
// we syould see this pass
// Sent a first event, as all is initiated clean
// we should see this pass
let event2 = Event {
id: 2.into(),
ingest_ns: 1_000_001,
Expand Down Expand Up @@ -242,7 +243,7 @@ mod test {
let (out, _event) = r.pop().expect("no results");
assert_eq!("out2", out);

// Even for multiople events
// Even for multiple events
let event3 = Event {
id: 3.into(),
ingest_ns: 2_000_000,
Expand Down
13 changes: 9 additions & 4 deletions tremor-pipeline/src/op/trickle/script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl Trickle {
}
};

let script = rentals::Script::new(defn_rentwrapped.stmt.clone(), move |_| unsafe {
let script = rentals::Script::try_new(defn_rentwrapped.stmt.clone(), move |_| unsafe {
use tremor_script::ast::ScriptDecl;
// This is sound since defn_rentwrapped.stmt is an arc by cloning
// it we ensure that the referenced data remains available until
Expand All @@ -115,9 +115,14 @@ impl Trickle {
let args: Value<'static> = mem::transmute(args);

decl.script.consts = vec![Value::null(), Value::null(), Value::null()];
decl.script.consts[ARGS_CONST_ID] = args;
decl
});
*decl
.script
.consts
.get_mut(ARGS_CONST_ID)
.ok_or_else(|| Error::from("Can't access ARGS_CONST_ID!"))? = args;
Ok(decl)
})
.map_err(|e: rental::RentalError<Error, _>| e.0)?;

Ok(Self {
id,
Expand Down
Loading

0 comments on commit 8cada0e

Please sign in to comment.