Skip to content

Commit

Permalink
Merge pull request #1672 from qlyoung/frr-pthread-improvements
Browse files Browse the repository at this point in the history
FRR pthread improvements
  • Loading branch information
rwestphal authored Jan 26, 2018
2 parents 2d007fe + 096476d commit cbbb31b
Show file tree
Hide file tree
Showing 9 changed files with 367 additions and 310 deletions.
138 changes: 25 additions & 113 deletions bgpd/bgp_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,100 +51,12 @@ static bool validate_header(struct peer *);
#define BGP_IO_TRANS_ERR (1 << 0) // EAGAIN or similar occurred
#define BGP_IO_FATAL_ERR (1 << 1) // some kind of fatal TCP error

/* Plumbing & control variables for thread lifecycle
* ------------------------------------------------------------------------ */
bool bgp_io_thread_run;
pthread_mutex_t *running_cond_mtx;
pthread_cond_t *running_cond;

/* Unused callback for thread_add_read() */
static int bgp_io_dummy(struct thread *thread) { return 0; }

/* Poison pill task */
static int bgp_io_finish(struct thread *thread)
{
bgp_io_thread_run = false;
return 0;
}

/* Extern lifecycle control functions. init -> start -> stop
* ------------------------------------------------------------------------ */
void bgp_io_init()
{
bgp_io_thread_run = false;

running_cond_mtx = XCALLOC(MTYPE_PTHREAD_PRIM, sizeof(pthread_mutex_t));
running_cond = XCALLOC(MTYPE_PTHREAD_PRIM, sizeof(pthread_cond_t));

pthread_mutex_init(running_cond_mtx, NULL);
pthread_cond_init(running_cond, NULL);

/* unlocked in bgp_io_wait_running() */
pthread_mutex_lock(running_cond_mtx);
}

void *bgp_io_start(void *arg)
{
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
fpt->master->owner = pthread_self();

// fd so we can sleep in poll()
int sleeper[2];
pipe(sleeper);
thread_add_read(fpt->master, &bgp_io_dummy, NULL, sleeper[0], NULL);

// we definitely don't want to handle signals
fpt->master->handle_signals = false;

struct thread task;

pthread_mutex_lock(running_cond_mtx);
{
bgp_io_thread_run = true;
pthread_cond_signal(running_cond);
}
pthread_mutex_unlock(running_cond_mtx);

while (bgp_io_thread_run) {
if (thread_fetch(fpt->master, &task)) {
thread_call(&task);
}
}

close(sleeper[1]);
close(sleeper[0]);

return NULL;
}

void bgp_io_wait_running()
{
while (!bgp_io_thread_run)
pthread_cond_wait(running_cond, running_cond_mtx);

/* locked in bgp_io_init() */
pthread_mutex_unlock(running_cond_mtx);
}

int bgp_io_stop(void **result, struct frr_pthread *fpt)
{
thread_add_event(fpt->master, &bgp_io_finish, NULL, 0, NULL);
pthread_join(fpt->thread, result);

pthread_mutex_destroy(running_cond_mtx);
pthread_cond_destroy(running_cond);

XFREE(MTYPE_PTHREAD_PRIM, running_cond_mtx);
XFREE(MTYPE_PTHREAD_PRIM, running_cond);

return 0;
}

/* Extern API -------------------------------------------------------------- */
/* Thread external API ----------------------------------------------------- */

void bgp_writes_on(struct peer *peer)
{
assert(bgp_io_thread_run);
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
assert(fpt->running);

assert(peer->status != Deleted);
assert(peer->obuf);
Expand All @@ -154,18 +66,15 @@ void bgp_writes_on(struct peer *peer)
assert(!peer->t_connect_check_w);
assert(peer->fd);

struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);

thread_add_write(fpt->master, bgp_process_writes, peer, peer->fd,
&peer->t_write);
SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
}

void bgp_writes_off(struct peer *peer)
{
assert(bgp_io_thread_run);

struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
assert(fpt->running);

thread_cancel_async(fpt->master, &peer->t_write, NULL);
THREAD_OFF(peer->t_generate_updgrp_packets);
Expand All @@ -175,7 +84,8 @@ void bgp_writes_off(struct peer *peer)

void bgp_reads_on(struct peer *peer)
{
assert(bgp_io_thread_run);
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
assert(fpt->running);

assert(peer->status != Deleted);
assert(peer->ibuf);
Expand All @@ -186,8 +96,6 @@ void bgp_reads_on(struct peer *peer)
assert(!peer->t_connect_check_w);
assert(peer->fd);

struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);

thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
&peer->t_read);

Expand All @@ -196,19 +104,18 @@ void bgp_reads_on(struct peer *peer)

void bgp_reads_off(struct peer *peer)
{
assert(bgp_io_thread_run);

struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
assert(fpt->running);

thread_cancel_async(fpt->master, &peer->t_read, NULL);
THREAD_OFF(peer->t_process_packet);

UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
}

/* Internal functions ------------------------------------------------------- */
/* Thread internal functions ----------------------------------------------- */

/**
/*
* Called from I/O pthread when a file descriptor has become ready for writing.
*/
static int bgp_process_writes(struct thread *thread)
Expand All @@ -231,11 +138,13 @@ static int bgp_process_writes(struct thread *thread)
}
pthread_mutex_unlock(&peer->io_mtx);

if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */
/* no problem */
if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) {
}

/* problem */
if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) {
reschedule = false; /* problem */
reschedule = false;
fatal = true;
}

Expand All @@ -250,7 +159,7 @@ static int bgp_process_writes(struct thread *thread)
return 0;
}

/**
/*
* Called from I/O pthread when a file descriptor has become ready for reading,
* or has hung up.
*
Expand Down Expand Up @@ -321,8 +230,10 @@ static int bgp_process_reads(struct thread *thread)
/* if this fails we are seriously screwed */
assert(pktsize <= BGP_MAX_PACKET_SIZE);

/* If we have that much data, chuck it into its own
* stream and append to input queue for processing. */
/*
* If we have that much data, chuck it into its own
* stream and append to input queue for processing.
*/
if (ringbuf_remain(ibw) >= pktsize) {
struct stream *pkt = stream_new(pktsize);
assert(ringbuf_get(ibw, pktbuf, pktsize) == pktsize);
Expand Down Expand Up @@ -356,7 +267,7 @@ static int bgp_process_reads(struct thread *thread)
return 0;
}

/**
/*
* Flush peer output buffer.
*
* This function pops packets off of peer->obuf and writes them to peer->fd.
Expand All @@ -379,7 +290,6 @@ static uint16_t bgp_write(struct peer *peer)
uint16_t status = 0;
uint32_t wpkt_quanta_old;

// cache current write quanta
wpkt_quanta_old =
atomic_load_explicit(&peer->bgp->wpkt_quanta, memory_order_relaxed);

Expand All @@ -398,7 +308,7 @@ static uint16_t bgp_write(struct peer *peer)
}

goto done;
} else if (num != writenum) // incomplete write
} else if (num != writenum)
stream_forward_getp(s, num);

} while (num != writenum);
Expand Down Expand Up @@ -427,8 +337,10 @@ static uint16_t bgp_write(struct peer *peer)
if (peer->v_start >= (60 * 2))
peer->v_start = (60 * 2);

/* Handle Graceful Restart case where the state changes
* to Connect instead of Idle */
/*
* Handle Graceful Restart case where the state changes
* to Connect instead of Idle.
*/
BGP_EVENT_ADD(peer, BGP_Stop);
goto done;

Expand Down Expand Up @@ -472,7 +384,7 @@ done : {
return status;
}

/**
/*
* Reads a chunk of data from peer->fd into peer->ibuf_work.
*
* @return status flag (see top-of-file)
Expand Down
17 changes: 0 additions & 17 deletions bgpd/bgp_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,13 @@
#include "bgpd/bgpd.h"
#include "frr_pthread.h"

/**
* Initializes data structures and flags for the write thread.
*
* This function must be called from the main thread before
* bgp_writes_start() is invoked.
*/
extern void bgp_io_init(void);

/**
* Start function for write thread.
*
* @param arg - unused
*/
extern void *bgp_io_start(void *arg);

/**
* Wait until the IO thread is ready to accept jobs.
*
* This function must be called immediately after the thread has been created
* for running. Use of other functions before calling this one will result in
* undefined behavior.
*/
extern void bgp_io_wait_running(void);

/**
* Start function for write thread.
*
Expand Down
Loading

0 comments on commit cbbb31b

Please sign in to comment.