Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FRR pthread improvements #1672

Merged
merged 4 commits into from
Jan 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 25 additions & 113 deletions bgpd/bgp_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,100 +51,12 @@ static bool validate_header(struct peer *);
#define BGP_IO_TRANS_ERR (1 << 0) // EAGAIN or similar occurred
#define BGP_IO_FATAL_ERR (1 << 1) // some kind of fatal TCP error

/* Plumbing & control variables for thread lifecycle
* ------------------------------------------------------------------------ */
bool bgp_io_thread_run;
pthread_mutex_t *running_cond_mtx;
pthread_cond_t *running_cond;

/* Unused callback for thread_add_read() */
static int bgp_io_dummy(struct thread *thread) { return 0; }

/* Poison pill task */
static int bgp_io_finish(struct thread *thread)
{
bgp_io_thread_run = false;
return 0;
}

/* Extern lifecycle control functions. init -> start -> stop
* ------------------------------------------------------------------------ */
void bgp_io_init()
{
bgp_io_thread_run = false;

running_cond_mtx = XCALLOC(MTYPE_PTHREAD_PRIM, sizeof(pthread_mutex_t));
running_cond = XCALLOC(MTYPE_PTHREAD_PRIM, sizeof(pthread_cond_t));

pthread_mutex_init(running_cond_mtx, NULL);
pthread_cond_init(running_cond, NULL);

/* unlocked in bgp_io_wait_running() */
pthread_mutex_lock(running_cond_mtx);
}

void *bgp_io_start(void *arg)
{
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
fpt->master->owner = pthread_self();

// fd so we can sleep in poll()
int sleeper[2];
pipe(sleeper);
thread_add_read(fpt->master, &bgp_io_dummy, NULL, sleeper[0], NULL);

// we definitely don't want to handle signals
fpt->master->handle_signals = false;

struct thread task;

pthread_mutex_lock(running_cond_mtx);
{
bgp_io_thread_run = true;
pthread_cond_signal(running_cond);
}
pthread_mutex_unlock(running_cond_mtx);

while (bgp_io_thread_run) {
if (thread_fetch(fpt->master, &task)) {
thread_call(&task);
}
}

close(sleeper[1]);
close(sleeper[0]);

return NULL;
}

void bgp_io_wait_running()
{
while (!bgp_io_thread_run)
pthread_cond_wait(running_cond, running_cond_mtx);

/* locked in bgp_io_init() */
pthread_mutex_unlock(running_cond_mtx);
}

int bgp_io_stop(void **result, struct frr_pthread *fpt)
{
thread_add_event(fpt->master, &bgp_io_finish, NULL, 0, NULL);
pthread_join(fpt->thread, result);

pthread_mutex_destroy(running_cond_mtx);
pthread_cond_destroy(running_cond);

XFREE(MTYPE_PTHREAD_PRIM, running_cond_mtx);
XFREE(MTYPE_PTHREAD_PRIM, running_cond);

return 0;
}

/* Extern API -------------------------------------------------------------- */
/* Thread external API ----------------------------------------------------- */

void bgp_writes_on(struct peer *peer)
{
assert(bgp_io_thread_run);
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
assert(fpt->running);

assert(peer->status != Deleted);
assert(peer->obuf);
Expand All @@ -154,18 +66,15 @@ void bgp_writes_on(struct peer *peer)
assert(!peer->t_connect_check_w);
assert(peer->fd);

struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);

thread_add_write(fpt->master, bgp_process_writes, peer, peer->fd,
&peer->t_write);
SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
}

void bgp_writes_off(struct peer *peer)
{
assert(bgp_io_thread_run);

struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
assert(fpt->running);

thread_cancel_async(fpt->master, &peer->t_write, NULL);
THREAD_OFF(peer->t_generate_updgrp_packets);
Expand All @@ -175,7 +84,8 @@ void bgp_writes_off(struct peer *peer)

void bgp_reads_on(struct peer *peer)
{
assert(bgp_io_thread_run);
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
assert(fpt->running);

assert(peer->status != Deleted);
assert(peer->ibuf);
Expand All @@ -186,8 +96,6 @@ void bgp_reads_on(struct peer *peer)
assert(!peer->t_connect_check_w);
assert(peer->fd);

struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);

thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
&peer->t_read);

Expand All @@ -196,19 +104,18 @@ void bgp_reads_on(struct peer *peer)

void bgp_reads_off(struct peer *peer)
{
assert(bgp_io_thread_run);

struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
assert(fpt->running);

thread_cancel_async(fpt->master, &peer->t_read, NULL);
THREAD_OFF(peer->t_process_packet);

UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
}

/* Internal functions ------------------------------------------------------- */
/* Thread internal functions ----------------------------------------------- */

/**
/*
* Called from I/O pthread when a file descriptor has become ready for writing.
*/
static int bgp_process_writes(struct thread *thread)
Expand All @@ -231,11 +138,13 @@ static int bgp_process_writes(struct thread *thread)
}
pthread_mutex_unlock(&peer->io_mtx);

if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */
/* no problem */
if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) {
}

/* problem */
if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) {
reschedule = false; /* problem */
reschedule = false;
fatal = true;
}

Expand All @@ -250,7 +159,7 @@ static int bgp_process_writes(struct thread *thread)
return 0;
}

/**
/*
* Called from I/O pthread when a file descriptor has become ready for reading,
* or has hung up.
*
Expand Down Expand Up @@ -321,8 +230,10 @@ static int bgp_process_reads(struct thread *thread)
/* if this fails we are seriously screwed */
assert(pktsize <= BGP_MAX_PACKET_SIZE);

/* If we have that much data, chuck it into its own
* stream and append to input queue for processing. */
/*
* If we have that much data, chuck it into its own
* stream and append to input queue for processing.
*/
if (ringbuf_remain(ibw) >= pktsize) {
struct stream *pkt = stream_new(pktsize);
assert(ringbuf_get(ibw, pktbuf, pktsize) == pktsize);
Expand Down Expand Up @@ -356,7 +267,7 @@ static int bgp_process_reads(struct thread *thread)
return 0;
}

/**
/*
* Flush peer output buffer.
*
* This function pops packets off of peer->obuf and writes them to peer->fd.
Expand All @@ -379,7 +290,6 @@ static uint16_t bgp_write(struct peer *peer)
uint16_t status = 0;
uint32_t wpkt_quanta_old;

// cache current write quanta
wpkt_quanta_old =
atomic_load_explicit(&peer->bgp->wpkt_quanta, memory_order_relaxed);

Expand All @@ -398,7 +308,7 @@ static uint16_t bgp_write(struct peer *peer)
}

goto done;
} else if (num != writenum) // incomplete write
} else if (num != writenum)
stream_forward_getp(s, num);

} while (num != writenum);
Expand Down Expand Up @@ -427,8 +337,10 @@ static uint16_t bgp_write(struct peer *peer)
if (peer->v_start >= (60 * 2))
peer->v_start = (60 * 2);

/* Handle Graceful Restart case where the state changes
* to Connect instead of Idle */
/*
* Handle Graceful Restart case where the state changes
* to Connect instead of Idle.
*/
BGP_EVENT_ADD(peer, BGP_Stop);
goto done;

Expand Down Expand Up @@ -472,7 +384,7 @@ done : {
return status;
}

/**
/*
* Reads a chunk of data from peer->fd into peer->ibuf_work.
*
* @return status flag (see top-of-file)
Expand Down
17 changes: 0 additions & 17 deletions bgpd/bgp_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,13 @@
#include "bgpd/bgpd.h"
#include "frr_pthread.h"

/**
* Initializes data structures and flags for the write thread.
*
* This function must be called from the main thread before
* bgp_writes_start() is invoked.
*/
extern void bgp_io_init(void);

/**
* Start function for write thread.
*
* @param arg - unused
*/
extern void *bgp_io_start(void *arg);

/**
* Wait until the IO thread is ready to accept jobs.
*
* This function must be called immediately after the thread has been created
* for running. Use of other functions before calling this one will result in
* undefined behavior.
*/
extern void bgp_io_wait_running(void);

/**
* Start function for write thread.
*
Expand Down
Loading