Skip to content

Commit

Permalink
finish the fix
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbai00 committed Mar 4, 2025
1 parent 89dbb79 commit c332ee5
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,23 @@ class WorkflowCompiler(
// Add all the links to the physical plan
physicalPlan = (externalLinks ++ internalLinks)
.foldLeft(physicalPlan) { (plan, link) => plan.addLink(link) }

// **Check for Python-based operator errors during code generation**
if (physicalOp.isPythonBased) {
val code = physicalOp.getCode
val exceptionPattern = """#EXCEPTION DURING CODE GENERATION:\s*(.*)""".r

exceptionPattern.findFirstMatchIn(code).foreach { matchResult =>
val errorMessage = matchResult.group(1).trim
val error =
new RuntimeException(s"Operator is not configured properly: $errorMessage")

errorList match {
case Some(list) => list.append((logicalOpId, error)) // Store error and continue
case None => throw error // Throw immediately if no error list is provided
}
}
}
}
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import java.time.Instant
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters.IteratorHasAsScala
import scala.util.{Failure, Success, Try}

object WorkflowCompiler {
// util function for extracting the error causes
Expand Down Expand Up @@ -102,31 +101,20 @@ class WorkflowCompiler(

// function to expand logical plan to physical plan
private def expandLogicalPlan(
logicalPlan: LogicalPlan,
errorList: Option[ArrayBuffer[(OperatorIdentity, Throwable)]]
): PhysicalPlan = {
logicalPlan: LogicalPlan,
errorList: Option[ArrayBuffer[(OperatorIdentity, Throwable)]]
): PhysicalPlan = {
var physicalPlan = PhysicalPlan(operators = Set.empty, links = Set.empty)

logicalPlan.getTopologicalOpIds.asScala.foreach { logicalOpId =>
val logicalOp = logicalPlan.getOperator(logicalOpId)
val allUpstreamLinks = logicalPlan.getUpstreamLinks(logicalOp.operatorIdentifier)

// Try to get the physical plan for this operator
val subPlanOpt = try {
Some(logicalOp.getPhysicalPlan(context.workflowId, context.executionId))
} catch {
case err: Throwable =>
errorList match {
case Some(list) =>
list.append((logicalOpId, err))
None // Skip processing this operator
case None => throw err
}
}
try {
val subPlan = logicalOp.getPhysicalPlan(context.workflowId, context.executionId)

// Continue if we couldn't get a subPlan
subPlanOpt.foreach { subPlan =>
subPlan.topologicalIterator()
subPlan
.topologicalIterator()
.map(subPlan.getOperator)
.foreach { physicalOp =>
val externalLinks = allUpstreamLinks
Expand All @@ -149,9 +137,33 @@ class WorkflowCompiler(
physicalPlan = (externalLinks ++ internalLinks).foldLeft(physicalPlan) { (plan, link) =>
plan.addLink(link)
}

// **Check for Python-based operator errors during code generation**
if (physicalOp.isPythonBased) {
val code = physicalOp.getCode
val exceptionPattern = """#EXCEPTION DURING CODE GENERATION:\s*(.*)""".r

exceptionPattern.findFirstMatchIn(code).foreach { matchResult =>
val errorMessage = matchResult.group(1).trim
val error =
new RuntimeException(s"Operator is not configured properly: $errorMessage")

errorList match {
case Some(list) => list.append((logicalOpId, error)) // Store error and continue
case None => throw error // Throw immediately if no error list is provided
}
}
}
}
} catch {
case e: Throwable =>
errorList match {
case Some(list) => list.append((logicalOpId, e)) // Store error
case None => throw e // Throw if no list is provided
}
}
}

physicalPlan
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,14 @@ case class PhysicalOp(
}
}

@JsonIgnore
def getCode: String = {
opExecInitInfo match {
case OpExecWithCode(code, _) => code
case _ => throw new IllegalAccessError("No code information in this physical operator")
}
}

/**
* creates a copy with the location preference information
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,20 @@ import edu.uci.ics.amber.core.workflow.{PhysicalOp, PortIdentity, SchemaPropagat

trait PythonOperatorDescriptor extends LogicalOp {
private def generatePythonCodeForRaisingException(ex: Throwable): String = {
val finalCode =
s"""
|from pytexera import *
|
|class ProcessTupleOperator(UDFOperatorV2):
| @overrides
| def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]:
| raise ValueError("Exception is thrown when generating the python code for current operator: ${ex.toString}")
| """.stripMargin
finalCode
s"#EXCEPTION DURING CODE GENERATION: ${ex.getMessage}"
}

override def getPhysicalOp(
workflowId: WorkflowIdentity,
executionId: ExecutionIdentity
): PhysicalOp = {
// TODO: make python code be the error reporting code
val pythonCode =
try {
generatePythonCode()
} catch {
case ex: Throwable =>
// instead of throwing error directly, we embed the error in the code
// this can let upper-level compiler catch the error without interrupting the schema propagation
generatePythonCodeForRaisingException(ex)
}
val physicalOp = if (asSource()) {
Expand Down

0 comments on commit c332ee5

Please sign in to comment.