diff --git a/pact4s/src/main/scala/org/broadinstitute/dsde/workbench/cromwell/consumer/CbasClient.scala b/pact4s/src/main/scala/org/broadinstitute/dsde/workbench/cromwell/consumer/CbasClient.scala index 6ede8c71f70..498a78e030b 100644 --- a/pact4s/src/main/scala/org/broadinstitute/dsde/workbench/cromwell/consumer/CbasClient.scala +++ b/pact4s/src/main/scala/org/broadinstitute/dsde/workbench/cromwell/consumer/CbasClient.scala @@ -9,19 +9,14 @@ import org.http4s.circe.CirceEntityCodec.circeEntityEncoder import org.http4s.client.Client import org.http4s.headers.Authorization -import java.util.UUID +final case class WorkflowCallbackMessage(workflowId: String, + state: String, + outputs: Object, + failures: List[String]) -case class WorkflowResultMetadata(workflowId: UUID, - state: String, - outputs: Object, - failures: List[String] -) trait CbasClient[F[_]] extends LazyLogging { - def postWorkflowResults(bearerToken: String, - workflowId: UUID, - state: String, - outputs: Object, - failures: List[String]): F[Boolean] + def postWorkflowResults(authHeader: Authorization, + callbackMessage: WorkflowCallbackMessage): F[Boolean] } /* @@ -30,33 +25,30 @@ trait CbasClient[F[_]] extends LazyLogging { */ class CbasClientImpl[F[_]: Concurrent](client: Client[F], baseUrl: Uri) extends CbasClient[F] { val apiVersion = "v1" - implicit val workflowResultMetadataEncoder: Encoder[WorkflowResultMetadata] = Encoder.forProduct4( + + implicit val workflowCallbackMessageEncoder: Encoder[WorkflowCallbackMessage] = Encoder.forProduct4( "workflowId", "state", "outputs", "failures" )(x => (x.workflowId, x.state, x.outputs.toString, x.failures)) - implicit val resourceMetadataRequestEntityEncoder: EntityEncoder[F, WorkflowResultMetadata] = circeEntityEncoder[F, WorkflowResultMetadata] - override def postWorkflowResults(bearerToken: String, - workflowId: UUID, - state: String, - outputs: Object, - failures: List[String]): F[Boolean] = { - val body = WorkflowResultMetadata(workflowId = workflowId, state = state, outputs = outputs, failures = failures) - val entityBody: EntityBody[F] = EntityEncoder[F, WorkflowResultMetadata].toEntity(body).body - val request = Request[F](uri = baseUrl / "api" / "batch"/ apiVersion / "runs" / "results", method=Method.POST, body=entityBody) - .putHeaders( - Authorization(Credentials.Token(AuthScheme.Bearer, bearerToken)), - org.http4s.headers.`Content-Type`(MediaType.application.json) - )//org.http4s.headers.`Authorization`(Credentials.Token(AuthScheme.Bearer, bearerToken)) - - client.run(request).use { resp => - resp.status match { - case Status.Ok => true.pure[F] - case Status.InternalServerError => false.pure[F] - case _ => UnknownError.raiseError - } + + override def postWorkflowResults(authHeader: Authorization, + callbackMessage: WorkflowCallbackMessage): F[Boolean] = { + val body = callbackMessage + val entityBody: EntityBody[F] = EntityEncoder[F, WorkflowCallbackMessage].toEntity(body).body + val request = Request[F](uri = baseUrl / "api" / "batch" / apiVersion / "runs" / "results", method = Method.POST, body = entityBody) + .withHeaders( + org.http4s.headers.`Content-Type`(MediaType.application.json), + authHeader + ) + client.run(request).use { resp => + resp.status match { + case Status.Ok => true.pure[F] + case Status.InternalServerError => false.pure[F] + case _ => UnknownError.raiseError } + } } } diff --git a/pact4s/src/main/scala/org/broadinstitute/dsde/workbench/cromwell/consumer/Helper.scala b/pact4s/src/main/scala/org/broadinstitute/dsde/workbench/cromwell/consumer/Helper.scala index 03577a4a690..1d19a587e59 100644 --- a/pact4s/src/main/scala/org/broadinstitute/dsde/workbench/cromwell/consumer/Helper.scala +++ b/pact4s/src/main/scala/org/broadinstitute/dsde/workbench/cromwell/consumer/Helper.scala @@ -123,6 +123,7 @@ object PactHelper { .body(responseBody) def buildInteraction(builder: PactDslWithProvider, + state: String, uponReceiving: String, method: String, path: String, @@ -131,11 +132,12 @@ object PactHelper { status: Int, ): PactDslResponse = builder + .`given`(state) .uponReceiving(uponReceiving) .method(method) .path(path) - .body(requestBody) .headers(scala.jdk.CollectionConverters.MapHasAsJava(requestHeaders.toMap).asJava) + .body(requestBody) .willRespondWith() .status(status) diff --git a/pact4s/src/test/scala/org/broadinstitute/dsde/workbench/cromwell/consumer/CbasClientSpec.scala b/pact4s/src/test/scala/org/broadinstitute/dsde/workbench/cromwell/consumer/CbasClientSpec.scala index 04f6c59e51a..ff552c88130 100644 --- a/pact4s/src/test/scala/org/broadinstitute/dsde/workbench/cromwell/consumer/CbasClientSpec.scala +++ b/pact4s/src/test/scala/org/broadinstitute/dsde/workbench/cromwell/consumer/CbasClientSpec.scala @@ -6,14 +6,14 @@ import au.com.dius.pact.consumer.{ConsumerPactBuilder, PactTestExecutionContext} import au.com.dius.pact.core.model.RequestResponsePact import cats.effect.IO import org.broadinstitute.dsde.workbench.cromwell.consumer.PactHelper._ -import org.http4s.Uri import org.http4s.blaze.client.BlazeClientBuilder import org.http4s.client.Client +import org.http4s.headers.Authorization +import org.http4s.{AuthScheme, Credentials, Uri} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import pact4s.scalatest.RequestResponsePactForger -import java.util.UUID import java.util.concurrent.Executors import scala.concurrent.ExecutionContext @@ -30,8 +30,6 @@ class CbasClientSpec extends AnyFlatSpec with Matchers with RequestResponsePactF val bearerToken = "my-token" val workflowId = "33333333-3333-3333-3333-333333333333" - val workspaceId = "44444444-3333-3333-3333-333333333333" - val state = "Aborted" val outputs = "[]" val failures = List.empty[String] @@ -56,6 +54,7 @@ class CbasClientSpec extends AnyFlatSpec with Matchers with RequestResponsePactF var pactDslResponse: PactDslResponse = buildInteraction( pactProvider, + state = "post workflow results", uponReceiving = "Request to post workflow results", method = "POST", path = "/api/batch/v1/runs/results", @@ -69,9 +68,12 @@ class CbasClientSpec extends AnyFlatSpec with Matchers with RequestResponsePactF BlazeClientBuilder[IO](ExecutionContext.global).resource.allocated.unsafeRunSync()._1 } - it should "post workflow results" in { + it should "successfully post workflow results" in { new CbasClientImpl[IO](client, Uri.unsafeFromString(mockServer.getUrl)) - .postWorkflowResults(bearerToken, UUID.fromString(workflowId), state, outputs, failures) + .postWorkflowResults( + Authorization(Credentials.Token(AuthScheme.Bearer, bearerToken)), + WorkflowCallbackMessage(workflowId, state, Map.empty, failures) + ) .attempt .unsafeRunSync() shouldBe Right(true) }