From 5c18600b6065a8f2b66c5d6b8d42dee8639415d5 Mon Sep 17 00:00:00 2001 From: Mayank <33252549+mynktl@users.noreply.github.com> Date: Mon, 19 Feb 2018 12:36:20 +0530 Subject: [PATCH] Use of RTE ring buffer API from DPDK in vdev disk AIO back-end (#42) * Use of RTE ring buffer API from DPDK in vdev disk AIO backend --- Makefile.am | 2 +- include/Makefile.am | 9 +- include/rte_atomic.h | 229 +++++++ include/rte_atomic_64.h | 199 ++++++ include/rte_atomic_generic.h | 973 +++++++++++++++++++++++++++++ include/rte_common.h | 483 ++++++++++++++ include/rte_memory.h | 227 +++++++ include/rte_pause.h | 51 ++ include/rte_ring.h | 1143 ++++++++++++++++++++++++++++++++++ include/sys/zfs_context.h | 1 + lib/libzpool/Makefile.am | 1 + lib/libzpool/rte_ring.c | 199 ++++++ lib/libzpool/vdev_disk_aio.c | 128 ++-- 13 files changed, 3588 insertions(+), 57 deletions(-) create mode 100644 include/rte_atomic.h create mode 100644 include/rte_atomic_64.h create mode 100644 include/rte_atomic_generic.h create mode 100644 include/rte_common.h create mode 100644 include/rte_memory.h create mode 100644 include/rte_pause.h create mode 100644 include/rte_ring.h create mode 100644 lib/libzpool/rte_ring.c diff --git a/Makefile.am b/Makefile.am index 3722530d41d2..d3c7e2031467 100644 --- a/Makefile.am +++ b/Makefile.am @@ -51,7 +51,7 @@ commitcheck: cstyle: @find ${top_srcdir} -name '*.[hc]' ! -name 'zfs_config.*' \ - ! -name '*.mod.c' -type f -exec scripts/cstyle.pl -cpP {} \+ + ! -name '*.mod.c' ! -name 'rte_*.[hc]' -type f -exec scripts/cstyle.pl -cpP {} \+ shellcheck: @if type shellcheck > /dev/null 2>&1; then \ diff --git a/include/Makefile.am b/include/Makefile.am index 26ebc43af94e..ae3befef81f2 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -23,7 +23,14 @@ USER_H = \ $(top_srcdir)/include/libzfs_core.h \ $(top_srcdir)/include/libzfs_impl.h \ $(top_srcdir)/include/uzfs_io.h \ - $(top_srcdir)/include/uzfs_mgmt.h + $(top_srcdir)/include/uzfs_mgmt.h \ + $(top_srcdir)/include/rte_ring.h \ + $(top_srcdir)/include/rte_atomic_64.h \ + $(top_srcdir)/include/rte_atomic_generic.h \ + $(top_srcdir)/include/rte_atomic.h \ + $(top_srcdir)/include/rte_common.h \ + $(top_srcdir)/include/rte_memory.h \ + $(top_srcdir)/include/rte_pause.h EXTRA_DIST = $(COMMON_H) $(KERNEL_H) $(USER_H) diff --git a/include/rte_atomic.h b/include/rte_atomic.h new file mode 100644 index 000000000000..5ecaa6693e70 --- /dev/null +++ b/include/rte_atomic.h @@ -0,0 +1,229 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _RTE_ATOMIC_X86_H_ +#define _RTE_ATOMIC_X86_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include + +#if RTE_MAX_LCORE == 1 +#define MPLOCKED /**< No need to insert MP lock prefix. */ +#else +#define MPLOCKED "lock ; " /**< Insert MP lock prefix. */ +#endif + +#define rte_mb() _mm_mfence() + +#define rte_wmb() _mm_sfence() + +#define rte_rmb() _mm_lfence() + +#define rte_smp_mb() rte_mb() + +#define rte_smp_wmb() rte_compiler_barrier() + +#define rte_smp_rmb() rte_compiler_barrier() + +#define rte_io_mb() rte_mb() + +#define rte_io_wmb() rte_compiler_barrier() + +#define rte_io_rmb() rte_compiler_barrier() + +/*------------------------- 16 bit atomic operations -------------------------*/ + +#ifndef RTE_FORCE_INTRINSICS +static inline int +rte_atomic16_cmpset(volatile uint16_t *dst, uint16_t exp, uint16_t src) +{ + uint8_t res; + + asm volatile( + MPLOCKED + "cmpxchgw %[src], %[dst];" + "sete %[res];" + : [res] "=a" (res), /* output */ + [dst] "=m" (*dst) + : [src] "r" (src), /* input */ + "a" (exp), + "m" (*dst) + : "memory"); /* no-clobber list */ + return res; +} + +static inline int rte_atomic16_test_and_set(rte_atomic16_t *v) +{ + return rte_atomic16_cmpset((volatile uint16_t *)&v->cnt, 0, 1); +} + +static inline void +rte_atomic16_inc(rte_atomic16_t *v) +{ + asm volatile( + MPLOCKED + "incw %[cnt]" + : [cnt] "=m" (v->cnt) /* output */ + : "m" (v->cnt) /* input */ + ); +} + +static inline void +rte_atomic16_dec(rte_atomic16_t *v) +{ + asm volatile( + MPLOCKED + "decw %[cnt]" + : [cnt] "=m" (v->cnt) /* output */ + : "m" (v->cnt) /* input */ + ); +} + +static inline int rte_atomic16_inc_and_test(rte_atomic16_t *v) +{ + uint8_t ret; + + asm volatile( + MPLOCKED + "incw %[cnt] ; " + "sete %[ret]" + : [cnt] "+m" (v->cnt), /* output */ + [ret] "=qm" (ret) + ); + return ret != 0; +} + +static inline int rte_atomic16_dec_and_test(rte_atomic16_t *v) +{ + uint8_t ret; + + asm volatile(MPLOCKED + "decw %[cnt] ; " + "sete %[ret]" + : [cnt] "+m" (v->cnt), /* output */ + [ret] "=qm" (ret) + ); + return ret != 0; +} + +/*------------------------- 32 bit atomic operations -------------------------*/ + +static inline int +rte_atomic32_cmpset(volatile uint32_t *dst, uint32_t exp, uint32_t src) +{ + uint8_t res; + + asm volatile( + MPLOCKED + "cmpxchgl %[src], %[dst];" + "sete %[res];" + : [res] "=a" (res), /* output */ + [dst] "=m" (*dst) + : [src] "r" (src), /* input */ + "a" (exp), + "m" (*dst) + : "memory"); /* no-clobber list */ + return res; +} + +static inline int rte_atomic32_test_and_set(rte_atomic32_t *v) +{ + return rte_atomic32_cmpset((volatile uint32_t *)&v->cnt, 0, 1); +} + +static inline void +rte_atomic32_inc(rte_atomic32_t *v) +{ + asm volatile( + MPLOCKED + "incl %[cnt]" + : [cnt] "=m" (v->cnt) /* output */ + : "m" (v->cnt) /* input */ + ); +} + +static inline void +rte_atomic32_dec(rte_atomic32_t *v) +{ + asm volatile( + MPLOCKED + "decl %[cnt]" + : [cnt] "=m" (v->cnt) /* output */ + : "m" (v->cnt) /* input */ + ); +} + +static inline int rte_atomic32_inc_and_test(rte_atomic32_t *v) +{ + uint8_t ret; + + asm volatile( + MPLOCKED + "incl %[cnt] ; " + "sete %[ret]" + : [cnt] "+m" (v->cnt), /* output */ + [ret] "=qm" (ret) + ); + return ret != 0; +} + +static inline int rte_atomic32_dec_and_test(rte_atomic32_t *v) +{ + uint8_t ret; + + asm volatile(MPLOCKED + "decl %[cnt] ; " + "sete %[ret]" + : [cnt] "+m" (v->cnt), /* output */ + [ret] "=qm" (ret) + ); + return ret != 0; +} +#endif + +#ifdef RTE_ARCH_I686 +#include "rte_atomic_32.h" +#else +#include "rte_atomic_64.h" +#endif + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_ATOMIC_X86_H_ */ diff --git a/include/rte_atomic_64.h b/include/rte_atomic_64.h new file mode 100644 index 000000000000..1a53a766bd72 --- /dev/null +++ b/include/rte_atomic_64.h @@ -0,0 +1,199 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * Inspired from FreeBSD src/sys/amd64/include/atomic.h + * Copyright (c) 1998 Doug Rabson + * All rights reserved. + */ + +#ifndef _RTE_ATOMIC_X86_H_ +#error do not include this file directly, use instead +#endif + +#ifndef _RTE_ATOMIC_X86_64_H_ +#define _RTE_ATOMIC_X86_64_H_ + +#include +#include +#include + +/*------------------------- 64 bit atomic operations -------------------------*/ + +#ifndef RTE_FORCE_INTRINSICS +static inline int +rte_atomic64_cmpset(volatile uint64_t *dst, uint64_t exp, uint64_t src) +{ + uint8_t res; + + + asm volatile( + MPLOCKED + "cmpxchgq %[src], %[dst];" + "sete %[res];" + : [res] "=a" (res), /* output */ + [dst] "=m" (*dst) + : [src] "r" (src), /* input */ + "a" (exp), + "m" (*dst) + : "memory"); /* no-clobber list */ + + return res; +} + +static inline void +rte_atomic64_init(rte_atomic64_t *v) +{ + v->cnt = 0; +} + +static inline int64_t +rte_atomic64_read(rte_atomic64_t *v) +{ + return v->cnt; +} + +static inline void +rte_atomic64_set(rte_atomic64_t *v, int64_t new_value) +{ + v->cnt = new_value; +} + +static inline void +rte_atomic64_add(rte_atomic64_t *v, int64_t inc) +{ + asm volatile( + MPLOCKED + "addq %[inc], %[cnt]" + : [cnt] "=m" (v->cnt) /* output */ + : [inc] "ir" (inc), /* input */ + "m" (v->cnt) + ); +} + +static inline void +rte_atomic64_sub(rte_atomic64_t *v, int64_t dec) +{ + asm volatile( + MPLOCKED + "subq %[dec], %[cnt]" + : [cnt] "=m" (v->cnt) /* output */ + : [dec] "ir" (dec), /* input */ + "m" (v->cnt) + ); +} + +static inline void +rte_atomic64_inc(rte_atomic64_t *v) +{ + asm volatile( + MPLOCKED + "incq %[cnt]" + : [cnt] "=m" (v->cnt) /* output */ + : "m" (v->cnt) /* input */ + ); +} + +static inline void +rte_atomic64_dec(rte_atomic64_t *v) +{ + asm volatile( + MPLOCKED + "decq %[cnt]" + : [cnt] "=m" (v->cnt) /* output */ + : "m" (v->cnt) /* input */ + ); +} + +static inline int64_t +rte_atomic64_add_return(rte_atomic64_t *v, int64_t inc) +{ + int64_t prev = inc; + + asm volatile( + MPLOCKED + "xaddq %[prev], %[cnt]" + : [prev] "+r" (prev), /* output */ + [cnt] "=m" (v->cnt) + : "m" (v->cnt) /* input */ + ); + return prev + inc; +} + +static inline int64_t +rte_atomic64_sub_return(rte_atomic64_t *v, int64_t dec) +{ + return rte_atomic64_add_return(v, -dec); +} + +static inline int rte_atomic64_inc_and_test(rte_atomic64_t *v) +{ + uint8_t ret; + + asm volatile( + MPLOCKED + "incq %[cnt] ; " + "sete %[ret]" + : [cnt] "+m" (v->cnt), /* output */ + [ret] "=qm" (ret) + ); + + return ret != 0; +} + +static inline int rte_atomic64_dec_and_test(rte_atomic64_t *v) +{ + uint8_t ret; + + asm volatile( + MPLOCKED + "decq %[cnt] ; " + "sete %[ret]" + : [cnt] "+m" (v->cnt), /* output */ + [ret] "=qm" (ret) + ); + return ret != 0; +} + +static inline int rte_atomic64_test_and_set(rte_atomic64_t *v) +{ + return rte_atomic64_cmpset((volatile uint64_t *)&v->cnt, 0, 1); +} + +static inline void rte_atomic64_clear(rte_atomic64_t *v) +{ + v->cnt = 0; +} +#endif + +#endif /* _RTE_ATOMIC_X86_64_H_ */ diff --git a/include/rte_atomic_generic.h b/include/rte_atomic_generic.h new file mode 100644 index 000000000000..7b81705b3fba --- /dev/null +++ b/include/rte_atomic_generic.h @@ -0,0 +1,973 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _RTE_ATOMIC_H_ +#define _RTE_ATOMIC_H_ + +/** + * @file + * Atomic Operations + * + * This file defines a generic API for atomic operations. + */ + +#include +#include + +#ifdef __DOXYGEN__ + +/** + * General memory barrier. + * + * Guarantees that the LOAD and STORE operations generated before the + * barrier occur before the LOAD and STORE operations generated after. + * This function is architecture dependent. + */ +static inline void rte_mb(void); + +/** + * Write memory barrier. + * + * Guarantees that the STORE operations generated before the barrier + * occur before the STORE operations generated after. + * This function is architecture dependent. + */ +static inline void rte_wmb(void); + +/** + * Read memory barrier. + * + * Guarantees that the LOAD operations generated before the barrier + * occur before the LOAD operations generated after. + * This function is architecture dependent. + */ +static inline void rte_rmb(void); + +/** + * General memory barrier between lcores + * + * Guarantees that the LOAD and STORE operations that precede the + * rte_smp_mb() call are globally visible across the lcores + * before the the LOAD and STORE operations that follows it. + */ +static inline void rte_smp_mb(void); + +/** + * Write memory barrier between lcores + * + * Guarantees that the STORE operations that precede the + * rte_smp_wmb() call are globally visible across the lcores + * before the the STORE operations that follows it. + */ +static inline void rte_smp_wmb(void); + +/** + * Read memory barrier between lcores + * + * Guarantees that the LOAD operations that precede the + * rte_smp_rmb() call are globally visible across the lcores + * before the the LOAD operations that follows it. + */ +static inline void rte_smp_rmb(void); + +/** + * General memory barrier for I/O device + * + * Guarantees that the LOAD and STORE operations that precede the + * rte_io_mb() call are visible to I/O device or CPU before the + * LOAD and STORE operations that follow it. + */ +static inline void rte_io_mb(void); + +/** + * Write memory barrier for I/O device + * + * Guarantees that the STORE operations that precede the + * rte_io_wmb() call are visible to I/O device before the STORE + * operations that follow it. + */ +static inline void rte_io_wmb(void); + +/** + * Read memory barrier for IO device + * + * Guarantees that the LOAD operations on I/O device that precede the + * rte_io_rmb() call are visible to CPU before the LOAD + * operations that follow it. + */ +static inline void rte_io_rmb(void); + +#endif /* __DOXYGEN__ */ + +/** + * Compiler barrier. + * + * Guarantees that operation reordering does not occur at compile time + * for operations directly before and after the barrier. + */ +#define rte_compiler_barrier() do { \ + asm volatile ("" : : : "memory"); \ +} while(0) + +/*------------------------- 16 bit atomic operations -------------------------*/ + +/** + * Atomic compare and set. + * + * (atomic) equivalent to: + * if (*dst == exp) + * *dst = src (all 16-bit words) + * + * @param dst + * The destination location into which the value will be written. + * @param exp + * The expected value. + * @param src + * The new value. + * @return + * Non-zero on success; 0 on failure. + */ +static inline int +rte_atomic16_cmpset(volatile uint16_t *dst, uint16_t exp, uint16_t src); + +#ifdef RTE_FORCE_INTRINSICS +static inline int +rte_atomic16_cmpset(volatile uint16_t *dst, uint16_t exp, uint16_t src) +{ + return __sync_bool_compare_and_swap(dst, exp, src); +} +#endif + +/** + * The atomic counter structure. + */ +typedef struct { + volatile int16_t cnt; /**< An internal counter value. */ +} rte_atomic16_t; + +/** + * Static initializer for an atomic counter. + */ +#define RTE_ATOMIC16_INIT(val) { (val) } + +/** + * Initialize an atomic counter. + * + * @param v + * A pointer to the atomic counter. + */ +static inline void +rte_atomic16_init(rte_atomic16_t *v) +{ + v->cnt = 0; +} + +/** + * Atomically read a 16-bit value from a counter. + * + * @param v + * A pointer to the atomic counter. + * @return + * The value of the counter. + */ +static inline int16_t +rte_atomic16_read(const rte_atomic16_t *v) +{ + return v->cnt; +} + +/** + * Atomically set a counter to a 16-bit value. + * + * @param v + * A pointer to the atomic counter. + * @param new_value + * The new value for the counter. + */ +static inline void +rte_atomic16_set(rte_atomic16_t *v, int16_t new_value) +{ + v->cnt = new_value; +} + +/** + * Atomically add a 16-bit value to an atomic counter. + * + * @param v + * A pointer to the atomic counter. + * @param inc + * The value to be added to the counter. + */ +static inline void +rte_atomic16_add(rte_atomic16_t *v, int16_t inc) +{ + __sync_fetch_and_add(&v->cnt, inc); +} + +/** + * Atomically subtract a 16-bit value from an atomic counter. + * + * @param v + * A pointer to the atomic counter. + * @param dec + * The value to be subtracted from the counter. + */ +static inline void +rte_atomic16_sub(rte_atomic16_t *v, int16_t dec) +{ + __sync_fetch_and_sub(&v->cnt, dec); +} + +/** + * Atomically increment a counter by one. + * + * @param v + * A pointer to the atomic counter. + */ +static inline void +rte_atomic16_inc(rte_atomic16_t *v); + +#ifdef RTE_FORCE_INTRINSICS +static inline void +rte_atomic16_inc(rte_atomic16_t *v) +{ + rte_atomic16_add(v, 1); +} +#endif + +/** + * Atomically decrement a counter by one. + * + * @param v + * A pointer to the atomic counter. + */ +static inline void +rte_atomic16_dec(rte_atomic16_t *v); + +#ifdef RTE_FORCE_INTRINSICS +static inline void +rte_atomic16_dec(rte_atomic16_t *v) +{ + rte_atomic16_sub(v, 1); +} +#endif + +/** + * Atomically add a 16-bit value to a counter and return the result. + * + * Atomically adds the 16-bits value (inc) to the atomic counter (v) and + * returns the value of v after addition. + * + * @param v + * A pointer to the atomic counter. + * @param inc + * The value to be added to the counter. + * @return + * The value of v after the addition. + */ +static inline int16_t +rte_atomic16_add_return(rte_atomic16_t *v, int16_t inc) +{ + return __sync_add_and_fetch(&v->cnt, inc); +} + +/** + * Atomically subtract a 16-bit value from a counter and return + * the result. + * + * Atomically subtracts the 16-bit value (inc) from the atomic counter + * (v) and returns the value of v after the subtraction. + * + * @param v + * A pointer to the atomic counter. + * @param dec + * The value to be subtracted from the counter. + * @return + * The value of v after the subtraction. + */ +static inline int16_t +rte_atomic16_sub_return(rte_atomic16_t *v, int16_t dec) +{ + return __sync_sub_and_fetch(&v->cnt, dec); +} + +/** + * Atomically increment a 16-bit counter by one and test. + * + * Atomically increments the atomic counter (v) by one and returns true if + * the result is 0, or false in all other cases. + * + * @param v + * A pointer to the atomic counter. + * @return + * True if the result after the increment operation is 0; false otherwise. + */ +static inline int rte_atomic16_inc_and_test(rte_atomic16_t *v); + +#ifdef RTE_FORCE_INTRINSICS +static inline int rte_atomic16_inc_and_test(rte_atomic16_t *v) +{ + return __sync_add_and_fetch(&v->cnt, 1) == 0; +} +#endif + +/** + * Atomically decrement a 16-bit counter by one and test. + * + * Atomically decrements the atomic counter (v) by one and returns true if + * the result is 0, or false in all other cases. + * + * @param v + * A pointer to the atomic counter. + * @return + * True if the result after the decrement operation is 0; false otherwise. + */ +static inline int rte_atomic16_dec_and_test(rte_atomic16_t *v); + +#ifdef RTE_FORCE_INTRINSICS +static inline int rte_atomic16_dec_and_test(rte_atomic16_t *v) +{ + return __sync_sub_and_fetch(&v->cnt, 1) == 0; +} +#endif + +/** + * Atomically test and set a 16-bit atomic counter. + * + * If the counter value is already set, return 0 (failed). Otherwise, set + * the counter value to 1 and return 1 (success). + * + * @param v + * A pointer to the atomic counter. + * @return + * 0 if failed; else 1, success. + */ +static inline int rte_atomic16_test_and_set(rte_atomic16_t *v); + +#ifdef RTE_FORCE_INTRINSICS +static inline int rte_atomic16_test_and_set(rte_atomic16_t *v) +{ + return rte_atomic16_cmpset((volatile uint16_t *)&v->cnt, 0, 1); +} +#endif + +/** + * Atomically set a 16-bit counter to 0. + * + * @param v + * A pointer to the atomic counter. + */ +static inline void rte_atomic16_clear(rte_atomic16_t *v) +{ + v->cnt = 0; +} + +/*------------------------- 32 bit atomic operations -------------------------*/ + +/** + * Atomic compare and set. + * + * (atomic) equivalent to: + * if (*dst == exp) + * *dst = src (all 32-bit words) + * + * @param dst + * The destination location into which the value will be written. + * @param exp + * The expected value. + * @param src + * The new value. + * @return + * Non-zero on success; 0 on failure. + */ +static inline int +rte_atomic32_cmpset(volatile uint32_t *dst, uint32_t exp, uint32_t src); + +#ifdef RTE_FORCE_INTRINSICS +static inline int +rte_atomic32_cmpset(volatile uint32_t *dst, uint32_t exp, uint32_t src) +{ + return __sync_bool_compare_and_swap(dst, exp, src); +} +#endif + +/** + * The atomic counter structure. + */ +typedef struct { + volatile int32_t cnt; /**< An internal counter value. */ +} rte_atomic32_t; + +/** + * Static initializer for an atomic counter. + */ +#define RTE_ATOMIC32_INIT(val) { (val) } + +/** + * Initialize an atomic counter. + * + * @param v + * A pointer to the atomic counter. + */ +static inline void +rte_atomic32_init(rte_atomic32_t *v) +{ + v->cnt = 0; +} + +/** + * Atomically read a 32-bit value from a counter. + * + * @param v + * A pointer to the atomic counter. + * @return + * The value of the counter. + */ +static inline int32_t +rte_atomic32_read(const rte_atomic32_t *v) +{ + return v->cnt; +} + +/** + * Atomically set a counter to a 32-bit value. + * + * @param v + * A pointer to the atomic counter. + * @param new_value + * The new value for the counter. + */ +static inline void +rte_atomic32_set(rte_atomic32_t *v, int32_t new_value) +{ + v->cnt = new_value; +} + +/** + * Atomically add a 32-bit value to an atomic counter. + * + * @param v + * A pointer to the atomic counter. + * @param inc + * The value to be added to the counter. + */ +static inline void +rte_atomic32_add(rte_atomic32_t *v, int32_t inc) +{ + __sync_fetch_and_add(&v->cnt, inc); +} + +/** + * Atomically subtract a 32-bit value from an atomic counter. + * + * @param v + * A pointer to the atomic counter. + * @param dec + * The value to be subtracted from the counter. + */ +static inline void +rte_atomic32_sub(rte_atomic32_t *v, int32_t dec) +{ + __sync_fetch_and_sub(&v->cnt, dec); +} + +/** + * Atomically increment a counter by one. + * + * @param v + * A pointer to the atomic counter. + */ +static inline void +rte_atomic32_inc(rte_atomic32_t *v); + +#ifdef RTE_FORCE_INTRINSICS +static inline void +rte_atomic32_inc(rte_atomic32_t *v) +{ + rte_atomic32_add(v, 1); +} +#endif + +/** + * Atomically decrement a counter by one. + * + * @param v + * A pointer to the atomic counter. + */ +static inline void +rte_atomic32_dec(rte_atomic32_t *v); + +#ifdef RTE_FORCE_INTRINSICS +static inline void +rte_atomic32_dec(rte_atomic32_t *v) +{ + rte_atomic32_sub(v,1); +} +#endif + +/** + * Atomically add a 32-bit value to a counter and return the result. + * + * Atomically adds the 32-bits value (inc) to the atomic counter (v) and + * returns the value of v after addition. + * + * @param v + * A pointer to the atomic counter. + * @param inc + * The value to be added to the counter. + * @return + * The value of v after the addition. + */ +static inline int32_t +rte_atomic32_add_return(rte_atomic32_t *v, int32_t inc) +{ + return __sync_add_and_fetch(&v->cnt, inc); +} + +/** + * Atomically subtract a 32-bit value from a counter and return + * the result. + * + * Atomically subtracts the 32-bit value (inc) from the atomic counter + * (v) and returns the value of v after the subtraction. + * + * @param v + * A pointer to the atomic counter. + * @param dec + * The value to be subtracted from the counter. + * @return + * The value of v after the subtraction. + */ +static inline int32_t +rte_atomic32_sub_return(rte_atomic32_t *v, int32_t dec) +{ + return __sync_sub_and_fetch(&v->cnt, dec); +} + +/** + * Atomically increment a 32-bit counter by one and test. + * + * Atomically increments the atomic counter (v) by one and returns true if + * the result is 0, or false in all other cases. + * + * @param v + * A pointer to the atomic counter. + * @return + * True if the result after the increment operation is 0; false otherwise. + */ +static inline int rte_atomic32_inc_and_test(rte_atomic32_t *v); + +#ifdef RTE_FORCE_INTRINSICS +static inline int rte_atomic32_inc_and_test(rte_atomic32_t *v) +{ + return __sync_add_and_fetch(&v->cnt, 1) == 0; +} +#endif + +/** + * Atomically decrement a 32-bit counter by one and test. + * + * Atomically decrements the atomic counter (v) by one and returns true if + * the result is 0, or false in all other cases. + * + * @param v + * A pointer to the atomic counter. + * @return + * True if the result after the decrement operation is 0; false otherwise. + */ +static inline int rte_atomic32_dec_and_test(rte_atomic32_t *v); + +#ifdef RTE_FORCE_INTRINSICS +static inline int rte_atomic32_dec_and_test(rte_atomic32_t *v) +{ + return __sync_sub_and_fetch(&v->cnt, 1) == 0; +} +#endif + +/** + * Atomically test and set a 32-bit atomic counter. + * + * If the counter value is already set, return 0 (failed). Otherwise, set + * the counter value to 1 and return 1 (success). + * + * @param v + * A pointer to the atomic counter. + * @return + * 0 if failed; else 1, success. + */ +static inline int rte_atomic32_test_and_set(rte_atomic32_t *v); + +#ifdef RTE_FORCE_INTRINSICS +static inline int rte_atomic32_test_and_set(rte_atomic32_t *v) +{ + return rte_atomic32_cmpset((volatile uint32_t *)&v->cnt, 0, 1); +} +#endif + +/** + * Atomically set a 32-bit counter to 0. + * + * @param v + * A pointer to the atomic counter. + */ +static inline void rte_atomic32_clear(rte_atomic32_t *v) +{ + v->cnt = 0; +} + +/*------------------------- 64 bit atomic operations -------------------------*/ + +/** + * An atomic compare and set function used by the mutex functions. + * (atomic) equivalent to: + * if (*dst == exp) + * *dst = src (all 64-bit words) + * + * @param dst + * The destination into which the value will be written. + * @param exp + * The expected value. + * @param src + * The new value. + * @return + * Non-zero on success; 0 on failure. + */ +static inline int +rte_atomic64_cmpset(volatile uint64_t *dst, uint64_t exp, uint64_t src); + +#ifdef RTE_FORCE_INTRINSICS +static inline int +rte_atomic64_cmpset(volatile uint64_t *dst, uint64_t exp, uint64_t src) +{ + return __sync_bool_compare_and_swap(dst, exp, src); +} +#endif + +/** + * The atomic counter structure. + */ +typedef struct { + volatile int64_t cnt; /**< Internal counter value. */ +} rte_atomic64_t; + +/** + * Static initializer for an atomic counter. + */ +#define RTE_ATOMIC64_INIT(val) { (val) } + +/** + * Initialize the atomic counter. + * + * @param v + * A pointer to the atomic counter. + */ +static inline void +rte_atomic64_init(rte_atomic64_t *v); + +#ifdef RTE_FORCE_INTRINSICS +static inline void +rte_atomic64_init(rte_atomic64_t *v) +{ +#ifdef __LP64__ + v->cnt = 0; +#else + int success = 0; + uint64_t tmp; + + while (success == 0) { + tmp = v->cnt; + success = rte_atomic64_cmpset((volatile uint64_t *)&v->cnt, + tmp, 0); + } +#endif +} +#endif + +/** + * Atomically read a 64-bit counter. + * + * @param v + * A pointer to the atomic counter. + * @return + * The value of the counter. + */ +static inline int64_t +rte_atomic64_read(rte_atomic64_t *v); + +#ifdef RTE_FORCE_INTRINSICS +static inline int64_t +rte_atomic64_read(rte_atomic64_t *v) +{ +#ifdef __LP64__ + return v->cnt; +#else + int success = 0; + uint64_t tmp; + + while (success == 0) { + tmp = v->cnt; + /* replace the value by itself */ + success = rte_atomic64_cmpset((volatile uint64_t *)&v->cnt, + tmp, tmp); + } + return tmp; +#endif +} +#endif + +/** + * Atomically set a 64-bit counter. + * + * @param v + * A pointer to the atomic counter. + * @param new_value + * The new value of the counter. + */ +static inline void +rte_atomic64_set(rte_atomic64_t *v, int64_t new_value); + +#ifdef RTE_FORCE_INTRINSICS +static inline void +rte_atomic64_set(rte_atomic64_t *v, int64_t new_value) +{ +#ifdef __LP64__ + v->cnt = new_value; +#else + int success = 0; + uint64_t tmp; + + while (success == 0) { + tmp = v->cnt; + success = rte_atomic64_cmpset((volatile uint64_t *)&v->cnt, + tmp, new_value); + } +#endif +} +#endif + +/** + * Atomically add a 64-bit value to a counter. + * + * @param v + * A pointer to the atomic counter. + * @param inc + * The value to be added to the counter. + */ +static inline void +rte_atomic64_add(rte_atomic64_t *v, int64_t inc); + +#ifdef RTE_FORCE_INTRINSICS +static inline void +rte_atomic64_add(rte_atomic64_t *v, int64_t inc) +{ + __sync_fetch_and_add(&v->cnt, inc); +} +#endif + +/** + * Atomically subtract a 64-bit value from a counter. + * + * @param v + * A pointer to the atomic counter. + * @param dec + * The value to be subtracted from the counter. + */ +static inline void +rte_atomic64_sub(rte_atomic64_t *v, int64_t dec); + +#ifdef RTE_FORCE_INTRINSICS +static inline void +rte_atomic64_sub(rte_atomic64_t *v, int64_t dec) +{ + __sync_fetch_and_sub(&v->cnt, dec); +} +#endif + +/** + * Atomically increment a 64-bit counter by one and test. + * + * @param v + * A pointer to the atomic counter. + */ +static inline void +rte_atomic64_inc(rte_atomic64_t *v); + +#ifdef RTE_FORCE_INTRINSICS +static inline void +rte_atomic64_inc(rte_atomic64_t *v) +{ + rte_atomic64_add(v, 1); +} +#endif + +/** + * Atomically decrement a 64-bit counter by one and test. + * + * @param v + * A pointer to the atomic counter. + */ +static inline void +rte_atomic64_dec(rte_atomic64_t *v); + +#ifdef RTE_FORCE_INTRINSICS +static inline void +rte_atomic64_dec(rte_atomic64_t *v) +{ + rte_atomic64_sub(v, 1); +} +#endif + +/** + * Add a 64-bit value to an atomic counter and return the result. + * + * Atomically adds the 64-bit value (inc) to the atomic counter (v) and + * returns the value of v after the addition. + * + * @param v + * A pointer to the atomic counter. + * @param inc + * The value to be added to the counter. + * @return + * The value of v after the addition. + */ +static inline int64_t +rte_atomic64_add_return(rte_atomic64_t *v, int64_t inc); + +#ifdef RTE_FORCE_INTRINSICS +static inline int64_t +rte_atomic64_add_return(rte_atomic64_t *v, int64_t inc) +{ + return __sync_add_and_fetch(&v->cnt, inc); +} +#endif + +/** + * Subtract a 64-bit value from an atomic counter and return the result. + * + * Atomically subtracts the 64-bit value (dec) from the atomic counter (v) + * and returns the value of v after the subtraction. + * + * @param v + * A pointer to the atomic counter. + * @param dec + * The value to be subtracted from the counter. + * @return + * The value of v after the subtraction. + */ +static inline int64_t +rte_atomic64_sub_return(rte_atomic64_t *v, int64_t dec); + +#ifdef RTE_FORCE_INTRINSICS +static inline int64_t +rte_atomic64_sub_return(rte_atomic64_t *v, int64_t dec) +{ + return __sync_sub_and_fetch(&v->cnt, dec); +} +#endif + +/** + * Atomically increment a 64-bit counter by one and test. + * + * Atomically increments the atomic counter (v) by one and returns + * true if the result is 0, or false in all other cases. + * + * @param v + * A pointer to the atomic counter. + * @return + * True if the result after the addition is 0; false otherwise. + */ +static inline int rte_atomic64_inc_and_test(rte_atomic64_t *v); + +#ifdef RTE_FORCE_INTRINSICS +static inline int rte_atomic64_inc_and_test(rte_atomic64_t *v) +{ + return rte_atomic64_add_return(v, 1) == 0; +} +#endif + +/** + * Atomically decrement a 64-bit counter by one and test. + * + * Atomically decrements the atomic counter (v) by one and returns true if + * the result is 0, or false in all other cases. + * + * @param v + * A pointer to the atomic counter. + * @return + * True if the result after subtraction is 0; false otherwise. + */ +static inline int rte_atomic64_dec_and_test(rte_atomic64_t *v); + +#ifdef RTE_FORCE_INTRINSICS +static inline int rte_atomic64_dec_and_test(rte_atomic64_t *v) +{ + return rte_atomic64_sub_return(v, 1) == 0; +} +#endif + +/** + * Atomically test and set a 64-bit atomic counter. + * + * If the counter value is already set, return 0 (failed). Otherwise, set + * the counter value to 1 and return 1 (success). + * + * @param v + * A pointer to the atomic counter. + * @return + * 0 if failed; else 1, success. + */ +static inline int rte_atomic64_test_and_set(rte_atomic64_t *v); + +#ifdef RTE_FORCE_INTRINSICS +static inline int rte_atomic64_test_and_set(rte_atomic64_t *v) +{ + return rte_atomic64_cmpset((volatile uint64_t *)&v->cnt, 0, 1); +} +#endif + +/** + * Atomically set a 64-bit counter to 0. + * + * @param v + * A pointer to the atomic counter. + */ +static inline void rte_atomic64_clear(rte_atomic64_t *v); + +#ifdef RTE_FORCE_INTRINSICS +static inline void rte_atomic64_clear(rte_atomic64_t *v) +{ + rte_atomic64_set(v, 0); +} +#endif + +#endif /* _RTE_ATOMIC_H_ */ diff --git a/include/rte_common.h b/include/rte_common.h new file mode 100644 index 000000000000..de853e1644ba --- /dev/null +++ b/include/rte_common.h @@ -0,0 +1,483 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _RTE_COMMON_H_ +#define _RTE_COMMON_H_ + +/** + * @file + * + * Generic, commonly-used macro and inline function definitions + * for DPDK. + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include +#include + +#ifndef typeof +#define typeof __typeof__ +#endif + +#ifndef asm +#define asm __asm__ +#endif + +/** C extension macro for environments lacking C11 features. */ +#if !defined(__STDC_VERSION__) || __STDC_VERSION__ < 201112L +#define RTE_STD_C11 __extension__ +#else +#define RTE_STD_C11 +#endif + +/** Define GCC_VERSION **/ +#ifdef RTE_TOOLCHAIN_GCC +#define GCC_VERSION (__GNUC__ * 10000 + __GNUC_MINOR__ * 100 + \ + __GNUC_PATCHLEVEL__) +#endif + +#ifdef RTE_ARCH_STRICT_ALIGN +typedef uint64_t unaligned_uint64_t __attribute__ ((aligned(1))); +typedef uint32_t unaligned_uint32_t __attribute__ ((aligned(1))); +typedef uint16_t unaligned_uint16_t __attribute__ ((aligned(1))); +#else +typedef uint64_t unaligned_uint64_t; +typedef uint32_t unaligned_uint32_t; +typedef uint16_t unaligned_uint16_t; +#endif + +/** + * Force alignment + */ +#define __rte_aligned(a) __attribute__((__aligned__(a))) + +/** + * Force a structure to be packed + */ +#define __rte_packed __attribute__((__packed__)) + +/******* Macro to mark functions and fields scheduled for removal *****/ +#define __rte_deprecated __attribute__((__deprecated__)) + +/*********** Macros to eliminate unused variable warnings ********/ + +/** + * short definition to mark a function parameter unused + */ +#define __rte_unused __attribute__((__unused__)) + +/** + * definition to mark a variable or function parameter as used so + * as to avoid a compiler warning + */ +#define RTE_SET_USED(x) (void)(x) + +/** + * Run function before main() with low priority. + * + * The constructor will be run after prioritized constructors. + * + * @param func + * Constructor function. + */ +#define RTE_INIT(func) \ +static void __attribute__((constructor, used)) func(void) + +/** + * Run function before main() with high priority. + * + * @param func + * Constructor function. + * @param prio + * Priority number must be above 100. + * Lowest number is the first to run. + */ +#define RTE_INIT_PRIO(func, prio) \ +static void __attribute__((constructor(prio), used)) func(void) + +/** + * Force a function to be inlined + */ +#define __rte_always_inline inline __attribute__((always_inline)) + +/** + * Force a function to be noinlined + */ +#define __rte_noinline __attribute__((noinline)) + +/*********** Macros for pointer arithmetic ********/ + +/** + * add a byte-value offset from a pointer + */ +#define RTE_PTR_ADD(ptr, x) ((void*)((uintptr_t)(ptr) + (x))) + +/** + * subtract a byte-value offset from a pointer + */ +#define RTE_PTR_SUB(ptr, x) ((void*)((uintptr_t)ptr - (x))) + +/** + * get the difference between two pointer values, i.e. how far apart + * in bytes are the locations they point two. It is assumed that + * ptr1 is greater than ptr2. + */ +#define RTE_PTR_DIFF(ptr1, ptr2) ((uintptr_t)(ptr1) - (uintptr_t)(ptr2)) + +/*********** Macros/static functions for doing alignment ********/ + + +/** + * Macro to align a pointer to a given power-of-two. The resultant + * pointer will be a pointer of the same type as the first parameter, and + * point to an address no higher than the first parameter. Second parameter + * must be a power-of-two value. + */ +#define RTE_PTR_ALIGN_FLOOR(ptr, align) \ + ((typeof(ptr))RTE_ALIGN_FLOOR((uintptr_t)ptr, align)) + +/** + * Macro to align a value to a given power-of-two. The resultant value + * will be of the same type as the first parameter, and will be no + * bigger than the first parameter. Second parameter must be a + * power-of-two value. + */ +#define RTE_ALIGN_FLOOR(val, align) \ + (typeof(val))((val) & (~((typeof(val))((align) - 1)))) + +/** + * Macro to align a pointer to a given power-of-two. The resultant + * pointer will be a pointer of the same type as the first parameter, and + * point to an address no lower than the first parameter. Second parameter + * must be a power-of-two value. + */ +#define RTE_PTR_ALIGN_CEIL(ptr, align) \ + RTE_PTR_ALIGN_FLOOR((typeof(ptr))RTE_PTR_ADD(ptr, (align) - 1), align) + +/** + * Macro to align a value to a given power-of-two. The resultant value + * will be of the same type as the first parameter, and will be no lower + * than the first parameter. Second parameter must be a power-of-two + * value. + */ +#define RTE_ALIGN_CEIL(val, align) \ + RTE_ALIGN_FLOOR(((val) + ((typeof(val)) (align) - 1)), align) + +/** + * Macro to align a pointer to a given power-of-two. The resultant + * pointer will be a pointer of the same type as the first parameter, and + * point to an address no lower than the first parameter. Second parameter + * must be a power-of-two value. + * This function is the same as RTE_PTR_ALIGN_CEIL + */ +#define RTE_PTR_ALIGN(ptr, align) RTE_PTR_ALIGN_CEIL(ptr, align) + +/** + * Macro to align a value to a given power-of-two. The resultant + * value will be of the same type as the first parameter, and + * will be no lower than the first parameter. Second parameter + * must be a power-of-two value. + * This function is the same as RTE_ALIGN_CEIL + */ +#define RTE_ALIGN(val, align) RTE_ALIGN_CEIL(val, align) + +/** + * Checks if a pointer is aligned to a given power-of-two value + * + * @param ptr + * The pointer whose alignment is to be checked + * @param align + * The power-of-two value to which the ptr should be aligned + * + * @return + * True(1) where the pointer is correctly aligned, false(0) otherwise + */ +static inline int +rte_is_aligned(void *ptr, unsigned align) +{ + return RTE_PTR_ALIGN(ptr, align) == ptr; +} + +/*********** Macros for compile type checks ********/ + +/** + * Triggers an error at compilation time if the condition is true. + */ +#ifndef __OPTIMIZE__ +#define RTE_BUILD_BUG_ON(condition) ((void)sizeof(char[1 - 2*!!(condition)])) +#else +extern int RTE_BUILD_BUG_ON_detected_error; +#define RTE_BUILD_BUG_ON(condition) do { \ + ((void)sizeof(char[1 - 2*!!(condition)])); \ + if (condition) \ + RTE_BUILD_BUG_ON_detected_error = 1; \ +} while(0) +#endif + +/*********** Macros to work with powers of 2 ********/ + +/** + * Returns true if n is a power of 2 + * @param n + * Number to check + * @return 1 if true, 0 otherwise + */ +static inline int +rte_is_power_of_2(uint32_t n) +{ + return n && !(n & (n - 1)); +} + +/** + * Aligns input parameter to the next power of 2 + * + * @param x + * The integer value to algin + * + * @return + * Input parameter aligned to the next power of 2 + */ +static inline uint32_t +rte_align32pow2(uint32_t x) +{ + x--; + x |= x >> 1; + x |= x >> 2; + x |= x >> 4; + x |= x >> 8; + x |= x >> 16; + + return x + 1; +} + +/** + * Aligns 64b input parameter to the next power of 2 + * + * @param v + * The 64b value to align + * + * @return + * Input parameter aligned to the next power of 2 + */ +static inline uint64_t +rte_align64pow2(uint64_t v) +{ + v--; + v |= v >> 1; + v |= v >> 2; + v |= v >> 4; + v |= v >> 8; + v |= v >> 16; + v |= v >> 32; + + return v + 1; +} + +/*********** Macros for calculating min and max **********/ + +/** + * Macro to return the minimum of two numbers + */ +#define RTE_MIN(a, b) \ + __extension__ ({ \ + typeof (a) _a = (a); \ + typeof (b) _b = (b); \ + _a < _b ? _a : _b; \ + }) + +/** + * Macro to return the maximum of two numbers + */ +#define RTE_MAX(a, b) \ + __extension__ ({ \ + typeof (a) _a = (a); \ + typeof (b) _b = (b); \ + _a > _b ? _a : _b; \ + }) + +/*********** Other general functions / macros ********/ + +/** + * Searches the input parameter for the least significant set bit + * (starting from zero). + * If a least significant 1 bit is found, its bit index is returned. + * If the content of the input parameter is zero, then the content of the return + * value is undefined. + * @param v + * input parameter, should not be zero. + * @return + * least significant set bit in the input parameter. + */ +static inline uint32_t +rte_bsf32(uint32_t v) +{ + return __builtin_ctz(v); +} + +/** + * Return the rounded-up log2 of a integer. + * + * @param v + * The input parameter. + * @return + * The rounded-up log2 of the input, or 0 if the input is 0. + */ +static inline uint32_t +rte_log2_u32(uint32_t v) +{ + if (v == 0) + return 0; + v = rte_align32pow2(v); + return rte_bsf32(v); +} + +#ifndef offsetof +/** Return the offset of a field in a structure. */ +#define offsetof(TYPE, MEMBER) __builtin_offsetof (TYPE, MEMBER) +#endif + +/** + * Return pointer to the wrapping struct instance. + * + * Example: + * + * struct wrapper { + * ... + * struct child c; + * ... + * }; + * + * struct child *x = obtain(...); + * struct wrapper *w = container_of(x, struct wrapper, c); + */ +#ifndef container_of +#define container_of(ptr, type, member) __extension__ ({ \ + const typeof(((type *)0)->member) *_ptr = (ptr); \ + __attribute__((unused)) type *_target_ptr = \ + (type *)(ptr); \ + (type *)(((uintptr_t)_ptr) - offsetof(type, member)); \ + }) +#endif + +#define _RTE_STR(x) #x +/** Take a macro value and get a string version of it */ +#define RTE_STR(x) _RTE_STR(x) + +/** + * ISO C helpers to modify format strings using variadic macros. + * This is a replacement for the ", ## __VA_ARGS__" GNU extension. + * An empty %s argument is appended to avoid a dangling comma. + */ +#define RTE_FMT(fmt, ...) fmt "%.0s", __VA_ARGS__ "" +#define RTE_FMT_HEAD(fmt, ...) fmt +#define RTE_FMT_TAIL(fmt, ...) __VA_ARGS__ + +/** Mask value of type "tp" for the first "ln" bit set. */ +#define RTE_LEN2MASK(ln, tp) \ + ((tp)((uint64_t)-1 >> (sizeof(uint64_t) * CHAR_BIT - (ln)))) + +/** Number of elements in the array. */ +#define RTE_DIM(a) (sizeof (a) / sizeof ((a)[0])) + +/** + * Converts a numeric string to the equivalent uint64_t value. + * As well as straight number conversion, also recognises the suffixes + * k, m and g for kilobytes, megabytes and gigabytes respectively. + * + * If a negative number is passed in i.e. a string with the first non-black + * character being "-", zero is returned. Zero is also returned in the case of + * an error with the strtoull call in the function. + * + * @param str + * String containing number to convert. + * @return + * Number. + */ +static inline uint64_t +rte_str_to_size(const char *str) +{ + char *endptr; + unsigned long long size; + + while (isspace((int)*str)) + str++; + if (*str == '-') + return 0; + + errno = 0; + size = strtoull(str, &endptr, 0); + if (errno) + return 0; + + if (*endptr == ' ') + endptr++; /* allow 1 space gap */ + + switch (*endptr){ + case 'G': case 'g': size *= 1024; /* fall-through */ + case 'M': case 'm': size *= 1024; /* fall-through */ + case 'K': case 'k': size *= 1024; /* fall-through */ + default: + break; + } + return size; +} + +/** + * Function to terminate the application immediately, printing an error + * message and returning the exit_code back to the shell. + * + * This function never returns + * + * @param exit_code + * The exit code to be returned by the application + * @param format + * The format string to be used for printing the message. This can include + * printf format characters which will be expanded using any further parameters + * to the function. + */ +void +rte_exit(int exit_code, const char *format, ...) + __attribute__((noreturn)) + __attribute__((format(printf, 2, 3))); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/include/rte_memory.h b/include/rte_memory.h new file mode 100644 index 000000000000..8d17eb61fe0b --- /dev/null +++ b/include/rte_memory.h @@ -0,0 +1,227 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _RTE_MEMORY_H_ +#define _RTE_MEMORY_H_ + +/** + * @file + * + * Memory-related RTE API. + */ + +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +__extension__ +enum rte_page_sizes { + RTE_PGSIZE_4K = 1ULL << 12, + RTE_PGSIZE_64K = 1ULL << 16, + RTE_PGSIZE_256K = 1ULL << 18, + RTE_PGSIZE_2M = 1ULL << 21, + RTE_PGSIZE_16M = 1ULL << 24, + RTE_PGSIZE_256M = 1ULL << 28, + RTE_PGSIZE_512M = 1ULL << 29, + RTE_PGSIZE_1G = 1ULL << 30, + RTE_PGSIZE_4G = 1ULL << 32, + RTE_PGSIZE_16G = 1ULL << 34, +}; + +#define SOCKET_ID_ANY -1 /**< Any NUMA socket. */ +#define RTE_CACHE_LINE_SIZE 64 /** uzfs **/ +#define RTE_CACHE_LINE_MASK (RTE_CACHE_LINE_SIZE-1) /**< Cache line mask. */ + +#define RTE_CACHE_LINE_ROUNDUP(size) \ + (RTE_CACHE_LINE_SIZE * ((size + RTE_CACHE_LINE_SIZE - 1) / RTE_CACHE_LINE_SIZE)) +/**< Return the first cache-aligned value greater or equal to size. */ + +/**< Cache line size in terms of log2 */ +#if RTE_CACHE_LINE_SIZE == 64 +#define RTE_CACHE_LINE_SIZE_LOG2 6 +#elif RTE_CACHE_LINE_SIZE == 128 +#define RTE_CACHE_LINE_SIZE_LOG2 7 +#else +#error "Unsupported cache line size" +#endif + +#define RTE_CACHE_LINE_MIN_SIZE 64 /**< Minimum Cache line size. */ + +/** + * Force alignment to cache line. + */ +#define __rte_cache_aligned __rte_aligned(RTE_CACHE_LINE_SIZE) + +/** + * Force minimum cache line alignment. + */ +#define __rte_cache_min_aligned __rte_aligned(RTE_CACHE_LINE_MIN_SIZE) + +typedef uint64_t phys_addr_t; /**< Physical address. */ +#define RTE_BAD_PHYS_ADDR ((phys_addr_t)-1) +/** + * IO virtual address type. + * When the physical addressing mode (IOVA as PA) is in use, + * the translation from an IO virtual address (IOVA) to a physical address + * is a direct mapping, i.e. the same value. + * Otherwise, in virtual mode (IOVA as VA), an IOMMU may do the translation. + */ +typedef uint64_t rte_iova_t; +#define RTE_BAD_IOVA ((rte_iova_t)-1) + +/** + * Physical memory segment descriptor. + */ +struct rte_memseg { + RTE_STD_C11 + union { + phys_addr_t phys_addr; /**< deprecated - Start physical address. */ + rte_iova_t iova; /**< Start IO address. */ + }; + RTE_STD_C11 + union { + void *addr; /**< Start virtual address. */ + uint64_t addr_64; /**< Makes sure addr is always 64 bits */ + }; + size_t len; /**< Length of the segment. */ + uint64_t hugepage_sz; /**< The pagesize of underlying memory */ + int32_t socket_id; /**< NUMA socket ID. */ + uint32_t nchannel; /**< Number of channels. */ + uint32_t nrank; /**< Number of ranks. */ +} __rte_packed; + +/** + * Lock page in physical memory and prevent from swapping. + * + * @param virt + * The virtual address. + * @return + * 0 on success, negative on error. + */ +int rte_mem_lock_page(const void *virt); + +/** + * Get physical address of any mapped virtual address in the current process. + * It is found by browsing the /proc/self/pagemap special file. + * The page must be locked. + * + * @param virt + * The virtual address. + * @return + * The physical address or RTE_BAD_IOVA on error. + */ +phys_addr_t rte_mem_virt2phy(const void *virt); + +/** + * Get IO virtual address of any mapped virtual address in the current process. + * + * @param virt + * The virtual address. + * @return + * The IO address or RTE_BAD_IOVA on error. + */ +rte_iova_t rte_mem_virt2iova(const void *virt); + +/** + * Get the layout of the available physical memory. + * + * It can be useful for an application to have the full physical + * memory layout to decide the size of a memory zone to reserve. This + * table is stored in rte_config (see rte_eal_get_configuration()). + * + * @return + * - On success, return a pointer to a read-only table of struct + * rte_physmem_desc elements, containing the layout of all + * addressable physical memory. The last element of the table + * contains a NULL address. + * - On error, return NULL. This should not happen since it is a fatal + * error that will probably cause the entire system to panic. + */ +const struct rte_memseg *rte_eal_get_physmem_layout(void); + +/** + * Dump the physical memory layout to a file. + * + * @param f + * A pointer to a file for output + */ +void rte_dump_physmem_layout(FILE *f); + +/** + * Get the total amount of available physical memory. + * + * @return + * The total amount of available physical memory in bytes. + */ +uint64_t rte_eal_get_physmem_size(void); + +/** + * Get the number of memory channels. + * + * @return + * The number of memory channels on the system. The value is 0 if unknown + * or not the same on all devices. + */ +unsigned rte_memory_get_nchannel(void); + +/** + * Get the number of memory ranks. + * + * @return + * The number of memory ranks on the system. The value is 0 if unknown or + * not the same on all devices. + */ +unsigned rte_memory_get_nrank(void); + +/** + * Drivers based on uio will not load unless physical + * addresses are obtainable. It is only possible to get + * physical addresses when running as a privileged user. + * + * @return + * 1 if the system is able to obtain physical addresses. + * 0 if using DMA addresses through an IOMMU. + */ +int rte_eal_using_phys_addrs(void); + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_MEMORY_H_ */ diff --git a/include/rte_pause.h b/include/rte_pause.h new file mode 100644 index 000000000000..7ad647085dc3 --- /dev/null +++ b/include/rte_pause.h @@ -0,0 +1,51 @@ +/*- + * BSD LICENSE + * + * Copyright(c) Cavium, Inc. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Cavium, Inc nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _RTE_PAUSE_X86_H_ +#define _RTE_PAUSE_X86_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +static inline void rte_pause(void) +{ + _mm_pause(); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_PAUSE_X86_H_ */ diff --git a/include/rte_ring.h b/include/rte_ring.h new file mode 100644 index 000000000000..32b77d87ed4b --- /dev/null +++ b/include/rte_ring.h @@ -0,0 +1,1143 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2017 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * Derived from FreeBSD's bufring.h + * + ************************************************************************** + * + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. The name of Kip Macy nor the names of other + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + ***************************************************************************/ + +#ifndef _RTE_RING_H_ +#define _RTE_RING_H_ + +/** + * @file + * RTE Ring + * + * The Ring Manager is a fixed-size queue, implemented as a table of + * pointers. Head and tail pointers are modified atomically, allowing + * concurrent access to it. It has the following features: + * + * - FIFO (First In First Out) + * - Maximum size is fixed; the pointers are stored in a table. + * - Lockless implementation. + * - Multi- or single-consumer dequeue. + * - Multi- or single-producer enqueue. + * - Bulk dequeue. + * - Bulk enqueue. + * + * Note: the ring implementation is not preemptable. A lcore must not + * be interrupted by another task that uses the same ring. + * + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define RTE_TAILQ_RING_NAME "RTE_RING" + +enum rte_ring_queue_behavior { + RTE_RING_QUEUE_FIXED = 0, /* Enq/Deq a fixed number of items from a ring */ + RTE_RING_QUEUE_VARIABLE /* Enq/Deq as many items as possible from ring */ +}; + +#define RTE_MEMZONE_NAMESIZE 32 +#define RTE_RING_MZ_PREFIX "RG_" +/**< The maximum length of a ring name. */ +#define RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \ + sizeof(RTE_RING_MZ_PREFIX) + 1) + +struct rte_memzone; /* forward declaration, so as not to require memzone.h */ + +#if RTE_CACHE_LINE_SIZE < 128 +#define PROD_ALIGN (RTE_CACHE_LINE_SIZE * 2) +#define CONS_ALIGN (RTE_CACHE_LINE_SIZE * 2) +#else +#define PROD_ALIGN RTE_CACHE_LINE_SIZE +#define CONS_ALIGN RTE_CACHE_LINE_SIZE +#endif + +/* structure to hold a pair of head/tail values and other metadata */ +struct rte_ring_headtail { + volatile uint32_t head; /**< Prod/consumer head. */ + volatile uint32_t tail; /**< Prod/consumer tail. */ + uint32_t single; /**< True if single prod/cons */ +}; + +/** + * An RTE ring structure. + * + * The producer and the consumer have a head and a tail index. The particularity + * of these index is that they are not between 0 and size(ring). These indexes + * are between 0 and 2^32, and we mask their value when we access the ring[] + * field. Thanks to this assumption, we can do subtractions between 2 index + * values in a modulo-32bit base: that's why the overflow of the indexes is not + * a problem. + */ +struct rte_ring { + /* + * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI + * compatibility requirements, it could be changed to RTE_RING_NAMESIZE + * next time the ABI changes + */ + char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; /**< Name of the ring. */ + int flags; /**< Flags supplied at creation. */ + const struct rte_memzone *memzone; + /**< Memzone, if any, containing the rte_ring */ + uint32_t size; /**< Size of ring. */ + uint32_t mask; /**< Mask (size-1) of ring. */ + uint32_t capacity; /**< Usable size of ring */ + + /** Ring producer status. */ + struct rte_ring_headtail prod __rte_aligned(PROD_ALIGN); + + /** Ring consumer status. */ + struct rte_ring_headtail cons __rte_aligned(CONS_ALIGN); +}; + +#define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */ +#define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */ +/** + * Ring is to hold exactly requested number of entries. + * Without this flag set, the ring size requested must be a power of 2, and the + * usable space will be that size - 1. With the flag, the requested size will + * be rounded up to the next power of two, but the usable space will be exactly + * that requested. Worst case, if a power-of-2 size is requested, half the + * ring space will be wasted. + */ +#define RING_F_EXACT_SZ 0x0004 +#define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask */ + +/* @internal defines for passing to the enqueue dequeue worker functions */ +#define __IS_SP 1 +#define __IS_MP 0 +#define __IS_SC 1 +#define __IS_MC 0 + +/** + * Calculate the memory size needed for a ring + * + * This function returns the number of bytes needed for a ring, given + * the number of elements in it. This value is the sum of the size of + * the structure rte_ring and the size of the memory needed by the + * objects pointers. The value is aligned to a cache line size. + * + * @param count + * The number of elements in the ring (must be a power of 2). + * @return + * - The memory size needed for the ring on success. + * - -EINVAL if count is not a power of 2. + */ +ssize_t rte_ring_get_memsize(unsigned count); + +/** + * Initialize a ring structure. + * + * Initialize a ring structure in memory pointed by "r". The size of the + * memory area must be large enough to store the ring structure and the + * object table. It is advised to use rte_ring_get_memsize() to get the + * appropriate size. + * + * The ring size is set to *count*, which must be a power of two. Water + * marking is disabled by default. The real usable ring size is + * *count-1* instead of *count* to differentiate a free ring from an + * empty ring. + * + * The ring is not added in RTE_TAILQ_RING global list. Indeed, the + * memory given by the caller may not be shareable among dpdk + * processes. + * + * @param r + * The pointer to the ring structure followed by the objects table. + * @param name + * The name of the ring. + * @param count + * The number of elements in the ring (must be a power of 2). + * @param flags + * An OR of the following: + * - RING_F_SP_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "single-producer". Otherwise, it is "multi-producers". + * - RING_F_SC_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "single-consumer". Otherwise, it is "multi-consumers". + * @return + * 0 on success, or a negative value on error. + */ +int rte_ring_init(struct rte_ring *r, const char *name, unsigned count, + unsigned flags); + +/** + * Create a new ring named *name* in memory. + * + * This function uses ``memzone_reserve()`` to allocate memory. Then it + * calls rte_ring_init() to initialize an empty ring. + * + * The new ring size is set to *count*, which must be a power of + * two. Water marking is disabled by default. The real usable ring size + * is *count-1* instead of *count* to differentiate a free ring from an + * empty ring. + * + * The ring is added in RTE_TAILQ_RING list. + * + * @param name + * The name of the ring. + * @param count + * The size of the ring (must be a power of 2). + * @param socket_id + * The *socket_id* argument is the socket identifier in case of + * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA + * constraint for the reserved zone. + * @param flags + * An OR of the following: + * - RING_F_SP_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "single-producer". Otherwise, it is "multi-producers". + * - RING_F_SC_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "single-consumer". Otherwise, it is "multi-consumers". + * @return + * On success, the pointer to the new allocated ring. NULL on error with + * rte_errno set appropriately. Possible errno values include: + * - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure + * - E_RTE_SECONDARY - function was called from a secondary process instance + * - EINVAL - count provided is not a power of 2 + * - ENOSPC - the maximum number of memzones has already been allocated + * - EEXIST - a memzone with the same name already exists + * - ENOMEM - no appropriate memory area found in which to create memzone + */ +struct rte_ring *rte_ring_create(const char *name, unsigned count, + int socket_id, unsigned flags); +/** + * De-allocate all memory used by the ring. + * + * @param r + * Ring to free + */ +void rte_ring_free(struct rte_ring *r); + +/** + * Dump the status of the ring to a file. + * + * @param f + * A pointer to a file for output + * @param r + * A pointer to the ring structure. + */ +void rte_ring_dump(FILE *f, const struct rte_ring *r); + +/* the actual enqueue of pointers on the ring. + * Placed here since identical code needed in both + * single and multi producer enqueue functions */ +#define ENQUEUE_PTRS(r, ring_start, prod_head, obj_table, n, obj_type) do { \ + unsigned int i; \ + const uint32_t size = (r)->size; \ + uint32_t idx = prod_head & (r)->mask; \ + obj_type *ring = (obj_type *)ring_start; \ + if (likely(idx + n < size)) { \ + for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \ + ring[idx] = obj_table[i]; \ + ring[idx+1] = obj_table[i+1]; \ + ring[idx+2] = obj_table[i+2]; \ + ring[idx+3] = obj_table[i+3]; \ + } \ + switch (n & 0x3) { \ + case 3: \ + ring[idx++] = obj_table[i++]; /* fallthrough */ \ + case 2: \ + ring[idx++] = obj_table[i++]; /* fallthrough */ \ + case 1: \ + ring[idx++] = obj_table[i++]; \ + } \ + } else { \ + for (i = 0; idx < size; i++, idx++)\ + ring[idx] = obj_table[i]; \ + for (idx = 0; i < n; i++, idx++) \ + ring[idx] = obj_table[i]; \ + } \ +} while (0) + +/* the actual copy of pointers on the ring to obj_table. + * Placed here since identical code needed in both + * single and multi consumer dequeue functions */ +#define DEQUEUE_PTRS(r, ring_start, cons_head, obj_table, n, obj_type) do { \ + unsigned int i; \ + uint32_t idx = cons_head & (r)->mask; \ + const uint32_t size = (r)->size; \ + obj_type *ring = (obj_type *)ring_start; \ + if (likely(idx + n < size)) { \ + for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\ + obj_table[i] = ring[idx]; \ + obj_table[i+1] = ring[idx+1]; \ + obj_table[i+2] = ring[idx+2]; \ + obj_table[i+3] = ring[idx+3]; \ + } \ + switch (n & 0x3) { \ + case 3: \ + obj_table[i++] = ring[idx++]; /* fallthrough */ \ + case 2: \ + obj_table[i++] = ring[idx++]; /* fallthrough */ \ + case 1: \ + obj_table[i++] = ring[idx++]; \ + } \ + } else { \ + for (i = 0; idx < size; i++, idx++) \ + obj_table[i] = ring[idx]; \ + for (idx = 0; i < n; i++, idx++) \ + obj_table[i] = ring[idx]; \ + } \ +} while (0) + +static __rte_always_inline void +update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, + uint32_t single) +{ + /* + * If there are other enqueues/dequeues in progress that preceded us, + * we need to wait for them to complete + */ + if (!single) + while (unlikely(ht->tail != old_val)) + rte_pause(); + + ht->tail = new_val; +} + +/** + * @internal This function updates the producer head for enqueue + * + * @param r + * A pointer to the ring structure + * @param is_sp + * Indicates whether multi-producer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where enqueue starts + * @param new_head + * Returns the current/new head value i.e. where enqueue finishes + * @param free_entries + * Returns the amount of free space in the ring BEFORE head was moved + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_prod_head(struct rte_ring *r, int is_sp, + unsigned int n, enum rte_ring_queue_behavior behavior, + uint32_t *old_head, uint32_t *new_head, + uint32_t *free_entries) +{ + const uint32_t capacity = r->capacity; + unsigned int max = n; + int success; + + do { + /* Reset n to the initial burst count */ + n = max; + + *old_head = r->prod.head; + + /* add rmb barrier to avoid load/load reorder in weak + * memory model. It is noop on x86 + */ + rte_smp_rmb(); + + const uint32_t cons_tail = r->cons.tail; + /* + * The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * *old_head > cons_tail). So 'free_entries' is always between 0 + * and capacity (which is < size). + */ + *free_entries = (capacity + cons_tail - *old_head); + + /* check that we have enough room in ring */ + if (unlikely(n > *free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : *free_entries; + + if (n == 0) + return 0; + + *new_head = *old_head + n; + if (is_sp) + r->prod.head = *new_head, success = 1; + else + success = rte_atomic32_cmpset(&r->prod.head, + *old_head, *new_head); + } while (unlikely(success == 0)); + return n; +} + +/** + * @internal Enqueue several objects on the ring + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param is_sp + * Indicates whether to use single producer or multi-producer head update + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, + unsigned int n, enum rte_ring_queue_behavior behavior, + int is_sp, unsigned int *free_space) +{ + uint32_t prod_head, prod_next; + uint32_t free_entries; + + n = __rte_ring_move_prod_head(r, is_sp, n, behavior, + &prod_head, &prod_next, &free_entries); + if (n == 0) + goto end; + + ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); + rte_smp_wmb(); + + update_tail(&r->prod, prod_head, prod_next, is_sp); +end: + if (free_space != NULL) + *free_space = free_entries - n; + return n; +} + +/** + * @internal This function updates the consumer head for dequeue + * + * @param r + * A pointer to the ring structure + * @param is_sc + * Indicates whether multi-consumer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where dequeue starts + * @param new_head + * Returns the current/new head value i.e. where dequeue finishes + * @param entries + * Returns the number of entries in the ring BEFORE head was moved + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_cons_head(struct rte_ring *r, int is_sc, + unsigned int n, enum rte_ring_queue_behavior behavior, + uint32_t *old_head, uint32_t *new_head, + uint32_t *entries) +{ + unsigned int max = n; + int success; + + /* move cons.head atomically */ + do { + /* Restore n as it may change every loop */ + n = max; + + *old_head = r->cons.head; + + /* add rmb barrier to avoid load/load reorder in weak + * memory model. It is noop on x86 + */ + rte_smp_rmb(); + + const uint32_t prod_tail = r->prod.tail; + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. */ + *entries = (prod_tail - *old_head); + + /* Set the actual entries for dequeue */ + if (n > *entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (unlikely(n == 0)) + return 0; + + *new_head = *old_head + n; + if (is_sc) + r->cons.head = *new_head, success = 1; + else + success = rte_atomic32_cmpset(&r->cons.head, *old_head, + *new_head); + } while (unlikely(success == 0)); + return n; +} + +/** + * @internal Dequeue several objects from the ring + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param is_sc + * Indicates whether to use single consumer or multi-consumer head update + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, + unsigned int n, enum rte_ring_queue_behavior behavior, + int is_sc, unsigned int *available) +{ + uint32_t cons_head, cons_next; + uint32_t entries; + + n = __rte_ring_move_cons_head(r, is_sc, n, behavior, + &cons_head, &cons_next, &entries); + if (n == 0) + goto end; + + DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *); + rte_smp_rmb(); + + update_tail(&r->cons, cons_head, cons_next, is_sc); + +end: + if (available != NULL) + *available = entries - n; + return n; +} + +/** + * Enqueue several objects on the ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + __IS_MP, free_space); +} + +/** + * Enqueue several objects on a ring (NOT multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + __IS_SP, free_space); +} + +/** + * Enqueue several objects on a ring. + * + * This function calls the multi-producer or the single-producer + * version depending on the default behavior that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + r->prod.single, free_space); +} + +/** + * Enqueue one object on a ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj + * A pointer to the object to be added. + * @return + * - 0: Success; objects enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +static __rte_always_inline int +rte_ring_mp_enqueue(struct rte_ring *r, void * const *obj) +{ + return rte_ring_mp_enqueue_bulk(r, obj, 1, NULL) ? 0 : -ENOBUFS; +} + +/** + * Enqueue one object on a ring (NOT multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj + * A pointer to the object to be added. + * @return + * - 0: Success; objects enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +static __rte_always_inline int +rte_ring_sp_enqueue(struct rte_ring *r, void *obj) +{ + return rte_ring_sp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS; +} + +/** + * Enqueue one object on a ring. + * + * This function calls the multi-producer or the single-producer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj + * A pointer to the object to be added. + * @return + * - 0: Success; objects enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +static __rte_always_inline int +rte_ring_enqueue(struct rte_ring *r, void *obj) +{ + return rte_ring_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS; +} + +/** + * Dequeue several objects from a ring (multi-consumers safe). + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + __IS_MC, available); +} + +/** + * Dequeue several objects from a ring (NOT multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table, + * must be strictly positive. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + __IS_SC, available); +} + +/** + * Dequeue several objects from a ring. + * + * This function calls the multi-consumers or the single-consumer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, + unsigned int *available) +{ + return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + r->cons.single, available); +} + +/** + * Dequeue one object from a ring (multi-consumers safe). + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_p + * A pointer to a void * pointer (object) that will be filled. + * @return + * - 0: Success; objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue; no object is + * dequeued. + */ +static __rte_always_inline int +rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p) +{ + return rte_ring_mc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT; +} + +/** + * Dequeue one object from a ring (NOT multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_p + * A pointer to a void * pointer (object) that will be filled. + * @return + * - 0: Success; objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue, no object is + * dequeued. + */ +static __rte_always_inline int +rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p) +{ + return rte_ring_sc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT; +} + +/** + * Dequeue one object from a ring. + * + * This function calls the multi-consumers or the single-consumer + * version depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_p + * A pointer to a void * pointer (object) that will be filled. + * @return + * - 0: Success, objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue, no object is + * dequeued. + */ +static __rte_always_inline int +rte_ring_dequeue(struct rte_ring *r, void **obj_p) +{ + return rte_ring_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT; +} + +/** + * Return the number of entries in a ring. + * + * @param r + * A pointer to the ring structure. + * @return + * The number of entries in the ring. + */ +static inline unsigned +rte_ring_count(const struct rte_ring *r) +{ + uint32_t prod_tail = r->prod.tail; + uint32_t cons_tail = r->cons.tail; + uint32_t count = (prod_tail - cons_tail) & r->mask; + return (count > r->capacity) ? r->capacity : count; +} + +/** + * Return the number of free entries in a ring. + * + * @param r + * A pointer to the ring structure. + * @return + * The number of free entries in the ring. + */ +static inline unsigned +rte_ring_free_count(const struct rte_ring *r) +{ + return r->capacity - rte_ring_count(r); +} + +/** + * Test if a ring is full. + * + * @param r + * A pointer to the ring structure. + * @return + * - 1: The ring is full. + * - 0: The ring is not full. + */ +static inline int +rte_ring_full(const struct rte_ring *r) +{ + return rte_ring_free_count(r) == 0; +} + +/** + * Test if a ring is empty. + * + * @param r + * A pointer to the ring structure. + * @return + * - 1: The ring is empty. + * - 0: The ring is not empty. + */ +static inline int +rte_ring_empty(const struct rte_ring *r) +{ + return rte_ring_count(r) == 0; +} + +/** + * Return the size of the ring. + * + * @param r + * A pointer to the ring structure. + * @return + * The size of the data store used by the ring. + * NOTE: this is not the same as the usable space in the ring. To query that + * use ``rte_ring_get_capacity()``. + */ +static inline unsigned int +rte_ring_get_size(const struct rte_ring *r) +{ + return r->size; +} + +/** + * Return the number of elements which can be stored in the ring. + * + * @param r + * A pointer to the ring structure. + * @return + * The usable size of the ring. + */ +static inline unsigned int +rte_ring_get_capacity(const struct rte_ring *r) +{ + return r->capacity; +} + +/** + * Dump the status of all rings on the console + * + * @param f + * A pointer to a file for output + */ +void rte_ring_list_dump(FILE *f); + +/** + * Search a ring from its name + * + * @param name + * The name of the ring. + * @return + * The pointer to the ring matching the name, or NULL if not found, + * with rte_errno set appropriately. Possible rte_errno values include: + * - ENOENT - required entry not available to return. + */ +struct rte_ring *rte_ring_lookup(const char *name); + +/** + * Enqueue several objects on the ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +static __rte_always_inline unsigned +rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space); +} + +/** + * Enqueue several objects on a ring (NOT multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +static __rte_always_inline unsigned +rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space); +} + +/** + * Enqueue several objects on a ring. + * + * This function calls the multi-producer or the single-producer + * version depending on the default behavior that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +static __rte_always_inline unsigned +rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE, + r->prod.single, free_space); +} + +/** + * Dequeue several objects from a ring (multi-consumers safe). When the request + * objects are more than the available objects, only dequeue the actual number + * of objects + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +static __rte_always_inline unsigned +rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, __IS_MC, available); +} + +/** + * Dequeue several objects from a ring (NOT multi-consumers safe).When the + * request objects are more than the available objects, only dequeue the + * actual number of objects + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +static __rte_always_inline unsigned +rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, __IS_SC, available); +} + +/** + * Dequeue multiple objects from a ring up to a maximum number. + * + * This function calls the multi-consumers or the single-consumer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - Number of objects dequeued + */ +static __rte_always_inline unsigned +rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + r->cons.single, available); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_RING_H_ */ diff --git a/include/sys/zfs_context.h b/include/sys/zfs_context.h index 74ddcfac7b68..175a828a69bc 100644 --- a/include/sys/zfs_context.h +++ b/include/sys/zfs_context.h @@ -129,6 +129,7 @@ #define noinline __attribute__((noinline)) #define likely(x) __builtin_expect((x), 1) +#define unlikely(x) __builtin_expect((x), 0) /* * Debugging diff --git a/lib/libzpool/Makefile.am b/lib/libzpool/Makefile.am index 9e1496c10b4d..51ff6eebb2aa 100644 --- a/lib/libzpool/Makefile.am +++ b/lib/libzpool/Makefile.am @@ -15,6 +15,7 @@ lib_LTLIBRARIES = libzpool.la USER_C = \ kernel.c \ + rte_ring.c \ taskq.c \ util.c \ uzfs_io.c \ diff --git a/lib/libzpool/rte_ring.c b/lib/libzpool/rte_ring.c new file mode 100644 index 000000000000..d1a9f562c985 --- /dev/null +++ b/lib/libzpool/rte_ring.c @@ -0,0 +1,199 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2015 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * Derived from FreeBSD's bufring.c + * + ************************************************************************** + * + * Copyright (c) 2007,2008 Kip Macy kmacy@freebsd.org + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. The name of Kip Macy nor the names of other + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + ***************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rte_ring.h" + +/* true if x is a power of 2 */ +#define POWEROF2(x) ((((x)-1) & (x)) == 0) + +/* return the size of memory occupied by a ring */ +ssize_t +rte_ring_get_memsize(unsigned count) +{ + ssize_t sz; + + /* count must be a power of 2 */ + if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) { + printf("Requested size is invalid, must be power of 2, and " + "do not exceed the size limit %u\n", RTE_RING_SZ_MASK); + return -EINVAL; + } + + sz = sizeof(struct rte_ring) + count * sizeof(void *); + sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); + return sz; +} + +int +rte_ring_init(struct rte_ring *r, const char *name, unsigned count, + unsigned flags) +{ + int ret; + + /* compilation-time checks */ + RTE_BUILD_BUG_ON((sizeof(struct rte_ring) & + RTE_CACHE_LINE_MASK) != 0); + RTE_BUILD_BUG_ON((offsetof(struct rte_ring, cons) & + RTE_CACHE_LINE_MASK) != 0); + RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) & + RTE_CACHE_LINE_MASK) != 0); + + /* init the ring structure */ + memset(r, 0, sizeof(*r)); + ret = snprintf(r->name, sizeof(r->name), "%s", name); + if (ret < 0 || ret >= (int)sizeof(r->name)) + return -ENAMETOOLONG; + r->flags = flags; + r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP; + r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC; + + if (flags & RING_F_EXACT_SZ) { + r->size = rte_align32pow2(count + 1); + r->mask = r->size - 1; + r->capacity = count; + } else { + if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK)) { + printf("Requested size is invalid, must be power of 2, and not exceed the size limit %u\n", + RTE_RING_SZ_MASK); + return -EINVAL; + } + r->size = count; + r->mask = count - 1; + r->capacity = r->mask; + } + r->prod.head = r->cons.head = 0; + r->prod.tail = r->cons.tail = 0; + + return 0; +} + +/* create the ring */ +struct rte_ring * +rte_ring_create(const char *name, unsigned count, int socket_id, + unsigned flags) +{ + char mz_name[RTE_MEMZONE_NAMESIZE]; + struct rte_ring *r; + ssize_t ring_size; + const unsigned int requested_count = count; + int ret; + + /* for an exact size ring, round up from count to a power of two */ + if (flags & RING_F_EXACT_SZ) + count = rte_align32pow2(count + 1); + + ring_size = rte_ring_get_memsize(count); + if (ring_size < 0) { + return NULL; + } + + ret = snprintf(mz_name, sizeof(mz_name), "%s%s", + RTE_RING_MZ_PREFIX, name); + if (ret < 0 || ret >= (int)sizeof(mz_name)) { + return NULL; + } + + r = malloc(ring_size); + if (r == NULL) { + printf("Cannot reserve memory for tailq\n"); + return NULL; + } + + rte_ring_init(r, name, requested_count, flags); + return r; +} + +/* free the ring */ +void +rte_ring_free(struct rte_ring *r) +{ + if (r == NULL) + return; + + /* + * Ring was not created with rte_ring_create, + * therefore, there is no memzone to free. + */ +/* + if (r->memzone == NULL) { + printf("Cannot free ring (not created with rte_ring_create()"); + return; + } + if (rte_memzone_free(r->memzone) != 0) { + printf("Cannot free memory\n"); + return; + } +*/ + free(r); +} diff --git a/lib/libzpool/vdev_disk_aio.c b/lib/libzpool/vdev_disk_aio.c index a4ed058c503a..ff6a241c04d6 100644 --- a/lib/libzpool/vdev_disk_aio.c +++ b/lib/libzpool/vdev_disk_aio.c @@ -29,6 +29,7 @@ #include #include +#include /* * The value is taken from SPDK. It does not scale (and perhaps should) with @@ -57,12 +58,14 @@ typedef struct vdev_disk_aio { boolean_t vda_stop_polling; uintptr_t vda_poller_tid; /* Support for submitting multiple IOs in one syscall */ + /* list of zios to enqueue/dequeue from ring buffer */ zio_t *vda_zio_queue[MAX_ZIOS]; uint32_t vda_zio_next; /* next zio to be submitted to kernel */ /* read & written only from poller thread */ uint32_t vda_zio_top; /* latest incoming zio from uzfs */ /* Preallocated array of iocbs for use in poller to run faster */ struct iocb *vda_iocbs[MAX_ZIOS]; + struct rte_ring *vda_ring; /* ring buffer to enqueue/dequeue zio */ } vdev_disk_aio_t; typedef struct aio_task { @@ -185,52 +188,56 @@ vdev_disk_aio_submit(vdev_disk_aio_t *vda) { struct iocb **iocbs = vda->vda_iocbs; int n = 0; - int nr; + int nr = 0; /* - * create aio tasks for the available ZIOs - * - * We don't want locks in IO path in aio backend, hence this ad-hoc - * lockfree algorithm for obtaining zio entries. We check for NULL - * because that signifies incomplete update of vda_zio_top pointer. + * Dequeue ZIOs from ring buffer as many as possible. + * We have used single consumer dequeue operation since polling + * thread is only dequeuing from aio_submit ring buffer. */ - while (vda->vda_zio_queue[vda->vda_zio_next] != NULL && - vda->vda_zio_next != vda->vda_zio_top) { - aio_task_t *task; - zio_t *zio = vda->vda_zio_queue[vda->vda_zio_next]; - ASSERT3P(zio->io_vd->vdev_tsd, ==, vda); - ASSERT3P(n, <, MAX_ZIOS); - - vda->vda_zio_queue[vda->vda_zio_next] = NULL; - vda->vda_zio_next = (vda->vda_zio_next + 1) % MAX_ZIOS; + nr = rte_ring_sc_dequeue_burst(vda->vda_ring, + (void **) &vda->vda_zio_queue, MAX_ZIOS, NULL); + + if (nr > 0) { + for (n = 0; n < nr; n++) { + aio_task_t *task; + zio_t *zio = vda->vda_zio_queue[n]; + ASSERT3P(zio->io_vd->vdev_tsd, ==, vda); + ASSERT3P(n, <, MAX_ZIOS); + + /* + * Prepare AIO command control block. + */ + task = kmem_alloc(sizeof (aio_task_t), KM_SLEEP); + task->zio = zio; + task->buf = NULL; + iocbs[n] = &task->iocb; + + switch (zio->io_type) { + case ZIO_TYPE_WRITE: + task->buf = abd_borrow_buf_copy(zio->io_abd, + zio->io_size); + io_prep_pwrite(iocbs[n], vda->vda_fd, task->buf, + zio->io_size, zio->io_offset); + break; + case ZIO_TYPE_READ: + task->buf = abd_borrow_buf(zio->io_abd, + zio->io_size); + io_prep_pread(iocbs[n], vda->vda_fd, task->buf, + zio->io_size, zio->io_offset); + break; + default: + ASSERT(0); + } - /* - * Prepare AIO command control block. - */ - task = kmem_alloc(sizeof (aio_task_t), KM_SLEEP); - task->zio = zio; - task->buf = NULL; - iocbs[n] = &task->iocb; - - switch (zio->io_type) { - case ZIO_TYPE_WRITE: - task->buf = abd_borrow_buf_copy(zio->io_abd, - zio->io_size); - io_prep_pwrite(iocbs[n], vda->vda_fd, task->buf, - zio->io_size, zio->io_offset); - break; - case ZIO_TYPE_READ: - task->buf = abd_borrow_buf(zio->io_abd, zio->io_size); - io_prep_pread(iocbs[n], vda->vda_fd, task->buf, - zio->io_size, zio->io_offset); - break; - default: - ASSERT(0); + /* + * prep functions above reset data pointer + * set it again + */ + iocbs[n]->data = task; } - - /* prep functions above reset data pointer - set it again */ - iocbs[n]->data = task; - n++; + } else { + return; } /* @@ -393,6 +400,16 @@ vdev_disk_aio_open(vdev_t *vd, uint64_t *psize, uint64_t *max_psize, return (SET_ERROR(-err)); } + /* Create RTE RING to enqueue/dequeue ZIOs */ + vda->vda_ring = rte_ring_create("aio_submit_ring", MAX_ZIOS, + -1, RING_F_EXACT_SZ); + if (!vda->vda_ring) { + fprintf(stderr, "Failed to create aio_submit ring\n"); + (void) io_destroy(vda->vda_io_ctx); + vda->vda_io_ctx = NULL; + return (SET_ERROR(ENOMEM)); + } + vda->vda_stop_polling = B_FALSE; vda->vda_poller_tid = (uintptr_t)thread_create(NULL, 0, vdev_disk_aio_poll, vda, 0, &p0, TS_RUN, 0); @@ -446,12 +463,20 @@ vdev_disk_aio_close(vdev_t *vd) while (vda->vda_poller_tid != 0) { nanosleep(&ts, NULL); } - (void) io_destroy(vda->vda_io_ctx); - vda->vda_io_ctx = NULL; + + if (vda->vda_io_ctx) { + (void) io_destroy(vda->vda_io_ctx); + vda->vda_io_ctx = NULL; + } (void) close(vda->vda_fd); vd->vdev_delayed_close = B_FALSE; + + if (vda->vda_ring) { + rte_ring_free(vda->vda_ring); + vda->vda_ring = NULL; + } kmem_free(vda, sizeof (vdev_disk_aio_t)); vd->vdev_tsd = NULL; } @@ -464,9 +489,6 @@ vdev_disk_aio_start(zio_t *zio) { vdev_t *vd = zio->io_vd; vdev_disk_aio_t *vda = vd->vdev_tsd; - uint32_t idx; - uint32_t new_idx; - int success = 0; /* * Check operation type. @@ -502,17 +524,13 @@ vdev_disk_aio_start(zio_t *zio) /* * Enqueue zio and poller thread will take care of it. - * - * We don't want locks in IO path in aio backend, hence this ad-hoc - * lockfree algorithm for enqueuing new entries. */ - while (!success) { - idx = vda->vda_zio_top; - new_idx = (idx + 1) % MAX_ZIOS; - success = __sync_bool_compare_and_swap(&vda->vda_zio_top, - idx, new_idx); + + if (rte_ring_mp_enqueue(vda->vda_ring, (void **) &zio)) { + fprintf(stderr, "Failed to enqueue zio in ring\n"); + zio->io_error = (SET_ERROR(EBUSY)); + zio_interrupt(zio); } - vda->vda_zio_queue[idx] = zio; } /* ARGSUSED */