Skip to content

Commit

Permalink
[Spark] Add Scala clone, cloneAtVersion, and cloneAtTimestamp A…
Browse files Browse the repository at this point in the history
…PI (#3392)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
Resolves #3391 

Add a Scala APIs for cloning a Delta Table: `clone`, `cloneAtVersion`,
and `cloneAtTimestamp`.

Simply creates a `CloneTableStatement` like the SQL parser does and lets
the analyzer handle the rest. I tried to mimic what limited info I could
find on the current Databricks implementation:
https://docs.databricks.com/en/delta/clone.html#language-scala.

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
New suite extending `CloneTableSuiteBase`

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
New Scala API on DeltaTable for cloning

---------

Co-authored-by: Thang Long VU <long.vu@databricks.com>
Co-authored-by: Thang Long Vu <107926660+longvu-db@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 19, 2024
1 parent 64d4602 commit b2d8235
Show file tree
Hide file tree
Showing 4 changed files with 374 additions and 3 deletions.
199 changes: 199 additions & 0 deletions spark/src/main/scala/io/delta/tables/DeltaTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,205 @@ class DeltaTable private[tables](
TableFeatureProtocolUtils.FEATURE_PROP_SUPPORTED))
toDataset(sparkSession, alterTableCmd)
}

/**
* Clone a DeltaTable to a given destination to mirror the existing table's data and metadata.
*
* Specifying properties here means that the target will override any properties with the same key
* in the source table with the user-defined properties.
*
* An example would be
* {{{
* io.delta.tables.DeltaTable.clone(
* "/some/path/to/table",
* true,
* Map("foo" -> "bar"))
* }}}
*
* @param target The path or table name to create the clone.
* @param replace Whether to replace the destination with the clone command.
* @param properties The table properties to override in the clone.
*
* @since 3.3.0
*/
def clone(target: String, replace: Boolean, properties: Map[String, String]): DeltaTable = {
executeClone(table, target, replace, properties)
}

/**
* Clone a DeltaTable to a given destination to mirror the existing table's data and metadata.
*
* An example would be
* {{{
* io.delta.tables.DeltaTable.clone("/some/path/to/table", true)
* }}}
*
* @param target The path or table name to create the clone.
* @param replace Whether to replace the destination with the clone command.
*
* @since 3.3.0
*/
def clone(target: String, replace: Boolean): DeltaTable = {
clone(target, replace, properties = Map.empty)
}

/**
* Clone a DeltaTable to a given destination to mirror the existing table's data and metadata.
*
* An example would be
* {{{
* io.delta.tables.DeltaTable.clone("/some/path/to/table")
* }}}
*
* @param target The path or table name to create the clone.
*
* @since 3.3.0
*/
def clone(target: String): DeltaTable = {
clone(target, replace = false)
}

/**
* Clone a DeltaTable at a specific version to a given destination to mirror the existing
* table's data and metadata at that version.
*
* Specifying properties here means that the target will override any properties with the same key
* in the source table with the user-defined properties.
*
* An example would be
* {{{
* io.delta.tables.DeltaTable.cloneAtVersion(
* 5,
* "/some/path/to/table",
* true,
* Map("foo" -> "bar"))
* }}}
*
* @param version The version of this table to clone from.
* @param target The path or table name to create the clone.
* @param replace Whether to replace the destination with the clone command.
* @param properties The table properties to override in the clone.
*
* @since 3.3.0
*/
def cloneAtVersion(
version: Long,
target: String,
replace: Boolean,
properties: Map[String, String]): DeltaTable = {
executeClone(table, target, replace, properties, versionAsOf = Some(version))
}

/**
* Clone a DeltaTable at a specific version to a given destination to mirror the existing
* table's data and metadata at that version.
*
* An example would be
* {{{
* io.delta.tables.DeltaTable.cloneAtVersion(5, "/some/path/to/table", true)
* }}}
*
* @param version The version of this table to clone from.
* @param target The path or table name to create the clone.
* @param replace Whether to replace the destination with the clone command.
*
* @since 3.3.0
*/
def cloneAtVersion(version: Long, target: String, replace: Boolean): DeltaTable = {
cloneAtVersion(version, target, replace, properties = Map.empty)
}

/**
* Clone a DeltaTable at a specific version to a given destination to mirror the existing
* table's data and metadata at that version.
*
* An example would be
* {{{
* io.delta.tables.DeltaTable.cloneAtVersion(5, "/some/path/to/table")
* }}}
*
* @param version The version of this table to clone from.
* @param target The path or table name to create the clone.
*
* @since 3.3.0
*/
def cloneAtVersion(version: Long, target: String): DeltaTable = {
cloneAtVersion(version, target, replace = false)
}

/**
* Clone a DeltaTable at a specific timestamp to a given destination to mirror the existing
* table's data and metadata at that timestamp.
*
* Timestamp can be of the format yyyy-MM-dd or yyyy-MM-dd HH:mm:ss.
*
* Specifying properties here means that the target will override any properties with the same key
* in the source table with the user-defined properties.
*
* An example would be
* {{{
* io.delta.tables.DeltaTable.cloneAtTimestamp(
* "2019-01-01",
* "/some/path/to/table",
* true,
* Map("foo" -> "bar"))
* }}}
*
* @param timestamp The timestamp of this table to clone from.
* @param target The path or table name to create the clone.
* @param replace Whether to replace the destination with the clone command.
* @param properties The table properties to override in the clone.
*
* @since 3.3.0
*/
def cloneAtTimestamp(
timestamp: String,
target: String,
replace: Boolean,
properties: Map[String, String]): DeltaTable = {
executeClone(table, target, replace, properties, timestampAsOf = Some(timestamp))
}

/**
* Clone a DeltaTable at a specific timestamp to a given destination to mirror the existing
* table's data and metadata at that timestamp.
*
* Timestamp can be of the format yyyy-MM-dd or yyyy-MM-dd HH:mm:ss.
*
* An example would be
* {{{
* io.delta.tables.DeltaTable.cloneAtTimestamp("2019-01-01", "/some/path/to/table", true)
* }}}
*
* @param timestamp The timestamp of this table to clone from.
* @param target The path or table name to create the clone.
* @param replace Whether to replace the destination with the clone command.
*
* @since 3.3.0
*/
def cloneAtTimestamp(timestamp: String, target: String, replace: Boolean): DeltaTable = {
cloneAtTimestamp(timestamp, target, replace, properties = Map.empty)
}

/**
* Clone a DeltaTable at a specific timestamp to a given destination to mirror the existing
* table's data and metadata at that timestamp.
*
* Timestamp can be of the format yyyy-MM-dd or yyyy-MM-dd HH:mm:ss.
*
* An example would be
* {{{
* io.delta.tables.DeltaTable.cloneAtTimestamp("2019-01-01", "/some/path/to/table")
* }}}
*
* @param timestamp The timestamp of this table to clone from.
* @param target The path or table name to create the clone.
*
* @since 3.3.0
*/
def cloneAtTimestamp(timestamp: String, target: String): DeltaTable = {
cloneAtTimestamp(timestamp, target, replace = false)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import org.apache.spark.sql.delta.DeltaTableUtils.withActiveSession
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.{DeltaGenerateCommand, DescribeDeltaDetailCommand, VacuumCommand}
import org.apache.spark.sql.delta.util.AnalysisHelper
import io.delta.tables.DeltaTable
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{functions, Column, DataFrame}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.Identifier
Expand All @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
/**
* Interface to provide the actual implementations of DeltaTable operations.
*/
trait DeltaTableOperations extends AnalysisHelper { self: DeltaTable =>
trait DeltaTableOperations extends AnalysisHelper { self: io.delta.tables.DeltaTable =>

protected def executeDelete(condition: Option[Expression]): Unit = improveUnsupportedOpError {
withActiveSession(sparkSession) {
Expand Down Expand Up @@ -112,6 +112,52 @@ trait DeltaTableOperations extends AnalysisHelper { self: DeltaTable =>
toDataset(sparkSession, restore)
}

protected def executeClone(
table: DeltaTableV2,
target: String,
replace: Boolean,
properties: Map[String, String],
versionAsOf: Option[Long] = None,
timestampAsOf: Option[String] = None
): io.delta.tables.DeltaTable = withActiveSession(sparkSession) {
val sourceIdentifier = table.getTableIdentifierIfExists.map(id =>
Identifier.of(id.database.toArray, id.table))
val sourceRelation = DataSourceV2Relation.create(table, None, sourceIdentifier)

val maybeTimeTravelSource = if (versionAsOf.isDefined || timestampAsOf.isDefined) {
TimeTravel(
sourceRelation,
timestampAsOf.map(Literal(_)),
versionAsOf,
Some("deltaTable")
)
} else {
sourceRelation
}

val targetIsAbsolutePath = new Path(target).isAbsolute()
val targetIdentifier = if (targetIsAbsolutePath) s"delta.`$target`" else target
val targetRelation = UnresolvedRelation(
sparkSession.sessionState.sqlParser.parseTableIdentifier(targetIdentifier))

val clone = CloneTableStatement(
maybeTimeTravelSource,
targetRelation,
ifNotExists = false,
replace,
isCreateCommand = true,
tablePropertyOverrides = properties.toMap,
targetLocation = None)

toDataset(sparkSession, clone)

if (targetIsAbsolutePath) {
io.delta.tables.DeltaTable.forPath(sparkSession, target)
} else {
io.delta.tables.DeltaTable.forName(sparkSession, target)
}
}

protected def toStrColumnMap(map: Map[String, String]): Map[String, Column] = {
map.toSeq.map { case (k, v) => k -> functions.expr(v) }.toMap
}
Expand Down
77 changes: 77 additions & 0 deletions spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ class DeltaTableHadoopOptionsSuite extends QueryTest
"addFeatureSupport",
"as",
"alias",
"clone",
"cloneAtTimestamp",
"cloneAtVersion",
"delete",
"detail",
"generate",
Expand Down Expand Up @@ -422,6 +425,80 @@ class DeltaTableHadoopOptionsSuite extends QueryTest
}
}

test("clone - with filesystem options") {
withTempDir { dir =>
val baseDir = fakeFileSystemPath(dir)

val srcDir = new File(baseDir, "source").getCanonicalPath
val dstDir = new File(baseDir, "destination").getCanonicalPath

spark.range(10).write.options(fakeFileSystemOptions).format("delta").save(srcDir)

val srcTable =
io.delta.tables.DeltaTable.forPath(spark, srcDir, fakeFileSystemOptions)
srcTable.clone(dstDir)

val srcLog = DeltaLog.forTable(spark, new Path(srcDir), fakeFileSystemOptions)
val dstLog = DeltaLog.forTable(spark, new Path(dstDir), fakeFileSystemOptions)

checkAnswer(
spark.baseRelationToDataFrame(srcLog.createRelation()),
spark.baseRelationToDataFrame(dstLog.createRelation())
)
}
}

test("cloneAtVersion/timestamp - with filesystem options") {
Seq(true, false).foreach { cloneWithVersion =>
withTempDir { dir =>
val baseDir = fakeFileSystemPath(dir)
val fsOptions = fakeFileSystemOptions

val srcDir = new File(baseDir, "source").getCanonicalPath
val dstDir = new File(baseDir, "destination").getCanonicalPath

val df1 = Seq(1, 2, 3).toDF("id")
val df2 = Seq(4, 5).toDF("id")
val df3 = Seq(6, 7).toDF("id")

// version 0.
df1.write.format("delta").options(fsOptions).save(srcDir)

// version 1.
df2.write.format("delta").options(fsOptions).mode("append").save(srcDir)

// version 2.
df3.write.format("delta").options(fsOptions).mode("append").save(srcDir)

val srcTable =
io.delta.tables.DeltaTable.forPath(spark, srcDir, fakeFileSystemOptions)

if (cloneWithVersion) {
srcTable.cloneAtVersion(0, dstDir)
} else {
// clone with timestamp.
//
// set the time to first file with a early time and verify the delta table can be
// restored to it.
val desiredTime = "1983-01-01"
val format = new java.text.SimpleDateFormat("yyyy-MM-dd")
val time = format.parse(desiredTime).getTime

val logPath = new Path(srcDir, "_delta_log")
val file = new File(FileNames.unsafeDeltaFile(logPath, 0).toString)
assert(file.setLastModified(time))
srcTable.cloneAtTimestamp(desiredTime, dstDir)
}

val dstLog = DeltaLog.forTable(spark, new Path(dstDir), fakeFileSystemOptions)

checkAnswer(
df1,
spark.baseRelationToDataFrame(dstLog.createRelation())
)
}
}
}

test("optimize - with filesystem options") {
withTempDir { dir =>
Expand Down
Loading

0 comments on commit b2d8235

Please sign in to comment.