Skip to content

Commit

Permalink
fix(floodsub): convert data to Bytes.
Browse files Browse the repository at this point in the history
This should help avoid potentially costly clones
  • Loading branch information
joshuef committed Oct 28, 2023
1 parent 459c9d4 commit f188e87
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 9 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion protocols/floodsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
## 0.44.0 - unreleased

- Change publish to require `data: impl Into<Bytes>` to internally avoid any costly cloning / allocation

## 0.43.0
## 0.43.0

- Raise MSRV to 1.65.
See [PR 3715].
Expand Down
1 change: 1 addition & 0 deletions protocols/floodsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ categories = ["network-programming", "asynchronous"]
asynchronous-codec = "0.6"
cuckoofilter = "0.5.0"
fnv = "1.0"
bytes = "1.5"
futures = "0.3.28"
libp2p-core = { workspace = true }
libp2p-swarm = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions protocols/floodsub/src/generated/floodsub/pb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#![cfg_attr(rustfmt, rustfmt_skip)]


use bytes::Bytes;
use quick_protobuf::{MessageInfo, MessageRead, MessageWrite, BytesReader, Writer, WriterBackend, Result};
use quick_protobuf::sizeofs::*;
use super::super::*;
Expand Down
11 changes: 6 additions & 5 deletions protocols/floodsub/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::protocol::{
};
use crate::topic::Topic;
use crate::FloodsubConfig;
use bytes::Bytes;
use cuckoofilter::{CuckooError, CuckooFilter};
use fnv::FnvHashSet;
use libp2p_core::{Endpoint, Multiaddr};
Expand Down Expand Up @@ -171,12 +172,12 @@ impl Floodsub {
}

/// Publishes a message to the network, if we're subscribed to the topic only.
pub fn publish(&mut self, topic: impl Into<Topic>, data: impl Into<Vec<u8>>) {
pub fn publish(&mut self, topic: impl Into<Topic>, data: impl Into<Bytes>) {
self.publish_many(iter::once(topic), data)
}

/// Publishes a message to the network, even if we're not subscribed to the topic.
pub fn publish_any(&mut self, topic: impl Into<Topic>, data: impl Into<Vec<u8>>) {
pub fn publish_any(&mut self, topic: impl Into<Topic>, data: impl Into<Bytes>) {
self.publish_many_any(iter::once(topic), data)
}

Expand All @@ -187,7 +188,7 @@ impl Floodsub {
pub fn publish_many(
&mut self,
topic: impl IntoIterator<Item = impl Into<Topic>>,
data: impl Into<Vec<u8>>,
data: impl Into<Bytes>,
) {
self.publish_many_inner(topic, data, true)
}
Expand All @@ -196,15 +197,15 @@ impl Floodsub {
pub fn publish_many_any(
&mut self,
topic: impl IntoIterator<Item = impl Into<Topic>>,
data: impl Into<Vec<u8>>,
data: impl Into<Bytes>,
) {
self.publish_many_inner(topic, data, false)
}

fn publish_many_inner(
&mut self,
topic: impl IntoIterator<Item = impl Into<Topic>>,
data: impl Into<Vec<u8>>,
data: impl Into<Bytes>,
check_self_subscriptions: bool,
) {
let message = FloodsubMessage {
Expand Down
7 changes: 4 additions & 3 deletions protocols/floodsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use crate::proto;
use crate::topic::Topic;
use asynchronous_codec::Framed;
use bytes::Bytes;
use futures::{
io::{AsyncRead, AsyncWrite},
Future,
Expand Down Expand Up @@ -81,7 +82,7 @@ where
messages.push(FloodsubMessage {
source: PeerId::from_bytes(&publish.from.unwrap_or_default())
.map_err(|_| FloodsubError::InvalidPeerId)?,
data: publish.data.unwrap_or_default(),
data: publish.data.unwrap_or_default().into(),
sequence_number: publish.seqno.unwrap_or_default(),
topics: publish.topic_ids.into_iter().map(Topic::new).collect(),
});
Expand Down Expand Up @@ -172,7 +173,7 @@ impl FloodsubRpc {
.into_iter()
.map(|msg| proto::Message {
from: Some(msg.source.to_bytes()),
data: Some(msg.data),
data: Some(msg.data.to_vec()),
seqno: Some(msg.sequence_number),
topic_ids: msg.topics.into_iter().map(|topic| topic.into()).collect(),
})
Expand All @@ -197,7 +198,7 @@ pub struct FloodsubMessage {
pub source: PeerId,

/// Content of the message. Its meaning is out of scope of this library.
pub data: Vec<u8>,
pub data: Bytes,

/// An incrementing sequence number.
pub sequence_number: Vec<u8>,
Expand Down

0 comments on commit f188e87

Please sign in to comment.