Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Submodules #108

Merged
merged 97 commits into from
Sep 11, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
ca5a6cf
Renamed package to stanza
djaglowski Aug 24, 2020
bbb31de
Merge branch 'master' into stanza
djaglowski Aug 24, 2020
299e800
Updated new operator package
djaglowski Aug 24, 2020
70b145b
Updated changelog ahead of release
djaglowski Aug 24, 2020
44c9adb
Submodule exploration
camdencheek Aug 27, 2020
1dfcc6e
Finished preliminary reorg of operators into modules
djaglowski Sep 1, 2020
46841b9
Merged master
djaglowski Sep 1, 2020
c26f895
Fixed google cloud test that was failing on infinite loop
djaglowski Sep 1, 2020
af46c61
Fix all tests
djaglowski Sep 1, 2020
f51025f
Merged master
djaglowski Sep 1, 2020
420acc7
Update changelog
djaglowski Sep 1, 2020
0fb2bfd
Fix CI build
djaglowski Sep 1, 2020
25b9986
Rename init.go to init_common.go, so that it will not be ignored by a…
djaglowski Sep 1, 2020
0a75e8f
WIP - try fixing tests on windows CI
djaglowski Sep 1, 2020
8567b61
WIP - Try another windows testing approach
djaglowski Sep 1, 2020
32ffdc7
Try uploading joined coverage file
djaglowski Sep 1, 2020
076c485
Run all unit tests for windows, and report all coverage
djaglowski Sep 1, 2020
aa15f29
Fix return paths on windows ci tests
djaglowski Sep 1, 2020
e0a3134
Try more powershelly command
djaglowski Sep 1, 2020
0cd0842
Clean up makefile and try codecov upload with glob
djaglowski Sep 1, 2020
980cfc3
WIP
camdencheek Aug 11, 2020
1428a52
WIP
camdencheek Aug 12, 2020
14c1b3d
Make disk buffer work
camdencheek Aug 21, 2020
d5019df
Add ReadWait
camdencheek Aug 21, 2020
56a019e
Make benchmark for disk buffer
camdencheek Aug 21, 2020
560bb1f
WIP
camdencheek Aug 24, 2020
6e1c1a1
WIP
camdencheek Aug 25, 2020
7fe5b11
WORKING
camdencheek Aug 25, 2020
1a90c8d
Remove debug printlns
camdencheek Aug 25, 2020
9286e20
WIP broken
camdencheek Aug 26, 2020
fcb7c7c
Rename to stanza
camdencheek Aug 27, 2020
2a43d87
WIP
camdencheek Aug 27, 2020
711555a
WIP
camdencheek Aug 28, 2020
e6ec767
WORKING
camdencheek Aug 28, 2020
364fabe
Remove unnecessary counter
camdencheek Aug 28, 2020
9da4a59
Clean up unused
camdencheek Aug 28, 2020
0f8ccd0
Start of memory buffer
camdencheek Aug 31, 2020
e957920
Add slow memory buffer implementation
camdencheek Aug 31, 2020
b425ea5
Reorganize package
camdencheek Aug 31, 2020
d54d251
Update comments
camdencheek Aug 31, 2020
c3f9c28
WIP
camdencheek Aug 31, 2020
f9c001e
Improve performance by only seeking when necessary
camdencheek Aug 31, 2020
ebe0ad0
WIP
camdencheek Aug 31, 2020
24308f1
WIP
camdencheek Sep 1, 2020
d188aab
WIP
camdencheek Sep 1, 2020
fa51beb
Fix failure to release semaphore
camdencheek Sep 1, 2020
e0c2b94
Some code hygiene
camdencheek Sep 1, 2020
6a2a16d
Add some small tests
camdencheek Sep 1, 2020
71f9159
Make NewConfig return a pointer
camdencheek Sep 1, 2020
0961983
Fix tests
camdencheek Sep 1, 2020
8b75cd5
Tidy
djaglowski Sep 1, 2020
7a52d76
Fix make tidy target
djaglowski Sep 2, 2020
e620fd2
Merged in disk-buffers
djaglowski Sep 3, 2020
ee559ea
Fix tests and integrate with Google Cloud
camdencheek Sep 3, 2020
a04e59b
Fix remaining tests
camdencheek Sep 3, 2020
a796934
Appease linter
camdencheek Sep 3, 2020
51d296b
Merge remote-tracking branch 'origin/disk-buffer' into submod-diskbuff
djaglowski Sep 3, 2020
747ac8e
Merged disk buffers again
djaglowski Sep 3, 2020
a75331f
Add comments to public functions
camdencheek Sep 3, 2020
1e8d25a
Add test for closing and reopening
camdencheek Sep 3, 2020
a2c603e
Add comments to flusher
camdencheek Sep 3, 2020
3bdb9d8
Remove TODO
camdencheek Sep 3, 2020
f0051fd
Update diskSizeSemaphore comment
camdencheek Sep 3, 2020
17ce9ab
Update changelog
camdencheek Sep 3, 2020
f184e0d
Fis issue with creating files in the buffer package
camdencheek Sep 3, 2020
dd952bf
Merged disk-buffer again
djaglowski Sep 3, 2020
5b6b775
Add disk buffers
camdencheek Sep 3, 2020
59b2bb6
Tidy dependencies
camdencheek Sep 3, 2020
2da5b7a
Merged disk-buffer again
djaglowski Sep 3, 2020
18add81
Fix all existing tests
camdencheek Sep 7, 2020
dd629c1
Move pollForNewFiles into its own method
camdencheek Sep 8, 2020
e7934c6
Deduplicate NewFileReader
camdencheek Sep 8, 2020
580784c
Fix race condition
camdencheek Sep 8, 2020
f8965c2
Resolved merge conflicts
djaglowski Sep 8, 2020
8c53231
Update tests
camdencheek Sep 8, 2020
e8ca3a8
Fix some lints
camdencheek Sep 8, 2020
48bbe84
Minor fixes
camdencheek Sep 8, 2020
174c138
Close file during move on Windows
camdencheek Sep 8, 2020
8e927d4
Merge branch 'submod-diskbuff' into rc-0.10.0
djaglowski Sep 8, 2020
1e5d36c
Fixed failing test
djaglowski Sep 8, 2020
5a26d73
Added comments, cleaned up stutter
djaglowski Sep 8, 2020
8156ab6
Added multi file test
djaglowski Sep 8, 2020
60b8b03
Improved coverage (#110)
jmwilliams89 Sep 9, 2020
53457f2
Added file rotation test
djaglowski Sep 9, 2020
ff56186
Merged master
djaglowski Sep 9, 2020
a20666b
Merged in master, and improved test coverage
djaglowski Sep 9, 2020
99f3ed5
Merged in submodules
djaglowski Sep 9, 2020
6276c2f
Add LastSeenTime to readers
camdencheek Sep 9, 2020
07dc982
Fix data race
camdencheek Sep 9, 2020
6c8f2d4
Synchronize reading to simplify logic
camdencheek Sep 9, 2020
e4ed15e
Update fingerprint on initialize
camdencheek Sep 9, 2020
dbb2eb1
Add comments
camdencheek Sep 9, 2020
3f6a663
Remove unnecessary setOffset function
camdencheek Sep 9, 2020
a0422ff
Update fingerprint on truncate
camdencheek Sep 9, 2020
82d351e
Add a few tests
camdencheek Sep 9, 2020
49d23ab
Ignore empty lines
camdencheek Sep 9, 2020
9bc0838
File fixes (#113)
camdencheek Sep 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WORKING
  • Loading branch information
camdencheek committed Sep 1, 2020
commit 7fe5b110acf5f0d2fd7c8f9fe21d8a633ee8530b
155 changes: 112 additions & 43 deletions operator/buffer/disk/disk_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -115,17 +116,28 @@ func (d *DiskBuffer) ReadWait(dst []*entry.Entry, timeout <-chan time.Time) (fun
return d.Read(dst)
}

func (d *DiskBuffer) Read(dst []*entry.Entry) (func(), int, error) {
func (d *DiskBuffer) Read(dst []*entry.Entry) (f func(), i int, err error) {
d.Lock()
defer d.Unlock()
defer func() {
if err != nil {
mf, _ := os.Create("/tmp/metadata")
d.metadata.file.Seek(0, 0)
io.Copy(mf, d.metadata.file)

df, _ := os.Create("/tmp/data")
d.data.Seek(0, 0)
io.Copy(df, d.data)
}
}()

if d.metadata.unreadCount == 0 {
return func() {}, 0, nil
}

_, err := d.data.Seek(d.metadata.unreadStartOffset, 0)
_, err = d.data.Seek(d.metadata.unreadStartOffset, 0)
if err != nil {
return nil, 0, err
return nil, 0, fmt.Errorf("seek to unread: %s", err)
}

readCount := min(len(dst), int(d.metadata.unreadCount))
Expand All @@ -138,7 +150,7 @@ func (d *DiskBuffer) Read(dst []*entry.Entry) (func(), int, error) {
var entry entry.Entry
err := dec.Decode(&entry)
if err != nil {
return nil, 0, err
return nil, 0, fmt.Errorf("decode: %s", err)
}
dst[i] = &entry

Expand All @@ -152,7 +164,7 @@ func (d *DiskBuffer) Read(dst []*entry.Entry) (func(), int, error) {
}

d.metadata.read = append(d.metadata.read, inFlight...)
d.metadata.unreadStartOffset = currentOffset
d.metadata.unreadStartOffset = currentOffset + 1 // Add one for the trailing newline
d.metadata.unreadCount -= int64(readCount)
markFlushed := func() {
d.Lock()
Expand Down Expand Up @@ -181,49 +193,89 @@ func (d *DiskBuffer) Compact() error {
d.Lock()
defer d.Unlock()

deletedBytes := int64(0)
for {
start, end, ok := d.metadata.nextFlushedRange()
if !ok {
break
}
m := d.metadata
for i := 0; i < len(m.read); {
if m.read[i].flushed {
// If the next entry is flushed, find the range of flushed entries, then
// update the length of the dead space to include the range, delete
// the flushed entries from metadata, then sync metadata

// Find the end index of the slice of flushed entries
j := i + 1
for ; j < len(m.read); j++ {
if !m.read[i].flushed {
break
}
}

// Expand the dead range
m.deadRangeLength += onDiskSize(m.read[i:j])

// Delete the range from metadata
m.read = append(m.read[:i], m.read[j:]...)
} else {
// If the next entry is unflushed, find the range of unflushed entries
// that can fit completely inside the dead space. Copy those into the dead
// space, update their offsets, update nextIndex, update the offset of
// the dead space, then sync metadata

// Find the end index of the slice of unflushed entries, or the end index
// of the range that fits inside the dead range
j := i + 1
for ; j < len(m.read); j++ {
if m.read[i].flushed {
break
}

firstFlushed := d.metadata.read[start]
lastFlushed := d.metadata.read[end-1]
if onDiskSize(m.read[i:j]) > m.deadRangeLength {
break
}
}

startOffset := firstFlushed.startOffset
endOffset := lastFlushed.startOffset + lastFlushed.length
bytesMoved, err := d.overwriteRange(startOffset, endOffset)
if err != nil {
return err
}
// Move the range into the dead space
bytesMoved, err := d.moveRange(
m.deadRangeStart,
m.deadRangeLength,
m.read[i].startOffset,
m.read[j-1].length,
)
if err != nil {
return err
}

for _, diskEntry := range d.metadata.read[end:] {
diskEntry.startOffset -= (endOffset - startOffset)
}
// TODO this logic is wrong. We should be moving all read entries to the beginning
// of the file, then deleting the dead range created in the space before the unread entries
// Update the offsets of the moved range
offsetDelta := m.read[i].startOffset - m.deadRangeStart
for _, diskEntry := range m.read {
diskEntry.startOffset -= offsetDelta
}

// Remove range 1 from tracked diskEntries
d.metadata.read = append(d.metadata.read[:start], d.metadata.read[end:]...)
// Update the offset of the dead space
m.deadRangeStart += int64(bytesMoved)

d.metadata.deadRangeStart = endOffset
d.metadata.deadRangeLength = int64(bytesMoved)
// Update i
i = j
}

err = d.metadata.Sync()
// Sync after every operation
err := d.metadata.Sync()
if err != nil {
return err
}

d.metadata.unreadStartOffset -= (endOffset - startOffset)
deletedBytes += (endOffset - startOffset)
}

info, err := d.data.Stat()
if err != nil {
return err
// Bubble the dead space through the unflushed entries, then truncate
return d.deleteDeadRange()
}

// onDiskSize calculates the size in bytes on disk for a contiguous
// range of diskEntries
func onDiskSize(entries []*diskEntry) int64 {
if len(entries) == 0 {
return 0
}
return d.data.Truncate(info.Size() - deletedBytes)

last := entries[len(entries)-1]
return last.startOffset + last.length - entries[0].startOffset
}

func (d *DiskBuffer) deleteDeadRange() error {
Expand All @@ -232,12 +284,16 @@ func (d *DiskBuffer) deleteDeadRange() error {
return nil
}

// Keep atomically overwriting ranges until we're at the end of the file
for {
// Replace the range with the proceeding range of bytes
start := d.metadata.deadRangeStart
length := d.metadata.deadRangeLength
n, err := d.overwriteRange(start, start+length)
n, err := d.moveRange(
start,
length,
start+length,
length,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -267,21 +323,35 @@ func (d *DiskBuffer) deleteDeadRange() error {
return d.metadata.setDeadRange(0, 0)
}

func (d *DiskBuffer) overwriteRange(start, end int64) (int, error) {
readPosition := end
writePosition := start
func (d *DiskBuffer) moveRange(start1, length1, start2, length2 int64) (int, error) {
if length2 > length1 {
return 0, fmt.Errorf("cannot move a range into a space smaller than itself")
}

readPosition := start2
writePosition := start1
bytesRead := 0

rd := io.LimitReader(d.data, length2)

eof := false
for !eof {
// Seek to last read position
_, err := d.data.Seek(readPosition, 0)
if err != nil {
return 0, err
}

// Read a chunk
n, err := d.data.ReadAt(d.copyBuffer, readPosition)
n, err := rd.Read(d.copyBuffer)
if err != nil {
if err != io.EOF {
return 0, err
}
eof = true
}
readPosition += int64(n)
bytesRead += n

// Write the chunk back into a free region
_, err = d.data.WriteAt(d.copyBuffer[:n], writePosition)
Expand All @@ -290,7 +360,6 @@ func (d *DiskBuffer) overwriteRange(start, end int64) (int, error) {
}
writePosition += int64(n)

bytesRead += n
}

return bytesRead, nil
Expand Down
2 changes: 1 addition & 1 deletion operator/buffer/disk/disk_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func BenchmarkDiskBuffer(b *testing.B) {
panicOnErr(err)
i += n
go func() {
time.Sleep(100 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
flush()
}()
}
Expand Down
36 changes: 36 additions & 0 deletions operator/buffer/disk/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,39 @@ func (m *Metadata) nextFlushedRange() (int, int, bool) {

return start, end, true
}

// getCleanableRanges returns the starting and ending indexes of the two ranges of diskEntries
// where the first range is composed successfully flushed diskEntries and the second
// range is composed of
func getCleanableRanges(searchStart int, entries []*diskEntry) (start1, start2, end2 int) {
// search for the first flushed entry in range 1
for start1 = searchStart; start1 < len(entries); start1++ {
if entries[start1].flushed {
break
}
}

// search for the last flushed entry in range 1
for start2 = start1; start2 < len(entries); start2++ {
if !entries[start2].flushed {
break
}
}

range1DiskSize := onDiskSize(entries[start1:start2])

// search for the last unflushed entry, or the last entry that will allow
// range2 to fit inside the space of range 1
for end2 = start2; end2 < len(entries); end2++ {
if entries[end2].flushed {
break
}

range2DiskSize := onDiskSize(entries[start2:end2])
if range2DiskSize > range1DiskSize {
break
}
}

return start1, start2, end2
}