Skip to content

Commit

Permalink
Make fields that are required in the protocol also required in Parque…
Browse files Browse the repository at this point in the history
…t. Remove timestamp logical type to match Spark schema. Remove Delta data type definitions.
  • Loading branch information
chelseajonesr committed Jul 7, 2023
1 parent 6522c19 commit 0bf30fc
Show file tree
Hide file tree
Showing 12 changed files with 466 additions and 554 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func main() {

add := delta.Add{
Path: fileName,
Size: delta.DeltaDataTypeLong(p.Size),
Size: p.Size,
DataChange: true,
ModificationTime: delta.DeltaDataTypeTimestamp(time.Now().UnixMilli()),
ModificationTime: time.Now().UnixMilli(),
Stats: string(stats.Json()),
PartitionValues: make(map[string]string),
}
Expand Down Expand Up @@ -104,9 +104,9 @@ func main() {

add := delta.Add{
Path: fileName,
Size: delta.DeltaDataTypeLong(p.Size),
Size: p.Size,
DataChange: true,
ModificationTime: delta.DeltaDataTypeTimestamp(time.Now().UnixMilli()),
ModificationTime: time.Now().UnixMilli(),
Stats: string(stats.Json()),
PartitionValues: make(map[string]string),
}
Expand Down
463 changes: 233 additions & 230 deletions action.go

Large diffs are not rendered by default.

61 changes: 22 additions & 39 deletions action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,17 @@ type emptyTestStruct struct {
}

func TestLogEntryFromActions(t *testing.T) {
path1 := "part-1.snappy.parquet"
size1 := DeltaDataTypeLong(1)
time1 := DeltaDataTypeTimestamp(1675020556534)
dataChange := false
add1 := AddPartitioned[emptyTestStruct, emptyTestStruct]{
Path: &path1,
Size: &size1,
ModificationTime: &time1,
DataChange: &dataChange,
}
path2 := "part-2.snappy.parquet"
size2 := DeltaDataTypeLong(2)
time2 := DeltaDataTypeTimestamp(1675020556534)
Path: "part-1.snappy.parquet",
Size: 1,
ModificationTime: 1675020556534,
DataChange: false,
}
add2 := &AddPartitioned[emptyTestStruct, emptyTestStruct]{
Path: &path2,
Size: &size2,
ModificationTime: &time2,
DataChange: &dataChange,
Path: "part-2.snappy.parquet",
Size: 2,
ModificationTime: 1675020556534,
DataChange: false,
}

write := Write{Mode: ErrorIfExists}
Expand Down Expand Up @@ -89,7 +82,7 @@ func TestLogEntryFromAction(t *testing.T) {
commit := make(CommitInfo)
commit["path"] = "part-1.snappy.parquet"
commit["size"] = 1
commit["ModificationTime"] = DeltaDataTypeTimestamp(time.Now().UnixMilli())
commit["ModificationTime"] = time.Now().UnixMilli()

abytes, err := logEntryFromAction[emptyTestStruct, emptyTestStruct](commit)
if err != nil {
Expand Down Expand Up @@ -121,19 +114,19 @@ func TestLogEntryFromActionChangeMetaData(t *testing.T) {
provider := "parquet"
options := make(map[string]string)
format := Format{
Provider: &provider,
Options: &options,
Provider: provider,
Options: options,
}
config := make(map[string]string)
config[string(AppendOnlyDeltaConfigKey)] = "true"
id, _ := uuid.Parse("af23c9d7-fff1-4a5a-a2c8-55c59bd782aa")
schemaString := "..."
action := MetaData{
Id: id,
Format: &format,
SchemaString: &schemaString,
PartitionColumns: &[]string{},
Configuration: &config,
Format: format,
SchemaString: schemaString,
PartitionColumns: []string{},
Configuration: config,
}

b, err := logEntryFromAction[emptyTestStruct, emptyTestStruct](action)
Expand Down Expand Up @@ -333,14 +326,7 @@ func TestActionFromLogEntry(t *testing.T) {
unstructuredResult map[string]json.RawMessage
}

path := "mypath.parquet"
size := DeltaDataTypeLong(8382)
partitionValues := map[string]string{"date": "2021-03-09"}
modificationTime := DeltaDataTypeTimestamp(1679610144893)
dataChange := true
statsString := `{"numRecords":155,"tightBounds":false,"minValues":{"timestamp":1615338375007003},"maxValues":{"timestamp":1615338377517216},"nullCount":null}`
minReader := DeltaDataTypeInt(2)
minWriter := DeltaDataTypeInt(7)

// Caveats:
// CommitInfo's operationParameters is not being tested because the result from the unmarshal process is a map[string]interface{} and I haven't
Expand All @@ -352,13 +338,13 @@ func TestActionFromLogEntry(t *testing.T) {
wantErr error
}{
{name: "Add", args: args{unstructuredResult: map[string]json.RawMessage{"add": []byte(`{"path":"mypath.parquet","size":8382,"partitionValues":{"date":"2021-03-09"},"modificationTime":1679610144893,"dataChange":true,"stats":"{\"numRecords\":155,\"tightBounds\":false,\"minValues\":{\"timestamp\":1615338375007003},\"maxValues\":{\"timestamp\":1615338377517216},\"nullCount\":null}"}`)}},
want: &AddPartitioned[emptyTestStruct, emptyTestStruct]{Path: &path, Size: &size, PartitionValues: &partitionValues, ModificationTime: &modificationTime, DataChange: &dataChange,
want: &AddPartitioned[emptyTestStruct, emptyTestStruct]{Path: "mypath.parquet", Size: 8382, PartitionValues: map[string]string{"date": "2021-03-09"}, ModificationTime: 1679610144893, DataChange: true,
Stats: &statsString}, wantErr: nil},
{name: "CommitInfo", args: args{unstructuredResult: map[string]json.RawMessage{"commitInfo": []byte(`{"clientVersion":"delta-go.alpha-0.0.0","isBlindAppend":true,"operation":"delta-go.Write","timestamp":1679610144893}`)}},
want: &CommitInfo{"clientVersion": "delta-go.alpha-0.0.0", "isBlindAppend": true, "operation": "delta-go.Write",
"timestamp": float64(1679610144893)}, wantErr: nil},
{name: "Protocol", args: args{unstructuredResult: map[string]json.RawMessage{"protocol": []byte(`{"minReaderVersion":2,"minWriterVersion":7}`)}},
want: &Protocol{MinReaderVersion: &minReader, MinWriterVersion: &minWriter}, wantErr: nil},
want: &Protocol{MinReaderVersion: 2, MinWriterVersion: 7}, wantErr: nil},
{name: "Fail on invalid JSON", args: args{unstructuredResult: map[string]json.RawMessage{"add": []byte(`"path":"s3a://bucket/table","size":8382,"partitionValues":{"date":"2021-03-09"},"modificationTime":1679610144893,"dataChange":true}`)}},
want: nil, wantErr: ErrorActionJSONFormat},
{name: "Fail on unknown", args: args{unstructuredResult: map[string]json.RawMessage{"fake": []byte(`{}`)}}, want: nil, wantErr: ErrorActionUnknown},
Expand Down Expand Up @@ -388,14 +374,11 @@ func TestActionsFromLogEntries(t *testing.T) {
NumRecords: 123,
}

path := "part-1.snappy.parquet"
size := DeltaDataTypeLong(1)
time := DeltaDataTypeTimestamp(1675020556534)
statsString := string(stats.Json())
add := AddPartitioned[emptyTestStruct, emptyTestStruct]{
Path: &path,
Size: &size,
ModificationTime: &time,
Path: "part-1.snappy.parquet",
Size: 1,
ModificationTime: 1675020556534,
Stats: &statsString,
}

Expand Down Expand Up @@ -498,7 +481,7 @@ func TestMetadataGetSchema(t *testing.T) {
// Simple
schemaString := "{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ts\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"
md := new(MetaData)
md.SchemaString = &schemaString
md.SchemaString = schemaString
schema, err := md.GetSchema()
if err != nil {
t.Error(err)
Expand Down
89 changes: 44 additions & 45 deletions checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,21 @@ import (
"strconv"
"time"

"github.com/rivian/delta-go/state"
"github.com/rivian/delta-go/storage"
)

// / Metadata for a checkpoint file
// / This gets written out to _last_checkpoint
type CheckPoint struct {
/// Delta table version
Version state.DeltaDataTypeVersion `json:"version"`
Version int64 `json:"version"`
// The number of actions in the checkpoint. -1 if not available.
Size DeltaDataTypeLong `json:"size"`
Size int64 `json:"size"`
// The number of parts if the checkpoint has multiple parts. Omit if single part.
Parts *DeltaDataTypeInt `json:"parts,omitempty"`
Parts *int32 `json:"parts,omitempty"`
// Size of the checkpoint in bytes
SizeInBytes DeltaDataTypeLong `json:"sizeInBytes"`
NumOfAddFiles DeltaDataTypeLong `json:"numOfAddFiles"`
SizeInBytes int64 `json:"sizeInBytes"`
NumOfAddFiles int64 `json:"numOfAddFiles"`
}

// / A single checkpoint entry in the checkpoint Parquet file
Expand Down Expand Up @@ -97,17 +96,17 @@ func lastCheckpointPath() *storage.Path {
// / Return the checkpoint version and total parts, and the current part index if the URI is a valid checkpoint filename
// / If the checkpoint is single-part then part and checkpoint.Parts will both be zero
// / If the URI is not a valid checkpoint filename then checkpoint will be nil
func checkpointInfoFromURI(path *storage.Path) (checkpoint *CheckPoint, part DeltaDataTypeInt, parseErr error) {
func checkpointInfoFromURI(path *storage.Path) (checkpoint *CheckPoint, part int32, parseErr error) {
// Check for a single-part checkpoint
groups := checkpointRegex.FindStringSubmatch(path.Base())
if len(groups) == 2 {
checkpointVersionInt, err := strconv.ParseInt(groups[1], 10, 64)
if err != nil {
parseErr = err
var version int64
version, parseErr = strconv.ParseInt(groups[1], 10, 64)
if parseErr != nil {
return
}
checkpoint = new(CheckPoint)
checkpoint.Version = state.DeltaDataTypeVersion(checkpointVersionInt)
checkpoint.Version = version
checkpoint.Size = 0
part = 0
return
Expand All @@ -116,33 +115,34 @@ func checkpointInfoFromURI(path *storage.Path) (checkpoint *CheckPoint, part Del
// Check for a multi part checkpoint
groups = checkpointPartsRegex.FindStringSubmatch(path.Base())
if len(groups) == 4 {
checkpointVersionInt, err := strconv.ParseInt(groups[1], 10, 64)
if err != nil {
parseErr = err
var version int64
version, parseErr = strconv.ParseInt(groups[1], 10, 64)
if parseErr != nil {
return
}
partInt64, err := strconv.ParseInt(groups[2], 10, 32)
if err != nil {
parseErr = err
var partInt64 int64
var partsInt64 int64
partInt64, parseErr = strconv.ParseInt(groups[2], 10, 32)
if parseErr != nil {
return
}
partsInt64, err := strconv.ParseInt(groups[3], 10, 32)
if err != nil {
parseErr = err
part = int32(partInt64)
partsInt64, parseErr = strconv.ParseInt(groups[3], 10, 32)
if parseErr != nil {
return
}
parts := int32(partsInt64)

checkpoint = new(CheckPoint)
checkpoint.Version = state.DeltaDataTypeVersion(checkpointVersionInt)
checkpoint.Version = version
checkpoint.Size = 0
partsDeltaInt := DeltaDataTypeInt(partsInt64)
checkpoint.Parts = &partsDeltaInt
part = DeltaDataTypeInt(partInt64)
checkpoint.Parts = &parts
}
return
}

// / Check whether the given checkpoint version exists, either as a single- or multi-part checkpoint
func doesCheckpointVersionExist(store storage.ObjectStore, version state.DeltaDataTypeVersion, validateAllPartsExist bool) (bool, error) {
func doesCheckpointVersionExist(store storage.ObjectStore, version int64, validateAllPartsExist bool) (bool, error) {
// List all files starting with the version prefix. This will also find commit logs and possible crc files
str := fmt.Sprintf("%020d", version)
path := storage.PathFromIter([]string{"_delta_log", str})
Expand All @@ -152,8 +152,8 @@ func doesCheckpointVersionExist(store storage.ObjectStore, version state.DeltaDa
}

// Multi-part validation
partsFound := make(map[DeltaDataTypeInt]bool, 10)
totalParts := DeltaDataTypeInt(0)
partsFound := make(map[int32]bool, 10)
totalParts := int32(0)

for _, possibleCheckpointFile := range possibleCheckpointFiles.Objects {
checkpoint, currentPart, err := checkpointInfoFromURI(&possibleCheckpointFile.Location)
Expand All @@ -174,7 +174,7 @@ func doesCheckpointVersionExist(store storage.ObjectStore, version state.DeltaDa
}
// Found a multi-part checkpoint and we want to validate that all parts exist
if len(partsFound) > 0 {
for i := DeltaDataTypeInt(0); i < totalParts; i++ {
for i := int32(0); i < totalParts; i++ {
found, ok := partsFound[i+1]
if !ok || !found {
return false, ErrorCheckpointIncomplete
Expand Down Expand Up @@ -210,7 +210,7 @@ func createCheckpointWithAddType[RowType any, PartitionType any, AddType AddPart
tableState.prepareStateForCheckpoint()

totalRows := len(tableState.Files) + len(tableState.Tombstones) + len(tableState.AppTransactionVersion) + 2
numParts := DeltaDataTypeInt(((totalRows - 1) / checkpointConfiguration.MaxRowsPerPart) + 1)
numParts := int32(((totalRows - 1) / checkpointConfiguration.MaxRowsPerPart) + 1)

// From https://github.com/delta-io/delta/blob/master/PROTOCOL.md#checkpoints:
// When writing multi-part checkpoints, the data must be clustered (either through hash or range partitioning)
Expand All @@ -224,7 +224,7 @@ func createCheckpointWithAddType[RowType any, PartitionType any, AddType AddPart

var totalBytes int64 = 0
offsetRow := 0
for part := DeltaDataTypeInt(0); part < numParts; part++ {
for part := int32(0); part < numParts; part++ {
records, err := checkpointRows[RowType, PartitionType, AddType](tableState, offsetRow, checkpointConfiguration.MaxRowsPerPart)
if err != nil {
return err
Expand Down Expand Up @@ -256,18 +256,18 @@ func createCheckpointWithAddType[RowType any, PartitionType any, AddType AddPart
return ErrorCheckpointRowCountMismatch
}

var reportedParts *DeltaDataTypeInt
var reportedParts *int32
if numParts > 1 {
// Only multipart checkpoints list the parts
reportedParts = &numParts
}

checkpoint := CheckPoint{
Version: tableState.Version,
Size: DeltaDataTypeLong(totalRows),
SizeInBytes: DeltaDataTypeLong(totalBytes),
Size: int64(totalRows),
SizeInBytes: totalBytes,
Parts: reportedParts,
NumOfAddFiles: DeltaDataTypeLong(len(tableState.Files)),
NumOfAddFiles: int64(len(tableState.Files)),
}
checkpointBytes, err := json.Marshal(checkpoint)
if err != nil {
Expand Down Expand Up @@ -296,13 +296,13 @@ func checkpointAdd[RowType any, PartitionType any, AddType AddPartitioned[RowTyp
switch typedAdd := any(checkpointAdd).(type) {
case *AddPartitioned[RowType, PartitionType]:
// *typedAdd = *add
typedAdd.DataChange = &addDataChange
typedAdd.DataChange = addDataChange
typedAdd.ModificationTime = add.ModificationTime
typedAdd.PartitionValues = add.PartitionValues
typedAdd.Path = add.Path
typedAdd.Size = add.Size
if typedAdd.Size == nil || *typedAdd.Size == 0 {
return nil, errors.Join(ErrorCheckpointAddZeroSize, fmt.Errorf("zero size add for path %s", *add.Path))
if typedAdd.Size == 0 {
return nil, errors.Join(ErrorCheckpointAddZeroSize, fmt.Errorf("zero size add for path %s", add.Path))
}
typedAdd.Stats = add.Stats
typedAdd.Tags = add.Tags
Expand All @@ -313,14 +313,13 @@ func checkpointAdd[RowType any, PartitionType any, AddType AddPartitioned[RowTyp
// }
// typedAdd.PartitionValuesParsed = *partitionValuesParsed
case *Add[RowType]:
typedAdd.DataChange = &addDataChange
typedAdd.DataChange = addDataChange
typedAdd.ModificationTime = add.ModificationTime
typedAdd.PartitionValues = add.PartitionValues
pathCopy := *add.Path
typedAdd.Path = &pathCopy
typedAdd.Path = add.Path
typedAdd.Size = add.Size
if typedAdd.Size == nil || *typedAdd.Size == 0 {
return nil, errors.Join(ErrorCheckpointAddZeroSize, fmt.Errorf("zero size add for path %s", pathCopy))
if typedAdd.Size == 0 {
return nil, errors.Join(ErrorCheckpointAddZeroSize, fmt.Errorf("zero size add for path %s", add.Path))
}
typedAdd.Stats = add.Stats
typedAdd.Tags = add.Tags
Expand All @@ -330,15 +329,15 @@ func checkpointAdd[RowType any, PartitionType any, AddType AddPartitioned[RowTyp
}

type DeletionCandidate struct {
Version state.DeltaDataTypeVersion
Version int64
Meta storage.ObjectMeta
}

// / If the maybeToDelete files are safe to delete, delete them. Otherwise, clear them
// / "Safe to delete" is determined by the version and timestamp of the last file in the maybeToDelete list.
// / For more details see BufferingLogDeletionIterator() in https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala
// / Returns the number of files deleted.
func flushDeleteFiles(store storage.ObjectStore, maybeToDelete []DeletionCandidate, beforeVersion state.DeltaDataTypeVersion, maxTimestamp time.Time) (int, error) {
func flushDeleteFiles(store storage.ObjectStore, maybeToDelete []DeletionCandidate, beforeVersion int64, maxTimestamp time.Time) (int, error) {
deleted := 0

if len(maybeToDelete) > 0 {
Expand All @@ -361,7 +360,7 @@ func flushDeleteFiles(store storage.ObjectStore, maybeToDelete []DeletionCandida
// / Remove any logs and checkpoints that have a last updated date before maxTimestamp and a version before beforeVersion
// / Last updated timestamps are required to be monotonically increasing, so there may be some time adjustment required
// / For more detail see BufferingLogDeletionIterator() in https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala
func removeExpiredLogsAndCheckpoints(beforeVersion state.DeltaDataTypeVersion, maxTimestamp time.Time, store storage.ObjectStore) (int, error) {
func removeExpiredLogsAndCheckpoints(beforeVersion int64, maxTimestamp time.Time, store storage.ObjectStore) (int, error) {
if !store.IsListOrdered() {
// Currently all object stores return list results sorted
return 0, errors.Join(ErrorNotImplemented, errors.New("removing expired logs is not implemented for this object store"))
Expand Down
Loading

0 comments on commit 0bf30fc

Please sign in to comment.