Skip to content

Commit

Permalink
Reuse a byte buffer for holding XML (#3118)
Browse files Browse the repository at this point in the history
Previously the data was read into a []byte encoded as UTF16. Then that
data was converted to []uint16 so that we can use utf16.Decode(). Then
the []rune slice was converted to a string which did another data copy.
The XML was unmarshalled from the string.

This PR changes the code to convert the UTF16 []byte directly to UTF8 and
puts the result into a reusable bytes.Buffer. The XML is then unmarshalled
directly from the data in buffer.

```
BenchmarkUTF16ToUTF8-4   	 2000000	      1044 ns/op        4 B/op      1 allocs/op
```

```
git checkout 6ba7700
PS > go test github.com/elastic/beats/winlogbeat/eventlog -run TestBenc -benchtest -benchtime 10s -v
=== RUN   TestBenchmarkBatchReadSize
--- PASS: TestBenchmarkBatchReadSize (67.89s)
        bench_test.go:100: batch_size=10, total_events=30000, batch_time=5.119626ms, events_per_sec=1953.2676801000696, bytes_alloced_per_event=44 kB, total_allocs=7385952
        bench_test.go:100: batch_size=100, total_events=30000, batch_time=51.366271ms, events_per_sec=1946.802795943665, bytes_alloced_per_event=44 kB, total_allocs=7354448
        bench_test.go:100: batch_size=500, total_events=25000, batch_time=250.974356ms, events_per_sec=1992.2354138842775, bytes_alloced_per_event=43 kB, total_allocs=6125812
        bench_test.go:100: batch_size=1000, total_events=30000, batch_time=514.796113ms, events_per_sec=1942.5166094834128, bytes_alloced_per_event=43 kB, total_allocs=7350550
PASS
ok      github.com/elastic/beats/winlogbeat/eventlog    67.950s

git checkout 833a806 (#3113)
PS > go test github.com/elastic/beats/winlogbeat/eventlog -run TestBenc -benchtest -benchtime 10s -v
=== RUN   TestBenchmarkBatchReadSize
--- PASS: TestBenchmarkBatchReadSize (65.69s)
        bench_test.go:100: batch_size=10, total_events=30000, batch_time=4.858277ms, events_per_sec=2058.3429063431336, bytes_alloced_per_event=25 kB, total_allocs=7385847
        bench_test.go:100: batch_size=100, total_events=30000, batch_time=51.612952ms, events_per_sec=1937.49816906423, bytes_alloced_per_event=24 kB, total_allocs=7354362
        bench_test.go:100: batch_size=500, total_events=25000, batch_time=241.713826ms, events_per_sec=2068.561853801445, bytes_alloced_per_event=24 kB, total_allocs=6125757
        bench_test.go:100: batch_size=1000, total_events=30000, batch_time=494.961643ms, events_per_sec=2020.3585755431961, bytes_alloced_per_event=24 kB, total_allocs=7350474
PASS
ok      github.com/elastic/beats/winlogbeat/eventlog    65.747s

This PR (#3118)
PS > go test github.com/elastic/beats/winlogbeat/eventlog -run TestBenc -benchtest -benchtime 10s -v
=== RUN   TestBenchmarkBatchReadSize
--- PASS: TestBenchmarkBatchReadSize (65.80s)
        bench_test.go:100: batch_size=10, total_events=30000, batch_time=4.925281ms, events_per_sec=2030.341009985014, bytes_alloced_per_event=14 kB, total_allocs=7295817
        bench_test.go:100: batch_size=100, total_events=30000, batch_time=48.976134ms, events_per_sec=2041.8108134055658, bytes_alloced_per_event=14 kB, total_allocs=7264329
        bench_test.go:100: batch_size=500, total_events=25000, batch_time=250.314316ms, events_per_sec=1997.4886294557757, bytes_alloced_per_event=14 kB, total_allocs=6050719
        bench_test.go:100: batch_size=1000, total_events=30000, batch_time=499.861923ms, events_per_sec=2000.5524605641945, bytes_alloced_per_event=14 kB, total_allocs=7260400
PASS
ok      github.com/elastic/beats/winlogbeat/eventlog    65.856s
```
  • Loading branch information
andrewkroh authored and ruflin committed Dec 7, 2016
1 parent 7a9f1bf commit 88d68dc
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ https://github.com/elastic/beats/compare/v5.0.1...master[Check the HEAD diff]
*Winlogbeat*

- Add `event_logs.batch_read_size` configuration option. {pull}2641[2641]
- Reduced amount of memory allocated while reading event log records. {pull}3113[3113] {pull}3118[3113]

==== Deprecated

Expand Down
35 changes: 20 additions & 15 deletions winlogbeat/eventlog/wineventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package eventlog

import (
"fmt"
"io"
"syscall"
"time"

Expand Down Expand Up @@ -75,9 +76,10 @@ type winEventLog struct {
maxRead int // Maximum number returned in one Read.
lastRead uint64 // Record number of the last read event.

render func(event win.EvtHandle) (string, error) // Function for rendering the event to XML.
renderBuf []byte // Buffer used for rendering event.
cache *messageFilesCache // Cached mapping of source name to event message file handles.
render func(event win.EvtHandle, out io.Writer) error // Function for rendering the event to XML.
renderBuf []byte // Buffer used for rendering event.
outputBuf *sys.ByteBuffer // Buffer for receiving XML
cache *messageFilesCache // Cached mapping of source name to event message file handles.

logPrefix string // String to prefix on log messages.
eventMetadata common.EventMetadata // Field and tags to add to each event.
Expand Down Expand Up @@ -132,20 +134,22 @@ func (l *winEventLog) Read() ([]Record, error) {

var records []Record
for _, h := range handles {
x, err := l.render(h)
l.outputBuf.Reset()
err := l.render(h, l.outputBuf)
if bufErr, ok := err.(sys.InsufficientBufferError); ok {
detailf("%s Increasing render buffer size to %d", l.logPrefix,
bufErr.RequiredSize)
l.renderBuf = make([]byte, bufErr.RequiredSize)
x, err = l.render(h)
l.outputBuf.Reset()
err = l.render(h, l.outputBuf)
}
if err != nil && x == "" {
if err != nil && l.outputBuf.Len() == 0 {
logp.Err("%s Dropping event with rendering error. %v", l.logPrefix, err)
incrementMetric(dropReasons, err)
continue
}

r, err := l.buildRecordFromXML(x, err)
r, err := l.buildRecordFromXML(l.outputBuf.Bytes(), err)
if err != nil {
logp.Err("%s Dropping event. %v", l.logPrefix, err)
incrementMetric(dropReasons, err)
Expand Down Expand Up @@ -192,8 +196,8 @@ func (l *winEventLog) eventHandles(maxRead int) ([]win.EvtHandle, int, error) {
}
}

func (l *winEventLog) buildRecordFromXML(x string, recoveredErr error) (Record, error) {
e, err := sys.UnmarshalEventXML([]byte(x))
func (l *winEventLog) buildRecordFromXML(x []byte, recoveredErr error) (Record, error) {
e, err := sys.UnmarshalEventXML(x)
if err != nil {
return Record{}, fmt.Errorf("Failed to unmarshal XML='%s'. %v", x, err)
}
Expand All @@ -213,7 +217,7 @@ func (l *winEventLog) buildRecordFromXML(x string, recoveredErr error) (Record,
}

if logp.IsDebug(detailSelector) {
detailf("%s XML=%s Event=%+v", l.logPrefix, x, e)
detailf("%s XML=%s Event=%+v", l.logPrefix, string(x), e)
}

r := Record{
Expand All @@ -223,7 +227,7 @@ func (l *winEventLog) buildRecordFromXML(x string, recoveredErr error) (Record,
}

if l.config.IncludeXML {
r.XML = x
r.XML = string(x)
}

return r, nil
Expand Down Expand Up @@ -270,6 +274,7 @@ func newWinEventLog(options map[string]interface{}) (EventLog, error) {
channelName: c.Name,
maxRead: c.BatchReadSize,
renderBuf: make([]byte, renderBufferSize),
outputBuf: sys.NewByteBuffer(renderBufferSize),
cache: newMessageFilesCache(c.Name, eventMetadataHandle, freeHandle),
logPrefix: fmt.Sprintf("WinEventLog[%s]", c.Name),
eventMetadata: c.EventMetadata,
Expand All @@ -281,12 +286,12 @@ func newWinEventLog(options map[string]interface{}) (EventLog, error) {
switch {
case c.Forwarded == nil && c.Name == "ForwardedEvents",
c.Forwarded != nil && *c.Forwarded == true:
l.render = func(event win.EvtHandle) (string, error) {
return win.RenderEventXML(event, l.renderBuf)
l.render = func(event win.EvtHandle, out io.Writer) error {
return win.RenderEventXML(event, l.renderBuf, out)
}
default:
l.render = func(event win.EvtHandle) (string, error) {
return win.RenderEvent(event, 0, l.renderBuf, l.cache.get)
l.render = func(event win.EvtHandle, out io.Writer) error {
return win.RenderEvent(event, 0, l.renderBuf, l.cache.get, out)
}
}

Expand Down
46 changes: 46 additions & 0 deletions winlogbeat/sys/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package sys

// ByteBuffer is an expandable buffer backed by a byte slice.
type ByteBuffer struct {
buf []byte
offset int
}

// NewByteBuffer creates a new ByteBuffer with an initial capacity of
// initialSize.
func NewByteBuffer(initialSize int) *ByteBuffer {
return &ByteBuffer{buf: make([]byte, initialSize)}
}

// Write appends the contents of p to the buffer, growing the buffer as needed.
// The return value is the length of p; err is always nil.
func (b *ByteBuffer) Write(p []byte) (int, error) {
if len(b.buf) < b.offset+len(p) {
// Create a buffer larger than needed so we don't spend lots of time
// allocating and copying.
spaceNeeded := len(b.buf) - b.offset + len(p)
largerBuf := make([]byte, 2*len(b.buf)+spaceNeeded)
copy(largerBuf, b.buf[:b.offset])
b.buf = largerBuf
}
n := copy(b.buf[b.offset:], p)
b.offset += n
return n, nil
}

// Reset resets the buffer to be empty. It retains the same underlying storage.
func (b *ByteBuffer) Reset() {
b.offset = 0
b.buf = b.buf[:cap(b.buf)]
}

// Bytes returns a slice of length b.Len() holding the bytes that have been
// written to the buffer.
func (b *ByteBuffer) Bytes() []byte {
return b.buf[:b.offset]
}

// Len returns the number of bytes that have been written to the buffer.
func (b *ByteBuffer) Len() int {
return b.offset
}
102 changes: 102 additions & 0 deletions winlogbeat/sys/buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package sys

import (
"bytes"
"io"
"testing"

"github.com/stretchr/testify/assert"
)

var _ io.Writer = &ByteBuffer{}

func TestByteBuffer(t *testing.T) {
input := "hello"
length := len(input)
buf := NewByteBuffer(1024)

n, err := buf.Write([]byte(input))
if err != nil {
t.Fatal(err)
}
assert.Equal(t, length, n)

assert.Equal(t, input, string(buf.Bytes()))
assert.Equal(t, length, len(buf.Bytes()))
assert.Equal(t, length, buf.Len())
}

func TestByteBufferGrow(t *testing.T) {
input := "hello"
length := len(input)
buf := NewByteBuffer(0)

n, err := buf.Write([]byte(input))
if err != nil {
t.Fatal(err)
}
assert.Equal(t, length, n)

assert.Equal(t, input, string(buf.Bytes()))
assert.Equal(t, length, len(buf.Bytes()))
assert.Equal(t, length, buf.Len())
assert.Equal(t, length, len(buf.buf))

n, err = buf.Write([]byte(input))
if err != nil {
t.Fatal(err)
}
assert.Equal(t, length, n)

assert.Equal(t, input+input, string(buf.Bytes()))
assert.Equal(t, 2*length, len(buf.Bytes()))
assert.Equal(t, 2*length, buf.Len())
}

func BenchmarkByteBuffer(b *testing.B) {
input := []byte("test writing this sentence to a buffer")

b.Run("byteBuffer", func(b *testing.B) {
buf := NewByteBuffer(1024)
b.ResetTimer()

for i := 0; i < b.N; i++ {
buf.Write(input)
buf.Bytes()
buf.Reset()
}
})

b.Run("bytes.Buffer", func(b *testing.B) {
buf := bytes.NewBuffer(make([]byte, 0, 1024))
b.ResetTimer()

for i := 0; i < b.N; i++ {
buf.Write(input)
buf.Bytes()
buf.Reset()
}
})
}

func BenchmarkByteBufferGrow(b *testing.B) {
b.Run("byteBuffer", func(b *testing.B) {
buf := NewByteBuffer(0)
b.ResetTimer()

for i := 0; i < b.N; i++ {
buf.Write([]byte("a"))
buf.Bytes()
}
})

b.Run("bytes.Buffer", func(b *testing.B) {
buf := bytes.NewBuffer(make([]byte, 0))
b.ResetTimer()

for i := 0; i < b.N; i++ {
buf.Write([]byte("a"))
buf.Bytes()
}
})
}
58 changes: 58 additions & 0 deletions winlogbeat/sys/strings.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,69 @@
package sys

import (
"errors"
"fmt"
"io"
"strings"
"unicode/utf16"
"unicode/utf8"
)

// The conditions replacementChar==unicode.ReplacementChar and
// maxRune==unicode.MaxRune are verified in the tests.
// Defining them locally avoids this package depending on package unicode.

const (
replacementChar = '\uFFFD' // Unicode replacement character
maxRune = '\U0010FFFF' // Maximum valid Unicode code point.
)

const (
// 0xd800-0xdc00 encodes the high 10 bits of a pair.
// 0xdc00-0xe000 encodes the low 10 bits of a pair.
// the value is those 20 bits plus 0x10000.
surr1 = 0xd800
surr2 = 0xdc00
surr3 = 0xe000

surrSelf = 0x10000
)

var ErrBufferTooSmall = errors.New("buffer too small")

func UTF16ToUTF8Bytes(in []byte, out io.Writer) error {
if len(in)%2 != 0 {
return fmt.Errorf("input buffer must have an even length (length=%d)", len(in))
}

var runeBuf [4]byte
var v1, v2 uint16
for i := 0; i < len(in); i += 2 {
v1 = uint16(in[i]) | uint16(in[i+1])<<8

switch {
case v1 < surr1, surr3 <= v1:
n := utf8.EncodeRune(runeBuf[:], rune(v1))
out.Write(runeBuf[:n])
case surr1 <= v1 && v1 < surr2 && len(in) > i+2:
v2 = uint16(in[i+2]) | uint16(in[i+3])<<8
if surr2 <= v2 && v2 < surr3 {
// valid surrogate sequence
r := utf16.DecodeRune(rune(v1), rune(v2))
n := utf8.EncodeRune(runeBuf[:], r)
out.Write(runeBuf[:n])
}
i += 2
default:
// invalid surrogate sequence
n := utf8.EncodeRune(runeBuf[:], replacementChar)
out.Write(runeBuf[:n])
}
}

return nil
}

// UTF16BytesToString returns a string that is decoded from the UTF-16 bytes.
// The byte slice must be of even length otherwise an error will be returned.
// The integer returned is the offset to the start of the next string with
Expand Down
23 changes: 23 additions & 0 deletions winlogbeat/sys/strings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,26 @@ func BenchmarkUTF16BytesToString(b *testing.B) {
}
})
}

func TestUTF16ToUTF8(t *testing.T) {
input := "abc白鵬翔\u145A6"
utf16Bytes := toUTF16Bytes(input)

outputBuf := &bytes.Buffer{}
err := UTF16ToUTF8Bytes(utf16Bytes, outputBuf)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, []byte(input), outputBuf.Bytes())
}

func BenchmarkUTF16ToUTF8(b *testing.B) {
utf16Bytes := toUTF16Bytes("A logon was attempted using explicit credentials.")
outputBuf := &bytes.Buffer{}
b.ResetTimer()

for i := 0; i < b.N; i++ {
UTF16ToUTF8Bytes(utf16Bytes, outputBuf)
outputBuf.Reset()
}
}
Loading

0 comments on commit 88d68dc

Please sign in to comment.