Skip to content

Commit

Permalink
feat: return to internal string references (#5486)
Browse files Browse the repository at this point in the history
* feat: stop making string copies

Undo the changes introduced in #5377. This is a trade-off where it
is considerd better to have as much memory as possible allocated
by the allocator, with the risk that it will be freed when still
in use. Than to have significant amounts of memory allocated outside
of the allocator.

It should be noted that because of Go's garbage collector it is
perfectly safe for memory to be "freed" when still in use. It just
mean that any accounting done by the allocator will be inaccurate.

* feat: remove unnecessary StringRef type

Remove the recently added StringRef type as it is now unnecessary.
  • Loading branch information
mhilton authored Jun 5, 2024
1 parent 68c831c commit 32f7947
Show file tree
Hide file tree
Showing 26 changed files with 36 additions and 123 deletions.
55 changes: 0 additions & 55 deletions array/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/apache/arrow/go/v7/arrow"
"github.com/apache/arrow/go/v7/arrow/array"
arrowmem "github.com/apache/arrow/go/v7/arrow/memory"

"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/internal/errors"
Expand Down Expand Up @@ -158,65 +157,11 @@ func (a *String) Value(i int) string {
return a.ValueString(i)
}

// ValueRef returns a reference to the memory buffer and location that
// stores the value at i. The reference is only valid for as long as the
// array is, the buffer needs to be retained if further access is
// required.
func (a *String) ValueRef(i int) StringRef {
if vr, ok := a.binaryArray.(interface{ ValueRef(int) StringRef }); ok {
return vr.ValueRef(i)
}
return StringRef{
buf: a.Data().Buffers()[2],
off: a.ValueOffset(i),
len: a.ValueLen(i),
}
}

// ValueCopy returns the value at the requested position copied into a
// new memory location. This value will remain valid after the array is
// released, but is not tracked by the memory allocator.
//
// This function is intended to be temporary while changes are being
// made to reduce the amount of unaccounted data memory.
func (a *String) ValueCopy(i int) string {
return string(a.ValueRef(i).Bytes())
}

func (a *String) IsConstant() bool {
ic, ok := a.binaryArray.(interface{ IsConstant() bool })
return ok && ic.IsConstant()
}

// StringRef contains a referenct to the storage for a value.
type StringRef struct {
buf *arrowmem.Buffer
off int
len int
}

// Buffer returns the memory buffer that contains the value.
func (r StringRef) Buffer() *arrowmem.Buffer {
return r.buf
}

// Offset returns the offset into the memory buffer at which the value
// starts.
func (r StringRef) Offset() int {
return r.off
}

// Len returns the length of the value.
func (r StringRef) Len() int {
return r.len
}

// Bytes returns the bytes from the memory buffer that contain the
// value.
func (r StringRef) Bytes() []byte {
return r.buf.Bytes()[r.off : r.off+r.len]
}

type sliceable interface {
Slice(i, j int) Array
}
Expand Down
8 changes: 0 additions & 8 deletions array/repeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,3 @@ func (b *repeatedBinary) Slice(i, j int) binaryArray {
buf: b.buf,
}
}

func (b *repeatedBinary) ValueRef(int) StringRef {
return StringRef{
buf: b.buf,
off: 0,
len: b.buf.Len(),
}
}
4 changes: 2 additions & 2 deletions arrow/arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func TestSlice_String(t *testing.T) {

vs := make([]string, l)
for i := 0; i < l; i++ {
vs[i] = arr.ValueCopy(i)
vs[i] = arr.Value(i)
}

if !cmp.Equal(values, vs) {
Expand Down Expand Up @@ -540,7 +540,7 @@ func TestSlice_String(t *testing.T) {

vs = vs[:0]
for i := 0; i < l; i++ {
vs = append(vs, arr.ValueCopy(i))
vs = append(vs, arr.Value(i))
}

if !cmp.Equal(tc.want, vs) {
Expand Down
2 changes: 1 addition & 1 deletion csv/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -1257,7 +1257,7 @@ func encodeValueFrom(i, j int, c colMeta, cr flux.ColReader) (string, error) {
}
case flux.TString:
if cr.Strings(j).IsValid(i) {
v = cr.Strings(j).ValueCopy(i)
v = cr.Strings(j).Value(i)
}
case flux.TTime:
if cr.Times(j).IsValid(i) {
Expand Down
26 changes: 1 addition & 25 deletions execute/executetest/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,5 @@ import (
)

var UnlimitedAllocator = &memory.ResourceAllocator{
Allocator: Allocator{},
}

// Allocator is an allocator for use in test. When a buffer is freed the
// contents are overwritten with a predictable pattern to help detect
// use-after-free scenarios.
type Allocator struct{}

func (Allocator) Allocate(size int) []byte {
return arrowmem.DefaultAllocator.Allocate(size)
}

func (a Allocator) Reallocate(size int, b []byte) []byte {
b1 := a.Allocate(size)
copy(b1, b)
a.Free(b)
return b1
}

func (a Allocator) Free(b []byte) {
var pattern = [...]byte{0x00, 0x33, 0xcc, 0xff}
for i := range b {
b[i] = pattern[i%len(pattern)]
}
arrowmem.DefaultAllocator.Free(b)
Allocator: arrowmem.DefaultAllocator,
}
2 changes: 1 addition & 1 deletion execute/executetest/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ func ConvertTable(tbl flux.Table) (*Table, error) {
}
case flux.TString:
if col := cr.Strings(j); col.IsValid(i) {
row[j] = col.ValueCopy(i)
row[j] = col.Value(i)
}
case flux.TTime:
if col := cr.Times(j); col.IsValid(i) {
Expand Down
2 changes: 1 addition & 1 deletion execute/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func ValueForRow(cr flux.ColReader, i, j int) values.Value {
if cr.Strings(j).IsNull(i) {
return values.NewNull(semantic.BasicString)
}
return values.NewString(cr.Strings(j).ValueCopy(i))
return values.NewString(cr.Strings(j).Value(i))
case flux.TInt:
if cr.Ints(j).IsNull(i) {
return values.NewNull(semantic.BasicInt)
Expand Down
2 changes: 1 addition & 1 deletion execute/table/stringify.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func valueForRow(cr flux.ColReader, i, j int) values.Value {
if cr.Strings(j).IsNull(i) {
return values.NewNull(semantic.BasicString)
}
return values.NewString(cr.Strings(j).ValueCopy(i))
return values.NewString(cr.Strings(j).Value(i))
case flux.TInt:
if cr.Ints(j).IsNull(i) {
return values.NewNull(semantic.BasicInt)
Expand Down
2 changes: 1 addition & 1 deletion internal/arrowutil/array_values.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func (v StringArrayValue) Get(i int) values.Value {
if v.arr.IsNull(i) {
return values.Null
}
return values.New(v.arr.ValueCopy(i))
return values.New(v.arr.Value(i))
}

func (v StringArrayValue) Set(i int, value values.Value) {
Expand Down
4 changes: 2 additions & 2 deletions internal/arrowutil/compare.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func StringCompare(x, y *array.String, i, j int) int {
return 1
}

if l, r := x.ValueCopy(i), y.ValueCopy(j); l < r {
if l, r := x.Value(i), y.Value(j); l < r {
return -1
} else if l == r {
return 0
Expand All @@ -256,7 +256,7 @@ func StringCompareDesc(x, y *array.String, i, j int) int {
return 1
}

if l, r := x.ValueCopy(i), y.ValueCopy(j); l > r {
if l, r := x.Value(i), y.Value(j); l > r {
return -1
} else if l == r {
return 0
Expand Down
6 changes: 3 additions & 3 deletions internal/arrowutil/copy.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func CopyStringsTo(b *array.StringBuilder, arr *array.String) {
b.AppendNull()
continue
}
b.Append(arr.ValueCopy(i))
b.Append(arr.Value(i))
}
}

Expand Down Expand Up @@ -315,7 +315,7 @@ func CopyStringsByIndexTo(b *array.StringBuilder, arr *array.String, indices *ar
b.AppendNull()
continue
}
b.Append(arr.ValueCopy(offset))
b.Append(arr.Value(offset))
}
}

Expand All @@ -324,5 +324,5 @@ func CopyStringValue(b *array.StringBuilder, arr *array.String, i int) {
b.AppendNull()
return
}
b.Append(arr.ValueCopy(i))
b.Append(arr.Value(i))
}
2 changes: 1 addition & 1 deletion internal/arrowutil/filter.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func FilterStrings(arr *array.String, bitset []byte, mem memory.Allocator) *arra
for i := 0; i < len(bitset); i++ {
if bitutil.BitIsSet(bitset, i) {
if arr.IsValid(i) {
b.Append(arr.ValueCopy(i))
b.Append(arr.Value(i))
} else {
b.AppendNull()
}
Expand Down
4 changes: 2 additions & 2 deletions internal/arrowutil/iterator.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,10 @@ func IterateStrings(arrs []array.Array) StringIterator {
return StringIterator{Values: values}
}

// ValueCopy returns the current value in the iterator.
// Value returns the current value in the iterator.
func (i *StringIterator) Value() string {
vs := i.Values[0]
return vs.ValueCopy(i.i)
return vs.Value(i.i)
}

// IsValid returns if the current value is valid.
Expand Down
2 changes: 1 addition & 1 deletion internal/arrowutil/types.tmpldata
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
"MonoType": "semantic.BasicString",
"IsNumeric": false,
"IsComparable": true,
"Value": "ValueCopy",
"Value": "Value",
"Append": "Append",
"NewArray": "NewStringArray"
}
Expand Down
4 changes: 2 additions & 2 deletions internal/moving_average/array_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (a *ArrayContainer) Value(i int) values.Value {
case *array.Float:
return values.New(float64(a.array.(*array.Float).Value(i)))
case *array.String:
return values.New(string(a.array.(*array.String).ValueCopy(i)))
return values.New(string(a.array.(*array.String).Value(i)))
default:
return nil
}
Expand All @@ -54,7 +54,7 @@ func (a *ArrayContainer) OrigValue(i int) interface{} {
case *array.Float:
return a.array.(*array.Float).Value(i)
case *array.String:
return a.array.(*array.String).ValueCopy(i)
return a.array.(*array.String).Value(i)
default:
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion result_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func TestQueryResultIterator_Results(t *testing.T) {
for i := 0; i < cr.Len(); i++ {
r := row{
Value: cr.Ints(0).Value(i),
Tag: cr.Strings(1).ValueCopy(i),
Tag: cr.Strings(1).Value(i),
}
got = append(got, r)
}
Expand Down
2 changes: 1 addition & 1 deletion semantic/semantictest/cmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func getValue(arr array.Array, i int) values.Value {
case *array.Float:
return values.New(arr.Value(i))
case *array.String:
return values.New(arr.ValueCopy(i))
return values.New(arr.Value(i))
case *array.Boolean:
return values.New(arr.Value(i))
default:
Expand Down
6 changes: 3 additions & 3 deletions stdlib/experimental/mqtt/to.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,12 @@ func (t *ToMQTTTransformation) Process(id execute.DatasetID, tbl flux.Table) err
if col.Type != flux.TString {
return errors.Newf(codes.FailedPrecondition, "invalid type for measurement column: %s", col.Type)
}
m.name = er.Strings(j).ValueCopy(i)
m.name = er.Strings(j).Value(i)
case isTag[j]:
if col.Type != flux.TString {
return errors.Newf(codes.FailedPrecondition, "invalid type for tag column: %s", col.Type)
}
m.tags = append(m.tags, &protocol.Tag{Key: col.Label, Value: er.Strings(j).ValueCopy(i)})
m.tags = append(m.tags, &protocol.Tag{Key: col.Label, Value: er.Strings(j).Value(i)})

case isValue[j]:
switch col.Type {
Expand All @@ -332,7 +332,7 @@ func (t *ToMQTTTransformation) Process(id execute.DatasetID, tbl flux.Table) err
case flux.TUInt:
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.UInts(j).Value(i)})
case flux.TString:
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Strings(j).ValueCopy(i)})
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Strings(j).Value(i)})
case flux.TTime:
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: values.Time(er.Times(j).Value(i))})
case flux.TBool:
Expand Down
4 changes: 2 additions & 2 deletions stdlib/influxdata/influxdb/to.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ outer:
for j, col := range chunk.Cols() {
switch {
case col.Label == spec.MeasurementColumn:
metric.NameStr = er.Strings(j).ValueCopy(i)
metric.NameStr = er.Strings(j).Value(i)
case col.Label == timeColLabel:
valueTime := execute.ValueForRow(&er, i, j)
if valueTime.IsNull() {
Expand All @@ -230,7 +230,7 @@ outer:
return errors.New(codes.Invalid, "invalid type for tag column")
}

value := er.Strings(j).ValueCopy(i)
value := er.Strings(j).Value(i)
if value == "" {
// Skip tag value if it is empty.
continue
Expand Down
6 changes: 3 additions & 3 deletions stdlib/kafka/to.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,12 @@ func (t *ToKafkaTransformation) Process(id execute.DatasetID, tbl flux.Table) (e
if col.Type != flux.TString {
return errors.New(codes.FailedPrecondition, "invalid type for measurement column")
}
m.name = er.Strings(j).ValueCopy(i)
m.name = er.Strings(j).Value(i)
case isTag[j]:
if col.Type != flux.TString {
return errors.New(codes.FailedPrecondition, "invalid type for measurement column")
}
m.tags = append(m.tags, &protocol.Tag{Key: col.Label, Value: er.Strings(j).ValueCopy(i)})
m.tags = append(m.tags, &protocol.Tag{Key: col.Label, Value: er.Strings(j).Value(i)})
case isValue[j]:
switch col.Type {
case flux.TFloat:
Expand All @@ -368,7 +368,7 @@ func (t *ToKafkaTransformation) Process(id execute.DatasetID, tbl flux.Table) (e
case flux.TUInt:
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.UInts(j).Value(i)})
case flux.TString:
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Strings(j).ValueCopy(i)})
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Strings(j).Value(i)})
case flux.TTime:
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: values.Time(er.Times(j).Value(i))})
case flux.TBool:
Expand Down
2 changes: 1 addition & 1 deletion stdlib/sql/to.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func CreateInsertComponents(t *ToSQLTransformation, tbl flux.Table) (colNames []
valueArgs = append(valueArgs, nil)
break
}
valueArgs = append(valueArgs, er.Strings(j).ValueCopy(i))
valueArgs = append(valueArgs, er.Strings(j).Value(i))
case flux.TTime:
if er.Times(j).IsNull(i) {
valueArgs = append(valueArgs, nil)
Expand Down
2 changes: 1 addition & 1 deletion stdlib/universe/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (t *distinctTransformation) Process(id execute.DatasetID, tbl flux.Table) e
}
nullDistinct = true
} else {
v := cr.Strings(j).ValueCopy(i)
v := cr.Strings(j).Value(i)
if stringDistinct[v] {
continue
}
Expand Down
2 changes: 1 addition & 1 deletion stdlib/universe/key_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func (t *keyValuesTransformation) Process(id execute.DatasetID, tbl flux.Table)
}
nullDistinct = true
} else {
v := vs.ValueCopy(i)
v := vs.Value(i)
if stringDistinct[[2]string{c.name, v}] {
continue
}
Expand Down
2 changes: 1 addition & 1 deletion stdlib/universe/mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (t *modeTransformation) doString(cr flux.ColReader, tbl flux.Table, builder
if cr.Strings(j).IsNull(i) {
continue
}
v := cr.Strings(j).ValueCopy(i)
v := cr.Strings(j).Value(i)
stringMode[v]++
}

Expand Down
Loading

0 comments on commit 32f7947

Please sign in to comment.