diff --git a/streamingpro-mlsql/src/main/java/tech/mlsql/ets/Ray.scala b/streamingpro-mlsql/src/main/java/tech/mlsql/ets/Ray.scala index f50ea49e9..eb39fe6ee 100644 --- a/streamingpro-mlsql/src/main/java/tech/mlsql/ets/Ray.scala +++ b/streamingpro-mlsql/src/main/java/tech/mlsql/ets/Ray.scala @@ -175,17 +175,24 @@ class Ray(override val uid: String) extends SQLAlg with VersionCompatibility wit convert(Row.fromSeq(Seq(irow.host, irow.port, irow.timezone))).copy() }.iterator val javaContext = new JavaContext() - val commonTaskContext = new AppContextImpl(javaContext, batch) - val columnarBatchIter = batch.compute(Iterator(newIter), 0, commonTaskContext) - val data = columnarBatchIter.flatMap { batch => - batch.rowIterator.asScala.map(f => - stage1_schema_encoder(f) - ) - }.toList - javaContext.markComplete - javaContext.close - val rdd = session.sparkContext.makeRDD[Row](data) - session.createDataFrame(rdd, stage1_schema) + try { + val commonTaskContext = new AppContextImpl(javaContext, batch) + val columnarBatchIter = batch.compute(Iterator(newIter), 0, commonTaskContext) + val data = columnarBatchIter.flatMap { batch => + batch.rowIterator.asScala.map(f => + stage1_schema_encoder(f) + ) + }.toList + val rdd = session.sparkContext.makeRDD[Row](data) + session.createDataFrame(rdd, stage1_schema) + } catch { + case e: Exception => + throw new MLSQLException("An exception was encountered in the execution of this python task! " + + "Please check your code and try again after modification.", e) + } finally { + javaContext.markComplete + javaContext.close + } } if (dataMode == "data") {