Skip to content

Commit

Permalink
fix: Revert "feat(execute): allocate memory for string content. (#5482)"
Browse files Browse the repository at this point in the history
This reverts commit bea9586.
  • Loading branch information
mhilton committed Aug 13, 2024
1 parent b624a57 commit cf7de96
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 136 deletions.
29 changes: 10 additions & 19 deletions execute/allocator.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package execute

import (
arrowmem "github.com/apache/arrow/go/v7/arrow/memory"

"github.com/influxdata/flux/memory"
)

Expand Down Expand Up @@ -158,14 +156,17 @@ func (a *Allocator) GrowFloats(slice []float64, n int) []float64 {
return s
}

// Strings makes a slice of String values.
func (a *Allocator) Strings(l, c int) []String {
// Strings makes a slice of string values.
// Only the string headers are accounted for.
func (a *Allocator) Strings(l, c int) []string {
a.account(c, stringSize)
return make([]String, l, c)
return make([]string, l, c)
}

// AppendStrings appends Strings to a slice.
func (a *Allocator) AppendStrings(slice []String, vs ...String) []String {
// AppendStrings appends strings to a slice.
// Only the string headers are accounted for.
func (a *Allocator) AppendStrings(slice []string, vs ...string) []string {
// TODO(nathanielc): Account for actual size of strings
if cap(slice)-len(slice) >= len(vs) {
return append(slice, vs...)
}
Expand All @@ -175,14 +176,14 @@ func (a *Allocator) AppendStrings(slice []String, vs ...String) []String {
return s
}

func (a *Allocator) GrowStrings(slice []String, n int) []String {
func (a *Allocator) GrowStrings(slice []string, n int) []string {
newCap := len(slice) + n
if newCap < cap(slice) {
return slice[:newCap]
}
// grow capacity same way as built-in append
newCap = newCap*3/2 + 1
s := make([]String, len(slice)+n, newCap)
s := make([]string, len(slice)+n, newCap)
copy(s, slice)
diff := cap(s) - cap(slice)
a.account(diff, stringSize)
Expand Down Expand Up @@ -219,13 +220,3 @@ func (a *Allocator) GrowTimes(slice []Time, n int) []Time {
a.account(diff, timeSize)
return s
}

// String represents a string stored in some backing byte slice.
type String struct {
offset int
len int
}

func (s String) Bytes(buf *arrowmem.Buffer) []byte {
return buf.Bytes()[s.offset : s.offset+s.len]
}
104 changes: 39 additions & 65 deletions execute/table.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package execute

import (
"bytes"
"fmt"
"sort"
"sync/atomic"

arrowmem "github.com/apache/arrow/go/v7/arrow/memory"
"github.com/google/go-cmp/cmp"

"github.com/influxdata/flux"
"github.com/influxdata/flux/array"
"github.com/influxdata/flux/arrow"
Expand Down Expand Up @@ -298,9 +295,8 @@ func TablesEqual(left, right flux.Table, alloc memory.Allocator) (bool, error) {
eq = cmp.Equal(leftBuffer.cols[j].(*floatColumnBuilder).data,
rightBuffer.cols[j].(*floatColumnBuilder).data)
case flux.TString:
eq = cmp.Equal(leftBuffer.cols[j].(*stringColumnBuilder),
rightBuffer.cols[j].(*stringColumnBuilder),
cmp.Comparer(stringColumnBuilderEqual))
eq = cmp.Equal(leftBuffer.cols[j].(*stringColumnBuilder).data,
rightBuffer.cols[j].(*stringColumnBuilder).data)
case flux.TTime:
eq = cmp.Equal(leftBuffer.cols[j].(*timeColumnBuilder).data,
rightBuffer.cols[j].(*timeColumnBuilder).data)
Expand Down Expand Up @@ -328,27 +324,6 @@ func colsMatch(left, right []flux.ColMeta) bool {
return true
}

func stringColumnBuilderEqual(x, y *stringColumnBuilder) bool {
if x.Len() != y.Len() {
return false
}
for i := 0; i < x.Len(); i++ {
if x.IsNil(i) {
if !y.IsNil(i) {
return false
}
continue
}
if y.IsNil(i) {
return false
}
if !bytes.Equal(x.data[i].Bytes(x.buf), y.data[i].Bytes(y.buf)) {
return false
}
}
return true
}

// ColMap writes a mapping of builder index to cols index into colMap.
// When colMap does not have enough capacity a new colMap is allocated.
// The colMap is always returned
Expand Down Expand Up @@ -623,7 +598,6 @@ func (b *ColListTableBuilder) AddCol(c flux.ColMeta) (int, error) {
case flux.TString:
b.cols = append(b.cols, &stringColumnBuilder{
columnBuilderBase: colBase,
buf: arrowmem.NewResizableBuffer(b.alloc.Allocator),
})
if b.NRows() > 0 {
if err := b.GrowStrings(newIdx, b.NRows()); err != nil {
Expand Down Expand Up @@ -945,9 +919,8 @@ func (b *ColListTableBuilder) SetString(i int, j int, value string) error {
if err := b.checkCol(j, flux.TString); err != nil {
return err
}
col := b.cols[j].(*stringColumnBuilder)
col.data[i] = col.makeString(value)
col.SetNil(i, false)
b.cols[j].(*stringColumnBuilder).data[i] = value
b.cols[j].SetNil(i, false)
return nil
}

Expand All @@ -956,7 +929,7 @@ func (b *ColListTableBuilder) AppendString(j int, value string) error {
return err
}
col := b.cols[j].(*stringColumnBuilder)
col.data = b.alloc.AppendStrings(col.data, col.makeString(value))
col.data = b.alloc.AppendStrings(col.data, value)
b.nrows = len(col.data)
return nil
}
Expand Down Expand Up @@ -1179,6 +1152,11 @@ func (b *ColListTableBuilder) Floats(j int) []float64 {
CheckColType(b.colMeta[j], flux.TFloat)
return b.cols[j].(*floatColumnBuilder).data
}
func (b *ColListTableBuilder) Strings(j int) []string {
meta := b.colMeta[j]
CheckColType(meta, flux.TString)
return b.cols[j].(*stringColumnBuilder).data
}
func (b *ColListTableBuilder) Times(j int) []values.Time {
CheckColType(b.colMeta[j], flux.TTime)
return b.cols[j].(*timeColumnBuilder).data
Expand All @@ -1202,9 +1180,7 @@ func (b *ColListTableBuilder) GetRow(row int) values.Object {
case flux.TFloat:
val = values.NewFloat(b.cols[j].(*floatColumnBuilder).data[row])
case flux.TString:
// TODO(mhilton): avoid a copy
col := b.cols[j].(*stringColumnBuilder)
val = values.NewString(string(col.data[row].Bytes(col.buf)))
val = values.NewString(b.cols[j].(*stringColumnBuilder).data[row])
case flux.TTime:
val = values.NewTime(b.cols[j].(*timeColumnBuilder).data[row])
}
Expand Down Expand Up @@ -1890,38 +1866,46 @@ func (c *stringColumn) Copy() column {

type stringColumnBuilder struct {
columnBuilderBase
data []String

// buf contains a backing buffer containing the bytes of the
// strings.
buf *arrowmem.Buffer
data []string
}

func (c *stringColumnBuilder) Clear() {
c.buf.Release()
c.buf = arrowmem.NewResizableBuffer(c.alloc.Allocator)
c.data = c.data[:0]
c.data = c.data[0:0]
}

func (c *stringColumnBuilder) Release() {
c.buf.Release()
c.alloc.Free(cap(c.data), stringSize)
c.data = nil
}

func (c *stringColumnBuilder) Copy() column {
builder := arrow.NewStringBuilder(c.alloc.Allocator)
builder.Reserve(len(c.data))
builder.ReserveData(c.buf.Len())
for i, v := range c.data {
if c.nils[i] {
builder.AppendNull()
continue
var data *array.String
if len(c.nils) > 0 {
b := arrow.NewStringBuilder(c.alloc.Allocator)
b.Reserve(len(c.data))
sz := 0
for i, v := range c.data {
if c.nils[i] {
continue
}
sz += len(v)
}
builder.AppendBytes(v.Bytes(c.buf))
b.ReserveData(sz)
for i, v := range c.data {
if c.nils[i] {
b.AppendNull()
continue
}
b.Append(v)
}
data = b.NewStringArray()
b.Release()
} else {
data = arrow.NewString(c.data, c.alloc.Allocator)
}
col := &stringColumn{
ColMeta: c.ColMeta,
data: builder.NewStringArray(),
data: data,
}
return col
}
Expand All @@ -1932,13 +1916,13 @@ func (c *stringColumnBuilder) Len() int {

func (c *stringColumnBuilder) Equal(i, j int) bool {
return c.EqualFunc(i, j, func(i, j int) bool {
return bytes.Equal(c.data[i].Bytes(c.buf), c.data[j].Bytes(c.buf))
return c.data[i] == c.data[j]
})
}

func (c *stringColumnBuilder) Less(i, j int) bool {
return c.LessFunc(i, j, func(i, j int) bool {
return bytes.Compare(c.data[i].Bytes(c.buf), c.data[j].Bytes(c.buf)) < 0
return c.data[i] < c.data[j]
})
}

Expand All @@ -1947,16 +1931,6 @@ func (c *stringColumnBuilder) Swap(i, j int) {
c.data[i], c.data[j] = c.data[j], c.data[i]
}

func (c *stringColumnBuilder) makeString(s string) String {
offset := c.buf.Len()
c.buf.Resize(offset + len(s))
copy(c.buf.Bytes()[offset:], s)
return String{
offset: offset,
len: len(s),
}
}

type timeColumn struct {
flux.ColMeta
data *array.Int
Expand Down
52 changes: 0 additions & 52 deletions execute/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,58 +148,6 @@ func TestTablesEqual(t *testing.T) {
},
want: false,
},
{
name: "string values",
data0: &executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(1), "1"},
{execute.Time(2), "2"},
{execute.Time(3), "3"},
},
},
data1: &executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(1), "1"},
{execute.Time(2), "2"},
{execute.Time(3), "3"},
},
},
want: true,
},
{
name: "string mismatch",
data0: &executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(1), "1"},
{execute.Time(2), "2"},
{execute.Time(3), "3"},
},
},
data1: &executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(1), "1"},
{execute.Time(2), "2"},
{execute.Time(3), "4"},
},
},
want: false,
},
}
for _, tc := range testCases {
tc := tc
Expand Down

0 comments on commit cf7de96

Please sign in to comment.