From 46ae4e2c6a62a8518f03f4c83d349f863e7e3aa2 Mon Sep 17 00:00:00 2001 From: Lorenz Bauer Date: Thu, 14 Mar 2024 15:59:44 +0000 Subject: [PATCH] ringbuf: simplify ringReader The code to correctly access the ring is currently split across two files an multiple functions which makes it hard to grasp what is going on. Strip out the overly generic Reader interface and only expose ringReader.readRecord. Also move discarding of samples into readRecord. This removes the amount of code and cuts down on the number of places where we call into sync/atomic. Signed-off-by: Lorenz Bauer --- ringbuf/reader.go | 62 ++----------------------- ringbuf/ring.go | 108 ++++++++++++++++++++++--------------------- ringbuf/ring_test.go | 67 --------------------------- 3 files changed, 60 insertions(+), 177 deletions(-) delete mode 100644 ringbuf/ring_test.go diff --git a/ringbuf/reader.go b/ringbuf/reader.go index 72b35f273..82010e27b 100644 --- a/ringbuf/reader.go +++ b/ringbuf/reader.go @@ -3,22 +3,19 @@ package ringbuf import ( "errors" "fmt" - "io" "os" "sync" "time" "github.com/cilium/ebpf" - "github.com/cilium/ebpf/internal" "github.com/cilium/ebpf/internal/epoll" "github.com/cilium/ebpf/internal/unix" ) var ( - ErrClosed = os.ErrClosed - errEOR = errors.New("end of ring") - errDiscard = errors.New("sample discarded") - errBusy = errors.New("sample not committed yet") + ErrClosed = os.ErrClosed + errEOR = errors.New("end of ring") + errBusy = errors.New("sample not committed yet") ) // ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c @@ -46,55 +43,6 @@ type Record struct { Remaining int } -// Read a record from an event ring. -func readRecord(rd *ringbufEventRing, rec *Record) error { - rd.loadConsumer() - - header, err := rd.readHeader() - if err == io.EOF { - return errEOR - } else if err != nil { - return fmt.Errorf("read event header: %w", err) - } - - if header.isBusy() { - // the next sample in the ring is not committed yet so we - // exit without storing the reader/consumer position - // and start again from the same position. - return errBusy - } - - /* read up to 8 byte alignment */ - dataLenAligned := uint64(internal.Align(header.dataLen(), 8)) - - if header.isDiscard() { - // when the record header indicates that the data should be - // discarded, we skip it by just updating the consumer position - // to the next record instead of normal Read() to avoid allocating data - // and reading/copying from the ring (which normally keeps track of the - // consumer position). - rd.skipRead(dataLenAligned) - rd.storeConsumer() - - return errDiscard - } - - if cap(rec.RawSample) < int(dataLenAligned) { - rec.RawSample = make([]byte, dataLenAligned) - } else { - rec.RawSample = rec.RawSample[:dataLenAligned] - } - - if _, err := io.ReadFull(rd, rec.RawSample); err != nil { - return fmt.Errorf("read sample: %w", err) - } - - rd.storeConsumer() - rec.RawSample = rec.RawSample[:header.dataLen()] - rec.Remaining = rd.remaining() - return nil -} - // Reader allows reading bpf_ringbuf_output // from user space. type Reader struct { @@ -211,10 +159,10 @@ func (r *Reader) ReadInto(rec *Record) error { } for { - err := readRecord(r.ring, rec) + err := r.ring.readRecord(rec) // Not using errors.Is which is quite a bit slower // For a tight loop it might make a difference - if err == errBusy || err == errDiscard { + if err == errBusy { continue } if err == errEOR { diff --git a/ringbuf/ring.go b/ringbuf/ring.go index 4601e2681..35534c51e 100644 --- a/ringbuf/ring.go +++ b/ringbuf/ring.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "unsafe" + "github.com/cilium/ebpf/internal" "github.com/cilium/ebpf/internal/unix" ) @@ -55,7 +56,6 @@ func (ring *ringbufEventRing) Close() { type ringReader struct { // These point into mmap'ed memory and must be accessed atomically. prod_pos, cons_pos *uint64 - cons uint64 mask uint64 ring []byte } @@ -64,26 +64,12 @@ func newRingReader(cons_ptr, prod_ptr *uint64, ring []byte) *ringReader { return &ringReader{ prod_pos: prod_ptr, cons_pos: cons_ptr, - cons: atomic.LoadUint64(cons_ptr), // cap is always a power of two mask: uint64(cap(ring)/2 - 1), ring: ring, } } -func (rr *ringReader) loadConsumer() { - rr.cons = atomic.LoadUint64(rr.cons_pos) -} - -func (rr *ringReader) storeConsumer() { - atomic.StoreUint64(rr.cons_pos, rr.cons) -} - -func (rr *ringReader) skipRead(skipBytes uint64) { - prod := atomic.LoadUint64(rr.prod_pos) - rr.cons += min(prod-rr.cons, skipBytes) -} - func (rr *ringReader) isEmpty() bool { cons := atomic.LoadUint64(rr.cons_pos) prod := atomic.LoadUint64(rr.prod_pos) @@ -95,45 +81,61 @@ func (rr *ringReader) size() int { return cap(rr.ring) } -func (rr *ringReader) remaining() int { - cons := atomic.LoadUint64(rr.cons_pos) - prod := atomic.LoadUint64(rr.prod_pos) - - return int((prod - cons) & rr.mask) -} - -func (rr *ringReader) readHeader() (ringbufHeader, error) { +// Read a record from an event ring. +func (rr *ringReader) readRecord(rec *Record) error { prod := atomic.LoadUint64(rr.prod_pos) + cons := atomic.LoadUint64(rr.cons_pos) - if remaining := prod - rr.cons; remaining == 0 { - return ringbufHeader{}, io.EOF - } else if remaining < unix.BPF_RINGBUF_HDR_SZ { - return ringbufHeader{}, io.ErrUnexpectedEOF - } - - // read the len field of the header atomically to ensure a happens before - // relationship with the xchg in the kernel. Without this we may see len - // without BPF_RINGBUF_BUSY_BIT before the written data is visible. - // See https://github.com/torvalds/linux/blob/v6.8/kernel/bpf/ringbuf.c#L484 - len := atomic.LoadUint32((*uint32)((unsafe.Pointer)(&rr.ring[rr.cons&rr.mask]))) - header := ringbufHeader{Len: len} - - rr.cons += unix.BPF_RINGBUF_HDR_SZ - return header, nil -} - -func (rr *ringReader) Read(p []byte) (int, error) { - prod := atomic.LoadUint64(rr.prod_pos) - n := min(prod-rr.cons, uint64(len(p))) - - start := rr.cons & rr.mask - - copy(p, rr.ring[start:start+n]) - rr.cons += n - - if prod == rr.cons { - return int(n), io.EOF + for { + if remaining := prod - cons; remaining == 0 { + return errEOR + } else if remaining < unix.BPF_RINGBUF_HDR_SZ { + return fmt.Errorf("read record header: %w", io.ErrUnexpectedEOF) + } + + // read the len field of the header atomically to ensure a happens before + // relationship with the xchg in the kernel. Without this we may see len + // without BPF_RINGBUF_BUSY_BIT before the written data is visible. + // See https://github.com/torvalds/linux/blob/v6.8/kernel/bpf/ringbuf.c#L484 + start := cons & rr.mask + len := atomic.LoadUint32((*uint32)((unsafe.Pointer)(&rr.ring[start]))) + header := ringbufHeader{Len: len} + + if header.isBusy() { + // the next sample in the ring is not committed yet so we + // exit without storing the reader/consumer position + // and start again from the same position. + return errBusy + } + + cons += unix.BPF_RINGBUF_HDR_SZ + + // Data is always padded to 8 byte alignment. + dataLenAligned := uint64(internal.Align(header.dataLen(), 8)) + if remaining := prod - cons; remaining < dataLenAligned { + return fmt.Errorf("read sample data: %w", io.ErrUnexpectedEOF) + } + + start = cons & rr.mask + cons += dataLenAligned + + if header.isDiscard() { + // when the record header indicates that the data should be + // discarded, we skip it by just updating the consumer position + // to the next record. + atomic.StoreUint64(rr.cons_pos, cons) + continue + } + + if n := header.dataLen(); cap(rec.RawSample) < n { + rec.RawSample = make([]byte, n) + } else { + rec.RawSample = rec.RawSample[:n] + } + + copy(rec.RawSample, rr.ring[start:]) + rec.Remaining = int(prod - cons) + atomic.StoreUint64(rr.cons_pos, cons) + return nil } - - return int(n), nil } diff --git a/ringbuf/ring_test.go b/ringbuf/ring_test.go deleted file mode 100644 index b1b6a46cf..000000000 --- a/ringbuf/ring_test.go +++ /dev/null @@ -1,67 +0,0 @@ -package ringbuf - -import ( - "bytes" - "io" - "testing" -) - -func TestRingBufferReader(t *testing.T) { - buf := make([]byte, 2) - - ring := makeRing(2, 0) - n, err := ring.Read(buf) - if err != io.EOF { - t.Error("Expected io.EOF, got", err) - } - if n != 2 { - t.Errorf("Expected to read 2 bytes, got %d", n) - } - if !bytes.Equal(buf, []byte{0, 1}) { - t.Error("Expected [0, 1], got", buf) - } - n, err = ring.Read(buf) - if err != io.EOF { - t.Error("Expected io.EOF, got", err) - } - if n != 0 { - t.Error("Expected to read 0 bytes, got", n) - } - - buf = make([]byte, 4) - - ring = makeRing(4, 4) - n, err = io.ReadFull(ring, buf) - if err != nil { - t.Error("Expected nil, got", err) - } - if n != 4 { - t.Errorf("Expected to read 4 bytes, got %d", n) - } - if !bytes.Equal(buf, []byte{0, 1, 2, 3}) { - t.Error("Expected [0, 1, 2, 3], got", buf) - } - n, err = ring.Read(buf) - if err != io.EOF { - t.Error("Expected io.EOF, got", err) - } - if n != 0 { - t.Error("Expected to read 0 bytes, got", n) - } -} - -func makeRing(size, offset int) *ringReader { - if size != 0 && (size&(size-1)) != 0 { - panic("size must be power of two") - } - - ring := make([]byte, 2*size) - for i := range ring { - ring[i] = byte(i) - } - - consumer := uint64(offset) - producer := uint64(len(ring)/2 + offset) - - return newRingReader(&consumer, &producer, ring) -}