Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: clean up miscellaneous code #33

Merged
merged 10 commits into from
Sep 13, 2023
Merged
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ type testData struct {
}

func (data *testData) getSchema() delta.SchemaTypeStruct {

// schema := GetSchema(data)
schema := delta.SchemaTypeStruct{
Fields: []delta.SchemaField{
Expand Down Expand Up @@ -198,7 +197,6 @@ type payload struct {
}

func writeParquet[T any](data []T, filename string) (*payload, error) {

p := new(payload)

// if err := parquet.WriteFile(filename, data); err != nil {
Expand Down
2 changes: 0 additions & 2 deletions action.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,6 @@ func StatsFromJson(b []byte) (*Stats, error) {
// the struct property is passed in as a pointer to ensure that it can be evaluated as nil[NULL]
// TODO Handle struct types
func UpdateStats[T constraints.Ordered](s *Stats, k string, vpt *T) {

var v T
if s.NullCount == nil {
s.NullCount = make(map[string]int64)
Expand Down Expand Up @@ -524,5 +523,4 @@ func UpdateStats[T constraints.Ordered](s *Stats, k string, vpt *T) {
s.MaxValues[k] = v

}

}
9 changes: 0 additions & 9 deletions action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func TestLogEntryFromActions(t *testing.T) {
}

func TestLogEntryFromAction(t *testing.T) {

commit := make(CommitInfo)
commit["path"] = "part-1.snappy.parquet"
commit["size"] = 1
Expand All @@ -91,7 +90,6 @@ func TestLogEntryFromAction(t *testing.T) {

// Test from example at https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-metadata
func TestLogEntryFromActionChangeMetaData(t *testing.T) {

expectedStr := strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(`
{
"metaData":{
Expand Down Expand Up @@ -140,7 +138,6 @@ func TestLogEntryFromActionChangeMetaData(t *testing.T) {

// TestUpdateStats tests gathering stats over a data set that includes pointers
func TestUpdateStats(t *testing.T) {

type rowType struct {
Id int `parquet:"id,snappy"`
Label string `parquet:"label,dict,snappy"`
Expand All @@ -161,7 +158,6 @@ func TestUpdateStats(t *testing.T) {

stats := Stats{}
for _, row := range data {

stats.NumRecords++
UpdateStats(&stats, "id", &row.Id)
UpdateStats(&stats, "label", &row.Label)
Expand All @@ -174,7 +170,6 @@ func TestUpdateStats(t *testing.T) {
if statsString != expectedStr {
t.Errorf("has:\n%s\nwant:\n%s", statsString, expectedStr)
}

}

func TestStatsFromJSON(t *testing.T) {
Expand Down Expand Up @@ -224,18 +219,15 @@ func TestStatsFromJSON(t *testing.T) {
}

func TestFormatDefault(t *testing.T) {

format := new(Format).Default()
b, _ := json.Marshal(format)
expectedStr := `{"provider":"parquet","options":{}}`
if string(b) != expectedStr {
t.Errorf("has:\n%s\nwant:\n%s", string(b), expectedStr)
}

}

func TestWriteOperationParameters(t *testing.T) {

write := Write{Mode: Append, PartitionBy: []string{"date"}}
commit := write.GetCommitInfo()
commit["timestamp"] = 1675020556534
Expand All @@ -253,7 +245,6 @@ func TestWriteOperationParameters(t *testing.T) {
if !reflect.DeepEqual(expectedStr, string(logs)) {
t.Errorf("expected %s, but got %s", expectedStr, string(logs))
}

}

func TestWrite_GetCommitInfo(t *testing.T) {
Expand Down
18 changes: 9 additions & 9 deletions checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ func checkpointFromBytes(bytes []byte) (*CheckPoint, error) {
return checkpoint, nil
}

func lastCheckpointPath() *storage.Path {
func lastCheckpointPath() storage.Path {
path := storage.PathFromIter([]string{"_delta_log", "_last_checkpoint"})
return &path
return path
}

// / Return the checkpoint version and total parts, and the current part index if the URI is a valid checkpoint filename
// / If the checkpoint is single-part then part and checkpoint.Parts will both be zero
// / If the URI is not a valid checkpoint filename then checkpoint will be nil
func checkpointInfoFromURI(path *storage.Path) (checkpoint *CheckPoint, part int32, parseErr error) {
func checkpointInfoFromURI(path storage.Path) (checkpoint *CheckPoint, part int32, parseErr error) {
// Check for a single-part checkpoint
groups := checkpointRegex.FindStringSubmatch(path.Base())
if len(groups) == 2 {
Expand Down Expand Up @@ -145,7 +145,7 @@ func doesCheckpointVersionExist(store storage.ObjectStore, version int64, valida
// List all files starting with the version prefix. This will also find commit logs and possible crc files
str := fmt.Sprintf("%020d", version)
path := storage.PathFromIter([]string{"_delta_log", str})
possibleCheckpointFiles, err := store.List(&path, nil)
possibleCheckpointFiles, err := store.List(path, nil)
if err != nil {
return false, err
}
Expand All @@ -155,7 +155,7 @@ func doesCheckpointVersionExist(store storage.ObjectStore, version int64, valida
totalParts := int32(0)

for _, possibleCheckpointFile := range possibleCheckpointFiles.Objects {
checkpoint, currentPart, err := checkpointInfoFromURI(&possibleCheckpointFile.Location)
checkpoint, currentPart, err := checkpointInfoFromURI(possibleCheckpointFile.Location)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -231,11 +231,11 @@ func createCheckpointFor(tableState *DeltaTableState, store storage.ObjectStore,
checkpointFileName = fmt.Sprintf("%020d.checkpoint.%010d.%010d.parquet", tableState.Version, part+1, numParts)
}
checkpointPath := storage.PathFromIter([]string{"_delta_log", checkpointFileName})
_, err = store.Head(&checkpointPath)
_, err = store.Head(checkpointPath)
if !errors.Is(err, storage.ErrorObjectDoesNotExist) {
return ErrorCheckpointAlreadyExists
}
err = store.Put(&checkpointPath, parquetBytes)
err = store.Put(checkpointPath, parquetBytes)
if err != nil {
return err
}
Expand Down Expand Up @@ -302,7 +302,7 @@ func flushDeleteFiles(store storage.ObjectStore, maybeToDelete []DeletionCandida
lastMaybeToDelete := maybeToDelete[len(maybeToDelete)-1]
if lastMaybeToDelete.Version < beforeVersion && lastMaybeToDelete.Meta.LastModified.UnixMilli() <= maxTimestamp.UnixMilli() {
for _, deleteFile := range maybeToDelete {
err := store.Delete(&deleteFile.Meta.Location)
err := store.Delete(deleteFile.Meta.Location)
if err != nil {
return deleted, err
}
Expand Down Expand Up @@ -334,7 +334,7 @@ func removeExpiredLogsAndCheckpoints(beforeVersion int64, maxTimestamp time.Time
if errors.Is(err, storage.ErrorObjectDoesNotExist) {
break
}
isValid, version := CommitOrCheckpointVersionFromUri(&meta.Location)
isValid, version := CommitOrCheckpointVersionFromUri(meta.Location)
// Spark and Rust clients also use the file's last updated timestamp rather than opening the commit and using internal state
if isValid && version < beforeVersion && meta.LastModified.Before(maxTimestamp) {
candidatesForDeletion = append(candidatesForDeletion, DeletionCandidate{Version: version, Meta: *meta})
Expand Down
34 changes: 16 additions & 18 deletions checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestSimpleCheckpoint(t *testing.T) {
}

// Remove the previous log to make sure we use the checkpoint when loading
err = store.Delete(table.CommitUriFromVersion(4))
err = store.Delete(CommitUriFromVersion(4))
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestSimpleCheckpoint(t *testing.T) {
}
}
// Remove the previous log to make sure we use the checkpoint when loading
err = store.Delete(table.CommitUriFromVersion(9))
err = store.Delete(CommitUriFromVersion(9))
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -295,7 +295,7 @@ func TestTombstones(t *testing.T) {

// Load the checkpoint
// Remove the previous log to make sure we use the checkpoint when loading
err = store.Delete(table.CommitUriFromVersion(1))
err = store.Delete(CommitUriFromVersion(1))
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -615,7 +615,7 @@ func TestMultiPartCheckpoint(t *testing.T) {
}

// Remove the previous commit to make sure we load the checkpoint files
err = store.Delete(table.CommitUriFromVersion(11))
err = store.Delete(CommitUriFromVersion(11))
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -898,9 +898,7 @@ func TestCheckpointCleanupExpiredLogs(t *testing.T) {
}

for _, enableCleanupInTableConfig := range tests {

for _, disableCleanupInCheckpointConfig := range tests {

store, stateStore, lock, checkpointLock := setupCheckpointTest(t, "", false)

table := NewDeltaTable(store, lock, stateStore)
Expand All @@ -925,15 +923,15 @@ func TestCheckpointCleanupExpiredLogs(t *testing.T) {
}
now := time.Now()
// With cleanup enabled, 25 and 15 minutes ago should be deleted, 5 should not
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, table.CommitUriFromVersion(0).Raw), now.Add(-25*time.Minute), now.Add(-25*time.Minute))
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(0).Raw), now.Add(-25*time.Minute), now.Add(-25*time.Minute))
if err != nil {
t.Fatal(err)
}
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, table.CommitUriFromVersion(1).Raw), now.Add(-15*time.Minute), now.Add(-15*time.Minute))
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(1).Raw), now.Add(-15*time.Minute), now.Add(-15*time.Minute))
if err != nil {
t.Fatal(err)
}
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, table.CommitUriFromVersion(2).Raw), now.Add(-5*time.Minute), now.Add(-5*time.Minute))
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(2).Raw), now.Add(-5*time.Minute), now.Add(-5*time.Minute))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1052,27 +1050,27 @@ func TestCheckpointCleanupTimeAdjustment(t *testing.T) {
// 3: 13 min ago
// 4: 12 min ago
// 5: 6 min ago
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, table.CommitUriFromVersion(0).Raw), now.Add(-20*time.Minute), now.Add(-20*time.Minute))
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(0).Raw), now.Add(-20*time.Minute), now.Add(-20*time.Minute))
if err != nil {
t.Fatal(err)
}
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, table.CommitUriFromVersion(1).Raw), now.Add(-15*time.Minute), now.Add(-15*time.Minute))
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(1).Raw), now.Add(-15*time.Minute), now.Add(-15*time.Minute))
if err != nil {
t.Fatal(err)
}
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, table.CommitUriFromVersion(2).Raw), now.Add(-10*time.Minute), now.Add(-10*time.Minute))
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(2).Raw), now.Add(-10*time.Minute), now.Add(-10*time.Minute))
if err != nil {
t.Fatal(err)
}
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, table.CommitUriFromVersion(3).Raw), now.Add(-13*time.Minute), now.Add(-13*time.Minute))
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(3).Raw), now.Add(-13*time.Minute), now.Add(-13*time.Minute))
if err != nil {
t.Fatal(err)
}
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, table.CommitUriFromVersion(3).Raw), now.Add(-12*time.Minute), now.Add(-12*time.Minute))
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(3).Raw), now.Add(-12*time.Minute), now.Add(-12*time.Minute))
if err != nil {
t.Fatal(err)
}
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, table.CommitUriFromVersion(3).Raw), now.Add(-6*time.Minute), now.Add(-6*time.Minute))
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(3).Raw), now.Add(-6*time.Minute), now.Add(-6*time.Minute))
if err != nil {
t.Fatal(err)
}
Expand All @@ -1098,21 +1096,21 @@ func TestCheckpointCleanupTimeAdjustment(t *testing.T) {
t.Fatal("did not remove version 1")
}
// We can't load versions 2 and 3 but the logs should persist
_, err = store.Head(table.CommitUriFromVersion(2))
_, err = store.Head(CommitUriFromVersion(2))
if errors.Is(err, storage.ErrorObjectDoesNotExist) {
t.Fatal("should not remove version 2")
}
if err != nil {
t.Fatal(err)
}
_, err = store.Head(table.CommitUriFromVersion(3))
_, err = store.Head(CommitUriFromVersion(3))
if errors.Is(err, storage.ErrorObjectDoesNotExist) {
t.Fatal("should not remove version 3")
}
if err != nil {
t.Fatal(err)
}
_, err = store.Head(table.CommitUriFromVersion(4))
_, err = store.Head(CommitUriFromVersion(4))
if errors.Is(err, storage.ErrorObjectDoesNotExist) {
t.Fatal("should not remove version 4")
}
Expand Down
Loading