Skip to content

Commit

Permalink
Merge pull request #1591 from qlyoung/bgpd-ringbuf
Browse files Browse the repository at this point in the history
bgpd: use ring buffer for network input
  • Loading branch information
riw777 authored Jan 11, 2018
2 parents bb46988 + 74ffbfe commit 2ed7e4c
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 65 deletions.
8 changes: 5 additions & 3 deletions bgpd/bgp_fsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "thread.h"
#include "log.h"
#include "stream.h"
#include "ringbuf.h"
#include "memory.h"
#include "plist.h"
#include "workqueue.h"
Expand Down Expand Up @@ -155,7 +156,6 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)

stream_fifo_clean(peer->ibuf);
stream_fifo_clean(peer->obuf);
stream_reset(peer->ibuf_work);

/*
* this should never happen, since bgp_process_packet() is the
Expand Down Expand Up @@ -183,7 +183,9 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)
stream_fifo_push(peer->ibuf,
stream_fifo_pop(from_peer->ibuf));

stream_copy(peer->ibuf_work, from_peer->ibuf_work);
ringbuf_wipe(peer->ibuf_work);
ringbuf_copy(peer->ibuf_work, from_peer->ibuf_work,
ringbuf_remain(from_peer->ibuf_work));
}
pthread_mutex_unlock(&from_peer->io_mtx);
pthread_mutex_unlock(&peer->io_mtx);
Expand Down Expand Up @@ -1099,7 +1101,7 @@ int bgp_stop(struct peer *peer)
stream_fifo_clean(peer->obuf);

if (peer->ibuf_work)
stream_reset(peer->ibuf_work);
ringbuf_wipe(peer->ibuf_work);
if (peer->obuf_work)
stream_reset(peer->obuf_work);

Expand Down
90 changes: 39 additions & 51 deletions bgpd/bgp_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "memory.h" // for MTYPE_TMP, XCALLOC, XFREE
#include "network.h" // for ERRNO_IO_RETRY
#include "stream.h" // for stream_get_endp, stream_getw_from, str...
#include "ringbuf.h" // for ringbuf_remain, ringbuf_peek, ringbuf_...
#include "thread.h" // for THREAD_OFF, THREAD_ARG, thread, thread...
#include "zassert.h" // for assert

Expand Down Expand Up @@ -263,14 +264,12 @@ static int bgp_process_reads(struct thread *thread)
/* static buffer for transferring packets */
static unsigned char pktbuf[BGP_MAX_PACKET_SIZE];
/* shorter alias to peer's input buffer */
struct stream *ibw = peer->ibuf_work;
/* offset of start of current packet */
size_t offset = stream_get_getp(ibw);
struct ringbuf *ibw = peer->ibuf_work;
/* packet size as given by header */
u_int16_t pktsize = 0;
uint16_t pktsize = 0;

/* check that we have enough data for a header */
if (STREAM_READABLE(ibw) < BGP_HEADER_SIZE)
if (ringbuf_remain(ibw) < BGP_HEADER_SIZE)
break;

/* validate header */
Expand All @@ -282,16 +281,18 @@ static int bgp_process_reads(struct thread *thread)
}

/* header is valid; retrieve packet size */
pktsize = stream_getw_from(ibw, offset + BGP_MARKER_SIZE);
ringbuf_peek(ibw, BGP_MARKER_SIZE, &pktsize, sizeof(pktsize));

pktsize = ntohs(pktsize);

/* if this fails we are seriously screwed */
assert(pktsize <= BGP_MAX_PACKET_SIZE);

/* If we have that much data, chuck it into its own
* stream and append to input queue for processing. */
if (STREAM_READABLE(ibw) >= pktsize) {
if (ringbuf_remain(ibw) >= pktsize) {
struct stream *pkt = stream_new(pktsize);
stream_get(pktbuf, ibw, pktsize);
assert(ringbuf_get(ibw, pktbuf, pktsize) == pktsize);
stream_put(pkt, pktbuf, pktsize);

pthread_mutex_lock(&peer->io_mtx);
Expand All @@ -305,28 +306,12 @@ static int bgp_process_reads(struct thread *thread)
break;
}

/*
* After reading:
* 1. Move unread data to stream start to make room for more.
* 2. Reschedule and return when we have additional data.
*
* XXX: Heavy abuse of stream API. This needs a ring buffer.
*/
if (more && STREAM_WRITEABLE(peer->ibuf_work) < BGP_MAX_PACKET_SIZE) {
void *from = stream_pnt(peer->ibuf_work);
void *to = peer->ibuf_work->data;
size_t siz = STREAM_READABLE(peer->ibuf_work);
memmove(to, from, siz);
stream_set_getp(peer->ibuf_work, 0);
stream_set_endp(peer->ibuf_work, siz);
}

assert(STREAM_WRITEABLE(peer->ibuf_work) >= BGP_MAX_PACKET_SIZE);
assert(ringbuf_space(peer->ibuf_work) >= BGP_MAX_PACKET_SIZE);

/* handle invalid header */
if (fatal) {
/* wipe buffer just in case someone screwed up */
stream_reset(peer->ibuf_work);
ringbuf_wipe(peer->ibuf_work);
} else {
thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
&peer->t_read);
Expand Down Expand Up @@ -464,14 +449,16 @@ static uint16_t bgp_read(struct peer *peer)
size_t readsize; // how many bytes we want to read
ssize_t nbytes; // how many bytes we actually read
uint16_t status = 0;
static uint8_t ibw[BGP_MAX_PACKET_SIZE * BGP_READ_PACKET_MAX];

readsize = STREAM_WRITEABLE(peer->ibuf_work);

nbytes = stream_read_try(peer->ibuf_work, peer->fd, readsize);
readsize = MIN(ringbuf_space(peer->ibuf_work), sizeof(ibw));
nbytes = read(peer->fd, ibw, readsize);

switch (nbytes) {
/* EAGAIN or EWOULDBLOCK; come back later */
if (nbytes < 0 && ERRNO_IO_RETRY(errno)) {
SET_FLAG(status, BGP_IO_TRANS_ERR);
/* Fatal error; tear down session */
case -1:
} else if (nbytes < 0) {
zlog_err("%s [Error] bgp_read_packet error: %s", peer->host,
safe_strerror(errno));

Expand All @@ -485,10 +472,8 @@ static uint16_t bgp_read(struct peer *peer)

BGP_EVENT_ADD(peer, TCP_fatal_error);
SET_FLAG(status, BGP_IO_FATAL_ERR);
break;

/* Received EOF / TCP session closed */
case 0:
} else if (nbytes == 0) {
if (bgp_debug_neighbor_events(peer))
zlog_debug("%s [Event] BGP connection closed fd %d",
peer->host, peer->fd);
Expand All @@ -503,14 +488,9 @@ static uint16_t bgp_read(struct peer *peer)

BGP_EVENT_ADD(peer, TCP_connection_closed);
SET_FLAG(status, BGP_IO_FATAL_ERR);
break;

/* EAGAIN or EWOULDBLOCK; come back later */
case -2:
SET_FLAG(status, BGP_IO_TRANS_ERR);
break;
default:
break;
} else {
assert(ringbuf_put(peer->ibuf_work, ibw, nbytes)
== (size_t)nbytes);
}

return status;
Expand All @@ -519,27 +499,35 @@ static uint16_t bgp_read(struct peer *peer)
/*
* Called after we have read a BGP packet header. Validates marker, message
* type and packet length. If any of these aren't correct, sends a notify.
*
* Assumes that there are at least BGP_HEADER_SIZE readable bytes in the input
* buffer.
*/
static bool validate_header(struct peer *peer)
{
uint16_t size;
uint8_t type;
struct stream *pkt = peer->ibuf_work;
size_t getp = stream_get_getp(pkt);
struct ringbuf *pkt = peer->ibuf_work;

static uint8_t marker[BGP_MARKER_SIZE] = {
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
static uint8_t m_correct[BGP_MARKER_SIZE] = {
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
uint8_t m_rx[BGP_MARKER_SIZE] = {0x00};

if (memcmp(marker, stream_pnt(pkt), BGP_MARKER_SIZE) != 0) {
if (ringbuf_peek(pkt, 0, m_rx, BGP_MARKER_SIZE) != BGP_MARKER_SIZE)
return false;

if (memcmp(m_correct, m_rx, BGP_MARKER_SIZE) != 0) {
bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR,
BGP_NOTIFY_HEADER_NOT_SYNC);
return false;
}

/* Get size and type in host byte order. */
size = stream_getw_from(pkt, getp + BGP_MARKER_SIZE);
type = stream_getc_from(pkt, getp + BGP_MARKER_SIZE + 2);
/* Get size and type in network byte order. */
ringbuf_peek(pkt, BGP_MARKER_SIZE, &size, sizeof(size));
ringbuf_peek(pkt, BGP_MARKER_SIZE + 2, &type, sizeof(type));

size = ntohs(size);

/* BGP type check. */
if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE
Expand Down
7 changes: 5 additions & 2 deletions bgpd/bgpd.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "thread.h"
#include "buffer.h"
#include "stream.h"
#include "ringbuf.h"
#include "command.h"
#include "sockunion.h"
#include "sockopt.h"
Expand Down Expand Up @@ -1162,7 +1163,9 @@ struct peer *peer_new(struct bgp *bgp)
*/
peer->obuf_work =
stream_new(BGP_MAX_PACKET_SIZE + BGP_MAX_PACKET_SIZE_OVERFLOW);
peer->ibuf_work = stream_new(BGP_MAX_PACKET_SIZE * BGP_READ_PACKET_MAX);
peer->ibuf_work =
ringbuf_new(BGP_MAX_PACKET_SIZE * BGP_READ_PACKET_MAX);

peer->scratch = stream_new(BGP_MAX_PACKET_SIZE);

bgp_sync_init(peer);
Expand Down Expand Up @@ -2179,7 +2182,7 @@ int peer_delete(struct peer *peer)
}

if (peer->ibuf_work) {
stream_free(peer->ibuf_work);
ringbuf_del(peer->ibuf_work);
peer->ibuf_work = NULL;
}

Expand Down
4 changes: 2 additions & 2 deletions bgpd/bgpd.h
Original file line number Diff line number Diff line change
Expand Up @@ -598,8 +598,8 @@ struct peer {
struct stream_fifo *ibuf; // packets waiting to be processed
struct stream_fifo *obuf; // packets waiting to be written

struct stream *ibuf_work; // WiP buffer used by bgp_read() only
struct stream *obuf_work; // WiP buffer used to construct packets
struct ringbuf *ibuf_work; // WiP buffer used by bgp_read() only
struct stream *obuf_work; // WiP buffer used to construct packets

struct stream *curr; // the current packet being parsed

Expand Down
3 changes: 2 additions & 1 deletion bgpd/rfapi/rfapi.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "lib/linklist.h"
#include "lib/command.h"
#include "lib/stream.h"
#include "lib/ringbuf.h"

#include "bgpd/bgpd.h"
#include "bgpd/bgp_ecommunity.h"
Expand Down Expand Up @@ -1310,7 +1311,7 @@ static int rfapi_open_inner(struct rfapi_descriptor *rfd, struct bgp *bgp,
stream_fifo_free(rfd->peer->obuf);

if (rfd->peer->ibuf_work)
stream_free(rfd->peer->ibuf_work);
ringbuf_del(rfd->peer->ibuf_work);
if (rfd->peer->obuf_work)
stream_free(rfd->peer->obuf_work);

Expand Down
3 changes: 2 additions & 1 deletion bgpd/rfapi/vnc_zebra.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "lib/command.h"
#include "lib/zclient.h"
#include "lib/stream.h"
#include "lib/ringbuf.h"
#include "lib/memory.h"

#include "bgpd/bgpd.h"
Expand Down Expand Up @@ -198,7 +199,7 @@ static void vnc_redistribute_add(struct prefix *p, u_int32_t metric,
stream_fifo_free(vncHD1VR.peer->obuf);

if (vncHD1VR.peer->ibuf_work)
stream_free(vncHD1VR.peer->ibuf_work);
ringbuf_del(vncHD1VR.peer->ibuf_work);
if (vncHD1VR.peer->obuf_work)
stream_free(vncHD1VR.peer->obuf_work);

Expand Down
15 changes: 12 additions & 3 deletions lib/ringbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ size_t ringbuf_put(struct ringbuf *buf, const void *data, size_t size)
size_t space = ringbuf_space(buf);
size_t copysize = MIN(size, space);
size_t tocopy = copysize;
if (tocopy > buf->size - buf->end) {
if (tocopy >= buf->size - buf->end) {
size_t ts = buf->size - buf->end;
memcpy(buf->data + buf->end, dp, ts);
buf->end = 0;
Expand Down Expand Up @@ -102,14 +102,24 @@ size_t ringbuf_peek(struct ringbuf *buf, size_t offset, void *data, size_t size)
if (tocopy >= buf->size - cstart) {
size_t ts = buf->size - cstart;
memcpy(dp, buf->data + cstart, ts);
buf->start = cstart = 0;
cstart = 0;
tocopy -= ts;
dp += ts;
}
memcpy(dp, buf->data + cstart, tocopy);
return copysize;
}

size_t ringbuf_copy(struct ringbuf *to, struct ringbuf *from, size_t size)
{
size_t tocopy = MIN(ringbuf_space(to), size);
uint8_t *cbuf = XCALLOC(MTYPE_TMP, tocopy);
tocopy = ringbuf_peek(from, 0, cbuf, tocopy);
size_t put = ringbuf_put(to, cbuf, tocopy);
XFREE(MTYPE_TMP, cbuf);
return put;
}

void ringbuf_reset(struct ringbuf *buf)
{
buf->start = buf->end = 0;
Expand All @@ -120,5 +130,4 @@ void ringbuf_wipe(struct ringbuf *buf)
{
memset(buf->data, 0x00, buf->size);
ringbuf_reset(buf);
buf->empty = true;
}
10 changes: 10 additions & 0 deletions lib/ringbuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ size_t ringbuf_get(struct ringbuf *buf, void *data, size_t size);
size_t ringbuf_peek(struct ringbuf *buf, size_t offset, void *data,
size_t size);

/*
* Copy data from one ringbuf to another.
*
* @param to destination ringbuf
* @param from source ringbuf
* @param size how much data to copy
* @return amount of data copied
*/
size_t ringbuf_copy(struct ringbuf *to, struct ringbuf *from, size_t size);

/*
* Reset buffer. Does not wipe.
*
Expand Down
4 changes: 2 additions & 2 deletions tests/lib/test_ringbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ int main(int argc, char **argv)

validate_state(soil, BUFSIZ, BUFSIZ);
assert(soil->start == 0);
assert(soil->end == BUFSIZ);
assert(soil->end == 0);

/* read 15 bytes of garbage */
printf("Validating read...\n");
assert(ringbuf_get(soil, &compost, 15) == 15);

validate_state(soil, BUFSIZ, BUFSIZ - 15);
assert(soil->start == 15);
assert(soil->end == BUFSIZ);
assert(soil->end == 0);

/* put another 10 bytes and validate wraparound */
printf("Validating wraparound...\n");
Expand Down

0 comments on commit 2ed7e4c

Please sign in to comment.