Skip to content

Commit

Permalink
Cleaning up S3.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Sep 7, 2024
1 parent a874cf2 commit 7eef047
Showing 1 changed file with 2 additions and 13 deletions.
15 changes: 2 additions & 13 deletions clients/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/parquetutil"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/writer"
)
Expand Down Expand Up @@ -79,15 +77,7 @@ func (s *Store) Merge(tableData *optimization.TableData) error {
return nil
}

var cols []columns.Column
for _, col := range tableData.ReadOnlyInMemoryCols().GetColumns() {
if col.KindDetails == typing.Invalid {
continue
}

cols = append(cols, col)
}

cols := tableData.ReadOnlyInMemoryCols().ValidColumns()
schema, err := parquetutil.GenerateJSONSchema(cols)
if err != nil {
return fmt.Errorf("failed to generate parquet schema: %w", err)
Expand All @@ -106,10 +96,9 @@ func (s *Store) Merge(tableData *optimization.TableData) error {

additionalDateFmts := s.config.SharedTransferConfig.TypingSettings.AdditionalDateFormats
pw.CompressionType = parquet.CompressionCodec_GZIP
columns := tableData.ReadOnlyInMemoryCols().ValidColumns()
for _, val := range tableData.Rows() {
row := make(map[string]any)
for _, col := range columns {
for _, col := range cols {
value, err := parquetutil.ParseValue(val[col.Name()], col, additionalDateFmts)
if err != nil {
return fmt.Errorf("failed to parse value, err: %w, value: %v, column: %q", err, val[col.Name()], col.Name())
Expand Down

0 comments on commit 7eef047

Please sign in to comment.