Skip to content

Commit

Permalink
Merge branch 'master' into jiadong-improve-compilation
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbai00 authored Mar 4, 2025
2 parents c332ee5 + 8bdfd11 commit 9e22e14
Show file tree
Hide file tree
Showing 20 changed files with 676 additions and 538 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package edu.uci.ics.texera.web.model.websocket.request

case class ResultExportRequest(
exportType: String,
exportType: String, // e.g. "csv", "google_sheet", "arrow", "data"
workflowId: Int,
workflowName: String,
operatorId: String,
operatorName: String,
operatorIds: List[String], // changed from single operatorId: String -> List of strings
datasetIds: List[Int],
rowIndex: Int,
columnIndex: Int,
filename: String
rowIndex: Int, // used by "data" export
columnIndex: Int, // used by "data" export
filename: String, // optional filename override
destination: String // "dataset" or "local"
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
package edu.uci.ics.texera.web.model.websocket.response

import edu.uci.ics.texera.web.model.websocket.event.TexeraWebSocketEvent

case class ResultExportResponse(status: String, message: String) extends TexeraWebSocketEvent
case class ResultExportResponse(status: String, message: String)
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.virtualidentity.WorkflowIdentity
import edu.uci.ics.texera.web.auth.SessionUser
import edu.uci.ics.texera.web.model.websocket.request.ResultExportRequest
import edu.uci.ics.texera.web.model.websocket.response.ResultExportResponse
import edu.uci.ics.texera.web.service.ResultExportService
import io.dropwizard.auth.Auth

import javax.ws.rs._
import javax.ws.rs.core.Response
import javax.ws.rs.core.{MediaType, Response}
import scala.jdk.CollectionConverters._

@Path("/result")
@Produces(Array(MediaType.APPLICATION_JSON, MediaType.APPLICATION_OCTET_STREAM))
class ResultResource extends LazyLogging {

@POST
Expand All @@ -22,21 +22,73 @@ class ResultResource extends LazyLogging {
@Auth user: SessionUser
): Response = {

if (request.operatorIds.size <= 0)
Response
.status(Response.Status.BAD_REQUEST)
.`type`(MediaType.APPLICATION_JSON)
.entity(Map("error" -> "No operator selected").asJava)
.build()

try {
val resultExportService = new ResultExportService(WorkflowIdentity(request.workflowId))
request.destination match {
case "local" =>
// CASE A: multiple operators => produce ZIP
if (request.operatorIds.size > 1) {
val resultExportService = new ResultExportService(WorkflowIdentity(request.workflowId))
val (zipStream, zipFileNameOpt) =
resultExportService.exportOperatorsAsZip(user.user, request)

if (zipStream == null) {
throw new RuntimeException("Zip stream is null")
}

val finalFileName = zipFileNameOpt.getOrElse("operators.zip")
return Response
.ok(zipStream, "application/zip")
.header("Content-Disposition", "attachment; filename=\"" + finalFileName + "\"")
.build()
}

val exportResponse: ResultExportResponse =
resultExportService.exportResult(user.user, request)
// CASE B: exactly one operator => single file
if (request.operatorIds.size != 1) {
return Response
.status(Response.Status.BAD_REQUEST)
.`type`(MediaType.APPLICATION_JSON)
.entity(Map("error" -> "Local download supports no operator or many.").asJava)
.build()
}
val singleOpId = request.operatorIds.head

Response.ok(exportResponse).build()
val resultExportService = new ResultExportService(WorkflowIdentity(request.workflowId))
val (streamingOutput, fileNameOpt) =
resultExportService.exportOperatorResultAsStream(request, singleOpId)

if (streamingOutput == null) {
return Response
.status(Response.Status.INTERNAL_SERVER_ERROR)
.`type`(MediaType.APPLICATION_JSON)
.entity(Map("error" -> "Failed to export operator").asJava)
.build()
}

val finalFileName = fileNameOpt.getOrElse("download.dat")
Response
.ok(streamingOutput, MediaType.APPLICATION_OCTET_STREAM)
.header("Content-Disposition", "attachment; filename=\"" + finalFileName + "\"")
.build()
case _ =>
// destination = "dataset" by default
val resultExportService = new ResultExportService(WorkflowIdentity(request.workflowId))
val exportResponse = resultExportService.exportResult(user.user, request)
Response.ok(exportResponse).build()
}
} catch {
case ex: Exception =>
Response
.status(Response.Status.INTERNAL_SERVER_ERROR)
.`type`(MediaType.APPLICATION_JSON)
.entity(Map("error" -> ex.getMessage).asJava)
.build()
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,30 @@ object WorkflowExecutionsResource {
}
}

def getConsoleMessagesUriByExecutionId(eid: ExecutionIdentity): List[URI] =
if (AmberConfig.isUserSystemEnabled)
context
.select(OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_URI)
.from(OPERATOR_EXECUTIONS)
.where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
.fetchInto(classOf[String])
.asScala
.toList
.map(URI.create)
else Nil

def getRuntimeStatsUriByExecutionId(eid: ExecutionIdentity): Option[URI] =
if (AmberConfig.isUserSystemEnabled)
Option(
context
.select(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI)
.from(WORKFLOW_EXECUTIONS)
.where(WORKFLOW_EXECUTIONS.EID.eq(eid.id.toInt))
.fetchOneInto(classOf[String])
).filter(_.nonEmpty)
.map(URI.create)
else None

def clearUris(eid: ExecutionIdentity): Unit = {
if (AmberConfig.isUserSystemEnabled) {
context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class ExecutionConsoleService(
logger.error("Failed to close console message writer", e)
}
}
case _ =>
}
)

Expand All @@ -152,6 +153,9 @@ class ExecutionConsoleService(
Array(consoleMessage.toProtoString)
)
writer.putOne(tuple)
} catch {
case e: Exception =>
logger.error(s"Error while writing console message for operator $opId", e)
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ class ExecutionStatsService(
logger.error("Failed to close runtime statistics writer", e)
}
}
case _ =>
}
)

Expand Down
Loading

0 comments on commit 9e22e14

Please sign in to comment.