Skip to content

Commit

Permalink
Additional ring_array API (twitter#159)
Browse files Browse the repository at this point in the history
  • Loading branch information
kevyang authored Jul 19, 2018
1 parent 3c994cc commit 95e04b0
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 35 deletions.
29 changes: 25 additions & 4 deletions include/cc_ring_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ extern "C" {

#include <cc_define.h>

#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>

#define RING_ARRAY_DEFAULT_CAP 1024

struct ring_array {
size_t elem_size; /* element size */
uint32_t cap; /* total capacity (# items stored + 1) */
uint32_t cap; /* total capacity */
uint32_t rpos; /* read offset */
uint32_t wpos; /* write offset */
union {
Expand All @@ -49,16 +50,36 @@ struct ring_array {
};
};

/***********************
* Producer thread API *
***********************/

/* push an element into the array */
rstatus_i ring_array_push(const void *elem, struct ring_array *arr);

/* check if array is full */
bool ring_array_full(struct ring_array *arr);


/***********************
* Consumer thread API *
***********************/

/* pop an element from the array */
rstatus_i ring_array_pop(void *elem, struct ring_array *arr);

/* creation/destruction */
struct ring_array *ring_array_create(size_t elem_size, uint32_t cap);
/* check if array is empty */
bool ring_array_empty(struct ring_array *arr);

/* flush contents of ring array */
void ring_array_flush(struct ring_array *arr);

void ring_array_destroy(struct ring_array *arr);

/*****************
* Create/Delete *
*****************/
struct ring_array *ring_array_create(size_t elem_size, uint32_t cap);
void ring_array_destroy(struct ring_array **arr);

#ifdef __cplusplus
}
Expand Down
69 changes: 42 additions & 27 deletions src/cc_ring_array.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
*
*/


static inline uint32_t
ring_array_nelem(uint32_t rpos, uint32_t wpos, uint32_t cap)
{
Expand All @@ -81,31 +80,12 @@ ring_array_nelem(uint32_t rpos, uint32_t wpos, uint32_t cap)
}
}

static inline bool
ring_array_empty(uint32_t rpos, uint32_t wpos)
{
return rpos == wpos;
}

static inline bool
ring_array_full(uint32_t rpos, uint32_t wpos, uint32_t cap)
{
return ring_array_nelem(rpos, wpos, cap) == cap;
}

rstatus_i
ring_array_push(const void *elem, struct ring_array *arr)
{
/**
* Take snapshot of rpos, since another thread might be popping. Note: other
* members of arr do not need to be saved because we assume the other thread
* only pops and does not push; in other words, only one thread updates
* either rpos or wpos.
*/
uint32_t new_wpos;
uint32_t rpos = __atomic_load_n(&(arr->rpos), __ATOMIC_RELAXED);

if (ring_array_full(rpos, arr->wpos, arr->cap)) {
if (ring_array_full(arr)) {
log_debug("Could not push to ring array %p; array is full", arr);
return CC_ERROR;
}
Expand All @@ -119,14 +99,25 @@ ring_array_push(const void *elem, struct ring_array *arr)
return CC_OK;
}

bool
ring_array_full(struct ring_array *arr)
{
/*
* Take snapshot of rpos, since another thread might be popping. Note: other
* members of arr do not need to be saved because we assume the other thread
* only pops and does not push; in other words, only one thread updates
* either rpos or wpos.
*/
uint32_t rpos = __atomic_load_n(&(arr->rpos), __ATOMIC_RELAXED);
return ring_array_nelem(rpos, arr->wpos, arr->cap) == arr->cap;
}

rstatus_i
ring_array_pop(void *elem, struct ring_array *arr)
{
/* take snapshot of wpos, since another thread might be pushing */
uint32_t new_rpos;
uint32_t wpos = __atomic_load_n(&(arr->wpos), __ATOMIC_RELAXED);

if (ring_array_empty(arr->rpos, wpos)) {
if (ring_array_empty(arr)) {
log_debug("Could not pop from ring array %p; array is empty", arr);
return CC_ERROR;
}
Expand All @@ -142,11 +133,28 @@ ring_array_pop(void *elem, struct ring_array *arr)
return CC_OK;
}

bool
ring_array_empty(struct ring_array *arr)
{
/* take snapshot of wpos, since another thread might be pushing */
uint32_t wpos = __atomic_load_n(&(arr->wpos), __ATOMIC_RELAXED);
return ring_array_nelem(arr->rpos, wpos, arr->cap) == 0;
}

void
ring_array_flush(struct ring_array *arr)
{
uint32_t wpos = __atomic_load_n(&(arr->wpos), __ATOMIC_RELAXED);
__atomic_store_n(&(arr->rpos), wpos, __ATOMIC_RELAXED);
}

struct ring_array *
ring_array_create(size_t elem_size, uint32_t cap)
{
struct ring_array *arr;

/* underlying array has # items stored + 1, since full is when wpos is 1
element behind wpos */
arr = cc_alloc(RING_ARRAY_HDR_SIZE + elem_size * (cap + 1));

if (arr == NULL) {
Expand All @@ -162,8 +170,15 @@ ring_array_create(size_t elem_size, uint32_t cap)
}

void
ring_array_destroy(struct ring_array *arr)
ring_array_destroy(struct ring_array **arr)
{
log_verb("destroying ring array %p and freeing memory");
cc_free(arr);
log_verb("destroying ring array %p and freeing memory", *arr);

if ((arr == NULL) || (*arr == NULL)) {
log_warn("destroying NULL ring_array pointer");
return;
}

cc_free(*arr);
*arr = NULL;
}
77 changes: 73 additions & 4 deletions test/ring_array/check_ring_array.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,57 @@ START_TEST(test_create_push_pop_destroy)

ck_assert_int_eq(*test_elem, ELEM_VALUE);

ring_array_destroy(arr);
ring_array_destroy(&arr);
#undef ELEM_SIZE
#undef CAP
#undef ELEM_VALUE
}
END_TEST

START_TEST(test_empty)
{
#define ELEM_SIZE sizeof(uint8_t)
#define CAP 10
struct ring_array *arr;
uint8_t data = 0;

arr = ring_array_create(ELEM_SIZE, CAP);
ck_assert(ring_array_empty(arr));

ring_array_push(&data, arr);
ck_assert(!ring_array_empty(arr));

ring_array_pop(NULL, arr);
ck_assert(ring_array_empty(arr));

ring_array_destroy(&arr);
#undef ELEM_SIZE
#undef CAP
}
END_TEST

START_TEST(test_full)
{
#define ELEM_SIZE sizeof(uint8_t)
#define CAP 1
struct ring_array *arr;
uint8_t data = 0;

arr = ring_array_create(ELEM_SIZE, CAP);
ck_assert(!ring_array_full(arr));

ring_array_push(&data, arr);
ck_assert(ring_array_full(arr));

ring_array_pop(NULL, arr);
ck_assert(!ring_array_full(arr));

ring_array_destroy(&arr);
#undef ELEM_SIZE
#undef CAP
}
END_TEST

START_TEST(test_pop_empty)
{
#define ELEM_SIZE sizeof(uint8_t)
Expand All @@ -41,7 +85,7 @@ START_TEST(test_pop_empty)
arr = ring_array_create(ELEM_SIZE, CAP);
ck_assert_int_eq(ring_array_pop(NULL, arr), CC_ERROR);

ring_array_destroy(arr);
ring_array_destroy(&arr);
#undef ELEM_SIZE
#undef CAP
}
Expand All @@ -60,7 +104,7 @@ START_TEST(test_push_full)
}
ck_assert_int_eq(ring_array_push(&i, arr), CC_ERROR);

ring_array_destroy(arr);
ring_array_destroy(&arr);
#undef ELEM_SIZE
#undef CAP
}
Expand All @@ -83,7 +127,29 @@ START_TEST(test_push_pop_many)
ck_assert_int_eq(ring_array_push(&i, arr), CC_OK);
}

ring_array_destroy(arr);
ring_array_destroy(&arr);
#undef ELEM_SIZE
#undef CAP
}
END_TEST

START_TEST(test_flush)
{
#define ELEM_SIZE sizeof(uint8_t)
#define CAP 10
struct ring_array *arr;
uint8_t i;

arr = ring_array_create(ELEM_SIZE, CAP);
for (i = 0; i < CAP; i++) {
ring_array_push(&i, arr);
}

ck_assert(ring_array_full(arr));
ring_array_flush(arr);
ck_assert(ring_array_empty(arr));

ring_array_destroy(&arr);
#undef ELEM_SIZE
#undef CAP
}
Expand All @@ -101,9 +167,12 @@ ring_array_suite(void)
suite_add_tcase(s, tc_ring_array);

tcase_add_test(tc_ring_array, test_create_push_pop_destroy);
tcase_add_test(tc_ring_array, test_empty);
tcase_add_test(tc_ring_array, test_full);
tcase_add_test(tc_ring_array, test_pop_empty);
tcase_add_test(tc_ring_array, test_push_full);
tcase_add_test(tc_ring_array, test_push_pop_many);
tcase_add_test(tc_ring_array, test_flush);

return s;
}
Expand Down

0 comments on commit 95e04b0

Please sign in to comment.