Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BW-1206 - Combine all Wes Endpoints & add Tests #6833

Merged
merged 9 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import cromwell.services.instrumentation.CromwellInstrumentationActor
import cromwell.webservice.SwaggerService
import cromwell.webservice.routes.CromwellApiService
import cromwell.webservice.routes.wes.WesRouteSupport
import cromwell.webservice.routes.wes.WesRunRoutes

import scala.concurrent.Future
import scala.util.{Failure, Success}
Expand All @@ -37,7 +36,6 @@ class CromwellServerActor(cromwellSystem: CromwellSystem, gracefulShutdown: Bool
with CromwellApiService
with CromwellInstrumentationActor
with WesRouteSupport
with WesRunRoutes
with SwaggerService
with ActorLogging {
implicit val actorSystem = context.system
Expand All @@ -53,7 +51,7 @@ class CromwellServerActor(cromwellSystem: CromwellSystem, gracefulShutdown: Bool
* cromwell.yaml is broken unless the swagger index.html is patched. Copy/paste the code from rawls or cromiam if
* actual cromwell+swagger+oauth+/api support is needed.
*/
val apiRoutes: Route = pathPrefix("api")(concat(workflowRoutes, womtoolRoutes, wesRoutes, runRoutes))
val apiRoutes: Route = pathPrefix("api")(concat(workflowRoutes, womtoolRoutes, wesRoutes))
val nonApiRoutes: Route = concat(engineRoutes, swaggerUiResourceRoute)
val allRoutes: Route = concat(apiRoutes, nonApiRoutes)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,38 @@
package cromwell.webservice.routes.wes

import akka.actor.ActorRef
import akka.http.scaladsl.model.{StatusCode, StatusCodes}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.{Directive1, Route}
import akka.http.scaladsl.server.directives.RouteDirectives.complete
import akka.pattern.{AskTimeoutException, ask}
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import cromwell.core.WorkflowId
import cromwell.core.abort.SuccessfulAbortResponse
import cromwell.engine.instrumentation.HttpInstrumentation
import cromwell.services.metadata.MetadataService.{GetStatus, MetadataServiceResponse, StatusLookupFailed}
import cromwell.webservice.routes.CromwellApiService.{UnrecognizedWorkflowException, validateWorkflowIdInMetadata}
import cromwell.engine.workflow.WorkflowManagerActor.WorkflowNotFoundException
import cromwell.server.CromwellShutdown
import cromwell.services.{FailedMetadataJsonResponse, SuccessfulMetadataJsonResponse}
import cromwell.services.metadata.MetadataService.{BuildMetadataJsonAction, GetSingleWorkflowMetadataAction, GetStatus, MetadataServiceResponse, StatusLookupFailed}
import cromwell.webservice.WebServiceUtils.EnhancedThrowable
import cromwell.webservice.routes.CromwellApiService.{UnrecognizedWorkflowException, validateWorkflowIdInMetadata}
import cromwell.webservice.routes.MetadataRouteSupport.{metadataBuilderActorRequest, metadataQueryRequest}
import cromwell.webservice.routes.wes.WesResponseJsonSupport._
import cromwell.webservice.routes.wes.WesRouteSupport._
import cromwell.webservice.routes.{CromwellApiService, WesCromwellRouteSupport}
import net.ceedubs.ficus.Ficus._

import scala.concurrent.ExecutionContext
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success}
import WesResponseJsonSupport._
import akka.http.scaladsl.model.{StatusCode, StatusCodes}
import akka.http.scaladsl.server.directives.RouteDirectives.complete
import WesRouteSupport._
import cromwell.core.abort.SuccessfulAbortResponse
import cromwell.engine.workflow.WorkflowManagerActor.WorkflowNotFoundException
import cromwell.server.CromwellShutdown
import cromwell.services.SuccessfulMetadataJsonResponse
import cromwell.webservice.routes.CromwellApiService

trait WesRouteSupport extends HttpInstrumentation {


trait WesRouteSupport extends HttpInstrumentation with WesCromwellRouteSupport {

val serviceRegistryActor: ActorRef
val workflowManagerActor: ActorRef
val workflowStoreActor: ActorRef

implicit val ec: ExecutionContext
implicit val timeout: Timeout
Expand All @@ -49,36 +56,53 @@ trait WesRouteSupport extends HttpInstrumentation {
pathPrefix("ga4gh" / "wes" / "v1") {
concat(
path("service-info") {
complete(ServiceInfo.toWesResponse(workflowStoreActor))
get {
complete(ServiceInfo.toWesResponse(workflowStoreActor))
}
},
pathPrefix("runs") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you've removed this path prefix and added a "runs" to the start of all the cases. That's fine, just curious whether it was a deliberate choice?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on curiosity, I do think pathPrefix is more conventional to reduce repetition, but I'm sure this works fine too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally intended to use pathPrefix for repetitions sake, but doing so confused all of the Routes for some reason, and the fix I found was to simply give all the routes their own runs.

concat(
path(Segment / "status") { possibleWorkflowId =>
val response = validateWorkflowIdInMetadata(possibleWorkflowId, serviceRegistryActor).flatMap(w => serviceRegistryActor.ask(GetStatus(w)).mapTo[MetadataServiceResponse])
// WES can also return a 401 or a 403 but that requires user auth knowledge which Cromwell doesn't currently have
onComplete(response) {
case Success(SuccessfulMetadataJsonResponse(_, jsObject)) =>
val wesState = WesState.fromCromwellStatusJson(jsObject)
complete(WesRunStatus(possibleWorkflowId, wesState))
case Success(r: StatusLookupFailed) => r.reason.errorRequest(StatusCodes.InternalServerError)
case Success(m: MetadataServiceResponse) =>
// This should never happen, but ....
val error = new IllegalStateException("Unexpected response from Metadata service: " + m)
error.errorRequest(StatusCodes.InternalServerError)
case Failure(_: UnrecognizedWorkflowException) => complete(NotFoundError)
case Failure(e) => complete(WesErrorResponse(e.getMessage, StatusCodes.InternalServerError.intValue))
}
},
path(Segment / "cancel") { possibleWorkflowId =>
post {
CromwellApiService.abortWorkflow(possibleWorkflowId,
workflowStoreActor,
workflowManagerActor,
successHandler = WesAbortSuccessHandler,
errorHandler = WesAbortErrorHandler)
path("runs") {
get {
parameters(("page_size".as[Int].?, "page_token".?)) { (pageSize, pageToken) =>
completeCromwellResponse(listRuns(pageSize, pageToken, serviceRegistryActor))
}
} ~
post {
extractSubmission() { submission =>
submitRequest(submission.entity,
isSingleSubmission = true)
}
}
)
},
path("runs" / Segment) { workflowId =>
get {
// this is what it was like in code found in the project… it perhaps isn’t ideal but doesn’t seem to hurt, so leaving it like this for now.
completeCromwellResponse(runLog(workflowId, (w: WorkflowId) => GetSingleWorkflowMetadataAction(w, None, None, expandSubWorkflows = false), serviceRegistryActor))
}
},
path("runs" / Segment / "status") { possibleWorkflowId =>
kpierre13 marked this conversation as resolved.
Show resolved Hide resolved
val response = validateWorkflowIdInMetadata(possibleWorkflowId, serviceRegistryActor).flatMap(w => serviceRegistryActor.ask(GetStatus(w)).mapTo[MetadataServiceResponse])
// WES can also return a 401 or a 403 but that requires user auth knowledge which Cromwell doesn't currently have
onComplete(response) {
case Success(SuccessfulMetadataJsonResponse(_, jsObject)) =>
val wesState = WesState.fromCromwellStatusJson(jsObject)
complete(WesRunStatus(possibleWorkflowId, wesState))
case Success(r: StatusLookupFailed) => r.reason.errorRequest(StatusCodes.InternalServerError)
case Success(m: MetadataServiceResponse) =>
// This should never happen, but ....
val error = new IllegalStateException("Unexpected response from Metadata service: " + m)
error.errorRequest(StatusCodes.InternalServerError)
case Failure(_: UnrecognizedWorkflowException) => complete(NotFoundError)
case Failure(e) => complete(WesErrorResponse(e.getMessage, StatusCodes.InternalServerError.intValue))
}
},
path("runs" / Segment / "cancel") { possibleWorkflowId =>
post {
CromwellApiService.abortWorkflow(possibleWorkflowId,
workflowStoreActor,
workflowManagerActor,
successHandler = WesAbortSuccessHandler,
errorHandler = WesAbortErrorHandler)
}
}
)
}
Expand All @@ -87,6 +111,13 @@ trait WesRouteSupport extends HttpInstrumentation {
}

object WesRouteSupport {
import WesResponseJsonSupport._

implicit lazy val duration: FiniteDuration = ConfigFactory.load().as[FiniteDuration]("akka.http.server.request-timeout")
implicit lazy val timeout: Timeout = duration
import scala.concurrent.ExecutionContext.Implicits.global


val NotFoundError = WesErrorResponse("The requested workflow run wasn't found", StatusCodes.NotFound.intValue)

def WesAbortSuccessHandler: PartialFunction[SuccessfulAbortResponse, Route] = {
Expand All @@ -104,4 +135,39 @@ object WesRouteSupport {
private def respondWithWesError(errorMsg: String, status: StatusCode): Route = {
complete((status, WesErrorResponse(errorMsg, status.intValue)))
}

def extractSubmission(): Directive1[WesSubmission] = {
formFields((
"workflow_params".?,
"workflow_type".?,
"workflow_type_version".?,
"tags".?,
"workflow_engine_parameters".?,
"workflow_url".?,
"workflow_attachment".as[String].*
)).as(WesSubmission)
}

def completeCromwellResponse(future: => Future[WesResponse]): Route = {
onComplete(future) {
case Success(response: WesResponse) => complete(response)
case Failure(e) => complete(WesErrorResponse(e.getMessage, StatusCodes.InternalServerError.intValue))
}
}

def listRuns(pageSize: Option[Int], pageToken: Option[String], serviceRegistryActor: ActorRef): Future[WesResponse] = {
// FIXME: to handle - page_size, page_token
// FIXME: How to handle next_page_token in response?
metadataQueryRequest(Seq.empty[(String, String)], serviceRegistryActor).map(RunListResponse.fromMetadataQueryResponse)
}

def runLog(workflowId: String, request: WorkflowId => BuildMetadataJsonAction, serviceRegistryActor: ActorRef): Future[WesResponse] = {
val metadataJsonResponse = metadataBuilderActorRequest(workflowId, request, serviceRegistryActor)

metadataJsonResponse.map {
case SuccessfulMetadataJsonResponse(_, responseJson) => WesResponseWorkflowMetadata(WesRunLog.fromJson(responseJson.toString()))
case FailedMetadataJsonResponse(_, reason) => WesErrorResponse(reason.getMessage, StatusCodes.InternalServerError.intValue)
}
}

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package cromwell.webservice.routes

import java.time.OffsetDateTime

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model.ContentTypes._
Expand All @@ -17,13 +15,13 @@ import cromwell.engine.workflow.WorkflowManagerActor.WorkflowNotFoundException
import cromwell.engine.workflow.workflowstore.WorkflowStoreActor._
import cromwell.engine.workflow.workflowstore.WorkflowStoreEngineActor.{WorkflowOnHoldToSubmittedFailure, WorkflowOnHoldToSubmittedSuccess}
import cromwell.engine.workflow.workflowstore.WorkflowStoreSubmitActor.{WorkflowSubmittedToStore, WorkflowsBatchSubmittedToStore}
import cromwell.services._
import cromwell.services.healthmonitor.ProtoHealthMonitorServiceActor.{GetCurrentStatus, StatusCheckResponse, SubsystemStatus}
import cromwell.services.instrumentation.InstrumentationService.InstrumentationServiceMessage
import cromwell.services.metadata.MetadataArchiveStatus._
import cromwell.services.metadata.MetadataService._
import cromwell.services.metadata._
import cromwell.services.metadata.impl.builder.MetadataBuilderActor
import cromwell.services._
import cromwell.services.womtool.WomtoolServiceMessages.{DescribeFailure, DescribeRequest, DescribeSuccess}
import cromwell.services.womtool.models.WorkflowDescription
import cromwell.util.SampleWdl.HelloWorld
Expand All @@ -34,6 +32,7 @@ import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import spray.json._

import java.time.OffsetDateTime
import scala.concurrent.duration._

class CromwellApiServiceSpec extends AsyncFlatSpec with ScalatestRouteTest with Matchers {
Expand Down Expand Up @@ -529,6 +528,7 @@ object CromwellApiServiceSpec {
val WorkflowIdExistingOnlyInSummaryTable = WorkflowId.fromString("f0000000-0000-0000-0000-000000000011")
val ArchivedWorkflowId = WorkflowId.fromString("c4c6339c-2145-47fb-acc5-b5cb8d2809f5")
val ArchivedAndDeletedWorkflowId = WorkflowId.fromString("abc1234d-2145-47fb-acc5-b5cb8d2809f5")
val wesWorkflowId = WorkflowId.randomId()
val SummarizedWorkflowIds = Set(
SummarizedWorkflowId,
WorkflowIdExistingOnlyInSummaryTable,
Expand All @@ -545,7 +545,8 @@ object CromwellApiServiceSpec {
FailedWorkflowId,
SummarizedWorkflowId,
ArchivedWorkflowId,
ArchivedAndDeletedWorkflowId
ArchivedAndDeletedWorkflowId,
wesWorkflowId
)

class MockApiService()(implicit val system: ActorSystem) extends CromwellApiService {
Expand All @@ -564,13 +565,21 @@ object CromwellApiServiceSpec {
List(
MetadataEvent(MetadataKey(workflowId, None, "testKey1a"), MetadataValue("myValue1a", MetadataString)),
MetadataEvent(MetadataKey(workflowId, None, "testKey1b"), MetadataValue("myValue1b", MetadataString)),
MetadataEvent(MetadataKey(workflowId, None, "testKey2a"), MetadataValue("myValue2a", MetadataString))
MetadataEvent(MetadataKey(workflowId, None, "testKey2a"), MetadataValue("myValue2a", MetadataString)),
)
}
private def wesFullMetadataResponse(workflowId: WorkflowId) = {
List(
MetadataEvent(MetadataKey(workflowId, None, "status"), MetadataValue("Running", MetadataString)),
MetadataEvent(MetadataKey(workflowId, None, "submittedFiles:workflow"), MetadataValue("myValue2a", MetadataString)),

)
}

def responseMetadataValues(workflowId: WorkflowId, withKeys: List[String], withoutKeys: List[String]): JsObject = {
def keyFilter(keys: List[String])(m: MetadataEvent) = keys.exists(k => m.key.key.startsWith(k))
val events = fullMetadataResponse(workflowId)
val metadataEvents = if (workflowId == wesWorkflowId) wesFullMetadataResponse(workflowId) else fullMetadataResponse(workflowId)
val events = metadataEvents
.filter(m => withKeys.isEmpty || keyFilter(withKeys)(m))
.filter(m => withoutKeys.isEmpty || !keyFilter(withoutKeys)(m))

Expand Down
Loading