Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ringbuf: simplify ringReader #1379

Merged
merged 1 commit into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 5 additions & 57 deletions ringbuf/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,19 @@ package ringbuf
import (
"errors"
"fmt"
"io"
"os"
"sync"
"time"

"github.com/cilium/ebpf"
"github.com/cilium/ebpf/internal"
"github.com/cilium/ebpf/internal/epoll"
"github.com/cilium/ebpf/internal/unix"
)

var (
ErrClosed = os.ErrClosed
errEOR = errors.New("end of ring")
errDiscard = errors.New("sample discarded")
errBusy = errors.New("sample not committed yet")
ErrClosed = os.ErrClosed
errEOR = errors.New("end of ring")
errBusy = errors.New("sample not committed yet")
)

// ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c
Expand Down Expand Up @@ -46,55 +43,6 @@ type Record struct {
Remaining int
}

// Read a record from an event ring.
func readRecord(rd *ringbufEventRing, rec *Record) error {
rd.loadConsumer()

header, err := rd.readHeader()
if err == io.EOF {
return errEOR
} else if err != nil {
return fmt.Errorf("read event header: %w", err)
}

if header.isBusy() {
// the next sample in the ring is not committed yet so we
// exit without storing the reader/consumer position
// and start again from the same position.
return errBusy
}

/* read up to 8 byte alignment */
dataLenAligned := uint64(internal.Align(header.dataLen(), 8))

if header.isDiscard() {
// when the record header indicates that the data should be
// discarded, we skip it by just updating the consumer position
// to the next record instead of normal Read() to avoid allocating data
// and reading/copying from the ring (which normally keeps track of the
// consumer position).
rd.skipRead(dataLenAligned)
rd.storeConsumer()

return errDiscard
}

if cap(rec.RawSample) < int(dataLenAligned) {
rec.RawSample = make([]byte, dataLenAligned)
} else {
rec.RawSample = rec.RawSample[:dataLenAligned]
}

if _, err := io.ReadFull(rd, rec.RawSample); err != nil {
return fmt.Errorf("read sample: %w", err)
}

rd.storeConsumer()
rec.RawSample = rec.RawSample[:header.dataLen()]
rec.Remaining = rd.remaining()
return nil
}

// Reader allows reading bpf_ringbuf_output
// from user space.
type Reader struct {
Expand Down Expand Up @@ -211,10 +159,10 @@ func (r *Reader) ReadInto(rec *Record) error {
}

for {
err := readRecord(r.ring, rec)
err := r.ring.readRecord(rec)
// Not using errors.Is which is quite a bit slower
// For a tight loop it might make a difference
if err == errBusy || err == errDiscard {
if err == errBusy {
continue
}
if err == errEOR {
Expand Down
108 changes: 55 additions & 53 deletions ringbuf/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync/atomic"
"unsafe"

"github.com/cilium/ebpf/internal"
"github.com/cilium/ebpf/internal/unix"
)

Expand Down Expand Up @@ -55,7 +56,6 @@ func (ring *ringbufEventRing) Close() {
type ringReader struct {
// These point into mmap'ed memory and must be accessed atomically.
prod_pos, cons_pos *uint64
cons uint64
mask uint64
ring []byte
}
Expand All @@ -64,26 +64,12 @@ func newRingReader(cons_ptr, prod_ptr *uint64, ring []byte) *ringReader {
return &ringReader{
prod_pos: prod_ptr,
cons_pos: cons_ptr,
cons: atomic.LoadUint64(cons_ptr),
// cap is always a power of two
mask: uint64(cap(ring)/2 - 1),
ring: ring,
}
}

func (rr *ringReader) loadConsumer() {
rr.cons = atomic.LoadUint64(rr.cons_pos)
}

func (rr *ringReader) storeConsumer() {
atomic.StoreUint64(rr.cons_pos, rr.cons)
}

func (rr *ringReader) skipRead(skipBytes uint64) {
prod := atomic.LoadUint64(rr.prod_pos)
rr.cons += min(prod-rr.cons, skipBytes)
}

func (rr *ringReader) isEmpty() bool {
cons := atomic.LoadUint64(rr.cons_pos)
prod := atomic.LoadUint64(rr.prod_pos)
Expand All @@ -95,45 +81,61 @@ func (rr *ringReader) size() int {
return cap(rr.ring)
}

func (rr *ringReader) remaining() int {
cons := atomic.LoadUint64(rr.cons_pos)
prod := atomic.LoadUint64(rr.prod_pos)

return int((prod - cons) & rr.mask)
}

func (rr *ringReader) readHeader() (ringbufHeader, error) {
// Read a record from an event ring.
func (rr *ringReader) readRecord(rec *Record) error {
prod := atomic.LoadUint64(rr.prod_pos)
cons := atomic.LoadUint64(rr.cons_pos)

if remaining := prod - rr.cons; remaining == 0 {
return ringbufHeader{}, io.EOF
} else if remaining < unix.BPF_RINGBUF_HDR_SZ {
return ringbufHeader{}, io.ErrUnexpectedEOF
}

// read the len field of the header atomically to ensure a happens before
// relationship with the xchg in the kernel. Without this we may see len
// without BPF_RINGBUF_BUSY_BIT before the written data is visible.
// See https://github.com/torvalds/linux/blob/v6.8/kernel/bpf/ringbuf.c#L484
len := atomic.LoadUint32((*uint32)((unsafe.Pointer)(&rr.ring[rr.cons&rr.mask])))
header := ringbufHeader{Len: len}

rr.cons += unix.BPF_RINGBUF_HDR_SZ
return header, nil
}

func (rr *ringReader) Read(p []byte) (int, error) {
prod := atomic.LoadUint64(rr.prod_pos)
n := min(prod-rr.cons, uint64(len(p)))

start := rr.cons & rr.mask

copy(p, rr.ring[start:start+n])
rr.cons += n

if prod == rr.cons {
return int(n), io.EOF
for {
if remaining := prod - cons; remaining == 0 {
return errEOR
} else if remaining < unix.BPF_RINGBUF_HDR_SZ {
return fmt.Errorf("read record header: %w", io.ErrUnexpectedEOF)
}

// read the len field of the header atomically to ensure a happens before
// relationship with the xchg in the kernel. Without this we may see len
// without BPF_RINGBUF_BUSY_BIT before the written data is visible.
// See https://github.com/torvalds/linux/blob/v6.8/kernel/bpf/ringbuf.c#L484
start := cons & rr.mask
len := atomic.LoadUint32((*uint32)((unsafe.Pointer)(&rr.ring[start])))
header := ringbufHeader{Len: len}

if header.isBusy() {
// the next sample in the ring is not committed yet so we
// exit without storing the reader/consumer position
// and start again from the same position.
return errBusy
}

cons += unix.BPF_RINGBUF_HDR_SZ

// Data is always padded to 8 byte alignment.
dataLenAligned := uint64(internal.Align(header.dataLen(), 8))
if remaining := prod - cons; remaining < dataLenAligned {
return fmt.Errorf("read sample data: %w", io.ErrUnexpectedEOF)
}

start = cons & rr.mask
cons += dataLenAligned

if header.isDiscard() {
// when the record header indicates that the data should be
// discarded, we skip it by just updating the consumer position
// to the next record.
atomic.StoreUint64(rr.cons_pos, cons)
continue
}

if n := header.dataLen(); cap(rec.RawSample) < n {
rec.RawSample = make([]byte, n)
} else {
rec.RawSample = rec.RawSample[:n]
}

copy(rec.RawSample, rr.ring[start:])
rec.Remaining = int(prod - cons)
atomic.StoreUint64(rr.cons_pos, cons)
return nil
}

return int(n), nil
}
67 changes: 0 additions & 67 deletions ringbuf/ring_test.go

This file was deleted.

Loading