Skip to content

Commit

Permalink
bgpd: use ring buffer for network input
Browse files Browse the repository at this point in the history
The multithreading code has a comment that reads:
"XXX: Heavy abuse of stream API. This needs a ring buffer."

This patch makes the relevant code use a ring buffer.

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
  • Loading branch information
qlyoung committed Jan 3, 2018
1 parent 74e4a32 commit 74ffbfe
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 60 deletions.
8 changes: 5 additions & 3 deletions bgpd/bgp_fsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "thread.h"
#include "log.h"
#include "stream.h"
#include "ringbuf.h"
#include "memory.h"
#include "plist.h"
#include "workqueue.h"
Expand Down Expand Up @@ -155,7 +156,6 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)

stream_fifo_clean(peer->ibuf);
stream_fifo_clean(peer->obuf);
stream_reset(peer->ibuf_work);

/*
* this should never happen, since bgp_process_packet() is the
Expand Down Expand Up @@ -183,7 +183,9 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)
stream_fifo_push(peer->ibuf,
stream_fifo_pop(from_peer->ibuf));

stream_copy(peer->ibuf_work, from_peer->ibuf_work);
ringbuf_wipe(peer->ibuf_work);
ringbuf_copy(peer->ibuf_work, from_peer->ibuf_work,
ringbuf_remain(from_peer->ibuf_work));
}
pthread_mutex_unlock(&from_peer->io_mtx);
pthread_mutex_unlock(&peer->io_mtx);
Expand Down Expand Up @@ -1097,7 +1099,7 @@ int bgp_stop(struct peer *peer)
stream_fifo_clean(peer->obuf);

if (peer->ibuf_work)
stream_reset(peer->ibuf_work);
ringbuf_wipe(peer->ibuf_work);
if (peer->obuf_work)
stream_reset(peer->obuf_work);

Expand Down
90 changes: 39 additions & 51 deletions bgpd/bgp_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "memory.h" // for MTYPE_TMP, XCALLOC, XFREE
#include "network.h" // for ERRNO_IO_RETRY
#include "stream.h" // for stream_get_endp, stream_getw_from, str...
#include "ringbuf.h" // for ringbuf_remain, ringbuf_peek, ringbuf_...
#include "thread.h" // for THREAD_OFF, THREAD_ARG, thread, thread...
#include "zassert.h" // for assert

Expand Down Expand Up @@ -273,14 +274,12 @@ static int bgp_process_reads(struct thread *thread)
/* static buffer for transferring packets */
static unsigned char pktbuf[BGP_MAX_PACKET_SIZE];
/* shorter alias to peer's input buffer */
struct stream *ibw = peer->ibuf_work;
/* offset of start of current packet */
size_t offset = stream_get_getp(ibw);
struct ringbuf *ibw = peer->ibuf_work;
/* packet size as given by header */
u_int16_t pktsize = 0;
uint16_t pktsize = 0;

/* check that we have enough data for a header */
if (STREAM_READABLE(ibw) < BGP_HEADER_SIZE)
if (ringbuf_remain(ibw) < BGP_HEADER_SIZE)
break;

/* validate header */
Expand All @@ -292,16 +291,18 @@ static int bgp_process_reads(struct thread *thread)
}

/* header is valid; retrieve packet size */
pktsize = stream_getw_from(ibw, offset + BGP_MARKER_SIZE);
ringbuf_peek(ibw, BGP_MARKER_SIZE, &pktsize, sizeof(pktsize));

pktsize = ntohs(pktsize);

/* if this fails we are seriously screwed */
assert(pktsize <= BGP_MAX_PACKET_SIZE);

/* If we have that much data, chuck it into its own
* stream and append to input queue for processing. */
if (STREAM_READABLE(ibw) >= pktsize) {
if (ringbuf_remain(ibw) >= pktsize) {
struct stream *pkt = stream_new(pktsize);
stream_get(pktbuf, ibw, pktsize);
assert(ringbuf_get(ibw, pktbuf, pktsize) == pktsize);
stream_put(pkt, pktbuf, pktsize);

pthread_mutex_lock(&peer->io_mtx);
Expand All @@ -315,28 +316,12 @@ static int bgp_process_reads(struct thread *thread)
break;
}

/*
* After reading:
* 1. Move unread data to stream start to make room for more.
* 2. Reschedule and return when we have additional data.
*
* XXX: Heavy abuse of stream API. This needs a ring buffer.
*/
if (more && STREAM_WRITEABLE(peer->ibuf_work) < BGP_MAX_PACKET_SIZE) {
void *from = stream_pnt(peer->ibuf_work);
void *to = peer->ibuf_work->data;
size_t siz = STREAM_READABLE(peer->ibuf_work);
memmove(to, from, siz);
stream_set_getp(peer->ibuf_work, 0);
stream_set_endp(peer->ibuf_work, siz);
}

assert(STREAM_WRITEABLE(peer->ibuf_work) >= BGP_MAX_PACKET_SIZE);
assert(ringbuf_space(peer->ibuf_work) >= BGP_MAX_PACKET_SIZE);

/* handle invalid header */
if (fatal) {
/* wipe buffer just in case someone screwed up */
stream_reset(peer->ibuf_work);
ringbuf_wipe(peer->ibuf_work);
} else {
thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
&peer->t_read);
Expand Down Expand Up @@ -474,14 +459,16 @@ static uint16_t bgp_read(struct peer *peer)
size_t readsize; // how many bytes we want to read
ssize_t nbytes; // how many bytes we actually read
uint16_t status = 0;
static uint8_t ibw[BGP_MAX_PACKET_SIZE * BGP_READ_PACKET_MAX];

readsize = STREAM_WRITEABLE(peer->ibuf_work);

nbytes = stream_read_try(peer->ibuf_work, peer->fd, readsize);
readsize = MIN(ringbuf_space(peer->ibuf_work), sizeof(ibw));
nbytes = read(peer->fd, ibw, readsize);

switch (nbytes) {
/* EAGAIN or EWOULDBLOCK; come back later */
if (nbytes < 0 && ERRNO_IO_RETRY(errno)) {
SET_FLAG(status, BGP_IO_TRANS_ERR);
/* Fatal error; tear down session */
case -1:
} else if (nbytes < 0) {
zlog_err("%s [Error] bgp_read_packet error: %s", peer->host,
safe_strerror(errno));

Expand All @@ -495,10 +482,8 @@ static uint16_t bgp_read(struct peer *peer)

BGP_EVENT_ADD(peer, TCP_fatal_error);
SET_FLAG(status, BGP_IO_FATAL_ERR);
break;

/* Received EOF / TCP session closed */
case 0:
} else if (nbytes == 0) {
if (bgp_debug_neighbor_events(peer))
zlog_debug("%s [Event] BGP connection closed fd %d",
peer->host, peer->fd);
Expand All @@ -513,14 +498,9 @@ static uint16_t bgp_read(struct peer *peer)

BGP_EVENT_ADD(peer, TCP_connection_closed);
SET_FLAG(status, BGP_IO_FATAL_ERR);
break;

/* EAGAIN or EWOULDBLOCK; come back later */
case -2:
SET_FLAG(status, BGP_IO_TRANS_ERR);
break;
default:
break;
} else {
assert(ringbuf_put(peer->ibuf_work, ibw, nbytes)
== (size_t)nbytes);
}

return status;
Expand All @@ -529,27 +509,35 @@ static uint16_t bgp_read(struct peer *peer)
/*
* Called after we have read a BGP packet header. Validates marker, message
* type and packet length. If any of these aren't correct, sends a notify.
*
* Assumes that there are at least BGP_HEADER_SIZE readable bytes in the input
* buffer.
*/
static bool validate_header(struct peer *peer)
{
uint16_t size;
uint8_t type;
struct stream *pkt = peer->ibuf_work;
size_t getp = stream_get_getp(pkt);
struct ringbuf *pkt = peer->ibuf_work;

static uint8_t marker[BGP_MARKER_SIZE] = {
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
static uint8_t m_correct[BGP_MARKER_SIZE] = {
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
uint8_t m_rx[BGP_MARKER_SIZE] = {0x00};

if (memcmp(marker, stream_pnt(pkt), BGP_MARKER_SIZE) != 0) {
if (ringbuf_peek(pkt, 0, m_rx, BGP_MARKER_SIZE) != BGP_MARKER_SIZE)
return false;

if (memcmp(m_correct, m_rx, BGP_MARKER_SIZE) != 0) {
bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR,
BGP_NOTIFY_HEADER_NOT_SYNC);
return false;
}

/* Get size and type in host byte order. */
size = stream_getw_from(pkt, getp + BGP_MARKER_SIZE);
type = stream_getc_from(pkt, getp + BGP_MARKER_SIZE + 2);
/* Get size and type in network byte order. */
ringbuf_peek(pkt, BGP_MARKER_SIZE, &size, sizeof(size));
ringbuf_peek(pkt, BGP_MARKER_SIZE + 2, &type, sizeof(type));

size = ntohs(size);

/* BGP type check. */
if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE
Expand Down
7 changes: 5 additions & 2 deletions bgpd/bgpd.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "thread.h"
#include "buffer.h"
#include "stream.h"
#include "ringbuf.h"
#include "command.h"
#include "sockunion.h"
#include "sockopt.h"
Expand Down Expand Up @@ -1162,7 +1163,9 @@ struct peer *peer_new(struct bgp *bgp)
*/
peer->obuf_work =
stream_new(BGP_MAX_PACKET_SIZE + BGP_MAX_PACKET_SIZE_OVERFLOW);
peer->ibuf_work = stream_new(BGP_MAX_PACKET_SIZE * BGP_READ_PACKET_MAX);
peer->ibuf_work =
ringbuf_new(BGP_MAX_PACKET_SIZE * BGP_READ_PACKET_MAX);

peer->scratch = stream_new(BGP_MAX_PACKET_SIZE);

bgp_sync_init(peer);
Expand Down Expand Up @@ -2179,7 +2182,7 @@ int peer_delete(struct peer *peer)
}

if (peer->ibuf_work) {
stream_free(peer->ibuf_work);
ringbuf_del(peer->ibuf_work);
peer->ibuf_work = NULL;
}

Expand Down
4 changes: 2 additions & 2 deletions bgpd/bgpd.h
Original file line number Diff line number Diff line change
Expand Up @@ -598,8 +598,8 @@ struct peer {
struct stream_fifo *ibuf; // packets waiting to be processed
struct stream_fifo *obuf; // packets waiting to be written

struct stream *ibuf_work; // WiP buffer used by bgp_read() only
struct stream *obuf_work; // WiP buffer used to construct packets
struct ringbuf *ibuf_work; // WiP buffer used by bgp_read() only
struct stream *obuf_work; // WiP buffer used to construct packets

struct stream *curr; // the current packet being parsed

Expand Down
3 changes: 2 additions & 1 deletion bgpd/rfapi/rfapi.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "lib/linklist.h"
#include "lib/command.h"
#include "lib/stream.h"
#include "lib/ringbuf.h"

#include "bgpd/bgpd.h"
#include "bgpd/bgp_ecommunity.h"
Expand Down Expand Up @@ -1310,7 +1311,7 @@ static int rfapi_open_inner(struct rfapi_descriptor *rfd, struct bgp *bgp,
stream_fifo_free(rfd->peer->obuf);

if (rfd->peer->ibuf_work)
stream_free(rfd->peer->ibuf_work);
ringbuf_del(rfd->peer->ibuf_work);
if (rfd->peer->obuf_work)
stream_free(rfd->peer->obuf_work);

Expand Down
3 changes: 2 additions & 1 deletion bgpd/rfapi/vnc_zebra.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "lib/command.h"
#include "lib/zclient.h"
#include "lib/stream.h"
#include "lib/ringbuf.h"
#include "lib/memory.h"

#include "bgpd/bgpd.h"
Expand Down Expand Up @@ -198,7 +199,7 @@ static void vnc_redistribute_add(struct prefix *p, u_int32_t metric,
stream_fifo_free(vncHD1VR.peer->obuf);

if (vncHD1VR.peer->ibuf_work)
stream_free(vncHD1VR.peer->ibuf_work);
ringbuf_del(vncHD1VR.peer->ibuf_work);
if (vncHD1VR.peer->obuf_work)
stream_free(vncHD1VR.peer->obuf_work);

Expand Down

0 comments on commit 74ffbfe

Please sign in to comment.