Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch parquet library #19

Merged
merged 10 commits into from
Jul 11, 2023
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func main() {

add := delta.Add{
Path: fileName,
Size: delta.DeltaDataTypeLong(p.Size),
Size: p.Size,
DataChange: true,
ModificationTime: delta.DeltaDataTypeTimestamp(time.Now().UnixMilli()),
ModificationTime: time.Now().UnixMilli(),
Stats: string(stats.Json()),
PartitionValues: make(map[string]string),
}
Expand Down Expand Up @@ -104,9 +104,9 @@ func main() {

add := delta.Add{
Path: fileName,
Size: delta.DeltaDataTypeLong(p.Size),
Size: p.Size,
DataChange: true,
ModificationTime: delta.DeltaDataTypeTimestamp(time.Now().UnixMilli()),
ModificationTime: time.Now().UnixMilli(),
Stats: string(stats.Json()),
PartitionValues: make(map[string]string),
}
Expand Down Expand Up @@ -246,7 +246,6 @@ Limitations / TODO
Checkpoints:
- The checkpoint checksum is not being written or validated
- Checkpoints with non-string-type partitions require custom JSON marshal/unmarshal code
- Parsed stats does not include null counts

Other:
- Nested schemas (containing nested structs, arrays, or maps) are not supported
Expand Down
488 changes: 415 additions & 73 deletions action.go

Large diffs are not rendered by default.

52 changes: 19 additions & 33 deletions action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ func TestLogEntryFromActions(t *testing.T) {
add1 := AddPartitioned[emptyTestStruct, emptyTestStruct]{
Path: "part-1.snappy.parquet",
Size: 1,
ModificationTime: DeltaDataTypeTimestamp(1675020556534),
ModificationTime: 1675020556534,
DataChange: false,
}
add2 := &AddPartitioned[emptyTestStruct, emptyTestStruct]{
Path: "part-2.snappy.parquet",
Size: 2,
ModificationTime: DeltaDataTypeTimestamp(1675020556534),
ModificationTime: 1675020556534,
DataChange: false,
}

write := Write{Mode: ErrorIfExists}
Expand Down Expand Up @@ -66,11 +68,11 @@ func TestLogEntryFromActions(t *testing.T) {
t.Errorf("want:\n%s\nhas:\n%s\n", expectedStr, string(logs))
}

if !strings.Contains(string(logs), `{"add":{"path":"part-1.snappy.parquet","partitionValues":null,"size":1,"modificationTime":1675020556534,"dataChange":false,"stats":""}}`) {
if !strings.Contains(string(logs), `{"add":{"path":"part-1.snappy.parquet","partitionValues":null,"size":1,"modificationTime":1675020556534,"dataChange":false,"stats":null}}`) {
t.Errorf("want:\n%s\nhas:\n%s\n", expectedStr, string(logs))
}

if !strings.Contains(string(logs), `{"path":"part-2.snappy.parquet","partitionValues":null,"size":2,"modificationTime":1675020556534,"dataChange":false,"stats":""}`) {
if !strings.Contains(string(logs), `{"path":"part-2.snappy.parquet","partitionValues":null,"size":2,"modificationTime":1675020556534,"dataChange":false,"stats":null}`) {
t.Errorf("want:\n%s\nhas:\n%s\n", expectedStr, string(logs))
}
}
Expand All @@ -80,7 +82,7 @@ func TestLogEntryFromAction(t *testing.T) {
commit := make(CommitInfo)
commit["path"] = "part-1.snappy.parquet"
commit["size"] = 1
commit["ModificationTime"] = DeltaDataTypeTimestamp(time.Now().UnixMilli())
commit["ModificationTime"] = time.Now().UnixMilli()

abytes, err := logEntryFromAction[emptyTestStruct, emptyTestStruct](commit)
if err != nil {
Expand Down Expand Up @@ -109,17 +111,20 @@ func TestLogEntryFromActionChangeMetaData(t *testing.T) {
}
}
`, "\n", ""), "\t", ""), " ", "")
provider := "parquet"
options := make(map[string]string)
format := Format{
Provider: "parquet",
Options: make(map[string]string),
Provider: provider,
Options: options,
}
config := make(map[string]string)
config[string(AppendOnlyDeltaConfigKey)] = "true"
id, _ := uuid.Parse("af23c9d7-fff1-4a5a-a2c8-55c59bd782aa")
schemaString := "..."
action := MetaData{
Id: id,
Format: format,
SchemaString: "...",
SchemaString: schemaString,
PartitionColumns: []string{},
Configuration: config,
}
Expand Down Expand Up @@ -321,6 +326,8 @@ func TestActionFromLogEntry(t *testing.T) {
unstructuredResult map[string]json.RawMessage
}

statsString := `{"numRecords":155,"tightBounds":false,"minValues":{"timestamp":1615338375007003},"maxValues":{"timestamp":1615338377517216},"nullCount":null}`

// Caveats:
// CommitInfo's operationParameters is not being tested because the result from the unmarshal process is a map[string]interface{} and I haven't
// been able to set up an expected map that maintains the interface{} type, so DeepEquals() fails
Expand All @@ -332,7 +339,7 @@ func TestActionFromLogEntry(t *testing.T) {
}{
{name: "Add", args: args{unstructuredResult: map[string]json.RawMessage{"add": []byte(`{"path":"mypath.parquet","size":8382,"partitionValues":{"date":"2021-03-09"},"modificationTime":1679610144893,"dataChange":true,"stats":"{\"numRecords\":155,\"tightBounds\":false,\"minValues\":{\"timestamp\":1615338375007003},\"maxValues\":{\"timestamp\":1615338377517216},\"nullCount\":null}"}`)}},
want: &AddPartitioned[emptyTestStruct, emptyTestStruct]{Path: "mypath.parquet", Size: 8382, PartitionValues: map[string]string{"date": "2021-03-09"}, ModificationTime: 1679610144893, DataChange: true,
Stats: `{"numRecords":155,"tightBounds":false,"minValues":{"timestamp":1615338375007003},"maxValues":{"timestamp":1615338377517216},"nullCount":null}`}, wantErr: nil},
Stats: &statsString}, wantErr: nil},
{name: "CommitInfo", args: args{unstructuredResult: map[string]json.RawMessage{"commitInfo": []byte(`{"clientVersion":"delta-go.alpha-0.0.0","isBlindAppend":true,"operation":"delta-go.Write","timestamp":1679610144893}`)}},
want: &CommitInfo{"clientVersion": "delta-go.alpha-0.0.0", "isBlindAppend": true, "operation": "delta-go.Write",
"timestamp": float64(1679610144893)}, wantErr: nil},
Expand Down Expand Up @@ -367,11 +374,12 @@ func TestActionsFromLogEntries(t *testing.T) {
NumRecords: 123,
}

statsString := string(stats.Json())
add := AddPartitioned[emptyTestStruct, emptyTestStruct]{
Path: "part-1.snappy.parquet",
Size: 1,
ModificationTime: DeltaDataTypeTimestamp(1675020556534),
Stats: string(stats.Json()),
ModificationTime: 1675020556534,
Stats: &statsString,
}

write := Write{Mode: ErrorIfExists}
Expand Down Expand Up @@ -469,28 +477,6 @@ func TestStatsAsGenericStats(t *testing.T) {
}
}

func TestPartitionValuesAsGenericPartitions(t *testing.T) {
type TestPartitionType1 struct {
Date DeltaDataTypeDate `json:"date"`
}

input1 := make(map[string]string)
input1["date"] = "2012-07-26"

expectedPartitions1 := TestPartitionType1{Date: DeltaDataTypeDate(time.Date(2012, time.July, 26, 0, 0, 0, 0, time.UTC).Unix() / NUM_SECONDS_IN_DAY)}

results1, err := partitionValuesAsGeneric[TestPartitionType1](input1)
if err != nil {
t.Error(err)
} else {
if !reflect.DeepEqual(expectedPartitions1, *results1) {
t.Errorf("StatsAsGenericStats results did not match expected. Got %v expected %v", *results1, expectedPartitions1)
}
}

// TODO non-string type partitions
}

func TestMetadataGetSchema(t *testing.T) {
// Simple
schemaString := "{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ts\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"
Expand Down
Loading