Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bgpd: use ring buffer for network input #1591

Merged
merged 3 commits into from
Jan 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions bgpd/bgp_fsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "thread.h"
#include "log.h"
#include "stream.h"
#include "ringbuf.h"
#include "memory.h"
#include "plist.h"
#include "workqueue.h"
Expand Down Expand Up @@ -155,7 +156,6 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)

stream_fifo_clean(peer->ibuf);
stream_fifo_clean(peer->obuf);
stream_reset(peer->ibuf_work);

/*
* this should never happen, since bgp_process_packet() is the
Expand Down Expand Up @@ -183,7 +183,9 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)
stream_fifo_push(peer->ibuf,
stream_fifo_pop(from_peer->ibuf));

stream_copy(peer->ibuf_work, from_peer->ibuf_work);
ringbuf_wipe(peer->ibuf_work);
ringbuf_copy(peer->ibuf_work, from_peer->ibuf_work,
ringbuf_remain(from_peer->ibuf_work));
}
pthread_mutex_unlock(&from_peer->io_mtx);
pthread_mutex_unlock(&peer->io_mtx);
Expand Down Expand Up @@ -1097,7 +1099,7 @@ int bgp_stop(struct peer *peer)
stream_fifo_clean(peer->obuf);

if (peer->ibuf_work)
stream_reset(peer->ibuf_work);
ringbuf_wipe(peer->ibuf_work);
if (peer->obuf_work)
stream_reset(peer->obuf_work);

Expand Down
90 changes: 39 additions & 51 deletions bgpd/bgp_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "memory.h" // for MTYPE_TMP, XCALLOC, XFREE
#include "network.h" // for ERRNO_IO_RETRY
#include "stream.h" // for stream_get_endp, stream_getw_from, str...
#include "ringbuf.h" // for ringbuf_remain, ringbuf_peek, ringbuf_...
#include "thread.h" // for THREAD_OFF, THREAD_ARG, thread, thread...
#include "zassert.h" // for assert

Expand Down Expand Up @@ -273,14 +274,12 @@ static int bgp_process_reads(struct thread *thread)
/* static buffer for transferring packets */
static unsigned char pktbuf[BGP_MAX_PACKET_SIZE];
/* shorter alias to peer's input buffer */
struct stream *ibw = peer->ibuf_work;
/* offset of start of current packet */
size_t offset = stream_get_getp(ibw);
struct ringbuf *ibw = peer->ibuf_work;
/* packet size as given by header */
u_int16_t pktsize = 0;
uint16_t pktsize = 0;

/* check that we have enough data for a header */
if (STREAM_READABLE(ibw) < BGP_HEADER_SIZE)
if (ringbuf_remain(ibw) < BGP_HEADER_SIZE)
break;

/* validate header */
Expand All @@ -292,16 +291,18 @@ static int bgp_process_reads(struct thread *thread)
}

/* header is valid; retrieve packet size */
pktsize = stream_getw_from(ibw, offset + BGP_MARKER_SIZE);
ringbuf_peek(ibw, BGP_MARKER_SIZE, &pktsize, sizeof(pktsize));

pktsize = ntohs(pktsize);

/* if this fails we are seriously screwed */
assert(pktsize <= BGP_MAX_PACKET_SIZE);

/* If we have that much data, chuck it into its own
* stream and append to input queue for processing. */
if (STREAM_READABLE(ibw) >= pktsize) {
if (ringbuf_remain(ibw) >= pktsize) {
struct stream *pkt = stream_new(pktsize);
stream_get(pktbuf, ibw, pktsize);
assert(ringbuf_get(ibw, pktbuf, pktsize) == pktsize);
stream_put(pkt, pktbuf, pktsize);

pthread_mutex_lock(&peer->io_mtx);
Expand All @@ -315,28 +316,12 @@ static int bgp_process_reads(struct thread *thread)
break;
}

/*
* After reading:
* 1. Move unread data to stream start to make room for more.
* 2. Reschedule and return when we have additional data.
*
* XXX: Heavy abuse of stream API. This needs a ring buffer.
*/
if (more && STREAM_WRITEABLE(peer->ibuf_work) < BGP_MAX_PACKET_SIZE) {
void *from = stream_pnt(peer->ibuf_work);
void *to = peer->ibuf_work->data;
size_t siz = STREAM_READABLE(peer->ibuf_work);
memmove(to, from, siz);
stream_set_getp(peer->ibuf_work, 0);
stream_set_endp(peer->ibuf_work, siz);
}

assert(STREAM_WRITEABLE(peer->ibuf_work) >= BGP_MAX_PACKET_SIZE);
assert(ringbuf_space(peer->ibuf_work) >= BGP_MAX_PACKET_SIZE);

/* handle invalid header */
if (fatal) {
/* wipe buffer just in case someone screwed up */
stream_reset(peer->ibuf_work);
ringbuf_wipe(peer->ibuf_work);
} else {
thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
&peer->t_read);
Expand Down Expand Up @@ -474,14 +459,16 @@ static uint16_t bgp_read(struct peer *peer)
size_t readsize; // how many bytes we want to read
ssize_t nbytes; // how many bytes we actually read
uint16_t status = 0;
static uint8_t ibw[BGP_MAX_PACKET_SIZE * BGP_READ_PACKET_MAX];

readsize = STREAM_WRITEABLE(peer->ibuf_work);

nbytes = stream_read_try(peer->ibuf_work, peer->fd, readsize);
readsize = MIN(ringbuf_space(peer->ibuf_work), sizeof(ibw));
nbytes = read(peer->fd, ibw, readsize);

switch (nbytes) {
/* EAGAIN or EWOULDBLOCK; come back later */
if (nbytes < 0 && ERRNO_IO_RETRY(errno)) {
SET_FLAG(status, BGP_IO_TRANS_ERR);
/* Fatal error; tear down session */
case -1:
} else if (nbytes < 0) {
zlog_err("%s [Error] bgp_read_packet error: %s", peer->host,
safe_strerror(errno));

Expand All @@ -495,10 +482,8 @@ static uint16_t bgp_read(struct peer *peer)

BGP_EVENT_ADD(peer, TCP_fatal_error);
SET_FLAG(status, BGP_IO_FATAL_ERR);
break;

/* Received EOF / TCP session closed */
case 0:
} else if (nbytes == 0) {
if (bgp_debug_neighbor_events(peer))
zlog_debug("%s [Event] BGP connection closed fd %d",
peer->host, peer->fd);
Expand All @@ -513,14 +498,9 @@ static uint16_t bgp_read(struct peer *peer)

BGP_EVENT_ADD(peer, TCP_connection_closed);
SET_FLAG(status, BGP_IO_FATAL_ERR);
break;

/* EAGAIN or EWOULDBLOCK; come back later */
case -2:
SET_FLAG(status, BGP_IO_TRANS_ERR);
break;
default:
break;
} else {
assert(ringbuf_put(peer->ibuf_work, ibw, nbytes)
== (size_t)nbytes);
}

return status;
Expand All @@ -529,27 +509,35 @@ static uint16_t bgp_read(struct peer *peer)
/*
* Called after we have read a BGP packet header. Validates marker, message
* type and packet length. If any of these aren't correct, sends a notify.
*
* Assumes that there are at least BGP_HEADER_SIZE readable bytes in the input
* buffer.
*/
static bool validate_header(struct peer *peer)
{
uint16_t size;
uint8_t type;
struct stream *pkt = peer->ibuf_work;
size_t getp = stream_get_getp(pkt);
struct ringbuf *pkt = peer->ibuf_work;

static uint8_t marker[BGP_MARKER_SIZE] = {
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
static uint8_t m_correct[BGP_MARKER_SIZE] = {
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
uint8_t m_rx[BGP_MARKER_SIZE] = {0x00};

if (memcmp(marker, stream_pnt(pkt), BGP_MARKER_SIZE) != 0) {
if (ringbuf_peek(pkt, 0, m_rx, BGP_MARKER_SIZE) != BGP_MARKER_SIZE)
return false;

if (memcmp(m_correct, m_rx, BGP_MARKER_SIZE) != 0) {
bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR,
BGP_NOTIFY_HEADER_NOT_SYNC);
return false;
}

/* Get size and type in host byte order. */
size = stream_getw_from(pkt, getp + BGP_MARKER_SIZE);
type = stream_getc_from(pkt, getp + BGP_MARKER_SIZE + 2);
/* Get size and type in network byte order. */
ringbuf_peek(pkt, BGP_MARKER_SIZE, &size, sizeof(size));
ringbuf_peek(pkt, BGP_MARKER_SIZE + 2, &type, sizeof(type));

size = ntohs(size);

/* BGP type check. */
if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE
Expand Down
7 changes: 5 additions & 2 deletions bgpd/bgpd.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "thread.h"
#include "buffer.h"
#include "stream.h"
#include "ringbuf.h"
#include "command.h"
#include "sockunion.h"
#include "sockopt.h"
Expand Down Expand Up @@ -1162,7 +1163,9 @@ struct peer *peer_new(struct bgp *bgp)
*/
peer->obuf_work =
stream_new(BGP_MAX_PACKET_SIZE + BGP_MAX_PACKET_SIZE_OVERFLOW);
peer->ibuf_work = stream_new(BGP_MAX_PACKET_SIZE * BGP_READ_PACKET_MAX);
peer->ibuf_work =
ringbuf_new(BGP_MAX_PACKET_SIZE * BGP_READ_PACKET_MAX);

peer->scratch = stream_new(BGP_MAX_PACKET_SIZE);

bgp_sync_init(peer);
Expand Down Expand Up @@ -2179,7 +2182,7 @@ int peer_delete(struct peer *peer)
}

if (peer->ibuf_work) {
stream_free(peer->ibuf_work);
ringbuf_del(peer->ibuf_work);
peer->ibuf_work = NULL;
}

Expand Down
4 changes: 2 additions & 2 deletions bgpd/bgpd.h
Original file line number Diff line number Diff line change
Expand Up @@ -598,8 +598,8 @@ struct peer {
struct stream_fifo *ibuf; // packets waiting to be processed
struct stream_fifo *obuf; // packets waiting to be written

struct stream *ibuf_work; // WiP buffer used by bgp_read() only
struct stream *obuf_work; // WiP buffer used to construct packets
struct ringbuf *ibuf_work; // WiP buffer used by bgp_read() only
struct stream *obuf_work; // WiP buffer used to construct packets

struct stream *curr; // the current packet being parsed

Expand Down
3 changes: 2 additions & 1 deletion bgpd/rfapi/rfapi.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "lib/linklist.h"
#include "lib/command.h"
#include "lib/stream.h"
#include "lib/ringbuf.h"

#include "bgpd/bgpd.h"
#include "bgpd/bgp_ecommunity.h"
Expand Down Expand Up @@ -1310,7 +1311,7 @@ static int rfapi_open_inner(struct rfapi_descriptor *rfd, struct bgp *bgp,
stream_fifo_free(rfd->peer->obuf);

if (rfd->peer->ibuf_work)
stream_free(rfd->peer->ibuf_work);
ringbuf_del(rfd->peer->ibuf_work);
if (rfd->peer->obuf_work)
stream_free(rfd->peer->obuf_work);

Expand Down
3 changes: 2 additions & 1 deletion bgpd/rfapi/vnc_zebra.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "lib/command.h"
#include "lib/zclient.h"
#include "lib/stream.h"
#include "lib/ringbuf.h"
#include "lib/memory.h"

#include "bgpd/bgpd.h"
Expand Down Expand Up @@ -198,7 +199,7 @@ static void vnc_redistribute_add(struct prefix *p, u_int32_t metric,
stream_fifo_free(vncHD1VR.peer->obuf);

if (vncHD1VR.peer->ibuf_work)
stream_free(vncHD1VR.peer->ibuf_work);
ringbuf_del(vncHD1VR.peer->ibuf_work);
if (vncHD1VR.peer->obuf_work)
stream_free(vncHD1VR.peer->obuf_work);

Expand Down
15 changes: 12 additions & 3 deletions lib/ringbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ size_t ringbuf_put(struct ringbuf *buf, const void *data, size_t size)
size_t space = ringbuf_space(buf);
size_t copysize = MIN(size, space);
size_t tocopy = copysize;
if (tocopy > buf->size - buf->end) {
if (tocopy >= buf->size - buf->end) {
size_t ts = buf->size - buf->end;
memcpy(buf->data + buf->end, dp, ts);
buf->end = 0;
Expand Down Expand Up @@ -102,14 +102,24 @@ size_t ringbuf_peek(struct ringbuf *buf, size_t offset, void *data, size_t size)
if (tocopy >= buf->size - cstart) {
size_t ts = buf->size - cstart;
memcpy(dp, buf->data + cstart, ts);
buf->start = cstart = 0;
cstart = 0;
tocopy -= ts;
dp += ts;
}
memcpy(dp, buf->data + cstart, tocopy);
return copysize;
}

size_t ringbuf_copy(struct ringbuf *to, struct ringbuf *from, size_t size)
{
size_t tocopy = MIN(ringbuf_space(to), size);
uint8_t *cbuf = XCALLOC(MTYPE_TMP, tocopy);
tocopy = ringbuf_peek(from, 0, cbuf, tocopy);
size_t put = ringbuf_put(to, cbuf, tocopy);
XFREE(MTYPE_TMP, cbuf);
return put;
}

void ringbuf_reset(struct ringbuf *buf)
{
buf->start = buf->end = 0;
Expand All @@ -120,5 +130,4 @@ void ringbuf_wipe(struct ringbuf *buf)
{
memset(buf->data, 0x00, buf->size);
ringbuf_reset(buf);
buf->empty = true;
}
10 changes: 10 additions & 0 deletions lib/ringbuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ size_t ringbuf_get(struct ringbuf *buf, void *data, size_t size);
size_t ringbuf_peek(struct ringbuf *buf, size_t offset, void *data,
size_t size);

/*
* Copy data from one ringbuf to another.
*
* @param to destination ringbuf
* @param from source ringbuf
* @param size how much data to copy
* @return amount of data copied
*/
size_t ringbuf_copy(struct ringbuf *to, struct ringbuf *from, size_t size);

/*
* Reset buffer. Does not wipe.
*
Expand Down
4 changes: 2 additions & 2 deletions tests/lib/test_ringbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ int main(int argc, char **argv)

validate_state(soil, BUFSIZ, BUFSIZ);
assert(soil->start == 0);
assert(soil->end == BUFSIZ);
assert(soil->end == 0);

/* read 15 bytes of garbage */
printf("Validating read...\n");
assert(ringbuf_get(soil, &compost, 15) == 15);

validate_state(soil, BUFSIZ, BUFSIZ - 15);
assert(soil->start == 15);
assert(soil->end == BUFSIZ);
assert(soil->end == 0);

/* put another 10 bytes and validate wraparound */
printf("Validating wraparound...\n");
Expand Down