Skip to content

Commit

Permalink
impl
Browse files Browse the repository at this point in the history
  • Loading branch information
lzlfred committed Feb 19, 2025
1 parent a16a2fd commit 095bd73
Showing 1 changed file with 15 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.conf.Configuration
import shadedForDelta.org.apache.iceberg.{AppendFiles, DeleteFiles, OverwriteFiles, PendingUpdate, RewriteFiles, Transaction => IcebergTransaction}
import shadedForDelta.org.apache.iceberg.{AppendFiles, DeleteFiles, OverwriteFiles, PartitionSpec, PendingUpdate, RewriteFiles, Transaction => IcebergTransaction}
import shadedForDelta.org.apache.iceberg.ExpireSnapshots
import shadedForDelta.org.apache.iceberg.mapping.MappingUtil
import shadedForDelta.org.apache.iceberg.mapping.NameMappingParser
Expand Down Expand Up @@ -64,7 +64,8 @@ class IcebergConversionTransaction(
protected val postCommitSnapshot: Snapshot,
protected val tableOp: IcebergTableOp = WRITE_TABLE,
protected val lastConvertedIcebergSnapshotId: Option[Long] = None,
protected val lastConvertedDeltaVersion: Option[Long] = None) extends DeltaLogging {
protected val lastConvertedDeltaVersion: Option[Long] = None
) extends DeltaLogging {

///////////////////////////
// Nested Helper Classes //
Expand Down Expand Up @@ -100,7 +101,7 @@ class IcebergConversionTransaction(
convertDeltaAddFileToIcebergDataFile(
add,
tablePath,
partitionSpec,
currentPartitionSpec,
logicalToPhysicalPartitionNames,
statsParser,
postCommitSnapshot
Expand Down Expand Up @@ -141,7 +142,7 @@ class IcebergConversionTransaction(
convertDeltaAddFileToIcebergDataFile(
add,
tablePath,
partitionSpec,
currentPartitionSpec,
logicalToPhysicalPartitionNames,
statsParser,
postCommitSnapshot
Expand All @@ -154,7 +155,7 @@ class IcebergConversionTransaction(
convertDeltaRemoveFileToIcebergDataFile(
remove,
tablePath,
partitionSpec,
currentPartitionSpec,
logicalToPhysicalPartitionNames,
postCommitSnapshot)
)
Expand All @@ -178,7 +179,7 @@ class IcebergConversionTransaction(
convertDeltaRemoveFileToIcebergDataFile(
f,
tablePath,
partitionSpec,
currentPartitionSpec,
logicalToPhysicalPartitionNames,
postCommitSnapshot)
}.toSet.asJava
Expand All @@ -188,7 +189,7 @@ class IcebergConversionTransaction(
convertDeltaAddFileToIcebergDataFile(
f,
tablePath,
partitionSpec,
currentPartitionSpec,
logicalToPhysicalPartitionNames,
statsParser,
postCommitSnapshot
Expand All @@ -212,8 +213,15 @@ class IcebergConversionTransaction(
protected val tablePath = postCommitSnapshot.deltaLog.dataPath
protected val icebergSchema =
convertDeltaSchemaToIcebergSchema(postCommitSnapshot.metadata.schema)
// Initial partition spec converted from Delta
protected val partitionSpec =
createPartitionSpec(icebergSchema, postCommitSnapshot.metadata.partitionColumns)

// Current partition spec from iceberg table
def currentPartitionSpec: PartitionSpec = {
Some(txn.table()).map(_.spec()).getOrElse(partitionSpec)
}

private val logicalToPhysicalPartitionNames =
getPartitionPhysicalNameMapping(postCommitSnapshot.metadata.partitionSchema)

Expand Down

0 comments on commit 095bd73

Please sign in to comment.