Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Submodules #108

Merged
merged 97 commits into from
Sep 11, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
ca5a6cf
Renamed package to stanza
djaglowski Aug 24, 2020
bbb31de
Merge branch 'master' into stanza
djaglowski Aug 24, 2020
299e800
Updated new operator package
djaglowski Aug 24, 2020
70b145b
Updated changelog ahead of release
djaglowski Aug 24, 2020
44c9adb
Submodule exploration
camdencheek Aug 27, 2020
1dfcc6e
Finished preliminary reorg of operators into modules
djaglowski Sep 1, 2020
46841b9
Merged master
djaglowski Sep 1, 2020
c26f895
Fixed google cloud test that was failing on infinite loop
djaglowski Sep 1, 2020
af46c61
Fix all tests
djaglowski Sep 1, 2020
f51025f
Merged master
djaglowski Sep 1, 2020
420acc7
Update changelog
djaglowski Sep 1, 2020
0fb2bfd
Fix CI build
djaglowski Sep 1, 2020
25b9986
Rename init.go to init_common.go, so that it will not be ignored by a…
djaglowski Sep 1, 2020
0a75e8f
WIP - try fixing tests on windows CI
djaglowski Sep 1, 2020
8567b61
WIP - Try another windows testing approach
djaglowski Sep 1, 2020
32ffdc7
Try uploading joined coverage file
djaglowski Sep 1, 2020
076c485
Run all unit tests for windows, and report all coverage
djaglowski Sep 1, 2020
aa15f29
Fix return paths on windows ci tests
djaglowski Sep 1, 2020
e0a3134
Try more powershelly command
djaglowski Sep 1, 2020
0cd0842
Clean up makefile and try codecov upload with glob
djaglowski Sep 1, 2020
980cfc3
WIP
camdencheek Aug 11, 2020
1428a52
WIP
camdencheek Aug 12, 2020
14c1b3d
Make disk buffer work
camdencheek Aug 21, 2020
d5019df
Add ReadWait
camdencheek Aug 21, 2020
56a019e
Make benchmark for disk buffer
camdencheek Aug 21, 2020
560bb1f
WIP
camdencheek Aug 24, 2020
6e1c1a1
WIP
camdencheek Aug 25, 2020
7fe5b11
WORKING
camdencheek Aug 25, 2020
1a90c8d
Remove debug printlns
camdencheek Aug 25, 2020
9286e20
WIP broken
camdencheek Aug 26, 2020
fcb7c7c
Rename to stanza
camdencheek Aug 27, 2020
2a43d87
WIP
camdencheek Aug 27, 2020
711555a
WIP
camdencheek Aug 28, 2020
e6ec767
WORKING
camdencheek Aug 28, 2020
364fabe
Remove unnecessary counter
camdencheek Aug 28, 2020
9da4a59
Clean up unused
camdencheek Aug 28, 2020
0f8ccd0
Start of memory buffer
camdencheek Aug 31, 2020
e957920
Add slow memory buffer implementation
camdencheek Aug 31, 2020
b425ea5
Reorganize package
camdencheek Aug 31, 2020
d54d251
Update comments
camdencheek Aug 31, 2020
c3f9c28
WIP
camdencheek Aug 31, 2020
f9c001e
Improve performance by only seeking when necessary
camdencheek Aug 31, 2020
ebe0ad0
WIP
camdencheek Aug 31, 2020
24308f1
WIP
camdencheek Sep 1, 2020
d188aab
WIP
camdencheek Sep 1, 2020
fa51beb
Fix failure to release semaphore
camdencheek Sep 1, 2020
e0c2b94
Some code hygiene
camdencheek Sep 1, 2020
6a2a16d
Add some small tests
camdencheek Sep 1, 2020
71f9159
Make NewConfig return a pointer
camdencheek Sep 1, 2020
0961983
Fix tests
camdencheek Sep 1, 2020
8b75cd5
Tidy
djaglowski Sep 1, 2020
7a52d76
Fix make tidy target
djaglowski Sep 2, 2020
e620fd2
Merged in disk-buffers
djaglowski Sep 3, 2020
ee559ea
Fix tests and integrate with Google Cloud
camdencheek Sep 3, 2020
a04e59b
Fix remaining tests
camdencheek Sep 3, 2020
a796934
Appease linter
camdencheek Sep 3, 2020
51d296b
Merge remote-tracking branch 'origin/disk-buffer' into submod-diskbuff
djaglowski Sep 3, 2020
747ac8e
Merged disk buffers again
djaglowski Sep 3, 2020
a75331f
Add comments to public functions
camdencheek Sep 3, 2020
1e8d25a
Add test for closing and reopening
camdencheek Sep 3, 2020
a2c603e
Add comments to flusher
camdencheek Sep 3, 2020
3bdb9d8
Remove TODO
camdencheek Sep 3, 2020
f0051fd
Update diskSizeSemaphore comment
camdencheek Sep 3, 2020
17ce9ab
Update changelog
camdencheek Sep 3, 2020
f184e0d
Fis issue with creating files in the buffer package
camdencheek Sep 3, 2020
dd952bf
Merged disk-buffer again
djaglowski Sep 3, 2020
5b6b775
Add disk buffers
camdencheek Sep 3, 2020
59b2bb6
Tidy dependencies
camdencheek Sep 3, 2020
2da5b7a
Merged disk-buffer again
djaglowski Sep 3, 2020
18add81
Fix all existing tests
camdencheek Sep 7, 2020
dd629c1
Move pollForNewFiles into its own method
camdencheek Sep 8, 2020
e7934c6
Deduplicate NewFileReader
camdencheek Sep 8, 2020
580784c
Fix race condition
camdencheek Sep 8, 2020
f8965c2
Resolved merge conflicts
djaglowski Sep 8, 2020
8c53231
Update tests
camdencheek Sep 8, 2020
e8ca3a8
Fix some lints
camdencheek Sep 8, 2020
48bbe84
Minor fixes
camdencheek Sep 8, 2020
174c138
Close file during move on Windows
camdencheek Sep 8, 2020
8e927d4
Merge branch 'submod-diskbuff' into rc-0.10.0
djaglowski Sep 8, 2020
1e5d36c
Fixed failing test
djaglowski Sep 8, 2020
5a26d73
Added comments, cleaned up stutter
djaglowski Sep 8, 2020
8156ab6
Added multi file test
djaglowski Sep 8, 2020
60b8b03
Improved coverage (#110)
jmwilliams89 Sep 9, 2020
53457f2
Added file rotation test
djaglowski Sep 9, 2020
ff56186
Merged master
djaglowski Sep 9, 2020
a20666b
Merged in master, and improved test coverage
djaglowski Sep 9, 2020
99f3ed5
Merged in submodules
djaglowski Sep 9, 2020
6276c2f
Add LastSeenTime to readers
camdencheek Sep 9, 2020
07dc982
Fix data race
camdencheek Sep 9, 2020
6c8f2d4
Synchronize reading to simplify logic
camdencheek Sep 9, 2020
e4ed15e
Update fingerprint on initialize
camdencheek Sep 9, 2020
dbb2eb1
Add comments
camdencheek Sep 9, 2020
3f6a663
Remove unnecessary setOffset function
camdencheek Sep 9, 2020
a0422ff
Update fingerprint on truncate
camdencheek Sep 9, 2020
82d351e
Add a few tests
camdencheek Sep 9, 2020
49d23ab
Ignore empty lines
camdencheek Sep 9, 2020
9bc0838
File fixes (#113)
camdencheek Sep 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add comments to public functions
  • Loading branch information
camdencheek committed Sep 3, 2020
commit a75331f32b4404dd81b2c6fec44961c8bacb0586
6 changes: 4 additions & 2 deletions operator/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (

type Buffer interface {
Add(context.Context, *entry.Entry) error
Read([]*entry.Entry) (func(), int, error)
ReadWait(context.Context, []*entry.Entry) (func(), int, error)
Read([]*entry.Entry) (FlushFunc, int, error)
ReadWait(context.Context, []*entry.Entry) (FlushFunc, int, error)
Close() error
}

Expand Down Expand Up @@ -75,3 +75,5 @@ func (bc *Config) unmarshal(unmarshal func(interface{}) error) error {

return nil
}

type FlushFunc func() error
79 changes: 72 additions & 7 deletions operator/buffer/buffer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package buffer

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -9,23 +10,28 @@ import (

func TestBufferUnmarshalYAML(t *testing.T) {
cases := []struct {
name string
input []byte
expected Config
name string
yaml []byte
json []byte
expected Config
expectError bool
}{
{
"SimpleMemory",
[]byte("type: memory\nmax_entries: 30\n"),
[]byte(`{"type": "memory", "max_entries": 30}`),
Config{
Type: "memory",
BufferBuilder: &MemoryBufferConfig{
MaxEntries: 30,
},
},
false,
},
{
"SimpleDisk",
[]byte("type: disk\nmax_size: 1234\npath: /var/log/testpath\n"),
[]byte(`{"type": "disk", "max_size": 1234, "path": "/var/log/testpath"}`),
Config{
Type: "disk",
BufferBuilder: &DiskBufferConfig{
Expand All @@ -34,15 +40,74 @@ func TestBufferUnmarshalYAML(t *testing.T) {
Sync: true,
},
},
false,
},
{
"UnknownType",
[]byte("type: invalid\n"),
[]byte(`{"type": "invalid"}`),
Config{
Type: "disk",
BufferBuilder: &DiskBufferConfig{
MaxSize: 1234,
Path: "/var/log/testpath",
Sync: true,
},
},
true,
},
{
"InvalidType",
[]byte("type: !!float 123\n"),
[]byte(`{"type": 12}`),
Config{
Type: "disk",
BufferBuilder: &DiskBufferConfig{
MaxSize: 1234,
Path: "/var/log/testpath",
Sync: true,
},
},
true,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
var b Config
err := yaml.Unmarshal(tc.input, &b)
require.NoError(t, err)
require.Equal(t, tc.expected, b)
t.Run("YAML", func(t *testing.T) {
var b Config
err := yaml.Unmarshal(tc.yaml, &b)
if tc.expectError {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.expected, b)
})

t.Run("JSON", func(t *testing.T) {
var b Config
err := json.Unmarshal(tc.json, &b)
if tc.expectError {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.expected, b)
})
})
}
}

func TestBuffer(t *testing.T) {
t.Run("Default", func(t *testing.T) {
cfg := NewConfig()
expected := Config{
Type: "memory",
BufferBuilder: &MemoryBufferConfig{
MaxEntries: 1 << 20,
},
}
require.Equal(t, expected, cfg)
})
}
38 changes: 24 additions & 14 deletions operator/buffer/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@ import (
"golang.org/x/sync/semaphore"
)

// DiskBufferConfig is a configuration struct for a DiskBuffer
type DiskBufferConfig struct {
// TODO make this configurable in human-readable terms
MaxSize int `json:"max_size" yaml:"max_size"`
Path string `json:"path" yaml:"path"`
Sync bool `json:"sync" yaml:"sync"`
// MaxSize is the maximum size in bytes of the data file on disk
MaxSize int `json:"max_size" yaml:"max_size"`

// Path is a path to a directory which contains the data and metadata files
Path string `json:"path" yaml:"path"`

// Sync indicates whether to open the files with O_SYNC. If this is set to false,
// in cases like power failures or unclean shutdowns, logs may be lost or the
// database may become corrupted.
Sync bool `json:"sync" yaml:"sync"`
}

// NewDiskBufferConfig creates a new default disk buffer config
Expand All @@ -31,6 +38,7 @@ func NewDiskBufferConfig() *DiskBufferConfig {
}
}

// Build creates a new Buffer from a DiskBufferConfig
func (c DiskBufferConfig) Build(context operator.BuildContext, _ string) (Buffer, error) {
b := NewDiskBuffer(c.MaxSize)
if err := b.Open(c.Path, c.Sync); err != nil {
Expand All @@ -39,6 +47,8 @@ func (c DiskBufferConfig) Build(context operator.BuildContext, _ string) (Buffer
return b, nil
}

// DiskBuffer is a buffer for storing entries on disk until they are flushed to their
// final destination.
type DiskBuffer struct {
// Metadata holds information about the current state of the buffered entries
metadata *Metadata
Expand Down Expand Up @@ -187,7 +197,7 @@ func (d *DiskBuffer) addUnreadCount(i int64) {
// buffer to fill dst or the context is cancelled. This amortizes the cost of reading from the
// disk. It returns a function that, when called, marks the read entries as flushed, the
// number of entries read, and an error.
func (d *DiskBuffer) ReadWait(ctx context.Context, dst []*entry.Entry) (func(), int, error) {
func (d *DiskBuffer) ReadWait(ctx context.Context, dst []*entry.Entry) (FlushFunc, int, error) {
d.readerLock.Lock()
defer d.readerLock.Unlock()

Expand All @@ -209,13 +219,13 @@ LOOP:

// Read copies entries from the disk into the destination buffer. It returns a function that,
// when called, marks the entries as flushed, the number of entries read, and an error.
func (d *DiskBuffer) Read(dst []*entry.Entry) (f func(), i int, err error) {
func (d *DiskBuffer) Read(dst []*entry.Entry) (f FlushFunc, i int, err error) {
d.Lock()
defer d.Unlock()

// Return fast if there are no unread entries
if d.metadata.unreadCount == 0 {
return d.checkCompact, 0, nil
return d.newFlushFunc(nil), 0, nil
}

// Seek to the start of the range of unread entries
Expand Down Expand Up @@ -262,33 +272,33 @@ func (d *DiskBuffer) Read(dst []*entry.Entry) (f func(), i int, err error) {
}

// newFlushFunc returns a function that marks read entries as flushed
func (d *DiskBuffer) newFlushFunc(newRead []*readEntry) func() {
return func() {
func (d *DiskBuffer) newFlushFunc(newRead []*readEntry) FlushFunc {
return func() error {
d.Lock()
for _, entry := range newRead {
entry.flushed = true
d.flushedBytes += entry.length
}
d.Unlock()
d.checkCompact()
return d.checkCompact()
}
}

// checkCompact checks if a compaction should be performed, then kicks one off
func (d *DiskBuffer) checkCompact() {
func (d *DiskBuffer) checkCompact() error {
d.Lock()
switch {
case d.flushedBytes > d.maxBytes/2:
fallthrough
case time.Since(d.lastCompaction) > 5*time.Second:
d.Unlock()
err := d.Compact()
if err != nil {
panic(err) // TODO how to report this error back to caller?
if err := d.Compact(); err != nil {
return err
}
default:
d.Unlock()
}
return nil
}

// Compact removes all flushed entries from disk
Expand Down
7 changes: 5 additions & 2 deletions operator/buffer/disk_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"os"
)

// Metadata is a representation of the on-disk metadata file. It contains
// information about the layout, location, and flushed status of entries
// stored in the data file
type Metadata struct {
// File is a handle to the on-disk metadata store
//
Expand All @@ -20,8 +23,8 @@ type Metadata struct {
// - 8 byte ReadCount as LittleEndian int64
// - Repeated ReadCount times:
// - 1 byte Flushed bool LittleEndian
// - 8 byte Length as LittleEndian uint64
// - 8 byte StartOffset as LittleEndian uint64
// - 8 byte Length as LittleEndian int64
// - 8 byte StartOffset as LittleEndian int64
file *os.File

// read is the collection of entries that have been read
Expand Down
119 changes: 86 additions & 33 deletions operator/buffer/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,43 +162,96 @@ func TestDiskBuffer(t *testing.T) {
readN(t, b2, 10, 10)
})

t.Run("Write10kRandomFlushReadCompact", func(t *testing.T) {
t.Run("ReadWaitTimesOut", func(t *testing.T) {
t.Parallel()
b := openBuffer(t)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
dst := make([]*entry.Entry, 10)
_, n, err := b.ReadWait(ctx, dst)
require.NoError(t, err)
require.Equal(t, 0, n)
})

t.Run("AddTimesOut", func(t *testing.T) {
t.Parallel()
b := NewDiskBuffer(100) // Enough space for 1, but not 2 entries
dir := testutil.NewTempDir(t)
err := b.Open(dir, false)
require.NoError(t, err)

// Add a first entry
err = b.Add(context.Background(), entry.New())
require.NoError(t, err)

// Second entry should block and be cancelled
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
err = b.Add(ctx, entry.New())
require.Error(t, err)
cancel()

// Read, flush, and compact
dst := make([]*entry.Entry, 1)
f, n, err := b.Read(dst)
require.NoError(t, err)
require.Equal(t, 1, n)
f()
require.NoError(t, b.Compact())

// Now there should be space for another entry
err = b.Add(context.Background(), entry.New())
require.NoError(t, err)
})

t.Run("Write1kRandomFlushReadCompact", func(t *testing.T) {
t.Parallel()
rand.Seed(time.Now().Unix())
for i := 0; i < 10; i++ {
seed := rand.Int63()
t.Run(strconv.Itoa(int(seed)), func(t *testing.T) {
t.Parallel()
r := rand.New(rand.NewSource(seed))

b := NewDiskBuffer(1 << 30)
dir := testutil.NewTempDir(t)
err := b.Open(dir, false)
require.NoError(t, err)

writes := 0
reads := 0

for i := 0; i < 10000; i++ {
j := r.Int() % 1000
switch {
case j < 900:
writeN(t, b, 1, writes)
writes++
case j < 990:
readCount := (writes - reads) / 2
f := readN(t, b, readCount, reads)
if j%2 == 0 {
f()
}
reads += readCount
default:
err := b.Compact()
require.NoError(t, err)
seed := rand.Int63()
t.Run(strconv.Itoa(int(seed)), func(t *testing.T) {
t.Parallel()
r := rand.New(rand.NewSource(seed))

b := NewDiskBuffer(1 << 30)
dir := testutil.NewTempDir(t)
err := b.Open(dir, false)
require.NoError(t, err)

writes := 0
reads := 0

for i := 0; i < 1000; i++ {
j := r.Int() % 1000
switch {
case j < 900:
writeN(t, b, 1, writes)
writes++
case j < 990:
readCount := (writes - reads) / 2
f := readN(t, b, readCount, reads)
if j%2 == 0 {
f()
}
reads += readCount
default:
err := b.Compact()
require.NoError(t, err)
}
})
}
}
})
})
}

func TestDiskBufferBuild(t *testing.T) {
t.Run("Default", func(t *testing.T) {
cfg := NewDiskBufferConfig()
b, err := cfg.Build(testutil.NewBuildContext(t), "test")
require.NoError(t, err)
diskBuffer := b.(*DiskBuffer)
require.Equal(t, diskBuffer.atEnd, false)
require.Len(t, diskBuffer.entryAdded, 1)
require.Equal(t, diskBuffer.maxBytes, int64(1<<32))
require.Equal(t, diskBuffer.flushedBytes, int64(0))
require.Len(t, diskBuffer.copyBuffer, 1<<16)
})
}

Expand Down
Loading