Skip to content

Commit

Permalink
[Delta Uniform] Support expireSnapshot in uniform iceberg table autom…
Browse files Browse the repository at this point in the history
…atically when OPTIMIZE (#3298)

## Description
**_Issue_**: the current uniform iceberg table doesn't have a mechanism
to cleanup old manifest/manifest list files, which adds great storage
maintenance overhead
**_Proposed_**: when `OPTIMIZE` is running on uniform delta table, it
will trigger the `expireSnapshot` operation on corresponding iceberg
table to do cleanup on manifests. The `OPTIMIZE` is chosen as the
trigger since it's recommended to run frequently on delta table and
iceberg's `expireSnapshot` is also recommended to run frequently (once
every day)

## How was this patch tested?
Manually tested
  • Loading branch information
ChengJi-db authored Jun 25, 2024
1 parent e054904 commit 7bb9792
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.conf.Configuration
import shadedForDelta.org.apache.iceberg.{AppendFiles, DeleteFiles, OverwriteFiles, PendingUpdate, RewriteFiles, Transaction => IcebergTransaction}
import shadedForDelta.org.apache.iceberg.ExpireSnapshots
import shadedForDelta.org.apache.iceberg.mapping.MappingUtil
import shadedForDelta.org.apache.iceberg.mapping.NameMappingParser

Expand Down Expand Up @@ -184,6 +185,12 @@ class IcebergConversionTransaction(
}
}

class ExpireSnapshotHelper(expireSnapshot: ExpireSnapshots)
extends TransactionHelper(expireSnapshot) {

override def opType: String = "expireSnapshot"
}

//////////////////////
// Member variables //
//////////////////////
Expand Down Expand Up @@ -240,6 +247,12 @@ class IcebergConversionTransaction(
ret
}

def getExpireSnapshotHelper(): ExpireSnapshotHelper = {
val ret = new ExpireSnapshotHelper(txn.expireSnapshots())
fileUpdates += ret
ret
}

/**
* Handles the following update scenarios
* - partition update -> throws
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.control.Breaks._
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.{DeltaErrors, DeltaFileNotFoundException, DeltaFileProviderUtils, OptimisticTransactionImpl, Snapshot, UniversalFormat, UniversalFormatConverter}
import org.apache.spark.sql.delta.DeltaOperations.OPTIMIZE_OPERATION_NAME
import org.apache.spark.sql.delta.actions.{Action, AddFile, CommitInfo, RemoveFile}
import org.apache.spark.sql.delta.hooks.IcebergConverterHook
import org.apache.spark.sql.delta.metering.DeltaLogging
Expand Down Expand Up @@ -291,6 +292,10 @@ class IcebergConverter(spark: SparkSession)
// or to the specified batch size.
val actionBatchSize =
spark.sessionState.conf.getConf(DeltaSQLConf.ICEBERG_MAX_ACTIONS_TO_CONVERT)
// If there exists any OPTIMIZE action inside actions to convert,
// It will trigger snapshot expiration for iceberg table
var needsExpireSnapshot = false

prevConvertedSnapshotOpt match {
case Some(prevSnapshot) =>
// Read the actions directly from the delta json files.
Expand All @@ -313,9 +318,12 @@ class IcebergConverter(spark: SparkSession)
actionsToConvert.foreach { actionsIter =>
try {
actionsIter.grouped(actionBatchSize).foreach { actionStrs =>
val actions = actionStrs.map(Action.fromJson)
needsExpireSnapshot ||= existsOptimize(actions)

runIcebergConversionForActions(
icebergTxn,
actionStrs.map(Action.fromJson),
actions,
log.dataPath,
prevConvertedSnapshotOpt)
}
Expand All @@ -342,6 +350,7 @@ class IcebergConverter(spark: SparkSession)

actionsToConvert.grouped(actionBatchSize)
.foreach { actions =>
needsExpireSnapshot ||= existsOptimize(actions)
runIcebergConversionForActions(icebergTxn, actions, log.dataPath, None)
}

Expand All @@ -350,6 +359,13 @@ class IcebergConverter(spark: SparkSession)
icebergTxn.updateTableMetadata(snapshotToConvert.metadata, snapshotToConvert.metadata)
}
}
if (needsExpireSnapshot) {
logInfo(s"Committing iceberg snapshot expiration for uniform table " +
s"[path = ${log.logPath}] tableId=${log.tableId}]")
val expireSnapshotHelper = icebergTxn.getExpireSnapshotHelper()
expireSnapshotHelper.commit()
}

icebergTxn.commit()
Some(snapshotToConvert.version, snapshotToConvert.timestamp)
}
Expand Down Expand Up @@ -458,4 +474,12 @@ class IcebergConverter(spark: SparkSession)
}
}
}

private def existsOptimize(actions: Seq[Action]): Boolean = {
actions.exists { action =>
val sa = action.wrap
sa.commitInfo != null && sa.commitInfo.operation == OPTIMIZE_OPERATION_NAME
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@ package org.apache.spark.sql.delta.icebergShaded
import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaConfigs, DeltaLog}
import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaConfig, DeltaConfigs, DeltaErrors, DeltaLog, DeltaRuntimeException}
import org.apache.spark.sql.delta.DeltaConfigs.parseCalendarInterval
import org.apache.spark.sql.delta.actions.{AddFile, FileAction, RemoveFile}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import shadedForDelta.org.apache.iceberg.{DataFile, DataFiles, FileFormat, PartitionSpec, Schema => IcebergSchema}
import shadedForDelta.org.apache.iceberg.Metrics
import shadedForDelta.org.apache.iceberg.TableProperties

// scalastyle:off import.ordering.noEmptyLine
import shadedForDelta.org.apache.iceberg.catalog.{Namespace, TableIdentifier => IcebergTableIdentifier}
// scalastyle:on import.ordering.noEmptyLine
import shadedForDelta.org.apache.iceberg.hive.HiveCatalog

import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier => SparkTableIdentifier}
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.CalendarInterval

object IcebergTransactionUtils
extends DeltaLogging
Expand Down Expand Up @@ -150,8 +154,13 @@ object IcebergTransactionUtils
*/
def getIcebergPropertiesFromDeltaProperties(
properties: Map[String, String]): Map[String, String] = {
val additionalPropertyFromDelta = additionalIcebergPropertiesFromDeltaProperties(properties)
val prefix = DeltaConfigs.DELTA_UNIVERSAL_FORMAT_ICEBERG_CONFIG_PREFIX
properties.filterKeys(_.startsWith(prefix)).map(kv => (kv._1.stripPrefix(prefix), kv._2)).toMap
val specifiedProperty =
properties.filterKeys(_.startsWith(prefix)).map(kv => (kv._1.stripPrefix(prefix), kv._2))
.toMap
validateIcebergProperty(additionalPropertyFromDelta, specifiedProperty)
additionalPropertyFromDelta ++ specifiedProperty
}

/** Returns the mapping of logicalPartitionColName -> physicalPartitionColName */
Expand Down Expand Up @@ -226,4 +235,78 @@ object IcebergTransactionUtils
}
IcebergTableIdentifier.of(namespace, identifier.table)
}

// Additional iceberg properties inferred from delta properties
// If user doesn't specify the property in iceberg table, we infer it from delta properties
// Otherwise, we validate the user specified property with the inferred property
// Here's a list of additional properties:
// 1. iceberg's history.expire.max-snapshot-age-ms:
// inferred as min of delta.logRetentionDuration and delta.deletedFileRetentionDuration
private def additionalIcebergPropertiesFromDeltaProperties(
properties: Map[String, String]): Map[String, String] = {
icebergRetentionPropertyFromDelta(properties)
}

private def icebergRetentionPropertyFromDelta(
deltaProperties: Map[String, String]): Map[String, String] = {
val icebergSnapshotRetentionFromDelta = deltaRetentionMsFrom(deltaProperties)
lazy val icebergDefault = TableProperties.MAX_SNAPSHOT_AGE_MS_DEFAULT
icebergSnapshotRetentionFromDelta.map { retentionMs =>
Map(TableProperties.MAX_SNAPSHOT_AGE_MS -> (retentionMs min icebergDefault).toString)
}.getOrElse(Map.empty)
}

// Given additional iceberg property constrained/inferred by Delta and
// user specified iceberg property, validate that they don't conflict
private def validateIcebergProperty(
additionalPropertyFromDelta: Map[String, String],
customizedProperty: Map[String, String]): Unit = {
validateIcebergRetentionWithDelta(additionalPropertyFromDelta, customizedProperty)
}

// Validation:
// Customized iceberg retention should be <= to the delta retention
// Which is min of logRetentionDuration and deletedFileRetentionDuration
private def validateIcebergRetentionWithDelta(
additionalPropertyFromDelta: Map[String, String],
usrSpecifiedProperty: Map[String, String]): Unit = {
lazy val defaultRetentionDelta =
calendarStrToMs(DeltaConfigs.LOG_RETENTION.defaultValue) min
calendarStrToMs(DeltaConfigs.TOMBSTONE_RETENTION.defaultValue)
lazy val retentionMsFromDelta = additionalPropertyFromDelta
.getOrElse(TableProperties.MAX_SNAPSHOT_AGE_MS, s"$defaultRetentionDelta").toLong

usrSpecifiedProperty.get(TableProperties.MAX_SNAPSHOT_AGE_MS).foreach { proposedMs =>
if (proposedMs.toLong > retentionMsFromDelta) {
throw new IllegalArgumentException(
s"Uniform iceberg's ${TableProperties.MAX_SNAPSHOT_AGE_MS} should be set >= " +
s" min of delta's ${DeltaConfigs.LOG_RETENTION.key} and" +
s" ${DeltaConfigs.TOMBSTONE_RETENTION.key}." +
s" Current delta retention min in MS: $retentionMsFromDelta," +
s" Proposed iceberg retention in Ms: $proposedMs")
}
}
}

private def deltaRetentionMsFrom(deltaProperties: Map[String, String]): Option[Long] = {
def getCalendarMsFrom(
conf: DeltaConfig[CalendarInterval], properties: Map[String, String]): Option[Long] = {
properties.get(conf.key).map(calendarStrToMs)
}

def minOf(a: Option[Long], b: Option[Long]): Option[Long] = (a, b) match {
case (Some(a), Some(b)) => Some(a min b)
case (a, b) => a orElse b
}

val logRetention = getCalendarMsFrom(DeltaConfigs.LOG_RETENTION, deltaProperties)
val vacuumRetention = getCalendarMsFrom(DeltaConfigs.TOMBSTONE_RETENTION, deltaProperties)
minOf(logRetention, vacuumRetention)
}

// Converts a string in calendar interval format to milliseconds
private def calendarStrToMs(calendarStr: String): Long = {
val interval = parseCalendarInterval(calendarStr)
DeltaConfigs.getMilliSeconds(interval)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ trait DeltaConfigsBase extends DeltaLogging {
case lKey if lKey.startsWith("delta.") =>
Option(entries.get(lKey.stripPrefix("delta."))) match {
case Some(deltaConfig) => deltaConfig(value) // validate the value
case None if lKey.startsWith(DELTA_UNIVERSAL_FORMAT_CONFIG_PREFIX) =>
// always allow any delta universal format config with key converted to lower case
lKey -> value
case None if allowArbitraryProperties =>
logConsole(
s"You are setting a property: $key that is not recognized by this " +
Expand Down

0 comments on commit 7bb9792

Please sign in to comment.