Skip to content

Commit

Permalink
Switch parquet library (#19)
Browse files Browse the repository at this point in the history
* Start switching parquet library

* Remove delta data type aliases

* Remove _parsed fields, DeltaDataTypeDate, drop writer version to 1

* Manually parse actions since the schema isn't fixed

* Restore DeltaDataType type definitions.  Convert all action struct members to pointers so that parquet schema is consistent with Spark/Rust

* Fix up map and list reading to work with either pointer or non-pointer entries

* go mod tidy

* Use consistent version type

* Return error if add size is zero or missing when creating a checkpoint

* Make fields that are required in the protocol also required in Parquet.  Remove timestamp logical type to match Spark schema. Remove Delta data type definitions.
  • Loading branch information
chelseajonesr authored Jul 11, 2023
1 parent c243636 commit 6167048
Show file tree
Hide file tree
Showing 16 changed files with 1,080 additions and 440 deletions.
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func main() {

add := delta.Add{
Path: fileName,
Size: delta.DeltaDataTypeLong(p.Size),
Size: p.Size,
DataChange: true,
ModificationTime: delta.DeltaDataTypeTimestamp(time.Now().UnixMilli()),
ModificationTime: time.Now().UnixMilli(),
Stats: string(stats.Json()),
PartitionValues: make(map[string]string),
}
Expand Down Expand Up @@ -104,9 +104,9 @@ func main() {

add := delta.Add{
Path: fileName,
Size: delta.DeltaDataTypeLong(p.Size),
Size: p.Size,
DataChange: true,
ModificationTime: delta.DeltaDataTypeTimestamp(time.Now().UnixMilli()),
ModificationTime: time.Now().UnixMilli(),
Stats: string(stats.Json()),
PartitionValues: make(map[string]string),
}
Expand Down Expand Up @@ -246,7 +246,6 @@ Limitations / TODO
Checkpoints:
- The checkpoint checksum is not being written or validated
- Checkpoints with non-string-type partitions require custom JSON marshal/unmarshal code
- Parsed stats does not include null counts

Other:
- Nested schemas (containing nested structs, arrays, or maps) are not supported
Expand Down
488 changes: 415 additions & 73 deletions action.go

Large diffs are not rendered by default.

52 changes: 19 additions & 33 deletions action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ func TestLogEntryFromActions(t *testing.T) {
add1 := AddPartitioned[emptyTestStruct, emptyTestStruct]{
Path: "part-1.snappy.parquet",
Size: 1,
ModificationTime: DeltaDataTypeTimestamp(1675020556534),
ModificationTime: 1675020556534,
DataChange: false,
}
add2 := &AddPartitioned[emptyTestStruct, emptyTestStruct]{
Path: "part-2.snappy.parquet",
Size: 2,
ModificationTime: DeltaDataTypeTimestamp(1675020556534),
ModificationTime: 1675020556534,
DataChange: false,
}

write := Write{Mode: ErrorIfExists}
Expand Down Expand Up @@ -66,11 +68,11 @@ func TestLogEntryFromActions(t *testing.T) {
t.Errorf("want:\n%s\nhas:\n%s\n", expectedStr, string(logs))
}

if !strings.Contains(string(logs), `{"add":{"path":"part-1.snappy.parquet","partitionValues":null,"size":1,"modificationTime":1675020556534,"dataChange":false,"stats":""}}`) {
if !strings.Contains(string(logs), `{"add":{"path":"part-1.snappy.parquet","partitionValues":null,"size":1,"modificationTime":1675020556534,"dataChange":false,"stats":null}}`) {
t.Errorf("want:\n%s\nhas:\n%s\n", expectedStr, string(logs))
}

if !strings.Contains(string(logs), `{"path":"part-2.snappy.parquet","partitionValues":null,"size":2,"modificationTime":1675020556534,"dataChange":false,"stats":""}`) {
if !strings.Contains(string(logs), `{"path":"part-2.snappy.parquet","partitionValues":null,"size":2,"modificationTime":1675020556534,"dataChange":false,"stats":null}`) {
t.Errorf("want:\n%s\nhas:\n%s\n", expectedStr, string(logs))
}
}
Expand All @@ -80,7 +82,7 @@ func TestLogEntryFromAction(t *testing.T) {
commit := make(CommitInfo)
commit["path"] = "part-1.snappy.parquet"
commit["size"] = 1
commit["ModificationTime"] = DeltaDataTypeTimestamp(time.Now().UnixMilli())
commit["ModificationTime"] = time.Now().UnixMilli()

abytes, err := logEntryFromAction[emptyTestStruct, emptyTestStruct](commit)
if err != nil {
Expand Down Expand Up @@ -109,17 +111,20 @@ func TestLogEntryFromActionChangeMetaData(t *testing.T) {
}
}
`, "\n", ""), "\t", ""), " ", "")
provider := "parquet"
options := make(map[string]string)
format := Format{
Provider: "parquet",
Options: make(map[string]string),
Provider: provider,
Options: options,
}
config := make(map[string]string)
config[string(AppendOnlyDeltaConfigKey)] = "true"
id, _ := uuid.Parse("af23c9d7-fff1-4a5a-a2c8-55c59bd782aa")
schemaString := "..."
action := MetaData{
Id: id,
Format: format,
SchemaString: "...",
SchemaString: schemaString,
PartitionColumns: []string{},
Configuration: config,
}
Expand Down Expand Up @@ -321,6 +326,8 @@ func TestActionFromLogEntry(t *testing.T) {
unstructuredResult map[string]json.RawMessage
}

statsString := `{"numRecords":155,"tightBounds":false,"minValues":{"timestamp":1615338375007003},"maxValues":{"timestamp":1615338377517216},"nullCount":null}`

// Caveats:
// CommitInfo's operationParameters is not being tested because the result from the unmarshal process is a map[string]interface{} and I haven't
// been able to set up an expected map that maintains the interface{} type, so DeepEquals() fails
Expand All @@ -332,7 +339,7 @@ func TestActionFromLogEntry(t *testing.T) {
}{
{name: "Add", args: args{unstructuredResult: map[string]json.RawMessage{"add": []byte(`{"path":"mypath.parquet","size":8382,"partitionValues":{"date":"2021-03-09"},"modificationTime":1679610144893,"dataChange":true,"stats":"{\"numRecords\":155,\"tightBounds\":false,\"minValues\":{\"timestamp\":1615338375007003},\"maxValues\":{\"timestamp\":1615338377517216},\"nullCount\":null}"}`)}},
want: &AddPartitioned[emptyTestStruct, emptyTestStruct]{Path: "mypath.parquet", Size: 8382, PartitionValues: map[string]string{"date": "2021-03-09"}, ModificationTime: 1679610144893, DataChange: true,
Stats: `{"numRecords":155,"tightBounds":false,"minValues":{"timestamp":1615338375007003},"maxValues":{"timestamp":1615338377517216},"nullCount":null}`}, wantErr: nil},
Stats: &statsString}, wantErr: nil},
{name: "CommitInfo", args: args{unstructuredResult: map[string]json.RawMessage{"commitInfo": []byte(`{"clientVersion":"delta-go.alpha-0.0.0","isBlindAppend":true,"operation":"delta-go.Write","timestamp":1679610144893}`)}},
want: &CommitInfo{"clientVersion": "delta-go.alpha-0.0.0", "isBlindAppend": true, "operation": "delta-go.Write",
"timestamp": float64(1679610144893)}, wantErr: nil},
Expand Down Expand Up @@ -367,11 +374,12 @@ func TestActionsFromLogEntries(t *testing.T) {
NumRecords: 123,
}

statsString := string(stats.Json())
add := AddPartitioned[emptyTestStruct, emptyTestStruct]{
Path: "part-1.snappy.parquet",
Size: 1,
ModificationTime: DeltaDataTypeTimestamp(1675020556534),
Stats: string(stats.Json()),
ModificationTime: 1675020556534,
Stats: &statsString,
}

write := Write{Mode: ErrorIfExists}
Expand Down Expand Up @@ -469,28 +477,6 @@ func TestStatsAsGenericStats(t *testing.T) {
}
}

func TestPartitionValuesAsGenericPartitions(t *testing.T) {
type TestPartitionType1 struct {
Date DeltaDataTypeDate `json:"date"`
}

input1 := make(map[string]string)
input1["date"] = "2012-07-26"

expectedPartitions1 := TestPartitionType1{Date: DeltaDataTypeDate(time.Date(2012, time.July, 26, 0, 0, 0, 0, time.UTC).Unix() / NUM_SECONDS_IN_DAY)}

results1, err := partitionValuesAsGeneric[TestPartitionType1](input1)
if err != nil {
t.Error(err)
} else {
if !reflect.DeepEqual(expectedPartitions1, *results1) {
t.Errorf("StatsAsGenericStats results did not match expected. Got %v expected %v", *results1, expectedPartitions1)
}
}

// TODO non-string type partitions
}

func TestMetadataGetSchema(t *testing.T) {
// Simple
schemaString := "{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ts\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"
Expand Down
Loading

0 comments on commit 6167048

Please sign in to comment.