Skip to content

Commit

Permalink
Add repeat padding to message chunking (#97)
Browse files Browse the repository at this point in the history
* Add repeat padding to message chunking.

* Fix imports.

* remove export

* Update for condition.
  • Loading branch information
blakerouse authored Jan 16, 2024
1 parent a38734b commit 5f9ca13
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 7 deletions.
5 changes: 3 additions & 2 deletions pkg/client/chunk/expected.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
func Expected(msg *proto.CheckinExpected, maxSize int, opts ...Option) ([]*proto.CheckinExpected, error) {
var options options
options.timestamp = time.Now() // timestamp used for chunk set
options.repeatPadding = defaultRepeatPadding
for _, opt := range opts {
opt(&options)
}
Expand Down Expand Up @@ -71,10 +72,10 @@ func Expected(msg *proto.CheckinExpected, maxSize int, opts ...Option) ([]*proto
// keep adding units until it doesn't fit
for nextUnit := 1; s < maxSize && nextUnit < len(bySize); nextUnit++ {
us := bySize[nextUnit]
if s+us.size < maxSize {
if s+us.size+options.repeatPadding < maxSize {
// unit fits add it
m.Units = append(m.Units, us.unit)
s += us.size
s += us.size + options.repeatPadding
} else {
// doesn't fit, create a new chunk
msgs = append(msgs, m)
Expand Down
47 changes: 46 additions & 1 deletion pkg/client/chunk/expected_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
package chunk

import (
"golang.org/x/exp/slices"
"fmt"
"strings"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
gproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/timestamppb"

Expand Down Expand Up @@ -333,6 +335,49 @@ func TestRecvExpected_Timestamp_Restart(t *testing.T) {
assert.Empty(t, diff)
}

func TestRecvExpected_RepeatPadding(t *testing.T) {
const grpcMaxSize = 1024 * 1024 * 4 // GRPC default max message size

// build the units to ensure that there is more than double the units required for the GRPC configuration
minimumMsgSize := grpcMaxSize * 2 // double it
var units []*proto.UnitExpected
var unitsSize int
var nextUnitID int
for unitsSize < minimumMsgSize {
unit := &proto.UnitExpected{
Id: fmt.Sprintf("fake-input-%d", nextUnitID),
Type: proto.UnitType_INPUT,
State: proto.State_HEALTHY,
ConfigStateIdx: 1,
Config: &proto.UnitExpectedConfig{
Id: "testing",
Type: "testing",
Name: "testing",
},
LogLevel: proto.UnitLogLevel_ERROR,
}
units = append(units, unit)
unitsSize += gproto.Size(unit)
nextUnitID++
}

chunked, err := Expected(&proto.CheckinExpected{
FeaturesIdx: 2,
ComponentIdx: 3,
Units: units,
}, grpcMaxSize, WithRepeatPadding(0))
require.NoError(t, err)
require.Greater(t, gproto.Size(chunked[0]), grpcMaxSize)

chunked, err = Expected(&proto.CheckinExpected{
FeaturesIdx: 2,
ComponentIdx: 3,
Units: units,
}, grpcMaxSize)
require.NoError(t, err)
require.Greater(t, grpcMaxSize, gproto.Size(chunked[0]))
}

func sortExpectedUnits(a *proto.UnitExpected, b *proto.UnitExpected) int {
return strings.Compare(a.Id, b.Id)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/client/chunk/observed.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
func Observed(msg *proto.CheckinObserved, maxSize int, opts ...Option) ([]*proto.CheckinObserved, error) {
var options options
options.timestamp = time.Now() // timestamp used for chunk set
options.repeatPadding = defaultRepeatPadding
for _, opt := range opts {
opt(&options)
}
Expand Down Expand Up @@ -71,10 +72,10 @@ func Observed(msg *proto.CheckinObserved, maxSize int, opts ...Option) ([]*proto
// keep adding units until it doesn't fit
for nextUnit := 1; s < maxSize && nextUnit < len(bySize); nextUnit++ {
us := bySize[nextUnit]
if s+us.size < maxSize {
if s+us.size+options.repeatPadding < maxSize {
// unit fits add it
m.Units = append(m.Units, us.unit)
s += us.size
s += us.size + options.repeatPadding
} else {
// doesn't fit, create a new chunk
msgs = append(msgs, m)
Expand Down
44 changes: 43 additions & 1 deletion pkg/client/chunk/observed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
package chunk

import (
"golang.org/x/exp/slices"
"fmt"
"strings"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
gproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
Expand Down Expand Up @@ -341,6 +343,46 @@ func TestRecvObserved_Timestamp_Restart(t *testing.T) {
assert.Empty(t, diff)
}

func TestObserved_RepeatPadding(t *testing.T) {
const grpcMaxSize = 1024 * 1024 * 4 // GRPC default max message size

// build the units to ensure that there is more than double the units required for the GRPC configuration
minimumMsgSize := grpcMaxSize * 2 // double it
var units []*proto.UnitObserved
var unitsSize int
var nextUnitID int
for unitsSize < minimumMsgSize {
unit := &proto.UnitObserved{
Id: fmt.Sprintf("fake-input-%d", nextUnitID),
Type: proto.UnitType_INPUT,
State: proto.State_HEALTHY,
Message: fmt.Sprintf("fake-input-%d is healthy", nextUnitID),
ConfigStateIdx: 1,
}
units = append(units, unit)
unitsSize += gproto.Size(unit)
nextUnitID++
}

chunked, err := Observed(&proto.CheckinObserved{
Token: "token",
FeaturesIdx: 2,
ComponentIdx: 3,
Units: units,
}, grpcMaxSize, WithRepeatPadding(0))
require.NoError(t, err)
require.Greater(t, gproto.Size(chunked[0]), grpcMaxSize)

chunked, err = Observed(&proto.CheckinObserved{
Token: "token",
FeaturesIdx: 2,
ComponentIdx: 3,
Units: units,
}, grpcMaxSize)
require.NoError(t, err)
require.Greater(t, grpcMaxSize, gproto.Size(chunked[0]))
}

func mustStruct(v map[string]interface{}) *structpb.Struct {
s, err := structpb.NewStruct(v)
if err != nil {
Expand Down
15 changes: 14 additions & 1 deletion pkg/client/chunk/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ package chunk

import "time"

const defaultRepeatPadding = 3 // 3 bytes

type options struct {
timestamp time.Time
timestamp time.Time
repeatPadding int
}

// Option is an option for adjusting chunking.
Expand All @@ -21,3 +24,13 @@ func WithTimestamp(t time.Time) Option {
opts.timestamp = t
}
}

// WithRepeatPadding adjusts the padding used on each repeated structure.
//
// Note: Mainly used for testing to validate that without padding that message
// size will be too large.
func WithRepeatPadding(padding int) Option {
return func(opts *options) {
opts.repeatPadding = padding
}
}

0 comments on commit 5f9ca13

Please sign in to comment.