Skip to content

Commit

Permalink
Swith to writev() for DiskPacketWriter.
Browse files Browse the repository at this point in the history
  • Loading branch information
jaycedowell committed Nov 26, 2024
1 parent 84cc3f9 commit 42dfbb3
Showing 1 changed file with 43 additions and 12 deletions.
55 changes: 43 additions & 12 deletions src/packet_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

#include <sys/mman.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <unistd.h>
#include <fstream>
#include <chrono>
Expand Down Expand Up @@ -116,31 +117,61 @@ class PacketWriterMethod: public BoundThread {
};

class DiskPacketWriter : public PacketWriterMethod {
int _last_count;
iovec* _iovs;
public:
DiskPacketWriter(int fd, size_t max_burst_size=BF_SEND_NPKTBURST, int core=-1)
: PacketWriterMethod(fd, max_burst_size, core) {}
: PacketWriterMethod(fd, max_burst_size, core), _last_count(0), _iovs(NULL) {}
~DiskPacketWriter() {
if( _iovs ) {
free(_iovs);
}
}
ssize_t send_packets(char* hdrs,
int hdr_size,
char* data,
int data_size,
int npackets,
int flags=0) {
if( npackets > _last_count ) {
if( _max_burst_size > 0 ) {
_max_burst_size = std::min(_max_burst_size, (size_t) IOV_MAX/2);
} else {
// Divide by two since there is a header and a payload
_max_burst_size = IOV_MAX/2;
}

if( _iovs ) {
::munlock(_iovs, sizeof(struct iovec)*2*_last_count);
free(_iovs);
}

_last_count = npackets;
_iovs = (struct iovec *) malloc(sizeof(struct iovec)*2*npackets);
::mlock(_iovs, sizeof(struct iovec)*2*npackets);
}

for(int i=0; i<npackets; i++) {
_iovs[2*i+0].iov_base = (hdrs + i*hdr_size);
_iovs[2*i+0].iov_len = hdr_size;
_iovs[2*i+1].iov_base = (data + i*data_size);
_iovs[2*i+1].iov_len = data_size;
}

int i = 0;
ssize_t status, nsend, nsent = 0;
ssize_t status, nsend, nsent_batch, nsent = 0;
while(npackets > 0) {
_limiter.begin();
if( _max_burst_size > 0 ) {
nsend = std::min(_max_burst_size, (size_t) npackets);
} else {
nsend = npackets;
nsend = std::min(_max_burst_size, (size_t) npackets);
nsent_batch = ::writev(_fd, _iovs+2*i, 2*nsend);
if( nsent_batch > 0 ) {
nsent += nsent_batch / (hdr_size + data_size);
}
for(int j=0; j<nsend; j++) {
status = ::write(_fd, hdrs+hdr_size*(i+j), hdr_size);
if( status != hdr_size ) continue;
status = ::write(_fd, data+data_size*(i+j), data_size);
if( status != data_size ) continue;
nsent += 1;
/*
if( nsent_batch == -1 ) {
std::cout << "writev failed: " << std::strerror(errno) << " with " << hdr_size << " and " << data_size << std::endl;
}
*/
i += nsend;
npackets -= nsend;
_limiter.end_and_wait(nsend);
Expand Down

0 comments on commit 42dfbb3

Please sign in to comment.