Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Auditbeat] system/socket: Monitor all online CPUs #22827

Merged
merged 6 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- file_integrity: stop monitoring excluded paths {issue}21278[21278] {pull}21282[21282]
- auditd: Fix an error condition causing a lot of `audit_send_reply` kernel threads being created. {pull}22673[22673]
- system/socket: Fixed start failure when run under config reloader. {issue}20851[20851] {pull}21693[21693]
- system/socket: Having some CPUs unavailable to Auditbeat could cause startup errors or event loss. {pull}22827[22827]

*Filebeat*

Expand Down
139 changes: 139 additions & 0 deletions x-pack/auditbeat/tracing/cpu.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build linux

package tracing

import (
"bytes"
"io/ioutil"
"strconv"
"strings"

"github.com/pkg/errors"
)

const (
// OnlineCPUsPath is the path to the system file listing the online CPUs.
OnlineCPUsPath = "/sys/devices/system/cpu/online"

// OfflineCPUsPath is the path to the system file listing the offline CPUs.
OfflineCPUsPath = "/sys/devices/system/cpu/offline"

// PossibleCPUsPath is the path to the system file listing the CPUs that can be brought online.
PossibleCPUsPath = "/sys/devices/system/cpu/possible"

// PresentCPUsPath is the path to the system file listing the CPUs that are identified as present.
PresentCPUsPath = "/sys/devices/system/cpu/present"

// See `Documentation/admin-guide/cputopology.rst` in the Linux kernel docs for more information
// on the above files.

// IsolatedCPUsPath is only present when CPU isolation is active, for example using the `isolcpus`
// kernel argument.
IsolatedCPUsPath = "/sys/devices/system/cpu/isolated"
)

// CPUSet represents a group of CPUs.
type CPUSet struct {
mask []bool
count int
}

// Mask returns a bitmask where each bit is set if the given CPU is present in the set.
func (s CPUSet) Mask() []bool {
return s.mask
}

// NumCPU returns the number of CPUs in the set.
func (s CPUSet) NumCPU() int {
return s.count
}

// Contains allows to query if a given CPU exists in the set.
func (s CPUSet) Contains(cpu int) bool {
if cpu < 0 || cpu >= len(s.mask) {
return false
}
return s.mask[cpu]
}

// AsList returns the list of CPUs in the set.
func (s CPUSet) AsList() []int {
list := make([]int, 0, s.count)
for num, bit := range s.mask {
if bit {
list = append(list, num)
}
}
return list
}

// NewCPUSetFromFile creates a new CPUSet from the contents of a file.
func NewCPUSetFromFile(path string) (cpus CPUSet, err error) {
contents, err := ioutil.ReadFile(path)
if err != nil {
return cpus, err
}
return NewCPUSetFromExpression(string(bytes.TrimRight(contents, "\n\r")))
}

// NewCPUSetFromExpression creates a new CPUSet from a range expression.
// Expression: RANGE ( ',' RANGE )*
// Where:
// RANGE := <NUMBER> | <NUMBER>-<NUMBER>
func NewCPUSetFromExpression(contents string) (CPUSet, error) {
var ranges [][]int
var max, count int
for _, expr := range strings.Split(contents, ",") {
if len(expr) == 0 {
continue
}
parts := strings.Split(expr, "-")
r := make([]int, 0, len(parts))
for _, numStr := range parts {
num16, err := strconv.ParseInt(numStr, 10, 16)
if err != nil || num16 < 0 {
return CPUSet{}, errors.Errorf("failed to parse integer '%s' from range '%s' at '%s'", numStr, expr, contents)
}
num := int(num16)
r = append(r, num)
if num+1 > max {
andrewstucki marked this conversation as resolved.
Show resolved Hide resolved
max = num + 1
}
}
ranges = append(ranges, r)
}
if max == 0 {
return CPUSet{}, nil
}
mask := make([]bool, max)
for _, r := range ranges {
from, to := -1, -1
switch len(r) {
case 0:
continue // Ignore empty range.
case 1:
from = r[0]
to = r[0]
case 2:
from = r[0]
to = r[1]
}
if from == -1 || to < from {
return CPUSet{}, errors.Errorf("invalid cpu range %v in '%s'", r, contents)
}
for i := from; i <= to; i++ {
if !mask[i] {
count++
mask[i] = true
}
}
}
return CPUSet{
mask: mask,
count: count,
}, nil
}
152 changes: 152 additions & 0 deletions x-pack/auditbeat/tracing/cpu_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build linux

package tracing

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestNewCPUSetFromExpression(t *testing.T) {
for _, testCase := range []struct {
content string
result CPUSet
fail bool
}{
{
content: "0",
result: CPUSet{
mask: []bool{true},
count: 1,
},
},
{
content: "0-3",
result: CPUSet{
mask: []bool{true, true, true, true},
count: 4,
},
},
{
content: "5-0",
fail: true,
},
{
content: "5-2147483648",
fail: true,
},
{
content: "0,2-2",
result: CPUSet{
mask: []bool{true, false, true},
count: 2,
},
},
{
content: "7",
result: CPUSet{
mask: []bool{false, false, false, false, false, false, false, true},
count: 1,
},
},
{
content: "-1",
fail: true,
},
{
content: "",
},
{
content: ",",
},
{
content: "-",
fail: true,
},
{
content: "3,-",
fail: true,
},
{
content: "3-4-5",
fail: true,
},
{
content: "0-4,5,6-6,,,,15",
result: CPUSet{
mask: []bool{
true, true, true, true, true, true, true, false,
false, false, false, false, false, false, false, true,
},
count: 8,
},
},
} {
mask, err := NewCPUSetFromExpression(testCase.content)
if !assert.Equal(t, testCase.fail, err != nil, testCase.content) {
t.Fatal(err)
}
assert.Equal(t, testCase.result, mask, testCase.content)
}
}

func TestCPUSet(t *testing.T) {
for _, test := range []struct {
expr string
num int
isSet func(int) bool
list []int
}{
{
expr: "0-2,5",
num: 4,
isSet: func(i int) bool { return i == 5 || (i >= 0 && i < 3) },
list: []int{0, 1, 2, 5},
},
{
expr: "0",
num: 1,
isSet: func(i int) bool { return i == 0 },
list: []int{0},
},
{
expr: "2",
num: 1,
isSet: func(i int) bool { return i == 2 },
list: []int{2},
},
{
expr: "0-7",
num: 8,
isSet: func(i int) bool { return i >= 0 && i < 8 },
list: []int{0, 1, 2, 3, 4, 5, 6, 7},
},
{
expr: "",
num: 0,
isSet: func(i int) bool { return false },
list: []int{},
},
{
expr: "1-2,0,2,0-0,0-1",
num: 3,
isSet: func(i int) bool { return i >= 0 && i < 3 },
list: []int{0, 1, 2},
},
} {
set, err := NewCPUSetFromExpression(test.expr)
if !assert.NoError(t, err, test.expr) {
t.Fatal(err)
}
assert.Equal(t, test.num, set.NumCPU(), test.expr)
for i := -1; i < 10; i++ {
assert.Equal(t, test.isSet(i), set.Contains(i), test.expr)
}
assert.Equal(t, test.list, set.AsList(), test.expr)
}
}
24 changes: 18 additions & 6 deletions x-pack/auditbeat/tracing/perfevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"context"
"fmt"
"os"
"runtime"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -55,7 +54,7 @@ type PerfChannel struct {

running uintptr
wg sync.WaitGroup
numCPUs int
cpus CPUSet

// Settings
attr perf.Attr
Expand Down Expand Up @@ -98,7 +97,6 @@ func NewPerfChannel(cfg ...PerfChannelConf) (channel *PerfChannel, err error) {
done: make(chan struct{}, 0),
streams: make(map[uint64]stream),
pid: perf.AllThreads,
numCPUs: runtime.NumCPU(),
attr: perf.Attr{
Type: perf.TracepointEvent,
SampleFormat: perf.SampleFormat{
Expand All @@ -112,6 +110,19 @@ func NewPerfChannel(cfg ...PerfChannelConf) (channel *PerfChannel, err error) {
channel.attr.SetSamplePeriod(1)
channel.attr.SetWakeupEvents(1)

// Load the list of online CPUs from /sys/devices/system/cpu/online.
// This is necessary in order to to install each kprobe on all online CPUs.
//
// Note:
// There's currently no mechanism to adapt to CPUs being added or removed
// at runtime (CPU hotplug).
channel.cpus, err = NewCPUSetFromFile(OnlineCPUsPath)
if err != nil {
return nil, errors.Wrap(err, "error listing online CPUs")
}
if channel.cpus.NumCPU() < 1 {
return nil, errors.New("couldn't list online CPUs")
}
// Set configuration
for _, fun := range cfg {
if err = fun(channel); err != nil {
Expand Down Expand Up @@ -210,14 +221,15 @@ func WithPollTimeout(timeout time.Duration) PerfChannelConf {
func (c *PerfChannel) MonitorProbe(format ProbeFormat, decoder Decoder) error {
c.attr.Config = uint64(format.ID)
doGroup := len(c.events) > 0
for idx := 0; idx < c.numCPUs; idx++ {
cpuList := c.cpus.AsList()
for idx, cpu := range cpuList {
var group *perf.Event
var flags int
if doGroup {
group = c.events[idx]
flags = unix.PERF_FLAG_FD_NO_GROUP | unix.PERF_FLAG_FD_OUTPUT
}
ev, err := perf.OpenWithFlags(&c.attr, c.pid, idx, group, flags)
ev, err := perf.OpenWithFlags(&c.attr, c.pid, cpu, group, flags)
if err != nil {
return err
}
Expand Down Expand Up @@ -352,7 +364,7 @@ func makeMetadata(eventID int, record *perf.SampleRecord) Metadata {
func (c *PerfChannel) channelLoop() {
defer c.wg.Done()
ctx := doneWrapperContext(c.done)
merger := newRecordMerger(c.events[:c.numCPUs], c, c.pollTimeout)
merger := newRecordMerger(c.events[:c.cpus.NumCPU()], c, c.pollTimeout)
for {
// Read the available event from all the monitored ring-buffers that
// has the smallest timestamp.
Expand Down