Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the compilation of visualization operators by suppressing the exception to codes #3265

Merged
merged 7 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,23 @@ class WorkflowCompiler(
// Add all the links to the physical plan
physicalPlan = (externalLinks ++ internalLinks)
.foldLeft(physicalPlan) { (plan, link) => plan.addLink(link) }

// **Check for Python-based operator errors during code generation**
if (physicalOp.isPythonBased) {
val code = physicalOp.getCode
val exceptionPattern = """#EXCEPTION DURING CODE GENERATION:\s*(.*)""".r

exceptionPattern.findFirstMatchIn(code).foreach { matchResult =>
val errorMessage = matchResult.group(1).trim
val error =
new RuntimeException(s"Operator is not configured properly: $errorMessage")

errorList match {
case Some(list) => list.append((logicalOpId, error)) // Store error and continue
case None => throw error // Throw immediately if no error list is provided
}
}
}
}
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import java.time.Instant
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters.IteratorHasAsScala
import scala.util.{Failure, Success, Try}

object WorkflowCompiler {
// util function for extracting the error causes
Expand Down Expand Up @@ -107,48 +106,64 @@ class WorkflowCompiler(
): PhysicalPlan = {
var physicalPlan = PhysicalPlan(operators = Set.empty, links = Set.empty)

logicalPlan.getTopologicalOpIds.asScala.foreach(logicalOpId =>
Try {
val logicalOp = logicalPlan.getOperator(logicalOpId)
val allUpstreamLinks = logicalPlan
.getUpstreamLinks(logicalOp.operatorIdentifier)
logicalPlan.getTopologicalOpIds.asScala.foreach { logicalOpId =>
val logicalOp = logicalPlan.getOperator(logicalOpId)
val allUpstreamLinks = logicalPlan.getUpstreamLinks(logicalOp.operatorIdentifier)

try {
val subPlan = logicalOp.getPhysicalPlan(context.workflowId, context.executionId)

subPlan
.topologicalIterator()
.map(subPlan.getOperator)
.foreach({ physicalOp =>
{
val externalLinks = allUpstreamLinks
.filter(link => physicalOp.inputPorts.contains(link.toPortId))
.flatMap { link =>
physicalPlan
.getPhysicalOpsOfLogicalOp(link.fromOpId)
.find(_.outputPorts.contains(link.fromPortId))
.map(fromOp =>
PhysicalLink(fromOp.id, link.fromPortId, physicalOp.id, link.toPortId)
)
}
.foreach { physicalOp =>
val externalLinks = allUpstreamLinks
.filter(link => physicalOp.inputPorts.contains(link.toPortId))
.flatMap { link =>
physicalPlan
.getPhysicalOpsOfLogicalOp(link.fromOpId)
.find(_.outputPorts.contains(link.fromPortId))
.map(fromOp =>
PhysicalLink(fromOp.id, link.fromPortId, physicalOp.id, link.toPortId)
)
}

val internalLinks = subPlan.getUpstreamPhysicalLinks(physicalOp.id)

// Add the operator to the physical plan
physicalPlan = physicalPlan.addOperator(physicalOp.propagateSchema())

// Add all the links to the physical plan
physicalPlan = (externalLinks ++ internalLinks).foldLeft(physicalPlan) { (plan, link) =>
plan.addLink(link)
}

val internalLinks = subPlan.getUpstreamPhysicalLinks(physicalOp.id)
// **Check for Python-based operator errors during code generation**
if (physicalOp.isPythonBased) {
val code = physicalOp.getCode
val exceptionPattern = """#EXCEPTION DURING CODE GENERATION:\s*(.*)""".r

// Add the operator to the physical plan
physicalPlan = physicalPlan.addOperator(physicalOp.propagateSchema())
exceptionPattern.findFirstMatchIn(code).foreach { matchResult =>
val errorMessage = matchResult.group(1).trim
val error =
new RuntimeException(s"Operator is not configured properly: $errorMessage")

// Add all the links to the physical plan
physicalPlan = (externalLinks ++ internalLinks)
.foldLeft(physicalPlan) { (plan, link) => plan.addLink(link) }
errorList match {
case Some(list) => list.append((logicalOpId, error)) // Store error and continue
case None => throw error // Throw immediately if no error list is provided
}
}
}
})
} match {
case Success(_) =>
case Failure(err) =>
}
} catch {
case e: Throwable =>
errorList match {
case Some(list) => list.append((logicalOpId, err))
case None => throw err
case Some(list) => list.append((logicalOpId, e)) // Store error
case None => throw e // Throw if no list is provided
}
}
)
}

physicalPlan
}

Expand All @@ -172,11 +187,9 @@ class WorkflowCompiler(
// 3. expand the logical plan to the physical plan
val physicalPlan = expandLogicalPlan(logicalPlan, Some(errorList))

if (errorList.isEmpty) {
// no error during the expansion, then do:
// - collect the input schema for each op
opIdToInputSchema = collectInputSchemaFromPhysicalPlan(physicalPlan, errorList)
}
// 4. collect the input schema for each logical op
// even if error is encountered when logical => physical, we still want to get the input schemas for rest no-error operators
opIdToInputSchema = collectInputSchemaFromPhysicalPlan(physicalPlan, errorList)

WorkflowCompilationResult(
physicalPlan = if (errorList.nonEmpty) None else Some(physicalPlan),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,14 @@ case class PhysicalOp(
}
}

@JsonIgnore
def getCode: String = {
opExecInitInfo match {
case OpExecWithCode(code, _) => code
case _ => throw new IllegalAccessError("No code information in this physical operator")
}
}

/**
* creates a copy with the location preference information
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,36 @@ import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdenti
import edu.uci.ics.amber.core.workflow.{PhysicalOp, PortIdentity, SchemaPropagationFunc}

trait PythonOperatorDescriptor extends LogicalOp {
private def generatePythonCodeForRaisingException(ex: Throwable): String = {
s"#EXCEPTION DURING CODE GENERATION: ${ex.getMessage}"
}

override def getPhysicalOp(
workflowId: WorkflowIdentity,
executionId: ExecutionIdentity
): PhysicalOp = {
val pythonCode =
try {
generatePythonCode()
} catch {
case ex: Throwable =>
// instead of throwing error directly, we embed the error in the code
// this can let upper-level compiler catch the error without interrupting the schema propagation
generatePythonCodeForRaisingException(ex)
}
val physicalOp = if (asSource()) {
PhysicalOp.sourcePhysicalOp(
workflowId,
executionId,
operatorIdentifier,
OpExecWithCode(generatePythonCode(), "python")
OpExecWithCode(pythonCode, "python")
)
} else {
PhysicalOp.oneToOnePhysicalOp(
workflowId,
executionId,
operatorIdentifier,
OpExecWithCode(generatePythonCode(), "python")
OpExecWithCode(pythonCode, "python")
)
}

Expand Down
Loading