Skip to content

Commit

Permalink
Optimize hunt_dispatcher GetFlows() API (#3561)
Browse files Browse the repository at this point in the history
In many cases we do not need the full enriched and current flow
information (e.g. when deleting a hunt) so we need a way to specify only
retrieving basic information.
  • Loading branch information
scudette committed Jun 30, 2024
1 parent b3beb29 commit 474af2f
Show file tree
Hide file tree
Showing 19 changed files with 127 additions and 45 deletions.
6 changes: 5 additions & 1 deletion api/hunts.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ func (self *ApiServer) GetHuntFlows(

scope := vql_subsystem.MakeScope()
flow_chan, total_rows, err := hunt_dispatcher.GetFlows(
ctx, org_config_obj, options, scope, in.HuntId, int(in.StartRow))
ctx, org_config_obj,
services.FlowSearchOptions{
ResultSetOptions: options,
},
scope, in.HuntId, int(in.StartRow))
if err != nil {
return nil, Status(self.verbose, err)
}
Expand Down
2 changes: 2 additions & 0 deletions artifacts/definitions/Generic/Client/Profile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ parameters:
- name: Allocs
description: A sampling of all past memory allocations
type: bool
default: Y
- name: Block
description: Stack traces that led to blocking on synchronization primitives
type: bool
- name: Goroutine
description: Stack traces of all current goroutines
type: bool
default: Y
- name: Heap
description: A sampling of memory allocations of live objects
type: bool
Expand Down
2 changes: 2 additions & 0 deletions artifacts/definitions/Server/Monitor/Profile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ parameters:
- name: Allocs
description: A sampling of all past memory allocations
type: bool
default: Y
- name: Block
description: Stack traces that led to blocking on synchronization primitives
type: bool
- name: Goroutine
description: Stack traces of all current goroutines
type: bool
default: Y
- name: Heap
description: A sampling of memory allocations of live objects
type: bool
Expand Down
9 changes: 9 additions & 0 deletions file_store/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ type MemoryWriter struct {
}

func (self *MemoryWriter) Size() (int64, error) {
self.memory_file_store.mu.Lock()
defer self.memory_file_store.mu.Unlock()

return int64(len(self.buf)), nil
}

Expand Down Expand Up @@ -175,6 +178,9 @@ func (self *MemoryWriter) Update(data []byte, offset int64) error {
func (self *MemoryWriter) Write(data []byte) (int, error) {
defer api.InstrumentWithDelay("write", "MemoryWriter", nil)()

self.memory_file_store.mu.Lock()
defer self.memory_file_store.mu.Unlock()

self.buf = append(self.buf, data...)
return len(data), nil
}
Expand Down Expand Up @@ -215,6 +221,9 @@ func (self *MemoryWriter) Close() error {
func (self *MemoryWriter) Truncate() error {
defer api.InstrumentWithDelay("truncate", "MemoryWriter", nil)()

self.memory_file_store.mu.Lock()
defer self.memory_file_store.mu.Unlock()

self.buf = nil
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion services/client_info/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (self *Store) StartHouseKeep(
return

case <-utils.GetTime().After(utils.Jitter(utils.Jitter(delay))):
if utils.GetTime().Now().Sub(last_run) < time.Second {
if utils.GetTime().Now().Sub(last_run) < 10*time.Second {
utils.SleepWithCtx(ctx, time.Minute)
continue
}
Expand Down
11 changes: 10 additions & 1 deletion services/hunt_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ const (
OnlyRunningHunts
)

type FlowSearchOptions struct {
result_sets.ResultSetOptions

// Additional Options for efficient search.

// BasicInformation includes only client id and flow id.
BasicInformation bool
}

type IHuntDispatcher interface {
// Applies the function on all the hunts. Functions may not
// modify the hunt but will have read only access to the hunt
Expand Down Expand Up @@ -106,7 +115,7 @@ type IHuntDispatcher interface {

// Paged view into the flows in the hunt
GetFlows(ctx context.Context, config_obj *config_proto.Config,
options result_sets.ResultSetOptions, scope vfilter.Scope,
options FlowSearchOptions, scope vfilter.Scope,
hunt_id string, start int) (
output chan *api_proto.FlowDetails, total_rows int64, err error)

Expand Down
29 changes: 23 additions & 6 deletions services/hunt_dispatcher/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
api_proto "www.velocidex.com/golang/velociraptor/api/proto"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/file_store"
flows_proto "www.velocidex.com/golang/velociraptor/flows/proto"
"www.velocidex.com/golang/velociraptor/json"
"www.velocidex.com/golang/velociraptor/logging"
"www.velocidex.com/golang/velociraptor/paths"
Expand Down Expand Up @@ -105,7 +106,7 @@ func (self *HuntDispatcher) syncFlowTables(
func (self *HuntDispatcher) GetFlows(
ctx context.Context,
config_obj *config_proto.Config,
options result_sets.ResultSetOptions, scope vfilter.Scope,
options services.FlowSearchOptions, scope vfilter.Scope,
hunt_id string, start int) (chan *api_proto.FlowDetails, int64, error) {

output_chan := make(chan *api_proto.FlowDetails)
Expand All @@ -124,7 +125,7 @@ func (self *HuntDispatcher) GetFlows(
file_store_factory := file_store.GetFileStore(config_obj)
rs_reader, err := result_sets.NewResultSetReaderWithOptions(
ctx, self.config_obj, file_store_factory,
table_to_query, options)
table_to_query, options.ResultSetOptions)
if err != nil {
close(output_chan)
return output_chan, 0, err
Expand Down Expand Up @@ -166,10 +167,26 @@ func (self *HuntDispatcher) GetFlows(
}
}

collection_context, err := launcher.GetFlowDetails(
ctx, config_obj, client_id, flow_id)
if err != nil {
continue
var collection_context *api_proto.FlowDetails

if options.BasicInformation {
collection_context = &api_proto.FlowDetails{
Context: &flows_proto.ArtifactCollectorContext{
ClientId: client_id,
SessionId: flow_id,
},
}

// If the user wants detailed flow information we need
// to fetch this now. For many uses this is not
// necessary so we can get away with very basic
// information.
} else {
collection_context, err = launcher.GetFlowDetails(
ctx, config_obj, client_id, flow_id)
if err != nil {
continue
}
}

select {
Expand Down
17 changes: 13 additions & 4 deletions services/hunt_dispatcher/hunt_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,22 @@ type: INTERNAL
func (self *HuntDispatcherTestSuite) TestLoadingFromDisk() {
// All hunts are now running.
hunts := self.getAllHunts()
assert.Equal(self.T(), len(hunts), 5)
for _, h := range hunts {
assert.Equal(self.T(), h.State, api_proto.Hunt_RUNNING)
}

vtesting.WaitUntil(5*time.Second, self.T(), func() bool {
if len(hunts) != 5 {
return false
}
for _, h := range hunts {
if h.State != api_proto.Hunt_RUNNING {
return false
}
}
return true
})
}

func (self *HuntDispatcherTestSuite) TearDownTest() {
self.TestSuite.TearDownTest()
if self.time_closer != nil {
self.time_closer()
}
Expand Down
5 changes: 5 additions & 0 deletions services/hunt_dispatcher/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package hunt_dispatcher
import (
"context"
"sort"
"sync/atomic"
"time"

"github.com/Velocidex/ordereddict"
Expand Down Expand Up @@ -32,6 +33,10 @@ func (self *HuntStorageManagerImpl) FlushIndex(
return nil
}

if atomic.LoadInt64(&self.closed) > 0 {
return nil
}

// Debounce the flushing a bit so we dont overload the system for
// fast events. Note that flushes occur periodically anyway so if
// we skip a flush we will get it later.
Expand Down
10 changes: 9 additions & 1 deletion services/hunt_dispatcher/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ type HuntStorageManagerImpl struct {
I_am_master bool

// If any of the hunt objects are dirty this will be set.
dirty bool
dirty bool
closed int64

last_flush_time time.Time
}
Expand Down Expand Up @@ -122,6 +123,7 @@ func (self *HuntStorageManagerImpl) GetLastTimestamp() uint64 {

func (self *HuntStorageManagerImpl) Close(ctx context.Context) {
atomic.SwapUint64(&self.last_timestamp, 0)
atomic.SwapInt64(&self.closed, 1)
self.FlushIndex(ctx)
}

Expand Down Expand Up @@ -211,6 +213,7 @@ func (self *HuntStorageManagerImpl) SetHunt(

if hunt.State == api_proto.Hunt_ARCHIVED {
delete(self.hunts, hunt.HuntId)
self.dirty = true
return db.DeleteSubject(self.config_obj, hunt_path_manager.Path())
}

Expand Down Expand Up @@ -267,6 +270,11 @@ func (self *HuntStorageManagerImpl) ListHunts(
// Get the full record from memory cache
hunt_obj, err := self.GetHunt(ctx, summary.HuntId)
if err != nil {
// Something is wrong! The index is referring to a hunt we
// dont know about - we should re-flush to sync the index.
self.mu.Lock()
self.dirty = true
self.mu.Unlock()
continue
}

Expand Down
2 changes: 1 addition & 1 deletion services/launcher/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ func (self *reporter) emit_ds(
}
self.seen[client_path] = true

id := self.id
self.id++
id := self.id

self.pool.Submit(func() {
self.mu.Lock()
Expand Down
11 changes: 4 additions & 7 deletions vql/readers/paged_reader_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
//go:build !windows
// +build !windows

package readers

import (
Expand Down Expand Up @@ -80,7 +77,7 @@ func (self *TestSuite) TestPagedReader() {
buff := make([]byte, 4)

for i := 0; i < 10; i++ {
reader, err := NewPagedReader(
reader, err := NewAccessorReader(
self.scope, "file", self.filenames[i], 100)
assert.NoError(self.T(), err)
_, err = reader.ReadAt(buff, 0)
Expand All @@ -90,7 +87,7 @@ func (self *TestSuite) TestPagedReader() {
}

for i := 0; i < 10; i++ {
reader, err := NewPagedReader(self.scope, "file", self.filenames[i], 100)
reader, err := NewAccessorReader(self.scope, "file", self.filenames[i], 100)
assert.NoError(self.T(), err)
_, err = reader.ReadAt(buff, 0)
assert.NoError(self.T(), err)
Expand All @@ -99,7 +96,7 @@ func (self *TestSuite) TestPagedReader() {

// Open the same reader 10 time returns from the cache.
for i := 0; i < 10; i++ {
reader, err := NewPagedReader(self.scope, "file", self.filenames[1], 100)
reader, err := NewAccessorReader(self.scope, "file", self.filenames[1], 100)
assert.NoError(self.T(), err)

_, err = reader.ReadAt(buff, 0)
Expand All @@ -110,7 +107,7 @@ func (self *TestSuite) TestPagedReader() {

// Make sure that it is ok to close the reader at any time -
// the next read will be valid.
reader, err := NewPagedReader(self.scope, "file", self.filenames[1], 100)
reader, err := NewAccessorReader(self.scope, "file", self.filenames[1], 100)
assert.NoError(self.T(), err)

for i := 0; i < 10; i++ {
Expand Down
5 changes: 2 additions & 3 deletions vql/server/downloads/downloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,15 +801,14 @@ func createHuntDownloadFile(
return
}

options := result_sets.ResultSetOptions{}
options := services.FlowSearchOptions{BasicInformation: true}
flow_chan, _, err := hunt_dispatcher.GetFlows(sub_ctx,
config_obj, options, scope, hunt_id, 0)
if err != nil {
return
}

for flow_details := range flow_chan {

if flow_details == nil || flow_details.Context == nil {
continue
}
Expand Down Expand Up @@ -875,7 +874,7 @@ func generateCombinedResults(
defer maybeClose(json_writer)
defer maybeClose(csv_writer)

options := result_sets.ResultSetOptions{}
options := services.FlowSearchOptions{BasicInformation: true}
flow_chan, _, err := hunt_dispatcher.GetFlows(ctx,
config_obj, options, scope, hunt_details.HuntId, 0)
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions vql/server/flows/flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

"github.com/Velocidex/ordereddict"
"github.com/alecthomas/assert"
"github.com/sebdah/goldie"
"github.com/sebdah/goldie/v2"
"github.com/stretchr/testify/suite"
"google.golang.org/protobuf/types/known/emptypb"
actions_proto "www.velocidex.com/golang/velociraptor/actions/proto"
Expand Down Expand Up @@ -107,7 +107,11 @@ func (self *FilestoreTestSuite) TestEnumerateFlow() {
Set("flow_id", self.flow_id).
Set("client_id", self.client_id)))

goldie.Assert(self.T(), "TestEnumerateFlow", json.MustMarshalIndent(result))
g := goldie.New(self.T(),
goldie.WithFixtureDir("fixtures"),
goldie.WithDiffEngine(goldie.ClassicDiff))

g.Assert(self.T(), "TestEnumerateFlow", json.MustMarshalIndent(result))
}

func TestFilestorePlugin(t *testing.T) {
Expand Down
7 changes: 5 additions & 2 deletions vql/server/flows/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/Velocidex/ordereddict"
"www.velocidex.com/golang/velociraptor/acls"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/result_sets"
"www.velocidex.com/golang/velociraptor/services"
"www.velocidex.com/golang/velociraptor/vql"
vql_subsystem "www.velocidex.com/golang/velociraptor/vql"
Expand Down Expand Up @@ -227,14 +226,18 @@ func breakHuntIntoScopes(
return
}

options := result_sets.ResultSetOptions{}
options := services.FlowSearchOptions{BasicInformation: true}
flow_chan, _, err := hunt_dispatcher.GetFlows(
ctx, config_obj, options, scope, arg.HuntId, 0)
if err != nil {
return
}

for flow_details := range flow_chan {
if flow_details == nil || flow_details.Context == nil {
continue
}

flow_job, err := breakIntoScopes(ctx, config_obj, scope,
&ParallelPluginArgs{
Artifact: arg.Artifact,
Expand Down
9 changes: 7 additions & 2 deletions vql/server/flows/parallel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

"github.com/Velocidex/ordereddict"
"github.com/alecthomas/assert"
"github.com/sebdah/goldie"
"github.com/sebdah/goldie/v2"
"github.com/stretchr/testify/suite"
api_proto "www.velocidex.com/golang/velociraptor/api/proto"
"www.velocidex.com/golang/velociraptor/file_store"
Expand Down Expand Up @@ -232,7 +232,12 @@ func (self *TestSuite) TestHuntsSource() {

// Stable sort the section list so we can goldie it.
sort.Strings(sections)
goldie.Assert(self.T(), "TestHuntsSource", json.MustMarshalIndent(sections))

g := goldie.New(self.T(),
goldie.WithFixtureDir("fixtures"),
goldie.WithDiffEngine(goldie.ClassicDiff))

g.Assert(self.T(), "TestHuntsSource", json.MustMarshalIndent(sections))

vql, err := vfilter.Parse(`
SELECT * FROM parallelize(
Expand Down
Loading

0 comments on commit 474af2f

Please sign in to comment.