Skip to content

Commit

Permalink
feat: Compression (#245)
Browse files Browse the repository at this point in the history
Supports gzip compression by adding `compression: gzip` to spec.
  • Loading branch information
disq authored Jul 21, 2023
1 parent ebb8f4d commit 9794651
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 9 deletions.
11 changes: 5 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import (
)

type Client struct {
spec *FileSpec

types.FileType
spec *FileSpec
filetype types.FileType

csv *csvFile.Client
json *jsonFile.Client
Expand Down Expand Up @@ -52,7 +51,7 @@ func NewClient(spec *FileSpec) (*Client, error) {
return &Client{
spec: spec,
csv: client,
FileType: client,
filetype: client,
}, nil

case FormatTypeJSON:
Expand All @@ -63,7 +62,7 @@ func NewClient(spec *FileSpec) (*Client, error) {
return &Client{
spec: spec,
json: client,
FileType: client,
filetype: client,
}, nil

case FormatTypeParquet:
Expand All @@ -74,7 +73,7 @@ func NewClient(spec *FileSpec) (*Client, error) {
return &Client{
spec: spec,
parquet: client,
FileType: client,
filetype: client,
}, nil

default:
Expand Down
15 changes: 15 additions & 0 deletions read.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package filetypes

import (
"bytes"
"compress/gzip"
"io"

"github.com/apache/arrow/go/v13/arrow"
Expand All @@ -14,6 +16,19 @@ type ReaderAtSeeker interface {
}

func (cl *Client) Read(f ReaderAtSeeker, table *schema.Table, res chan<- arrow.Record) error {
if cl.spec.Compression == CompressionTypeGZip {
rr, err := gzip.NewReader(f)
if err != nil {
return err
}
defer rr.Close()
b, err := io.ReadAll(rr)
if err != nil {
return err
}
f = bytes.NewReader(b)
}

switch cl.spec.Format {
case FormatTypeCSV:
if err := cl.csv.Read(f, table, res); err != nil {
Expand Down
37 changes: 35 additions & 2 deletions spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,17 @@ const (
FormatTypeParquet = "parquet"
)

type CompressionType string

const (
CompressionTypeNone CompressionType = ""
CompressionTypeGZip CompressionType = "gzip"
)

type FileSpec struct {
Format FormatType `json:"format,omitempty"`
FormatSpec any `json:"format_spec,omitempty"`
Format FormatType `json:"format,omitempty"`
FormatSpec any `json:"format_spec,omitempty"`
Compression CompressionType `json:"compression,omitempty"`
csvSpec *csv.Spec
jsonSpec *jsonFile.Spec
parquetSpec *parquet.Spec
Expand All @@ -38,6 +46,9 @@ func (s *FileSpec) SetDefaults() {
}

func (s *FileSpec) Validate() error {
if !s.Compression.IsValid() {
return fmt.Errorf("`compression` must be either empty or `%s`", CompressionTypeGZip)
}
if s.Format == "" {
return fmt.Errorf("format is required")
}
Expand All @@ -47,6 +58,10 @@ func (s *FileSpec) Validate() error {
case FormatTypeJSON:
return s.jsonSpec.Validate()
case FormatTypeParquet:
if s.Compression != CompressionTypeNone {
return fmt.Errorf("compression is not supported for parquet format") // This won't work even if we wanted to, because parquet writer prematurely closes the file handle
}

return s.parquetSpec.Validate()
default:
return fmt.Errorf("unknown format %s", s.Format)
Expand Down Expand Up @@ -76,3 +91,21 @@ func (s *FileSpec) UnmarshalSpec() error {
return fmt.Errorf("unknown format %s", s.Format)
}
}

func (c CompressionType) IsValid() bool {
switch c {
case CompressionTypeNone, CompressionTypeGZip:
return true
default:
return false
}
}

func (c CompressionType) Extension() string {
switch c {
case CompressionTypeGZip:
return ".gz"
default:
return ""
}
}
39 changes: 38 additions & 1 deletion write.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package filetypes

import (
"compress/gzip"
"fmt"
"io"

"github.com/apache/arrow/go/v13/arrow"
Expand All @@ -9,5 +11,40 @@ import (
)

func (cl *Client) WriteTableBatchFile(w io.Writer, table *schema.Table, records []arrow.Record) error {
return types.WriteAll(cl.FileType, w, table, records)
return types.WriteAll(cl, w, table, records)
}

func (cl *Client) WriteHeader(w io.Writer, t *schema.Table) (types.Handle, error) {
switch cl.spec.Compression {
case CompressionTypeNone:
return cl.filetype.WriteHeader(w, t)

case CompressionTypeGZip:
gw := gzip.NewWriter(w)
h, err := cl.filetype.WriteHeader(gw, t)
if err != nil {
return nil, err
}
return newClosableHandle(h, gw.Close), nil

default:
return nil, fmt.Errorf("unhandled compression type %s", cl.spec.Compression)
}
}

type closableHandle struct {
types.Handle
afterCloseFunc func() error
}

var _ types.Handle = (*closableHandle)(nil)

func newClosableHandle(h types.Handle, afterCloseFunc func() error) types.Handle {
return &closableHandle{Handle: h, afterCloseFunc: afterCloseFunc}
}
func (c *closableHandle) WriteFooter() error {
if err := c.Handle.WriteFooter(); err != nil {
return err
}
return c.afterCloseFunc()
}

0 comments on commit 9794651

Please sign in to comment.