Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(proto): introduce StatefulColumn concept #31

Merged
merged 2 commits into from
Jan 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions cht/cht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,12 @@ PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID)`,
})

require.NoError(t, client.Do(ctx, ch.Query{
do(ctx, t, ch.Query{
Result: (&proto.Results{}).Auto(),
OnResult: func(ctx context.Context, block proto.Block) error { return nil },
Body: `CREATE TABLE hits_distributed ON CLUSTER 'nexus' AS hits
ENGINE = Distributed('nexus', default, hits, rand())`,
}))
})
t.Run("Insert", func(t *testing.T) {
for i := 0; i < 20; i++ {
require.NoError(t, client.Do(ctx, ch.Query{
Expand Down
8 changes: 4 additions & 4 deletions proto/_golden/col_arr_low_cardinality_u8_str.hex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
00000000 03 00 00 00 00 00 00 00 04 00 00 00 00 00 00 00 |................|
00000010 06 00 00 00 00 00 00 00 08 00 00 00 00 00 00 00 |................|
00000020 0c 00 00 00 00 00 00 00 01 00 00 00 00 00 00 00 |................|
00000030 00 06 00 00 00 00 00 00 03 00 00 00 00 00 00 00 |................|
00000040 03 66 6f 6f 03 62 61 72 03 62 61 7a 0c 00 00 00 |.foo.bar.baz....|
00000050 00 00 00 00 00 01 02 00 01 01 00 00 01 01 01 01 |................|
00000020 0c 00 00 00 00 00 00 00 00 06 00 00 00 00 00 00 |................|
00000030 03 00 00 00 00 00 00 00 03 66 6f 6f 03 62 61 72 |.........foo.bar|
00000040 03 62 61 7a 0c 00 00 00 00 00 00 00 00 01 02 00 |.baz............|
00000050 01 01 00 00 01 01 01 01 |........|
Binary file modified proto/_golden/col_arr_low_cardinality_u8_str.raw
Binary file not shown.
10 changes: 5 additions & 5 deletions proto/_golden/col_low_cardinality_i_str_k_8.hex
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
00000000 01 00 00 00 00 00 00 00 00 06 00 00 00 00 00 00 |................|
00000010 03 00 00 00 00 00 00 00 03 6e 65 6f 07 74 72 69 |.........neo.tri|
00000020 6e 69 74 79 08 6d 6f 72 70 68 65 75 73 19 00 00 |nity.morpheus...|
00000030 00 00 00 00 00 00 01 02 00 01 02 00 01 02 00 01 |................|
00000040 02 00 01 02 00 01 02 00 01 02 00 01 02 00 |..............|
00000000 00 06 00 00 00 00 00 00 03 00 00 00 00 00 00 00 |................|
00000010 03 6e 65 6f 07 74 72 69 6e 69 74 79 08 6d 6f 72 |.neo.trinity.mor|
00000020 70 68 65 75 73 19 00 00 00 00 00 00 00 00 01 02 |pheus...........|
00000030 00 01 02 00 01 02 00 01 02 00 01 02 00 01 02 00 |................|
00000040 01 02 00 01 02 00 |......|
Binary file modified proto/_golden/col_low_cardinality_i_str_k_8.raw
Binary file not shown.
Binary file added proto/_testdata/select_lc.raw
Binary file not shown.
6 changes: 6 additions & 0 deletions proto/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ func (b Block) EncodeBlock(buf *Buffer, version int, input []InputColumn) error
return errors.Wrap(err, "prepare")
}
}
if col.Data.Rows() == 0 {
continue
}
if v, ok := col.Data.(StateEncoder); ok {
v.EncodeState(buf)
}
col.Data.EncodeColumn(buf)
}
return nil
Expand Down
128 changes: 14 additions & 114 deletions proto/col_arr.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,132 +17,32 @@ var (
_ Column = (*ColArr)(nil)
)

// Result of Query.
type Result interface {
DecodeResult(r *Reader, b Block) error
}

// Results wrap []ResultColumn to implement Result.
type Results []ResultColumn

type autoResults struct {
results *Results
}

func (s autoResults) DecodeResult(r *Reader, b Block) error {
return s.results.decodeAuto(r, b)
func (c ColArr) EncodeColumn(b *Buffer) {
c.Offsets.EncodeColumn(b)
c.Data.EncodeColumn(b)
}

func (s Results) Rows() int {
if len(s) == 0 {
return 0
}
return s[0].Data.Rows()
func (c ColArr) Type() ColumnType {
return c.Data.Type().Array()
}

func (s *Results) Auto() Result {
return autoResults{results: s}
func (c ColArr) Rows() int {
return len(c.Offsets)
}

func (s *Results) decodeAuto(r *Reader, b Block) error {
if len(*s) > 0 {
// Already inferred.
return s.DecodeResult(r, b)
}
for i := 0; i < b.Columns; i++ {
columnName, err := r.Str()
if err != nil {
return errors.Wrapf(err, "column [%d] name", i)
}
columnTypeRaw, err := r.Str()
if err != nil {
return errors.Wrapf(err, "column [%d] type", i)
func (c *ColArr) DecodeState(r *Reader) error {
if s, ok := c.Data.(StateDecoder); ok {
if err := s.DecodeState(r); err != nil {
return errors.Wrap(err, "data state")
}
var (
colType = ColumnType(columnTypeRaw)
col = &ColAuto{}
)
if err := col.Infer(colType); err != nil {
return errors.Wrap(err, "column type inference")
}
col.Data.Reset()
if err := col.Data.DecodeColumn(r, b.Rows); err != nil {
return errors.Wrap(err, columnName)
}
*s = append(*s, ResultColumn{
Name: columnName,
Data: col.Data,
})
}
return nil
}

func (s Results) DecodeResult(r *Reader, b Block) error {
var (
noTarget = len(s) == 0
noRows = b.Rows == 0
columnsMismatch = b.Columns != len(s)
allowMismatch = noTarget && noRows
)
if columnsMismatch && !allowMismatch {
return errors.Errorf("%d (columns) != %d (target)", b.Columns, len(s))
func (c *ColArr) EncodeState(b *Buffer) {
if s, ok := c.Data.(StateEncoder); ok {
s.EncodeState(b)
}
for i := 0; i < b.Columns; i++ {
columnName, err := r.Str()
if err != nil {
return errors.Wrapf(err, "column [%d] name", i)
}
columnType, err := r.Str()
if err != nil {
return errors.Wrapf(err, "column [%d] type", i)
}
if noTarget {
// Just reading types and names.
continue
}

// Checking column name and type.
t := s[i]
if t.Name == "" {
// Inferring column name.
t.Name = columnName
s[i] = t
}
if t.Name != columnName {
return errors.Errorf("[%d]: unexpected column %q (%q expected)", i, columnName, t.Name)
}
gotType := ColumnType(columnType)
if infer, ok := t.Data.(InferColumn); ok {
if err := infer.Infer(gotType); err != nil {
return errors.Wrap(err, "infer")
}
}
hasType := t.Data.Type()
if gotType.Conflicts(hasType) {
return errors.Errorf("[%d]: %s: unexpected type %q (got) instead of %q (has)",
i, columnName, gotType, hasType,
)
}
t.Data.Reset()
if err := t.Data.DecodeColumn(r, b.Rows); err != nil {
return errors.Wrap(err, columnName)
}
}

return nil
}

func (c ColArr) EncodeColumn(b *Buffer) {
c.Offsets.EncodeColumn(b)
c.Data.EncodeColumn(b)
}

func (c ColArr) Type() ColumnType {
return c.Data.Type().Array()
}

func (c ColArr) Rows() int {
return len(c.Offsets)
}

func (c *ColArr) DecodeColumn(r *Reader, rows int) error {
Expand Down
38 changes: 26 additions & 12 deletions proto/col_low_cardinality.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,32 @@ type ColLowCardinality struct {
Keys64 ColUInt64
}

func (c *ColLowCardinality) DecodeState(r *Reader) error {
keySerialization, err := r.Int64()
if err != nil {
return errors.Wrap(err, "version")
}
if keySerialization != int64(sharedDictionariesWithAdditionalKeys) {
return errors.Errorf("got version %d, expected %d",
keySerialization, sharedDictionariesWithAdditionalKeys,
)
}
if s, ok := c.Index.(StateDecoder); ok {
if err := s.DecodeState(r); err != nil {
return errors.Wrap(err, "state")
}
}
return nil
}

func (c ColLowCardinality) EncodeState(b *Buffer) {
// Writing key serialization version.
b.PutInt64(int64(sharedDictionariesWithAdditionalKeys))
if s, ok := c.Index.(StateEncoder); ok {
s.EncodeState(b)
}
}

// Constants for low cardinality metadata value that is represented as int64
// consisted of bitflags and key type.
//
Expand Down Expand Up @@ -119,15 +145,6 @@ func (c *ColLowCardinality) DecodeColumn(r *Reader, rows int) error {
// Skipping entirely of no rows.
return nil
}
keySerialization, err := r.Int64()
if err != nil {
return errors.Wrap(err, "version")
}
if keySerialization != int64(sharedDictionariesWithAdditionalKeys) {
return errors.Errorf("got version %d, expected %d",
keySerialization, sharedDictionariesWithAdditionalKeys,
)
}
meta, err := r.Int64()
if err != nil {
return errors.Wrap(err, "meta")
Expand Down Expand Up @@ -184,9 +201,6 @@ func (c ColLowCardinality) EncodeColumn(b *Buffer) {
return
}

// Writing key serialization version.
b.PutInt64(int64(sharedDictionariesWithAdditionalKeys))

// Meta encodes whether reader should update
// low cardinality metadata and keys column type.
meta := cardinalityUpdateAll | int64(c.Key)
Expand Down
23 changes: 23 additions & 0 deletions proto/col_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,29 @@ type ColMap struct {
Values Column
}

func (c *ColMap) DecodeState(r *Reader) error {
if s, ok := c.Keys.(StateDecoder); ok {
if err := s.DecodeState(r); err != nil {
return errors.Wrap(err, "keys state")
}
}
if s, ok := c.Values.(StateDecoder); ok {
if err := s.DecodeState(r); err != nil {
return errors.Wrap(err, "values state")
}
}
return nil
}

func (c ColMap) EncodeState(b *Buffer) {
if s, ok := c.Keys.(StateEncoder); ok {
s.EncodeState(b)
}
if s, ok := c.Values.(StateEncoder); ok {
s.EncodeState(b)
}
}

func (c ColMap) Type() ColumnType {
return ColumnTypeMap.Sub(c.Keys.Type(), c.Values.Type())
}
Expand Down
19 changes: 19 additions & 0 deletions proto/col_tuple.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,25 @@ import "github.com/go-faster/errors"
// Basically it is just a group of columns.
type ColTuple []Column

func (c ColTuple) DecodeState(r *Reader) error {
for i, v := range c {
if s, ok := v.(StateDecoder); ok {
if err := s.DecodeState(r); err != nil {
return errors.Wrapf(err, "[%d]", i)
}
}
}
return nil
}

func (c ColTuple) EncodeState(b *Buffer) {
for _, v := range c {
if s, ok := v.(StateEncoder); ok {
s.EncodeState(b)
}
}
}

func (c ColTuple) Type() ColumnType {
var types []ColumnType
for _, v := range c {
Expand Down
13 changes: 13 additions & 0 deletions proto/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,19 @@ type Column interface {
ColInput
}

type StateEncoder interface {
EncodeState(b *Buffer)
}

type StateDecoder interface {
DecodeState(r *Reader) error
}

type StatefulColumn interface {
StateEncoder
StateDecoder
}

// Preparable is Column that should be prepared before encoding or decoding.
type Preparable interface {
Prepare() error
Expand Down
15 changes: 15 additions & 0 deletions proto/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,18 @@ func TestDump(t *testing.T) {
}),
)
}

func TestDumpLowCardinality(t *testing.T) {
data, err := os.ReadFile(filepath.Join("_testdata", "select_lc.raw"))
require.NoError(t, err)
str := &ColStr{}
idx := &ColLowCardinality{Index: str}
col := &ColArr{Data: idx}
var dec Block
require.NoError(t, dec.DecodeRawBlock(
NewReader(bytes.NewReader(data)),
Results{
{Name: "v", Data: col},
}),
)
}
Loading