Skip to content

Commit

Permalink
Move SendmmsgPacketBatchWriter to sendmsg with iovecs
Browse files Browse the repository at this point in the history
Summary: I'm switching the usages of `writem` to use iovecs instead of IOBufs.

Reviewed By: kvtsoy

Differential Revision: D68857701

fbshipit-source-id: 96461aac85f39da9b85e16f2927b4539e712e5cb
  • Loading branch information
Aman Sharma authored and facebook-github-bot committed Jan 30, 2025
1 parent 128904b commit 69d9467
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 20 deletions.
42 changes: 40 additions & 2 deletions quic/api/QuicBatchWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,31 @@ ssize_t SendmmsgPacketBatchWriter::write(
return sock.write(address, vec, iovec_len);
}

int ret = sock.writem(
folly::range(&address, &address + 1), bufs_.data(), bufs_.size());
size_t numChainedBuffers = 0;
for (auto& buf : bufs_) {
numChainedBuffers += buf->countChainElements();
}

int ret = 0;
if (numChainedBuffers <= kNumIovecBufferChains &&
bufs_.size() < kNumIovecBufferChains) {
// We don't allocate arrays on the heap
iovec vec[kNumIovecBufferChains];
size_t messageSizes[kNumIovecBufferChains];
fillIovecAndMessageSizes(vec, messageSizes, kNumIovecBufferChains);
sock.writem(
folly::range(&address, &address + 1), vec, messageSizes, bufs_.size());
} else {
// We allocate the arrays on the heap
std::unique_ptr<iovec[]> vec(new iovec[numChainedBuffers]);
std::unique_ptr<size_t[]> messageSizes(new size_t[bufs_.size()]);
fillIovecAndMessageSizes(vec.get(), messageSizes.get(), numChainedBuffers);
sock.writem(
folly::range(&address, &address + 1),
vec.get(),
messageSizes.get(),
bufs_.size());
}

if (ret <= 0) {
return ret;
Expand All @@ -177,6 +200,21 @@ ssize_t SendmmsgPacketBatchWriter::write(
return 0;
}

void SendmmsgPacketBatchWriter::fillIovecAndMessageSizes(
iovec* vec,
size_t* messageSizes,
size_t iovecLen) {
size_t currentIovecIndex = 0;
for (uint32_t i = 0; i < bufs_.size(); i++) {
size_t numIovecs =
bufs_.at(i)
->fillIov(vec + currentIovecIndex, iovecLen - currentIovecIndex)
.numIovecs;
messageSizes[i] = numIovecs;
currentIovecIndex += numIovecs;
}
}

bool useSinglePacketInplaceBatchWriter(
uint32_t maxBatchSize,
quic::DataPathType dataPathType) {
Expand Down
3 changes: 3 additions & 0 deletions quic/api/QuicBatchWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ class SendmmsgPacketBatchWriter : public BatchWriter {
override;

private:
void
fillIovecAndMessageSizes(iovec* vec, size_t* messageSizes, size_t iovecLen);

// max number of buffer chains we can accumulate before we need to flush
size_t maxBufs_{1};
// size of data in all the buffers
Expand Down
141 changes: 141 additions & 0 deletions quic/api/test/QuicBatchWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,147 @@ TEST_F(QuicBatchWriterTest, TestBatchingSendmmsg) {
}
}

TEST_F(QuicBatchWriterTest, TestBatchingSendmmsgInplaceIovecMatches) {
// In this test case, we don't surpass the kNumIovecBufferChains limit
// (i.e. the number of contiguous buffers we are sending)
folly::EventBase evb;
std::shared_ptr<FollyQuicEventBase> qEvb =
std::make_shared<FollyQuicEventBase>(&evb);
quic::test::MockAsyncUDPSocket sock(qEvb);

auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG,
kBatchNum,
false, /* enable backpressure */
DataPathType::ChainedMemory,
conn_,
gsoSupported_);
CHECK(batchWriter);

std::vector<std::string> messages{"It", "is", "sunny!"};

CHECK(batchWriter->empty());
CHECK_EQ(batchWriter->size(), 0);
size_t size = 0;
for (auto& message : messages) {
auto buf = folly::IOBuf::copyBuffer(
folly::ByteRange((unsigned char*)message.data(), message.size()));
batchWriter->append(
std::move(buf), message.size(), folly::SocketAddress(), nullptr);
size += message.size();
CHECK_EQ(batchWriter->size(), size);
}

EXPECT_CALL(sock, writem(_, _, _, _))
.Times(1)
.WillOnce(Invoke([&](folly::Range<folly::SocketAddress const*> addrs,
iovec* iovecs,
size_t* messageSizes,
size_t count) {
EXPECT_EQ(addrs.size(), 1);
EXPECT_EQ(count, messages.size());

size_t currentIovIndex = 0;
for (size_t i = 0; i < count; i++) {
auto wrappedIovBuffer =
folly::IOBuf::wrapIov(iovecs + currentIovIndex, messageSizes[i]);
currentIovIndex += messageSizes[i];

folly::IOBufEqualTo eq;
EXPECT_TRUE(
eq(wrappedIovBuffer,
folly::IOBuf::copyBuffer(folly::ByteRange(
(unsigned char*)messages[i].data(), messages[i].size()))));
}

return 0;
}));

batchWriter->write(sock, folly::SocketAddress());
}

TEST_F(QuicBatchWriterTest, TestBatchingSendmmsgNewlyAllocatedIovecMatches) {
// In this test case, we surpass the kNumIovecBufferChains limit
// (i.e. the number of contiguous buffers we are sending)
folly::EventBase evb;
std::shared_ptr<FollyQuicEventBase> qEvb =
std::make_shared<FollyQuicEventBase>(&evb);
quic::test::MockAsyncUDPSocket sock(qEvb);

auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG,
kBatchNum,
false, /* enable backpressure */
DataPathType::ChainedMemory,
conn_,
gsoSupported_);
CHECK(batchWriter);

std::vector<std::vector<std::string>> messages{
{"It", "is", "sunny!"},
{"but", "it", "is", "so", "cold"},
{"my",
"jacket",
"isn't",
"warm",
"enough",
"and",
"my",
"hands",
"are",
"freezing"}};

CHECK(batchWriter->empty());
CHECK_EQ(batchWriter->size(), 0);

std::vector<Buf> buffers;

size_t size = 0;
for (auto& message : messages) {
auto buf = std::make_unique<folly::IOBuf>();
for (size_t j = 0; j < message.size(); j++) {
auto partBuf = folly::IOBuf::copyBuffer(folly::ByteRange(
(unsigned char*)message[j].data(), message[j].size()));
buf->appendToChain(std::move(partBuf));
}
buffers.emplace_back(std::move(buf));
}

for (size_t i = 0; i < messages.size(); i++) {
batchWriter->append(
buffers[i]->clone(),
buffers[i]->computeChainDataLength(),
folly::SocketAddress(),
nullptr);
size += buffers[i]->computeChainDataLength();
CHECK_EQ(batchWriter->size(), size);
}

EXPECT_CALL(sock, writem(_, _, _, _))
.Times(1)
.WillOnce(Invoke([&](folly::Range<folly::SocketAddress const*> addrs,
iovec* iovecs,
size_t* messageSizes,
size_t count) {
EXPECT_EQ(addrs.size(), 1);
EXPECT_EQ(count, messages.size());

size_t currentIovIndex = 0;
for (size_t i = 0; i < count; i++) {
auto wrappedIovBuffer =
folly::IOBuf::wrapIov(iovecs + currentIovIndex, messageSizes[i]);
currentIovIndex += messageSizes[i];

folly::IOBufEqualTo eq;
EXPECT_TRUE(eq(wrappedIovBuffer, buffers[i]));
}

return 0;
}));

batchWriter->write(sock, folly::SocketAddress());
}

TEST_F(QuicBatchWriterTest, TestBatchingSendmmsgGSOBatchNum) {
folly::EventBase evb;
std::shared_ptr<FollyQuicEventBase> qEvb =
Expand Down
4 changes: 1 addition & 3 deletions quic/common/testutil/MockAsyncUDPSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ struct MockAsyncUDPSocket : public FollyQuicAsyncUDPSocket {
MOCK_METHOD(
int,
writem,
(folly::Range<folly::SocketAddress const*>,
const std::unique_ptr<folly::IOBuf>*,
size_t));
(folly::Range<folly::SocketAddress const*>, iovec*, size_t*, size_t));
MOCK_METHOD(
ssize_t,
writeGSO,
Expand Down
5 changes: 3 additions & 2 deletions quic/common/udpsocket/FollyQuicAsyncUDPSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ ssize_t FollyQuicAsyncUDPSocket::write(

int FollyQuicAsyncUDPSocket::writem(
folly::Range<folly::SocketAddress const*> addrs,
const std::unique_ptr<folly::IOBuf>* bufs,
iovec* iov,
size_t* numIovecsInBuffer,
size_t count) {
return follySocket_.writem(addrs, bufs, count);
return follySocket_.writemv(addrs, iov, numIovecsInBuffer, count);
}

ssize_t FollyQuicAsyncUDPSocket::writeGSO(
Expand Down
3 changes: 2 additions & 1 deletion quic/common/udpsocket/FollyQuicAsyncUDPSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ class FollyQuicAsyncUDPSocket : public QuicAsyncUDPSocketImpl {

int writem(
folly::Range<folly::SocketAddress const*> addrs,
const std::unique_ptr<folly::IOBuf>* bufs,
iovec* iov,
size_t* numIovecsInBuffer,
size_t count) override;

ssize_t writeGSO(
Expand Down
7 changes: 4 additions & 3 deletions quic/common/udpsocket/LibevQuicAsyncUDPSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ int LibevQuicAsyncUDPSocket::getGSO() {
}

int LibevQuicAsyncUDPSocket::writem(
folly::Range<folly::SocketAddress const*> /* addrs */,
const std::unique_ptr<folly::IOBuf>* /* bufs */,
size_t /* count */) {
folly::Range<folly::SocketAddress const*>,
iovec*,
size_t*,
size_t) {
LOG(FATAL) << __func__ << "is not implemented in LibevQuicAsyncUDPSocket";
return -1;
}
Expand Down
3 changes: 2 additions & 1 deletion quic/common/udpsocket/LibevQuicAsyncUDPSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class LibevQuicAsyncUDPSocket : public QuicAsyncUDPSocketImpl {

int writem(
folly::Range<folly::SocketAddress const*> addrs,
const std::unique_ptr<folly::IOBuf>* bufs,
iovec* iov,
size_t* numIovecsInBuffer,
size_t count) override;

ssize_t writeGSO(
Expand Down
12 changes: 7 additions & 5 deletions quic/common/udpsocket/QuicAsyncUDPSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,15 @@ class QuicAsyncUDPSocket {
/**
* Send the data in buffers to destination. Returns the return code from
* ::sendmmsg.
* bufs is an array of std::unique_ptr<folly::IOBuf>
* of size num
* iov is an array of iovecs, which is composed of "count" messages that
* need to be sent. Each message can have multiple iovecs. The number of
* iovecs per message is specified in numIovecsInBuffer.
*/
virtual int writem(
folly::Range<folly::SocketAddress const*> /* addrs */,
const std::unique_ptr<folly::IOBuf>* /* bufs */,
size_t /* count */) = 0;
folly::Range<folly::SocketAddress const*> addrs,
iovec* iov,
size_t* numIovecsInBuffer,
size_t count) = 0;

struct WriteOptions {
WriteOptions() = default;
Expand Down
4 changes: 1 addition & 3 deletions quic/common/udpsocket/test/QuicAsyncUDPSocketMock.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ class QuicAsyncUDPSocketMock : public QuicAsyncUDPSocket {
MOCK_METHOD(
(int),
writem,
(folly::Range<folly::SocketAddress const*>,
const std::unique_ptr<folly::IOBuf>*,
size_t));
(folly::Range<folly::SocketAddress const*>, iovec*, size_t*, size_t));
MOCK_METHOD(
ssize_t,
writeGSO,
Expand Down

0 comments on commit 69d9467

Please sign in to comment.