Skip to content

Commit

Permalink
feat: Expose max_row_group_length option for Parquet writer (#589)
Browse files Browse the repository at this point in the history
  • Loading branch information
disq authored Nov 7, 2024
1 parent f2be0a6 commit 4a4b389
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 3 deletions.
28 changes: 26 additions & 2 deletions parquet/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import (
"github.com/invopop/jsonschema"
)

const defaultMaxRowGroupLength = 128 * 1024 * 1024

var allowedVersions = []string{"v1.0", "v2.4", "v2.6", "v2Latest"}
var allowedRootRepetitions = []string{"undefined", "required", "optional", "repeated"}

// nolint:revive
type ParquetSpec struct {
Version string `json:"version,omitempty"`
RootRepetition string `json:"root_repetition,omitempty"`
Version string `json:"version,omitempty"`
RootRepetition string `json:"root_repetition,omitempty"`
MaxRowGroupLength *int64 `json:"max_row_group_length,omitempty"`
}

func (s *ParquetSpec) GetVersion() parquet.Version {
Expand Down Expand Up @@ -46,6 +49,13 @@ func (s *ParquetSpec) GetRootRepetition() parquet.Repetition {
return parquet.Repetitions.Repeated
}

func (s *ParquetSpec) GetMaxRowGroupLength() int64 {
if s.MaxRowGroupLength == nil {
return defaultMaxRowGroupLength
}
return *s.MaxRowGroupLength
}

func (ParquetSpec) JSONSchema() *jsonschema.Schema {
properties := jsonschema.NewProperties()
allowedVersionsAsAny := make([]any, len(allowedVersions))
Expand All @@ -70,6 +80,13 @@ func (ParquetSpec) JSONSchema() *jsonschema.Schema {
Default: "repeated",
})

properties.Set("max_row_group_length", &jsonschema.Schema{
Type: "integer",
Description: "Max row group length",
Default: defaultMaxRowGroupLength,
Minimum: "0",
})

return &jsonschema.Schema{
Description: "CloudQuery Parquet file output spec.",
Properties: properties,
Expand All @@ -85,6 +102,10 @@ func (s *ParquetSpec) SetDefaults() {
if s.RootRepetition == "" {
s.RootRepetition = "repeated"
}
if s.MaxRowGroupLength == nil {
i := int64(defaultMaxRowGroupLength)
s.MaxRowGroupLength = &i
}
}

func (s *ParquetSpec) Validate() error {
Expand All @@ -94,5 +115,8 @@ func (s *ParquetSpec) Validate() error {
if !slices.Contains(allowedRootRepetitions, s.RootRepetition) {
return fmt.Errorf("invalid rootRepetition: %s. Allowed values are %s", s.RootRepetition, strings.Join(allowedRootRepetitions, ", "))
}
if s.MaxRowGroupLength != nil && *s.MaxRowGroupLength < 0 {
return fmt.Errorf("invalid: maxRowGroupLength: %v. Must be zero or positive", *s.MaxRowGroupLength)
}
return nil
}
9 changes: 9 additions & 0 deletions parquet/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,14 @@ func TestSpec_JSONSchema(t *testing.T) {
ErrorMessage: "at '/root_repetition': value must be one of 'undefined', 'required', 'optional', 'repeated'",
Spec: `{"root_repetition":"invalid"}`,
},
{
Name: "valid max_row_group_length",
Spec: `{"max_row_group_length":256}`,
},
{
Name: "invalid max_row_group_length",
ErrorMessage: "at '/max_row_group_length': minimum: got",
Spec: `{"max_row_group_length":-4}`,
},
})
}
2 changes: 1 addition & 1 deletion parquet/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var _ ftypes.Handle = (*Handle)(nil)

func (c *Client) WriteHeader(w io.Writer, t *schema.Table) (ftypes.Handle, error) {
props := parquet.NewWriterProperties(
parquet.WithMaxRowGroupLength(128*1024*1024), // 128M
parquet.WithMaxRowGroupLength(c.spec.GetMaxRowGroupLength()),
parquet.WithCompression(compress.Codecs.Snappy),
parquet.WithVersion(c.spec.GetVersion()),
parquet.WithRootRepetition(c.spec.GetRootRepetition()),
Expand Down
6 changes: 6 additions & 0 deletions schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@
],
"description": "Root repetition",
"default": "repeated"
},
"max_row_group_length": {
"type": "integer",
"minimum": 0,
"description": "Max row group length",
"default": 134217728
}
},
"additionalProperties": false,
Expand Down

0 comments on commit 4a4b389

Please sign in to comment.