Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve Concurrency Issues in Iceberg Tables and Eliminate Scala Compiler Warnings #3302

Merged
merged 9 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,30 @@ object WorkflowExecutionsResource {
}
}

def getConsoleMessagesUriByExecutionId(eid: ExecutionIdentity): List[URI] =
if (AmberConfig.isUserSystemEnabled)
context
.select(OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_URI)
.from(OPERATOR_EXECUTIONS)
.where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
.fetchInto(classOf[String])
.asScala
.toList
.map(URI.create)
else Nil

def getRuntimeStatsUriByExecutionId(eid: ExecutionIdentity): Option[URI] =
if (AmberConfig.isUserSystemEnabled)
Option(
context
.select(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI)
.from(WORKFLOW_EXECUTIONS)
.where(WORKFLOW_EXECUTIONS.EID.eq(eid.id.toInt))
.fetchOneInto(classOf[String])
).filter(_.nonEmpty)
.map(URI.create)
else None

def clearUris(eid: ExecutionIdentity): Unit = {
if (AmberConfig.isUserSystemEnabled) {
context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class ExecutionConsoleService(
logger.error("Failed to close console message writer", e)
}
}
case _ =>
}
)

Expand All @@ -152,6 +153,9 @@ class ExecutionConsoleService(
Array(consoleMessage.toProtoString)
)
writer.putOne(tuple)
} catch {
case e: Exception =>
logger.error(s"Error while writing console message for operator $opId", e)
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ class ExecutionStatsService(
logger.error("Failed to close runtime statistics writer", e)
}
}
case _ =>
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ class WorkflowService(
case _: Throwable => // exception can be raised if the document is already cleared
}
)

expireSnapshotsForExecution(eid)
})
WorkflowService.workflowServiceMapping.remove(mkWorkflowStateId(workflowId))
if (executionService.getValue != null) {
Expand Down Expand Up @@ -177,19 +179,21 @@ class WorkflowService(
var controllerConf = ControllerConfig.default

// clean up results from previous run
val previousExecutionId = WorkflowExecutionService.getLatestExecutionId(workflowId)
previousExecutionId.foreach(eid => {
val uris = WorkflowExecutionsResource
.getResultUrisByExecutionId(eid)
WorkflowExecutionsResource.clearUris(eid)
uris.foreach(uri =>
try {
DocumentFactory.openDocument(uri)._1.clear()
} catch { // exception can happen if the resource is already cleared
case _: Throwable =>
}
)
}) // TODO: change this behavior after enabling cache.
WorkflowExecutionService
.getLatestExecutionId(workflowId)
.foreach(eid => {
val uris = WorkflowExecutionsResource
.getResultUrisByExecutionId(eid)
WorkflowExecutionsResource.clearUris(eid)
uris.foreach(uri =>
try {
DocumentFactory.openDocument(uri)._1.clear()
} catch { // exception can happen if the resource is already cleared
case _: Throwable =>
}
)
expireSnapshotsForExecution(eid)
}) // TODO: change this behavior after enabling cache.

workflowContext.executionId = ExecutionsMetadataPersistService.insertNewExecution(
workflowContext.workflowId,
Expand Down Expand Up @@ -295,4 +299,18 @@ class WorkflowService(
resultService.unsubscribeAll()
}

private def expireSnapshots(uri: URI): Unit = {
try {
// Call the expireSnapshots on the opened document
DocumentFactory.openDocument(uri)._1.expireSnapshots()
} catch {
case _: Throwable => // exception can be raised if the document is already cleared
}
}

private def expireSnapshotsForExecution(eid: ExecutionIdentity): Unit = {
WorkflowExecutionsResource.getConsoleMessagesUriByExecutionId(eid).foreach(expireSnapshots)
WorkflowExecutionsResource.getRuntimeStatsUriByExecutionId(eid).foreach(expireSnapshots)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,10 @@ abstract class VirtualDocument[T] extends ReadonlyVirtualDocument[T] {
*/
def getTableStatistics: Map[String, Map[String, Any]] =
throw new NotImplementedError("getTableStatistics method is not implemented")

/**
* Expire snapshots if the document supports it.
*/
def expireSnapshots(): Unit =
throw new NotImplementedError("expireSnapshots method is not implemented")
}
Original file line number Diff line number Diff line change
Expand Up @@ -388,4 +388,22 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef](
field -> stats.toMap
}.toMap
}

/**
* Expire snapshots for the table.
*/
override def expireSnapshots(): Unit = {
val table = IcebergUtil
.loadTableMetadata(catalog, tableNamespace, tableName)
.getOrElse(
throw new NoSuchTableException(s"table $tableNamespace.$tableName doesn't exist")
)

table
.expireSnapshots()
.retainLast(1)
.expireOlderThan(System.currentTimeMillis())
.cleanExpiredFiles(true)
.commit()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,26 +132,12 @@ object IcebergUtil {
overrideIfExists: Boolean
): Table = {

val baseProperties = Map(
val tableProperties = Map(
TableProperties.COMMIT_NUM_RETRIES -> StorageConfig.icebergTableCommitNumRetries.toString,
TableProperties.COMMIT_MAX_RETRY_WAIT_MS -> StorageConfig.icebergTableCommitMaxRetryWaitMs.toString,
TableProperties.COMMIT_MIN_RETRY_WAIT_MS -> StorageConfig.icebergTableCommitMinRetryWaitMs.toString
)

val tableProperties =
if (
Set(
StorageConfig.icebergTableRuntimeStatisticsNamespace,
StorageConfig.icebergTableConsoleMessagesNamespace
).contains(tableNamespace)
) {
baseProperties ++ Map(
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED -> "true",
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX -> "1"
)
} else {
baseProperties
}
val identifier = TableIdentifier.of(tableNamespace, tableName)
if (catalog.tableExists(identifier) && overrideIfExists) {
catalog.dropTable(identifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ class IcebergDocumentConsoleMessagesSpec

override def getDocument: VirtualDocument[Tuple] = {
DocumentFactory.openDocument(uri)._1 match {
case doc: VirtualDocument[Tuple] => doc
case _ => fail("Failed to open document as VirtualDocument[Tuple]")
case doc: VirtualDocument[_] => doc.asInstanceOf[VirtualDocument[Tuple]]
case _ => fail("Failed to open document as VirtualDocument[Tuple]")
}
}
}
Loading