Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DDCE-4709 Upgrade RAS to Upscan #206

Merged
merged 4 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 0 additions & 87 deletions app/uk/gov/hmrc/rasapi/connectors/FileUploadConnector.scala

This file was deleted.

45 changes: 45 additions & 0 deletions app/uk/gov/hmrc/rasapi/connectors/UpscanConnector.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2023 HM Revenue & Customs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package uk.gov.hmrc.rasapi.connectors

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.StreamConverters
import play.api.Logging
import uk.gov.hmrc.rasapi.config.{AppContext, WSHttp}

import java.io.InputStream
import javax.inject.Inject
import scala.concurrent.{ExecutionContext, Future}

class UpscanConnector @Inject()(val wsHttp: WSHttp,
val appContext: AppContext,
implicit val ec: ExecutionContext
) extends Logging {

def getUpscanFile(downloadUrl: String, reference: String, userId: String): Future[Option[InputStream]] = {
implicit val system: ActorSystem = ActorSystem()
logger.info(s"[UpscanConnector][getUpscanFile] Trying to retrieve file from Upscan: $reference")

wsHttp.buildRequestWithStream(uri = downloadUrl).map { res =>
Some(res.bodyAsSource.runWith(StreamConverters.asInputStream()))
} recover {
case ex: Throwable =>
logger.error(s"[UpscanConnector][getUpscanFile] Exception thrown while retrieving file / converting to InputStream for userId ($userId). ${ex.getMessage}}.", ex)
throw new RuntimeException("Error Streaming file from Upscan service")
}
}
}
36 changes: 20 additions & 16 deletions app/uk/gov/hmrc/rasapi/controllers/FileProcessingController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import play.api.Logging
import play.api.libs.json.JsSuccess
import play.api.mvc._
import uk.gov.hmrc.play.bootstrap.backend.controller.BackendController
import uk.gov.hmrc.rasapi.models.{ApiVersion, CallbackData, V1_0, V2_0}
import uk.gov.hmrc.rasapi.models.{ApiVersion, UpscanCallbackData, V1_0, V2_0}
import uk.gov.hmrc.rasapi.services.{FileProcessingService, RasFilesSessionService}

import javax.inject.Inject
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Success, Try}
import scala.util.{Failure, Success, Try}

class FileProcessingController @Inject()(
val sessionCacheService: RasFilesSessionService,
Expand All @@ -34,8 +34,8 @@ class FileProcessingController @Inject()(
implicit val ec: ExecutionContext
) extends BackendController(cc) with Logging {

val STATUS_AVAILABLE: String = "AVAILABLE"
val STATUS_ERROR: String = "ERROR"
val STATUS_READY: String = "READY"
val STATUS_FAILED: String = "FAILED"

def statusCallback(userId: String, version: String): Action[AnyContent] = Action.async {
implicit request =>
Expand All @@ -46,38 +46,42 @@ class FileProcessingController @Inject()(
}
(optVersion, withValidJson()) match {
case (Some(apiVersion), Some(callbackData)) =>
callbackData.status match {
case STATUS_AVAILABLE =>
callbackData.fileStatus match {
case STATUS_READY =>
logger.info(s"[FileProcessingController][statusCallback] Callback request received with status available: file processing " +
s"started for userId ($userId).")
if (Try(fileProcessingService.processFile(userId, callbackData, apiVersion)).isFailure) {
if (fileProcessingService.processFile(userId, callbackData, apiVersion)) {
sessionCacheService.updateFileSession(userId, callbackData, None, None)
}
case STATUS_ERROR => logger.error(s"[FileProcessingController][statusCallback] There is a problem with the " +
s"file for userId ($userId) ERROR (${callbackData.fileId}), the status is: ${callbackData.status} and the reason is: ${callbackData.reason.get}")
case STATUS_FAILED => logger.error(s"[FileProcessingController][statusCallback] There is a problem with the " +
s"file for userId ($userId) and fileId: (${callbackData.reference}), the failure reason is: ${callbackData.failureReason} and the message is: ${callbackData.message}")
sessionCacheService.updateFileSession(userId, callbackData, None, None)
case _ => logger.warn(s"[FileProcessingController][statusCallback] There is a problem with the file (${callbackData.fileId}) for userId ($userId), the status is:" +
s" ${callbackData.status}")
case _ => logger.warn(s"[FileProcessingController][statusCallback] There is a problem with the file (${callbackData.reference}) for userId ($userId), the status is:" +
s" ${callbackData.fileStatus}")
}
Future(Ok(""))
case (optVer, optData) => handleInvalidRequest(optVer, optData)
}
}

private def handleInvalidRequest(optVersion: Option[ApiVersion], optCallBackData: Option[CallbackData]): Future[Result] = {
private def handleInvalidRequest(optVersion: Option[ApiVersion], optCallBackData: Option[UpscanCallbackData]): Future[Result] = {
(optVersion, optCallBackData) match {
case (None, _) => logger.warn("[FileProcessingController][handleInvalidRequest] Unsupported api version supplied")
case _ => logger.warn("[FileProcessingController][handleInvalidRequest] Invalid Json supplied")
}
Future.successful(BadRequest(""))
}

private def withValidJson()(implicit request: Request[AnyContent]): Option[CallbackData] = {
private def withValidJson()(implicit request: Request[AnyContent]): Option[UpscanCallbackData] = {
request.body.asJson match {
case Some(json) =>
Try(json.validate[CallbackData]) match {
case Success(JsSuccess(payload, _)) => Some(payload)
case _ => logger.warn(s"[FileProcessingController][withValidJson] Json could not be parsed. Json Data: $json"); None
Try(json.validate[UpscanCallbackData]) match {
case Success(JsSuccess(payload, _)) =>
Some(payload)
case Failure(exception)=> {
logger.warn(s"[FileProcessingController][withValidJson] Json could not be parsed. Json Data: $json, exception: ${exception.getMessage}")
}
None
}
case _ => logger.warn("[FileProcessingController][withValidJson] No json provided."); None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class FileSessionController @Inject()(val filesSessionService: RasFilesSessionSe
request.body.asJson match {
case Some(json) => Try(json.validate[CreateFileSessionRequest]) match {
case Success(JsSuccess(payload, _)) =>
filesSessionService.createFileSession(payload.userId, payload.envelopeId).flatMap {
filesSessionService.createFileSession(payload.userId, payload.reference).flatMap {
case true => Future.successful(Created)
case false =>
logger.warn(s"[FileSessionController][createFileSession] Could not create FileSession. Json Data: $json")
Expand Down
37 changes: 30 additions & 7 deletions app/uk/gov/hmrc/rasapi/models/CallbackData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,42 @@ import org.mongodb.scala.bson.ObjectId
import play.api.libs.json.{Format, Json, OFormat}
import uk.gov.hmrc.mongo.play.json.formats.MongoFormats

import java.time.Instant

case class FileMetadata(id: String, name: Option[String], created: Option[String])

object FileMetadata {
implicit val format: OFormat[FileMetadata] = Json.format[FileMetadata]
}

case class CallbackData(envelopeId: String, fileId: String, status: String, reason: Option[String])
case class UploadDetails(uploadTimestamp: Instant, checksum: String, fileMimeType: String, fileName: String, size: Int)

case class FailureDetails(failureReason: String, message: String)
object FailureDetails{
implicit val formats: OFormat[FailureDetails] = Json.format[FailureDetails]
}

object UploadDetails {
implicit val formats: OFormat[UploadDetails] = Json.format[UploadDetails]
}

case class UpscanCallbackData(reference: String, downloadUrl: Option[String], fileStatus: String, uploadDetails: Option[UploadDetails], failureDetails: Option[FailureDetails]) {
val failureReason: String = this.failureDetails.getOrElse(FailureDetails("unknown", "")).failureReason
val message: String = this.failureDetails.getOrElse(FailureDetails("unknown", "")).message

def toFileMetadata: FileMetadata =
FileMetadata(
this.reference,
Some(this.uploadDetails.getOrElse(UploadDetails(Instant.now(), "", "", "", 0)).fileName),
Some(this.uploadDetails.getOrElse(UploadDetails(Instant.now(), "", "", "", 0)).uploadTimestamp.toString)
)
}

object CallbackData {
implicit val formats: OFormat[CallbackData] = Json.format[CallbackData]
object UpscanCallbackData {
implicit val formats: OFormat[UpscanCallbackData] = Json.format[UpscanCallbackData]
}

case class ResultsFileMetaData (id: String, filename: String,uploadDate: Long, chunkSize: Int, length: Long)
case class ResultsFileMetaData(id: String, filename: String, uploadDate: Long, chunkSize: Int, length: Long)

object ResultsFileMetaData {
implicit val formats: OFormat[ResultsFileMetaData] = Json.format[ResultsFileMetaData]
Expand All @@ -42,13 +65,13 @@ case class Chunks(_id: ObjectId, files_id: ObjectId)

object Chunks {
implicit val objectIdformats: Format[ObjectId] = MongoFormats.objectIdFormat
implicit val format: OFormat[Chunks] = Json.format[Chunks]
implicit val format: OFormat[Chunks] = Json.format[Chunks]
}

case class FileSession(userFile: Option[CallbackData],
case class FileSession(userFile: Option[UpscanCallbackData],
resultsFile: Option[ResultsFileMetaData],
userId: String,
uploadTimeStamp : Option[Long],
uploadTimeStamp: Option[Long],
fileMetadata: Option[FileMetadata]
)

Expand Down
4 changes: 2 additions & 2 deletions app/uk/gov/hmrc/rasapi/models/CreateFileSessionRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ package uk.gov.hmrc.rasapi.models

import play.api.libs.json._
import play.api.libs.functional.syntax._
case class CreateFileSessionRequest(userId: String, envelopeId: String)
case class CreateFileSessionRequest(userId: String, reference: String)

object CreateFileSessionRequest {
def nonEmptyString(fieldName: String): Reads[String] = Reads.StringReads.filter(JsonValidationError(s"$fieldName cannot be empty"))(_.nonEmpty)

implicit val reads: Reads[CreateFileSessionRequest] = (
(__ \ "userId").read[String](nonEmptyString("userId")) and
(__ \ "envelopeId").read[String](nonEmptyString("envelopeId"))
(__ \ "reference").read[String](nonEmptyString("reference"))
)(CreateFileSessionRequest.apply _)

implicit val writes: OWrites[CreateFileSessionRequest] = Json.writes[CreateFileSessionRequest]
Expand Down
14 changes: 7 additions & 7 deletions app/uk/gov/hmrc/rasapi/repository/RasFileRepository.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,22 @@ class RasFilesRepository @Inject()(val mongoComponent: MongoComponent,
private val bucketName: String = "resultsFiles"
val gridFSG: GridFSBucket = GridFSBucket(mongoComponent.database, bucketName)

def saveFile(userId:String, envelopeId: String, filePath: Path, fileId: String): Future[ResultsFile] = {
def saveFile(userId: String, reference: String, filePath: Path): Future[ResultsFile] = {
log.info("[RasFilesRepository][saveFile] Starting to save file")

val observableToUploadFrom: Observable[ByteBuffer] = Observable(
Seq(ByteBuffer.wrap(Files.readAllBytes(filePath))
))
))

val options: GridFSUploadOptions = new GridFSUploadOptions()
.metadata(Document(
"envelopeId" -> envelopeId,
"fileId" -> fileId,
"reference" -> reference,
"fileId" -> reference,
"userId" -> userId,
"contentType" -> contentType))

gridFSG.uploadFromObservable(fileId, observableToUploadFrom, options).head().flatMap { res =>
log.warn(s"[RasFilesRepository][saveFile] Saved file $fileId for user $userId")
gridFSG.uploadFromObservable(reference, observableToUploadFrom, options).head().flatMap { res =>
log.warn(s"[RasFilesRepository][saveFile] Saved file $reference for user $userId")
checkAndEnsureTTL(mongoComponent.database, s"$bucketName.files").flatMap { ttlIndexExists =>
if (ttlIndexExists) {
gridFSG.find(Document("_id" -> res)).head()
Expand All @@ -76,7 +76,7 @@ class RasFilesRepository @Inject()(val mongoComponent: MongoComponent,
}
}.recover {
case e: Throwable =>
log.error(s"[RasFilesRepository][saveFile] Error saving the file $fileId for user $userId. Exception: ${e.getMessage}")
log.error(s"[RasFilesRepository][saveFile] Error saving the file $reference for user $userId. Exception: ${e.getMessage}")
throw new RuntimeException(s"Failed to save file due to error: ${e.getMessage}")
}
}
Expand Down
Loading