Skip to content

Commit

Permalink
Merge pull request #28 from Antti/check_buffer_len_before_reading_frame
Browse files Browse the repository at this point in the history
Check buffer len before reading frame. Return explicit VHostError
  • Loading branch information
Antti committed Mar 1, 2016
2 parents 1c1dd7c + 10bf4b0 commit ef4a70d
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 34 deletions.
5 changes: 4 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
language: rust
rust: nightly
rust:
- stable
- beta
- nightly
sudo: false
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]

name = "amqp"
version = "0.0.14"
version = "0.0.15"
authors = ["Andrii Dmytrenko <andrey@reevoo.com>"]
description = "AMQP/RabbitMQ protocol client"
repository = "https://github.com/Antti/rust-amqp"
Expand Down
12 changes: 8 additions & 4 deletions src/amqp_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub enum AMQPError {
ByteOrderError,
QueueEmpty,
SyncError,
FramingError(String),
VHostError,
}

impl fmt::Display for AMQPError {
Expand All @@ -25,16 +27,18 @@ impl fmt::Display for AMQPError {
}

impl error::Error for AMQPError {
fn description(&self) -> &str {
fn description<'a>(&'a self) -> &'a str {
match *self {
AMQPError::IoError(_) => "IoError",
AMQPError::DecodeError(_) => "Protocol decoding error",
AMQPError::Protocol(_) => "Protocol level error",
AMQPError::SchemeError(_) => "Invalid scheme",
AMQPError::DecodeError(err) => err,
AMQPError::Protocol(ref err) => err,
AMQPError::SchemeError(ref err) => err,
AMQPError::UrlParseError(_) => "URL parsing error",
AMQPError::ByteOrderError => "ByteOrderError",
AMQPError::QueueEmpty => "Queue is empty",
AMQPError::SyncError => "Synchronisation error",
AMQPError::FramingError(ref err) => err,
AMQPError::VHostError => "Access to vhost is denied for a current user",
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ pub trait Basic <'a> {
immediate: bool,
properties: BasicProperties,
content: Vec<u8>)
-> AMQPResult<()> where S: Into<String>;
-> AMQPResult<()>
where S: Into<String>;
fn basic_ack(&mut self, delivery_tag: u64, multiple: bool) -> AMQPResult<()>;
fn basic_nack(&mut self, delivery_tag: u64, multiple: bool, requeue: bool) -> AMQPResult<()>;
fn basic_reject(&mut self, delivery_tag: u64, requeue: bool) -> AMQPResult<()>;
Expand Down
27 changes: 15 additions & 12 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ pub trait Consumer : Send {
pub type ConsumerCallBackFn = fn(channel: &mut Channel,
method: basic::Deliver,
headers: BasicProperties,
body: Vec<u8>);
body: Vec<u8>)
;

impl Consumer for ConsumerCallBackFn {
fn handle_delivery(&mut self,
Expand All @@ -38,14 +39,16 @@ impl Consumer for ConsumerCallBackFn {
}
}

impl <T> Consumer for Box<T> where T: FnMut(&mut Channel, basic::Deliver, BasicProperties, Vec<u8>) + Send {
fn handle_delivery(&mut self,
channel: &mut Channel,
method: basic::Deliver,
headers: BasicProperties,
body: Vec<u8>) {
self(channel, method, headers, body);
}
impl<T> Consumer for Box<T>
where T: FnMut(&mut Channel, basic::Deliver, BasicProperties, Vec<u8>) + Send
{
fn handle_delivery(&mut self,
channel: &mut Channel,
method: basic::Deliver,
headers: BasicProperties,
body: Vec<u8>) {
self(channel, method, headers, body);
}
}

pub struct Channel {
Expand Down Expand Up @@ -117,9 +120,9 @@ impl Channel {

// Send method frame, receive method frame, try to return expected method frame
// or return error.
pub fn rpc<T, U>(&mut self, method: &U, expected_reply: &str) -> AMQPResult<T>
where T: protocol::Method,
U: protocol::Method
pub fn rpc<I, O>(&mut self, method: &I, expected_reply: &str) -> AMQPResult<O>
where I: protocol::Method,
O: protocol::Method
{
let method_frame = try!(self.raw_rpc(method));
match method_frame.method_name() {
Expand Down
23 changes: 15 additions & 8 deletions src/framing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,30 @@ pub struct Frame {

impl Frame {
pub fn decode<T: Read>(reader: &mut T) -> AMQPResult<Frame> {
// cast sized [u8] to unsized
let mut header = &mut [0u8; 7] as &mut [u8];
try!(reader.read(&mut header));
let mut header = [0u8; 7];
let read_len = try!(reader.read(&mut header));
if read_len != 7 {
return Err(AMQPError::FramingError(format!("Error reading frame header. Expected \
to read 7 bytes, but read {}",
read_len)));
}
// Make a &mut to &[u8]. &mut &[u8] implements `Read` trait.
// `Read` works by changing a mutable reference to immutable slice,
// as in with a basic pointer manipulation.
let header = &mut (header as &[u8]) as &mut &[u8];
let header = &mut &header[..];
let frame_type_id = try!(header.read_u8());
let channel = try!(header.read_u16::<BigEndian>());
let size = try!(header.read_u32::<BigEndian>()) as usize;
// We need to use Vec because the size is not know in compile time.
let mut payload: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut payload));
let frame_end = try!(reader.read_u8());
if payload.len() != size {
return Err(AMQPError::DecodeError("Cannot read a full frame payload"));
let read_len = try!(reader.read(&mut payload));
if read_len != size {
return Err(AMQPError::FramingError(format!("Error reading frame body. Expected to \
read {} bytes, but read {}",
size,
read_len)));
}
let frame_end = try!(reader.read_u8());
if frame_end != 0xCE {
return Err(AMQPError::DecodeError("Frame didn't end with 0xCE"));
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ mod basic;
mod amqp_error;
mod table;

pub const VERSION: &'static str = "0.1.13";
pub const VERSION: &'static str = env!("CARGO_PKG_VERSION");
pub mod protocol;

pub use session::{Session, Options};
Expand Down
18 changes: 12 additions & 6 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl Session {
if &string.chars().next() == &Some('/') {
String::from(decode(&string[1..]))
} else {
String::from(decode(&string[..]))
String::from(decode(&string))
}
}

Expand Down Expand Up @@ -240,11 +240,17 @@ impl Session {
insist: false,
};
debug!("Sending connection.open: {:?}", open);
let _: protocol::connection::OpenOk = try!(self.channel_zero
.rpc(&open, "connection.open-ok"));
debug!("Connection initialized. conneciton.open-ok recieved");
info!("Session initialized");
Ok(())
let open_ok = self.channel_zero
.rpc::<_, protocol::connection::OpenOk>(&open, "connection.open-ok");
match open_ok {
Ok(_) => {
debug!("Connection initialized. conneciton.open-ok recieved");
info!("Session initialized");
Ok(())
}
Err(AMQPError::FramingError(_)) => Err(AMQPError::VHostError),
Err(other_error) => Err(other_error),
}
}

/// `open_channel` will open a new amqp channel:
Expand Down

0 comments on commit ef4a70d

Please sign in to comment.