From 32f7947c7ad374145aa1983d78e6c91d584d00f6 Mon Sep 17 00:00:00 2001 From: Martin Hilton Date: Wed, 5 Jun 2024 05:43:48 +0100 Subject: [PATCH] feat: return to internal string references (#5486) * feat: stop making string copies Undo the changes introduced in #5377. This is a trade-off where it is considerd better to have as much memory as possible allocated by the allocator, with the risk that it will be freed when still in use. Than to have significant amounts of memory allocated outside of the allocator. It should be noted that because of Go's garbage collector it is perfectly safe for memory to be "freed" when still in use. It just mean that any accounting done by the allocator will be inaccurate. * feat: remove unnecessary StringRef type Remove the recently added StringRef type as it is now unnecessary. --- array/array.go | 55 ---------------------- array/repeat.go | 8 ---- arrow/arrow_test.go | 4 +- csv/result.go | 2 +- execute/executetest/allocator.go | 26 +--------- execute/executetest/table.go | 2 +- execute/table.go | 2 +- execute/table/stringify.go | 2 +- internal/arrowutil/array_values.gen.go | 2 +- internal/arrowutil/compare.gen.go | 4 +- internal/arrowutil/copy.gen.go | 6 +-- internal/arrowutil/filter.gen.go | 2 +- internal/arrowutil/iterator.gen.go | 4 +- internal/arrowutil/types.tmpldata | 2 +- internal/moving_average/array_container.go | 4 +- result_iterator_test.go | 2 +- semantic/semantictest/cmp.go | 2 +- stdlib/experimental/mqtt/to.go | 6 +-- stdlib/influxdata/influxdb/to.go | 4 +- stdlib/kafka/to.go | 6 +-- stdlib/sql/to.go | 2 +- stdlib/universe/distinct.go | 2 +- stdlib/universe/key_values.go | 2 +- stdlib/universe/mode.go | 2 +- stdlib/universe/pivot.go | 2 +- stdlib/universe/table_fns.go | 4 +- 26 files changed, 36 insertions(+), 123 deletions(-) diff --git a/array/array.go b/array/array.go index 5fffb8df2c..48138d10fa 100644 --- a/array/array.go +++ b/array/array.go @@ -5,7 +5,6 @@ import ( "github.com/apache/arrow/go/v7/arrow" "github.com/apache/arrow/go/v7/arrow/array" - arrowmem "github.com/apache/arrow/go/v7/arrow/memory" "github.com/influxdata/flux/codes" "github.com/influxdata/flux/internal/errors" @@ -158,65 +157,11 @@ func (a *String) Value(i int) string { return a.ValueString(i) } -// ValueRef returns a reference to the memory buffer and location that -// stores the value at i. The reference is only valid for as long as the -// array is, the buffer needs to be retained if further access is -// required. -func (a *String) ValueRef(i int) StringRef { - if vr, ok := a.binaryArray.(interface{ ValueRef(int) StringRef }); ok { - return vr.ValueRef(i) - } - return StringRef{ - buf: a.Data().Buffers()[2], - off: a.ValueOffset(i), - len: a.ValueLen(i), - } -} - -// ValueCopy returns the value at the requested position copied into a -// new memory location. This value will remain valid after the array is -// released, but is not tracked by the memory allocator. -// -// This function is intended to be temporary while changes are being -// made to reduce the amount of unaccounted data memory. -func (a *String) ValueCopy(i int) string { - return string(a.ValueRef(i).Bytes()) -} - func (a *String) IsConstant() bool { ic, ok := a.binaryArray.(interface{ IsConstant() bool }) return ok && ic.IsConstant() } -// StringRef contains a referenct to the storage for a value. -type StringRef struct { - buf *arrowmem.Buffer - off int - len int -} - -// Buffer returns the memory buffer that contains the value. -func (r StringRef) Buffer() *arrowmem.Buffer { - return r.buf -} - -// Offset returns the offset into the memory buffer at which the value -// starts. -func (r StringRef) Offset() int { - return r.off -} - -// Len returns the length of the value. -func (r StringRef) Len() int { - return r.len -} - -// Bytes returns the bytes from the memory buffer that contain the -// value. -func (r StringRef) Bytes() []byte { - return r.buf.Bytes()[r.off : r.off+r.len] -} - type sliceable interface { Slice(i, j int) Array } diff --git a/array/repeat.go b/array/repeat.go index 22cc447117..fcdb7603bd 100644 --- a/array/repeat.go +++ b/array/repeat.go @@ -83,11 +83,3 @@ func (b *repeatedBinary) Slice(i, j int) binaryArray { buf: b.buf, } } - -func (b *repeatedBinary) ValueRef(int) StringRef { - return StringRef{ - buf: b.buf, - off: 0, - len: b.buf.Len(), - } -} diff --git a/arrow/arrow_test.go b/arrow/arrow_test.go index 4be0b65a37..cb29f05bcd 100644 --- a/arrow/arrow_test.go +++ b/arrow/arrow_test.go @@ -472,7 +472,7 @@ func TestSlice_String(t *testing.T) { vs := make([]string, l) for i := 0; i < l; i++ { - vs[i] = arr.ValueCopy(i) + vs[i] = arr.Value(i) } if !cmp.Equal(values, vs) { @@ -540,7 +540,7 @@ func TestSlice_String(t *testing.T) { vs = vs[:0] for i := 0; i < l; i++ { - vs = append(vs, arr.ValueCopy(i)) + vs = append(vs, arr.Value(i)) } if !cmp.Equal(tc.want, vs) { diff --git a/csv/result.go b/csv/result.go index 0a6bbd0e02..b3244fcda5 100644 --- a/csv/result.go +++ b/csv/result.go @@ -1257,7 +1257,7 @@ func encodeValueFrom(i, j int, c colMeta, cr flux.ColReader) (string, error) { } case flux.TString: if cr.Strings(j).IsValid(i) { - v = cr.Strings(j).ValueCopy(i) + v = cr.Strings(j).Value(i) } case flux.TTime: if cr.Times(j).IsValid(i) { diff --git a/execute/executetest/allocator.go b/execute/executetest/allocator.go index 64161cc876..af8ff9a255 100644 --- a/execute/executetest/allocator.go +++ b/execute/executetest/allocator.go @@ -7,29 +7,5 @@ import ( ) var UnlimitedAllocator = &memory.ResourceAllocator{ - Allocator: Allocator{}, -} - -// Allocator is an allocator for use in test. When a buffer is freed the -// contents are overwritten with a predictable pattern to help detect -// use-after-free scenarios. -type Allocator struct{} - -func (Allocator) Allocate(size int) []byte { - return arrowmem.DefaultAllocator.Allocate(size) -} - -func (a Allocator) Reallocate(size int, b []byte) []byte { - b1 := a.Allocate(size) - copy(b1, b) - a.Free(b) - return b1 -} - -func (a Allocator) Free(b []byte) { - var pattern = [...]byte{0x00, 0x33, 0xcc, 0xff} - for i := range b { - b[i] = pattern[i%len(pattern)] - } - arrowmem.DefaultAllocator.Free(b) + Allocator: arrowmem.DefaultAllocator, } diff --git a/execute/executetest/table.go b/execute/executetest/table.go index 918259c943..6e9dc591f0 100644 --- a/execute/executetest/table.go +++ b/execute/executetest/table.go @@ -544,7 +544,7 @@ func ConvertTable(tbl flux.Table) (*Table, error) { } case flux.TString: if col := cr.Strings(j); col.IsValid(i) { - row[j] = col.ValueCopy(i) + row[j] = col.Value(i) } case flux.TTime: if col := cr.Times(j); col.IsValid(i) { diff --git a/execute/table.go b/execute/table.go index 34168bb006..9d0c93c1ab 100644 --- a/execute/table.go +++ b/execute/table.go @@ -437,7 +437,7 @@ func ValueForRow(cr flux.ColReader, i, j int) values.Value { if cr.Strings(j).IsNull(i) { return values.NewNull(semantic.BasicString) } - return values.NewString(cr.Strings(j).ValueCopy(i)) + return values.NewString(cr.Strings(j).Value(i)) case flux.TInt: if cr.Ints(j).IsNull(i) { return values.NewNull(semantic.BasicInt) diff --git a/execute/table/stringify.go b/execute/table/stringify.go index 2edf3bb564..4c48785b75 100644 --- a/execute/table/stringify.go +++ b/execute/table/stringify.go @@ -124,7 +124,7 @@ func valueForRow(cr flux.ColReader, i, j int) values.Value { if cr.Strings(j).IsNull(i) { return values.NewNull(semantic.BasicString) } - return values.NewString(cr.Strings(j).ValueCopy(i)) + return values.NewString(cr.Strings(j).Value(i)) case flux.TInt: if cr.Ints(j).IsNull(i) { return values.NewNull(semantic.BasicInt) diff --git a/internal/arrowutil/array_values.gen.go b/internal/arrowutil/array_values.gen.go index 6a86de1bd2..316506dde0 100644 --- a/internal/arrowutil/array_values.gen.go +++ b/internal/arrowutil/array_values.gen.go @@ -498,7 +498,7 @@ func (v StringArrayValue) Get(i int) values.Value { if v.arr.IsNull(i) { return values.Null } - return values.New(v.arr.ValueCopy(i)) + return values.New(v.arr.Value(i)) } func (v StringArrayValue) Set(i int, value values.Value) { diff --git a/internal/arrowutil/compare.gen.go b/internal/arrowutil/compare.gen.go index 2deefef85b..907577b581 100644 --- a/internal/arrowutil/compare.gen.go +++ b/internal/arrowutil/compare.gen.go @@ -237,7 +237,7 @@ func StringCompare(x, y *array.String, i, j int) int { return 1 } - if l, r := x.ValueCopy(i), y.ValueCopy(j); l < r { + if l, r := x.Value(i), y.Value(j); l < r { return -1 } else if l == r { return 0 @@ -256,7 +256,7 @@ func StringCompareDesc(x, y *array.String, i, j int) int { return 1 } - if l, r := x.ValueCopy(i), y.ValueCopy(j); l > r { + if l, r := x.Value(i), y.Value(j); l > r { return -1 } else if l == r { return 0 diff --git a/internal/arrowutil/copy.gen.go b/internal/arrowutil/copy.gen.go index 11a7e1a834..3cfc673323 100644 --- a/internal/arrowutil/copy.gen.go +++ b/internal/arrowutil/copy.gen.go @@ -284,7 +284,7 @@ func CopyStringsTo(b *array.StringBuilder, arr *array.String) { b.AppendNull() continue } - b.Append(arr.ValueCopy(i)) + b.Append(arr.Value(i)) } } @@ -315,7 +315,7 @@ func CopyStringsByIndexTo(b *array.StringBuilder, arr *array.String, indices *ar b.AppendNull() continue } - b.Append(arr.ValueCopy(offset)) + b.Append(arr.Value(offset)) } } @@ -324,5 +324,5 @@ func CopyStringValue(b *array.StringBuilder, arr *array.String, i int) { b.AppendNull() return } - b.Append(arr.ValueCopy(i)) + b.Append(arr.Value(i)) } diff --git a/internal/arrowutil/filter.gen.go b/internal/arrowutil/filter.gen.go index dc41d18cf4..5567dbc172 100644 --- a/internal/arrowutil/filter.gen.go +++ b/internal/arrowutil/filter.gen.go @@ -108,7 +108,7 @@ func FilterStrings(arr *array.String, bitset []byte, mem memory.Allocator) *arra for i := 0; i < len(bitset); i++ { if bitutil.BitIsSet(bitset, i) { if arr.IsValid(i) { - b.Append(arr.ValueCopy(i)) + b.Append(arr.Value(i)) } else { b.AppendNull() } diff --git a/internal/arrowutil/iterator.gen.go b/internal/arrowutil/iterator.gen.go index 27acf30461..169f1eea63 100644 --- a/internal/arrowutil/iterator.gen.go +++ b/internal/arrowutil/iterator.gen.go @@ -289,10 +289,10 @@ func IterateStrings(arrs []array.Array) StringIterator { return StringIterator{Values: values} } -// ValueCopy returns the current value in the iterator. +// Value returns the current value in the iterator. func (i *StringIterator) Value() string { vs := i.Values[0] - return vs.ValueCopy(i.i) + return vs.Value(i.i) } // IsValid returns if the current value is valid. diff --git a/internal/arrowutil/types.tmpldata b/internal/arrowutil/types.tmpldata index 57fcdbf4a4..38d86298b7 100644 --- a/internal/arrowutil/types.tmpldata +++ b/internal/arrowutil/types.tmpldata @@ -50,7 +50,7 @@ "MonoType": "semantic.BasicString", "IsNumeric": false, "IsComparable": true, - "Value": "ValueCopy", + "Value": "Value", "Append": "Append", "NewArray": "NewStringArray" } diff --git a/internal/moving_average/array_container.go b/internal/moving_average/array_container.go index 3bf7759c4a..54a5965495 100644 --- a/internal/moving_average/array_container.go +++ b/internal/moving_average/array_container.go @@ -37,7 +37,7 @@ func (a *ArrayContainer) Value(i int) values.Value { case *array.Float: return values.New(float64(a.array.(*array.Float).Value(i))) case *array.String: - return values.New(string(a.array.(*array.String).ValueCopy(i))) + return values.New(string(a.array.(*array.String).Value(i))) default: return nil } @@ -54,7 +54,7 @@ func (a *ArrayContainer) OrigValue(i int) interface{} { case *array.Float: return a.array.(*array.Float).Value(i) case *array.String: - return a.array.(*array.String).ValueCopy(i) + return a.array.(*array.String).Value(i) default: return nil } diff --git a/result_iterator_test.go b/result_iterator_test.go index 3bbe6b7b89..2804e6e0d6 100644 --- a/result_iterator_test.go +++ b/result_iterator_test.go @@ -192,7 +192,7 @@ func TestQueryResultIterator_Results(t *testing.T) { for i := 0; i < cr.Len(); i++ { r := row{ Value: cr.Ints(0).Value(i), - Tag: cr.Strings(1).ValueCopy(i), + Tag: cr.Strings(1).Value(i), } got = append(got, r) } diff --git a/semantic/semantictest/cmp.go b/semantic/semantictest/cmp.go index cbc06e4d00..f80a71fc37 100644 --- a/semantic/semantictest/cmp.go +++ b/semantic/semantictest/cmp.go @@ -219,7 +219,7 @@ func getValue(arr array.Array, i int) values.Value { case *array.Float: return values.New(arr.Value(i)) case *array.String: - return values.New(arr.ValueCopy(i)) + return values.New(arr.Value(i)) case *array.Boolean: return values.New(arr.Value(i)) default: diff --git a/stdlib/experimental/mqtt/to.go b/stdlib/experimental/mqtt/to.go index c5b7ceb5a8..5fd9b9db41 100644 --- a/stdlib/experimental/mqtt/to.go +++ b/stdlib/experimental/mqtt/to.go @@ -316,12 +316,12 @@ func (t *ToMQTTTransformation) Process(id execute.DatasetID, tbl flux.Table) err if col.Type != flux.TString { return errors.Newf(codes.FailedPrecondition, "invalid type for measurement column: %s", col.Type) } - m.name = er.Strings(j).ValueCopy(i) + m.name = er.Strings(j).Value(i) case isTag[j]: if col.Type != flux.TString { return errors.Newf(codes.FailedPrecondition, "invalid type for tag column: %s", col.Type) } - m.tags = append(m.tags, &protocol.Tag{Key: col.Label, Value: er.Strings(j).ValueCopy(i)}) + m.tags = append(m.tags, &protocol.Tag{Key: col.Label, Value: er.Strings(j).Value(i)}) case isValue[j]: switch col.Type { @@ -332,7 +332,7 @@ func (t *ToMQTTTransformation) Process(id execute.DatasetID, tbl flux.Table) err case flux.TUInt: m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.UInts(j).Value(i)}) case flux.TString: - m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Strings(j).ValueCopy(i)}) + m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Strings(j).Value(i)}) case flux.TTime: m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: values.Time(er.Times(j).Value(i))}) case flux.TBool: diff --git a/stdlib/influxdata/influxdb/to.go b/stdlib/influxdata/influxdb/to.go index 6c04ceea05..6689e1bdaf 100644 --- a/stdlib/influxdata/influxdb/to.go +++ b/stdlib/influxdata/influxdb/to.go @@ -217,7 +217,7 @@ outer: for j, col := range chunk.Cols() { switch { case col.Label == spec.MeasurementColumn: - metric.NameStr = er.Strings(j).ValueCopy(i) + metric.NameStr = er.Strings(j).Value(i) case col.Label == timeColLabel: valueTime := execute.ValueForRow(&er, i, j) if valueTime.IsNull() { @@ -230,7 +230,7 @@ outer: return errors.New(codes.Invalid, "invalid type for tag column") } - value := er.Strings(j).ValueCopy(i) + value := er.Strings(j).Value(i) if value == "" { // Skip tag value if it is empty. continue diff --git a/stdlib/kafka/to.go b/stdlib/kafka/to.go index b818b37f31..db8dd16db5 100644 --- a/stdlib/kafka/to.go +++ b/stdlib/kafka/to.go @@ -353,12 +353,12 @@ func (t *ToKafkaTransformation) Process(id execute.DatasetID, tbl flux.Table) (e if col.Type != flux.TString { return errors.New(codes.FailedPrecondition, "invalid type for measurement column") } - m.name = er.Strings(j).ValueCopy(i) + m.name = er.Strings(j).Value(i) case isTag[j]: if col.Type != flux.TString { return errors.New(codes.FailedPrecondition, "invalid type for measurement column") } - m.tags = append(m.tags, &protocol.Tag{Key: col.Label, Value: er.Strings(j).ValueCopy(i)}) + m.tags = append(m.tags, &protocol.Tag{Key: col.Label, Value: er.Strings(j).Value(i)}) case isValue[j]: switch col.Type { case flux.TFloat: @@ -368,7 +368,7 @@ func (t *ToKafkaTransformation) Process(id execute.DatasetID, tbl flux.Table) (e case flux.TUInt: m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.UInts(j).Value(i)}) case flux.TString: - m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Strings(j).ValueCopy(i)}) + m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Strings(j).Value(i)}) case flux.TTime: m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: values.Time(er.Times(j).Value(i))}) case flux.TBool: diff --git a/stdlib/sql/to.go b/stdlib/sql/to.go index f1d866c077..2e52492a7a 100644 --- a/stdlib/sql/to.go +++ b/stdlib/sql/to.go @@ -471,7 +471,7 @@ func CreateInsertComponents(t *ToSQLTransformation, tbl flux.Table) (colNames [] valueArgs = append(valueArgs, nil) break } - valueArgs = append(valueArgs, er.Strings(j).ValueCopy(i)) + valueArgs = append(valueArgs, er.Strings(j).Value(i)) case flux.TTime: if er.Times(j).IsNull(i) { valueArgs = append(valueArgs, nil) diff --git a/stdlib/universe/distinct.go b/stdlib/universe/distinct.go index 644859edcd..b4942d4c69 100644 --- a/stdlib/universe/distinct.go +++ b/stdlib/universe/distinct.go @@ -309,7 +309,7 @@ func (t *distinctTransformation) Process(id execute.DatasetID, tbl flux.Table) e } nullDistinct = true } else { - v := cr.Strings(j).ValueCopy(i) + v := cr.Strings(j).Value(i) if stringDistinct[v] { continue } diff --git a/stdlib/universe/key_values.go b/stdlib/universe/key_values.go index 3a7b7199ff..0537c2b2a7 100644 --- a/stdlib/universe/key_values.go +++ b/stdlib/universe/key_values.go @@ -396,7 +396,7 @@ func (t *keyValuesTransformation) Process(id execute.DatasetID, tbl flux.Table) } nullDistinct = true } else { - v := vs.ValueCopy(i) + v := vs.Value(i) if stringDistinct[[2]string{c.name, v}] { continue } diff --git a/stdlib/universe/mode.go b/stdlib/universe/mode.go index 9054f2bee4..16347edd3d 100644 --- a/stdlib/universe/mode.go +++ b/stdlib/universe/mode.go @@ -220,7 +220,7 @@ func (t *modeTransformation) doString(cr flux.ColReader, tbl flux.Table, builder if cr.Strings(j).IsNull(i) { continue } - v := cr.Strings(j).ValueCopy(i) + v := cr.Strings(j).Value(i) stringMode[v]++ } diff --git a/stdlib/universe/pivot.go b/stdlib/universe/pivot.go index e83c46371d..e478ffbb3c 100644 --- a/stdlib/universe/pivot.go +++ b/stdlib/universe/pivot.go @@ -384,7 +384,7 @@ func valueToStr(cr flux.ColReader, c flux.ColMeta, row, col int) string { } case flux.TString: if v := cr.Strings(col); v.IsValid(row) { - result = v.ValueCopy(row) + result = v.Value(row) } case flux.TTime: if v := cr.Times(col); v.IsValid(row) { diff --git a/stdlib/universe/table_fns.go b/stdlib/universe/table_fns.go index fb986c86b4..f2cf0b5914 100644 --- a/stdlib/universe/table_fns.go +++ b/stdlib/universe/table_fns.go @@ -189,7 +189,7 @@ func arrayFromColumn(idx int, tbl flux.Table) (values.Array, error) { switch typ { case flux.TString: if vs := cr.Strings(idx); vs.IsValid(i) { - vsSlice = append(vsSlice, values.New(vs.ValueCopy(i))) + vsSlice = append(vsSlice, values.New(vs.Value(i))) } else { vsSlice = append(vsSlice, values.NewNull(semantic.BasicString)) } @@ -279,7 +279,7 @@ func objectFromRow(idx int, cr flux.ColReader) values.Object { switch c.Type { case flux.TString: if vs := cr.Strings(j); vs.IsValid(idx) { - v = values.New(vs.ValueCopy(idx)) + v = values.New(vs.Value(idx)) } else { v = values.NewNull(semantic.BasicString) }