Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve Concurrency Issues in Iceberg Tables and Eliminate Scala Compiler Warnings #3302

Merged
merged 9 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.virtualidentity.WorkflowIdentity
import edu.uci.ics.texera.web.auth.SessionUser
import edu.uci.ics.texera.web.model.websocket.request.ResultExportRequest
import edu.uci.ics.texera.web.model.websocket.response.ResultExportResponse
import edu.uci.ics.texera.web.service.ResultExportService
import io.dropwizard.auth.Auth

import javax.ws.rs._
import javax.ws.rs.core.{MediaType, Response}
import javax.ws.rs.core.Response.Status
import scala.jdk.CollectionConverters._

@Path("/result")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,30 @@ object WorkflowExecutionsResource {
}
}

def getConsoleMessagesUriByExecutionId(eid: ExecutionIdentity): List[URI] =
if (AmberConfig.isUserSystemEnabled)
context
.select(OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_URI)
.from(OPERATOR_EXECUTIONS)
.where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
.fetchInto(classOf[String])
.asScala
.toList
.map(URI.create)
else Nil

def getRuntimeStatsUriByExecutionId(eid: ExecutionIdentity): Option[URI] =
if (AmberConfig.isUserSystemEnabled)
Option(
context
.select(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI)
.from(WORKFLOW_EXECUTIONS)
.where(WORKFLOW_EXECUTIONS.EID.eq(eid.id.toInt))
.fetchOneInto(classOf[String])
).filter(_.nonEmpty)
.map(URI.create)
else None

def clearUris(eid: ExecutionIdentity): Unit = {
if (AmberConfig.isUserSystemEnabled) {
context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class ExecutionConsoleService(
logger.error("Failed to close console message writer", e)
}
}
case _ =>
}
)

Expand All @@ -152,6 +153,9 @@ class ExecutionConsoleService(
Array(consoleMessage.toProtoString)
)
writer.putOne(tuple)
} catch {
case e: Exception =>
logger.error(s"Error while writing console message for operator $opId", e)
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ class ExecutionStatsService(
logger.error("Failed to close runtime statistics writer", e)
}
}
case _ =>
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import edu.uci.ics.amber.core.storage.model.VirtualDocument
import edu.uci.ics.amber.core.tuple.Tuple
import edu.uci.ics.amber.core.virtualidentity.{OperatorIdentity, WorkflowIdentity}
import edu.uci.ics.amber.core.workflow.PortIdentity
import edu.uci.ics.amber.engine.common.Utils.retry
import edu.uci.ics.amber.util.{ArrowUtils, PathUtils}
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.User
import edu.uci.ics.texera.web.model.websocket.request.ResultExportRequest
Expand All @@ -17,20 +16,16 @@ import edu.uci.ics.texera.web.resource.dashboard.user.workflow.{
WorkflowVersionResource
}
import edu.uci.ics.texera.web.service.WorkflowExecutionService.getLatestExecutionId
import org.jooq.types.UInteger

import java.io.{FilterOutputStream, IOException, OutputStream, PipedInputStream, PipedOutputStream}
import java.nio.channels.Channels
import java.nio.charset.StandardCharsets
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import java.util
import java.util.concurrent.{Executors, ThreadPoolExecutor}
import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.annotation.tailrec
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.util.Using
import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import scala.jdk.CollectionConverters.IterableHasAsScala

import edu.uci.ics.amber.core.storage.result.iceberg.OnIceberg

object WorkflowService {
private val workflowServiceMapping = new ConcurrentHashMap[String, WorkflowService]()
val cleanUpDeadlineInSeconds: Int = AmberConfig.executionStateCleanUpInSecs
Expand Down Expand Up @@ -99,6 +101,8 @@ class WorkflowService(
case _: Throwable => // exception can be raised if the document is already cleared
}
)

expireSnapshotsForExecution(eid)
})
WorkflowService.workflowServiceMapping.remove(mkWorkflowStateId(workflowId))
if (executionService.getValue != null) {
Expand Down Expand Up @@ -189,6 +193,7 @@ class WorkflowService(
case _: Throwable =>
}
)
expireSnapshotsForExecution(eid)
}) // TODO: change this behavior after enabling cache.

workflowContext.executionId = ExecutionsMetadataPersistService.insertNewExecution(
Expand Down Expand Up @@ -295,4 +300,24 @@ class WorkflowService(
resultService.unsubscribeAll()
}

private def expireSnapshots(uri: URI): Unit = {
try {
DocumentFactory.openDocument(uri)._1 match {
case iceberg: OnIceberg =>
iceberg.expireSnapshots()
case other =>
logger.error(
s"Cannot expire snapshots: document from URI [$uri] is of type ${other.getClass.getName}. Expected an instance of ${classOf[OnIceberg].getName}."
)
}
} catch {
case _: Throwable => logger.error("Cannot expire snapshots")
}
}

private def expireSnapshotsForExecution(eid: ExecutionIdentity): Unit = {
WorkflowExecutionsResource.getConsoleMessagesUriByExecutionId(eid).foreach(expireSnapshots)
WorkflowExecutionsResource.getRuntimeStatsUriByExecutionId(eid).foreach(expireSnapshots)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef](
val tableSchema: org.apache.iceberg.Schema,
val serde: (org.apache.iceberg.Schema, T) => Record,
val deserde: (org.apache.iceberg.Schema, Record) => T
) extends VirtualDocument[T] {
) extends VirtualDocument[T]
with OnIceberg {

private val lock = new ReentrantReadWriteLock()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package edu.uci.ics.amber.core.storage.result.iceberg

import edu.uci.ics.amber.util.IcebergUtil
import org.apache.iceberg.exceptions.NoSuchTableException

trait OnIceberg {
def catalog: org.apache.iceberg.catalog.Catalog
def tableNamespace: String
def tableName: String

/**
* Expire snapshots for the table.
*/
def expireSnapshots(): Unit = {
val table = IcebergUtil
.loadTableMetadata(catalog, tableNamespace, tableName)
.getOrElse(throw new NoSuchTableException(s"table $tableNamespace.$tableName doesn't exist"))

// Begin the snapshot expiration process:
table
.expireSnapshots() // Initiate snapshot expiration.
.retainLast(1) // Retain only the most recent snapshot.
.expireOlderThan(
System.currentTimeMillis()
) // Expire all snapshots older than the current time.
.cleanExpiredFiles(true) // Remove the files associated with expired snapshots.
.commit() // Commit the changes to make expiration effective.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,26 +132,12 @@ object IcebergUtil {
overrideIfExists: Boolean
): Table = {

val baseProperties = Map(
val tableProperties = Map(
TableProperties.COMMIT_NUM_RETRIES -> StorageConfig.icebergTableCommitNumRetries.toString,
TableProperties.COMMIT_MAX_RETRY_WAIT_MS -> StorageConfig.icebergTableCommitMaxRetryWaitMs.toString,
TableProperties.COMMIT_MIN_RETRY_WAIT_MS -> StorageConfig.icebergTableCommitMinRetryWaitMs.toString
)

val tableProperties =
if (
Set(
StorageConfig.icebergTableRuntimeStatisticsNamespace,
StorageConfig.icebergTableConsoleMessagesNamespace
).contains(tableNamespace)
) {
baseProperties ++ Map(
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED -> "true",
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX -> "1"
)
} else {
baseProperties
}
val identifier = TableIdentifier.of(tableNamespace, tableName)
if (catalog.tableExists(identifier) && overrideIfExists) {
catalog.dropTable(identifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ class IcebergDocumentConsoleMessagesSpec

override def getDocument: VirtualDocument[Tuple] = {
DocumentFactory.openDocument(uri)._1 match {
case doc: VirtualDocument[Tuple] => doc
case _ => fail("Failed to open document as VirtualDocument[Tuple]")
case doc: VirtualDocument[_] => doc.asInstanceOf[VirtualDocument[Tuple]]
case _ => fail("Failed to open document as VirtualDocument[Tuple]")
}
}
}
Loading