Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NU-1772] ScenarioActivity-based activity log #7039

Merged
merged 13 commits into from
Oct 24, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ class ScenarioActivityApiHttpService(

private def editComment(request: EditCommentRequest, scenarioId: ProcessId)(
implicit loggedUser: LoggedUser
): EitherT[Future, ScenarioActivityError, Unit] =
): EitherT[Future, ScenarioActivityError, ScenarioActivityId] =
EitherT(
dbioActionRunner.run(
scenarioActivityRepository.editComment(
Expand All @@ -519,14 +519,14 @@ class ScenarioActivityApiHttpService(

private def deleteComment(request: DeprecatedDeleteCommentRequest, scenarioId: ProcessId)(
implicit loggedUser: LoggedUser
): EitherT[Future, ScenarioActivityError, Unit] =
): EitherT[Future, ScenarioActivityError, ScenarioActivityId] =
EitherT(
dbioActionRunner.run(scenarioActivityRepository.deleteComment(scenarioId, request.commentId))
).leftMap(_ => NoComment(request.commentId))

private def deleteComment(request: DeleteCommentRequest, scenarioId: ProcessId)(
implicit loggedUser: LoggedUser
): EitherT[Future, ScenarioActivityError, Unit] =
): EitherT[Future, ScenarioActivityError, ScenarioActivityId] =
EitherT(
dbioActionRunner.run(
scenarioActivityRepository.deleteComment(scenarioId, ScenarioActivityId(request.scenarioActivityId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ import cats.syntax.functor._
import com.typesafe.scalalogging.LazyLogging
import db.util.DBIOActionInstances.DB
import io.circe.generic.JsonCodec
import pl.touk.nussknacker.engine.api.Comment
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.component.ProcessingMode
import pl.touk.nussknacker.engine.api.deployment.{DataFreshnessPolicy, ProcessAction, ScenarioActionName}
import pl.touk.nussknacker.engine.api.graph.ScenarioGraph
import pl.touk.nussknacker.engine.api.process._
import pl.touk.nussknacker.engine.api.{Comment, ProcessVersion}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.EngineSetupName
import pl.touk.nussknacker.engine.marshall.ProcessMarshaller
Expand Down Expand Up @@ -351,7 +350,13 @@ class DBProcessService(
DBIOAction.seq(
processRepository.archive(processId = process.idWithNameUnsafe, isArchived = false),
scenarioActionRepository
.markProcessAsUnArchived(processId = process.processIdUnsafe, process.processVersionId)
.addInstantAction(
process.processIdUnsafe,
process.processVersionId,
ScenarioActionName.UnArchive,
None,
None
)
)
)
} else {
Expand Down Expand Up @@ -528,7 +533,13 @@ class DBProcessService(
.runInTransaction(
DBIOAction.seq(
processRepository.archive(processId = process.idWithNameUnsafe, isArchived = true),
scenarioActionRepository.markProcessAsArchived(processId = process.processIdUnsafe, process.processVersionId)
scenarioActionRepository.addInstantAction(
process.processIdUnsafe,
process.processVersionId,
ScenarioActionName.Archive,
None,
None
)
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
package pl.touk.nussknacker.ui.process

import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.process.{ProcessId, VersionId}
import pl.touk.nussknacker.ui.process.ScenarioAttachmentService.AttachmentToAdd
import pl.touk.nussknacker.ui.security.api.LoggedUser

object ScenarioActivityAuditLog extends LazyLogging {

def onCreateScenarioActivity(
scenarioActivity: ScenarioActivity
): Unit =
logger.info(
withPrefix(scenarioActivity.scenarioId, scenarioActivity.scenarioVersionId, scenarioActivity.user.name.value)(
s"New activity: ${printScenarioActivity(scenarioActivity)}"
)
)

private def printScenarioActivity(scenarioActivity: ScenarioActivity) = scenarioActivity match {
case ScenarioActivity.ScenarioDeployed(_, _, _, _, _, comment, result) =>
s"ScenarioDeployed(comment=${printComment(comment)},result=${printResult(result)})"
case ScenarioActivity.ScenarioPaused(_, _, _, _, _, comment, result) =>
s"ScenarioPaused(comment=${printComment(comment)},result=${printResult(result)})"
case ScenarioActivity.ScenarioCanceled(_, _, _, _, _, comment, result) =>
s"ScenarioCanceled(comment=${printComment(comment)},result=${printResult(result)})"
case ScenarioActivity.CustomAction(_, _, _, _, _, actionName, comment, result) =>
s"CustomAction(action=$actionName,comment=${printComment(comment)},result=${printResult(result)})"
case ScenarioActivity.PerformedSingleExecution(_, _, _, _, _, comment, result) =>
s"PerformedSingleExecution(comment=${printComment(comment)},result=${printResult(result)})"
case ScenarioActivity.PerformedScheduledExecution(_, _, _, _, _, status, _, scheduleName, _, _, _) =>
s"PerformedScheduledExecution(scheduleName=$scheduleName,scheduledExecutionStatus=${status.entryName})"
case ScenarioActivity.ScenarioCreated(_, _, _, _, _) =>
"ScenarioCreated"
case ScenarioActivity.ScenarioArchived(_, _, _, _, _) =>
"ScenarioArchived"
case ScenarioActivity.ScenarioUnarchived(_, _, _, _, _) =>
"ScenarioUnarchived"
case ScenarioActivity.ScenarioModified(_, _, _, _, _, _, comment) =>
s"ScenarioModified(comment=${printComment(comment)})"
case ScenarioActivity.ScenarioNameChanged(_, _, _, _, _, oldName, newName) =>
s"ScenarioNameChanged(oldName=$oldName,newName=$newName)"
case ScenarioActivity.CommentAdded(_, _, _, _, _, comment) =>
s"CommentAdded(comment=${printComment(comment)})"
case ScenarioActivity.AttachmentAdded(_, _, _, _, _, attachment) =>
s"AttachmentAdded(fileName=${printAttachment(attachment)})"
case ScenarioActivity.ChangedProcessingMode(_, _, _, _, _, from, to) =>
s"ChangedProcessingMode(from=$from,to=$to)"
case ScenarioActivity.IncomingMigration(_, _, _, _, _, sourceEnvironment, sourceUser, sourceVersionId, _) =>
s"IncomingMigration(sourceEnvironment=${sourceEnvironment.name},sourceUser=${sourceUser.value},sourceVersionId=${sourceVersionId
.map(_.value.toString)
.getOrElse("[none]")})"
case ScenarioActivity.OutgoingMigration(_, _, _, _, _, destinationEnvironment) =>
s"OutgoingMigration(destinationEnvironment=${destinationEnvironment.name})"
case ScenarioActivity.AutomaticUpdate(_, _, _, _, _, changes) =>
s"AutomaticUpdate(changes=$changes)"
}

private def printAttachment(attachment: ScenarioAttachment) = attachment match {
case ScenarioAttachment.Available(_, attachmentFilename, _, _) => s"Available(${attachmentFilename.value})"
case ScenarioAttachment.Deleted(attachmentFilename, _, _) => s"Deleted(${attachmentFilename.value})"
}

private def printComment(comment: ScenarioComment) = comment match {
case ScenarioComment.Available(comment, _, _) => comment
case ScenarioComment.Deleted(_, _) => "[none]"
}

private def printResult(result: DeploymentResult) = result match {
case DeploymentResult.Success(_) => "Success"
case DeploymentResult.Failure(_, errorMessage) => s"Failure($errorMessage)"
}

def onAddComment(
processId: ProcessId,
versionId: Option[VersionId],
user: LoggedUser,
scenarioActivityId: ScenarioActivityId,
comment: String,
): Unit =
logger.info(
withPrefix(ScenarioId(processId.value), versionId.map(ScenarioVersionId.from), user.username)(
s"[commentId=${scenarioActivityId.value.toString}] Comment added: [$comment]"
)
)

def onEditComment(
processId: ProcessId,
user: LoggedUser,
scenarioActivityId: ScenarioActivityId,
comment: String
): Unit =
logger.info(
withPrefix(ScenarioId(processId.value), None, user.username)(
s"[commentId=${scenarioActivityId.value.toString}] Comment edited, new value: [$comment]"
)
)

def onDeleteComment(
processId: ProcessId,
rowId: Long,
user: LoggedUser,
): Unit =
logger.info(
withPrefix(ScenarioId(processId.value), None, user.username)(
s"Comment with rowId=$rowId deleted"
)
)

def onDeleteComment(
processId: ProcessId,
activityId: ScenarioActivityId,
user: LoggedUser,
): Unit =
logger.info(
withPrefix(ScenarioId(processId.value), None, user.username)(
s"Comment for activityId=${activityId.value} deleted"
)
)

def onAddAttachment(
attachmentToAdd: AttachmentToAdd,
user: LoggedUser,
): Unit =
logger.info(
withPrefix(
ScenarioId(attachmentToAdd.scenarioId.value),
Some(ScenarioVersionId.from(attachmentToAdd.scenarioVersionId)),
user.username
)(
s"Attachment added: [${attachmentToAdd.fileName}]"
)
)

def onScenarioImmediateAction(
processActionId: ProcessActionId,
processId: ProcessId,
actionName: ScenarioActionName,
processVersion: Option[VersionId],
user: LoggedUser
): Unit =
logger.info(
withPrefix(ScenarioId(processId.value), processVersion.map(ScenarioVersionId.from), user.username)(
s"Immediate scenario action [actionName=${actionName.value},actionId=${processActionId.value}]"
)
)

def onScenarioActionStarted(
processActionId: ProcessActionId,
processId: ProcessId,
actionName: ScenarioActionName,
processVersion: Option[VersionId],
user: LoggedUser
): Unit =
logger.info(
withPrefix(ScenarioId(processId.value), processVersion.map(ScenarioVersionId.from), user.username)(
s"Scenario action [actionName=${actionName.value},actionId=${processActionId.value}] started"
)
)

def onScenarioActionFinishedWithSuccess(
processActionId: ProcessActionId,
processId: ProcessId,
actionName: ScenarioActionName,
processVersion: Option[VersionId],
comment: Option[String],
user: LoggedUser
): Unit = {
val commentValue = comment match {
case Some(content) => s"comment [$content]"
case None => "without comment"
}
logger.info(
withPrefix(ScenarioId(processId.value), processVersion.map(ScenarioVersionId.from), user.username)(
s"Scenario action [actionName=${actionName.value},actionId=${processActionId.value}] finished with success and $commentValue "
)
)
}

def onScenarioActionFinishedWithFailure(
processActionId: ProcessActionId,
processId: ProcessId,
actionName: ScenarioActionName,
processVersion: Option[VersionId],
comment: Option[String],
failureMessage: String,
user: LoggedUser
): Unit = {
val commentValue = comment match {
case Some(content) => s"with comment [$content]"
case None => "without comment"
}
logger.info(
withPrefix(ScenarioId(processId.value), processVersion.map(ScenarioVersionId.from), user.username)(
s"Scenario action [actionName=${actionName.value},actionId=${processActionId.value}] finished with failure [$failureMessage] $commentValue"
)
)
}

def onScenarioActionRemoved(
processActionId: ProcessActionId,
processId: ProcessId,
processVersion: Option[VersionId],
user: LoggedUser
): Unit = {
logger.info(
withPrefix(ScenarioId(processId.value), processVersion.map(ScenarioVersionId.from), user.username)(
s"Scenario action [actionId=${processActionId.value}] removed"
)
)
}

private def withPrefix(scenarioId: ScenarioId, scenarioVersionId: Option[ScenarioVersionId], username: String)(
log: String
) = {
val scenarioIdValue = scenarioId.value
val scenarioVersionIdValue = scenarioVersionId.map(_.value.toString).getOrElse("none")
s"[SCENARIO_AUDIT][scenarioId=$scenarioIdValue][version=$scenarioVersionIdValue][user=$username] $log"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object DefaultProcessingTypeDeployedScenariosProvider {
val dumbModelInfoProvier = ProcessingTypeDataProvider.withEmptyCombinedData(
Map(processingType -> ValueWithRestriction.anyUser(Map.empty[String, String]))
)
val actionRepository = new DbScenarioActionRepository(dbRef, dumbModelInfoProvier)
val actionRepository = DbScenarioActionRepository.create(dbRef, dumbModelInfoProvier)
val scenarioLabelsRepository = new ScenarioLabelsRepository(dbRef)
val processRepository = DBFetchingProcessRepository.create(dbRef, actionRepository, scenarioLabelsRepository)
val futureProcessRepository =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import cats.implicits.{toFoldableOps, toTraverseOps}
import cats.syntax.functor._
import com.typesafe.scalalogging.LazyLogging
import db.util.DBIOActionInstances._
import pl.touk.nussknacker.engine.api.Comment
import pl.touk.nussknacker.engine.api.component.NodesDeploymentData
import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName.{Cancel, Deploy}
import pl.touk.nussknacker.engine.api.deployment._
Expand All @@ -19,7 +20,6 @@ import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioWithDetails
import pl.touk.nussknacker.ui.api.{DeploymentCommentSettings, ListenerApiUser}
import pl.touk.nussknacker.ui.listener.ProcessChangeEvent.{OnActionExecutionFinished, OnActionFailed, OnActionSuccess}
import pl.touk.nussknacker.ui.listener.{ProcessChangeListener, User => ListenerUser}
import pl.touk.nussknacker.engine.api.Comment
import pl.touk.nussknacker.ui.process.ProcessStateProvider
import pl.touk.nussknacker.ui.process.ScenarioWithDetailsConversions._
import pl.touk.nussknacker.ui.process.deployment.LoggedUserConversions.LoggedUserOps
Expand Down Expand Up @@ -48,7 +48,7 @@ import scala.util.{Failure, Success}
class DeploymentService(
dispatcher: DeploymentManagerDispatcher,
processRepository: FetchingProcessRepository[DB],
actionRepository: DbScenarioActionRepository,
actionRepository: ScenarioActionRepository,
dbioRunner: DBIOActionRunner,
processValidator: ProcessingTypeDataProvider[UIProcessValidator, _],
scenarioResolver: ProcessingTypeDataProvider[ScenarioResolver, _],
Expand Down Expand Up @@ -141,7 +141,7 @@ class DeploymentService(
actionResult <- validateBeforeDeploy(ctx.latestScenarioDetails, deployedScenarioData, updateStrategy)
.transformWith {
case Failure(ex) =>
removeInvalidAction(ctx.actionId).transform(_ => Failure(ex))
removeInvalidAction(ctx).transform(_ => Failure(ex))
case Success(_) =>
// we notify of deployment finish/fail only if initial validation succeeded
val deploymentFuture = runActionAndHandleResults(
Expand Down Expand Up @@ -170,10 +170,9 @@ class DeploymentService(
getBuildInfoProcessingType: ScenarioWithDetailsEntity[PS] => Option[ProcessingType]
)(implicit user: LoggedUser): Future[CommandContext[PS]] = {
implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.Fresh
dbioRunner.runInTransaction(
// 1.1 lock for critical section
transactionallyRunCriticalSection(
for {
// 1.1 lock for critical section
_ <- actionRepository.lockActionsTable
// 1.2. fetch scenario data
processDetailsOpt <- processRepository.fetchLatestProcessDetailsForProcessId[PS](processId.id)
processDetails <- existsOrFail(processDetailsOpt, ProcessNotFoundError(processId.name))
Expand Down Expand Up @@ -207,6 +206,10 @@ class DeploymentService(
)
}

private def transactionallyRunCriticalSection[T](dbioAction: DB[T]) = {
dbioRunner.runInTransaction(actionRepository.executeCriticalSection(dbioAction))
}

// TODO: Use buildInfo explicitly instead of ProcessingType-that-is-used-to-calculate-buildInfo
private case class CommandContext[PS: ScenarioShapeFetchStrategy](
latestScenarioDetails: ScenarioWithDetailsEntity[PS],
Expand Down Expand Up @@ -387,14 +390,22 @@ class DeploymentService(
// Before we can do that we should check if we somewhere rely on fact that version is always defined -
// see ProcessAction.processVersionId
logger.info(s"Action $actionString finished for action without version id - skipping listener notification")
removeInvalidAction(ctx.actionId)
removeInvalidAction(ctx)
}
.map(_ => result)
}
}

private def removeInvalidAction(actionId: ProcessActionId): Future[Unit] = {
dbioRunner.runInTransaction(actionRepository.removeAction(actionId))
private def removeInvalidAction[PS: ScenarioShapeFetchStrategy](
context: CommandContext[PS]
)(implicit user: LoggedUser): Future[Unit] = {
dbioRunner.runInTransaction(
actionRepository.removeAction(
context.actionId,
context.latestScenarioDetails.processId,
context.versionOnWhichActionIsDone
)
)
}

// TODO: check deployment id to be sure that returned status is for given deployment
Expand Down
Loading
Loading