Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NU-1772] ScenarioActivity-based activity log #7039

Merged
merged 13 commits into from
Oct 24, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ class ScenarioActivityApiHttpService(

private def editComment(request: EditCommentRequest, scenarioId: ProcessId)(
implicit loggedUser: LoggedUser
): EitherT[Future, ScenarioActivityError, Unit] =
): EitherT[Future, ScenarioActivityError, ScenarioActivityId] =
EitherT(
dbioActionRunner.run(
scenarioActivityRepository.editComment(
Expand All @@ -542,14 +542,14 @@ class ScenarioActivityApiHttpService(

private def deleteComment(request: DeprecatedDeleteCommentRequest, scenarioId: ProcessId)(
implicit loggedUser: LoggedUser
): EitherT[Future, ScenarioActivityError, Unit] =
): EitherT[Future, ScenarioActivityError, ScenarioActivityId] =
EitherT(
dbioActionRunner.run(scenarioActivityRepository.deleteComment(scenarioId, request.commentId))
).leftMap(_ => NoComment(request.commentId))

private def deleteComment(request: DeleteCommentRequest, scenarioId: ProcessId)(
implicit loggedUser: LoggedUser
): EitherT[Future, ScenarioActivityError, Unit] =
): EitherT[Future, ScenarioActivityError, ScenarioActivityId] =
EitherT(
dbioActionRunner.run(
scenarioActivityRepository.deleteComment(scenarioId, ScenarioActivityId(request.scenarioActivityId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,13 @@ class DBProcessService(
DBIOAction.seq(
processRepository.archive(processId = process.idWithNameUnsafe, isArchived = false),
scenarioActionRepository
.markProcessAsUnArchived(processId = process.processIdUnsafe, process.processVersionId)
.addInstantAction(
process.processIdUnsafe,
process.processVersionId,
ScenarioActionName.UnArchive,
None,
None
)
)
)
} else {
Expand Down Expand Up @@ -527,7 +533,13 @@ class DBProcessService(
.runInTransaction(
DBIOAction.seq(
processRepository.archive(processId = process.idWithNameUnsafe, isArchived = true),
scenarioActionRepository.markProcessAsArchived(processId = process.processIdUnsafe, process.processVersionId)
scenarioActionRepository.addInstantAction(
process.processIdUnsafe,
process.processVersionId,
ScenarioActionName.Archive,
None,
None
)
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package pl.touk.nussknacker.ui.process

import cats.effect.IO
import com.typesafe.scalalogging.Logger
import org.slf4j.{LoggerFactory, MDC}
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.process.{ProcessId, VersionId}
import pl.touk.nussknacker.ui.process.ScenarioAttachmentService.AttachmentToAdd
import pl.touk.nussknacker.ui.security.api.LoggedUser

object ScenarioActivityAuditLog {

private val logger = Logger(LoggerFactory.getLogger(s"scenario-activity-audit"))

def onCreateScenarioActivity(
scenarioActivity: ScenarioActivity
): IO[Unit] =
logWithContext(scenarioActivity.scenarioId, scenarioActivity.scenarioVersionId, scenarioActivity.user.name.value)(
s"New activity: ${stringify(scenarioActivity)}"
)

def onEditComment(
processId: ProcessId,
user: LoggedUser,
scenarioActivityId: ScenarioActivityId,
comment: String
): IO[Unit] = {
logWithContext(ScenarioId(processId.value), None, user.username)(
s"[commentId=${scenarioActivityId.value.toString}] Comment edited, new value: [$comment]"
)
}

def onDeleteComment(
processId: ProcessId,
rowId: Long,
user: LoggedUser,
): IO[Unit] =
logWithContext(ScenarioId(processId.value), None, user.username)(
s"Comment with rowId=$rowId deleted"
)

def onDeleteComment(
processId: ProcessId,
activityId: ScenarioActivityId,
user: LoggedUser,
): IO[Unit] =
logWithContext(ScenarioId(processId.value), None, user.username)(
s"Comment for activityId=${activityId.value} deleted"
)

def onAddAttachment(
attachmentToAdd: AttachmentToAdd,
user: LoggedUser,
): IO[Unit] =
logWithContext(
ScenarioId(attachmentToAdd.scenarioId.value),
Some(ScenarioVersionId.from(attachmentToAdd.scenarioVersionId)),
user.username
)(s"Attachment added: [${attachmentToAdd.fileName}]")

def onDeleteAttachment(
scenarioId: ProcessId,
attachmentId: Long,
user: LoggedUser,
): IO[Unit] =
logWithContext(
ScenarioId(scenarioId.value),
None,
user.username
)(s"Attachment deleted: [attachmentId=$attachmentId]")

def onScenarioImmediateAction(
processActionId: ProcessActionId,
processId: ProcessId,
actionName: ScenarioActionName,
processVersion: Option[VersionId],
user: LoggedUser
): IO[Unit] =
logWithContext(ScenarioId(processId.value), processVersion.map(ScenarioVersionId.from), user.username)(
s"Immediate scenario action [actionName=${actionName.value},actionId=${processActionId.value}]"
)

def onScenarioActionStarted(
processActionId: ProcessActionId,
processId: ProcessId,
actionName: ScenarioActionName,
processVersion: Option[VersionId],
user: LoggedUser
): IO[Unit] =
logWithContext(ScenarioId(processId.value), processVersion.map(ScenarioVersionId.from), user.username)(
s"Scenario action [actionName=${actionName.value},actionId=${processActionId.value}] started"
)

def onScenarioActionFinishedWithSuccess(
processActionId: ProcessActionId,
processId: ProcessId,
actionName: ScenarioActionName,
processVersion: Option[VersionId],
comment: Option[String],
user: LoggedUser
): IO[Unit] = {
val commentValue = comment match {
case Some(content) => s"comment [$content]"
case None => "without comment"
}
logWithContext(ScenarioId(processId.value), processVersion.map(ScenarioVersionId.from), user.username)(
s"Scenario action [actionName=${actionName.value},actionId=${processActionId.value}] finished with success and $commentValue "
)
}

def onScenarioActionFinishedWithFailure(
processActionId: ProcessActionId,
processId: ProcessId,
actionName: ScenarioActionName,
processVersion: Option[VersionId],
comment: Option[String],
failureMessage: String,
user: LoggedUser
): IO[Unit] = {
val commentValue = comment match {
case Some(content) => s"with comment [$content]"
case None => "without comment"
}
logWithContext(ScenarioId(processId.value), processVersion.map(ScenarioVersionId.from), user.username)(
s"Scenario action [actionName=${actionName.value},actionId=${processActionId.value}] finished with failure [$failureMessage] $commentValue"
)
}

def onScenarioActionRemoved(
processActionId: ProcessActionId,
processId: ProcessId,
processVersion: Option[VersionId],
user: LoggedUser
): IO[Unit] = {
logWithContext(ScenarioId(processId.value), processVersion.map(ScenarioVersionId.from), user.username)(
s"Scenario action [actionId=${processActionId.value}] removed"
)
}

private def stringify(scenarioActivity: ScenarioActivity): String = scenarioActivity match {
case ScenarioActivity.ScenarioDeployed(_, _, _, _, _, comment, result) =>
s"ScenarioDeployed(comment=${stringify(comment)},result=${stringify(result)})"
case ScenarioActivity.ScenarioPaused(_, _, _, _, _, comment, result) =>
s"ScenarioPaused(comment=${stringify(comment)},result=${stringify(result)})"
case ScenarioActivity.ScenarioCanceled(_, _, _, _, _, comment, result) =>
s"ScenarioCanceled(comment=${stringify(comment)},result=${stringify(result)})"
case ScenarioActivity.CustomAction(_, _, _, _, _, actionName, comment, result) =>
s"CustomAction(action=$actionName,comment=${stringify(comment)},result=${stringify(result)})"
case ScenarioActivity.PerformedSingleExecution(_, _, _, _, _, comment, result) =>
s"PerformedSingleExecution(comment=${stringify(comment)},result=${stringify(result)})"
case ScenarioActivity.PerformedScheduledExecution(_, _, _, _, _, status, _, scheduleName, _, _, _) =>
s"PerformedScheduledExecution(scheduleName=$scheduleName,scheduledExecutionStatus=${status.entryName})"
case ScenarioActivity.ScenarioCreated(_, _, _, _, _) =>
"ScenarioCreated"
case ScenarioActivity.ScenarioArchived(_, _, _, _, _) =>
"ScenarioArchived"
case ScenarioActivity.ScenarioUnarchived(_, _, _, _, _) =>
"ScenarioUnarchived"
case ScenarioActivity.ScenarioModified(_, _, _, _, _, _, comment) =>
s"ScenarioModified(comment=${stringify(comment)})"
case ScenarioActivity.ScenarioNameChanged(_, _, _, _, _, oldName, newName) =>
s"ScenarioNameChanged(oldName=$oldName,newName=$newName)"
case ScenarioActivity.CommentAdded(_, _, _, _, _, comment) =>
s"CommentAdded(comment=${stringify(comment)})"
case ScenarioActivity.AttachmentAdded(_, _, _, _, _, attachment) =>
s"AttachmentAdded(fileName=${stringify(attachment)})"
case ScenarioActivity.ChangedProcessingMode(_, _, _, _, _, from, to) =>
s"ChangedProcessingMode(from=$from,to=$to)"
case ScenarioActivity.IncomingMigration(_, _, _, _, _, sourceEnvironment, sourceUser, sourceVersionId, _) =>
s"IncomingMigration(sourceEnvironment=${sourceEnvironment.name},sourceUser=${sourceUser.value},sourceVersionId=${sourceVersionId
.map(_.value.toString)
.getOrElse("[none]")})"
case ScenarioActivity.OutgoingMigration(_, _, _, _, _, destinationEnvironment) =>
s"OutgoingMigration(destinationEnvironment=${destinationEnvironment.name})"
case ScenarioActivity.AutomaticUpdate(_, _, _, _, _, changes) =>
s"AutomaticUpdate(changes=$changes)"
}

private def stringify(attachment: ScenarioAttachment): String = attachment match {
case ScenarioAttachment.Available(_, attachmentFilename, _, _) => s"Available(${attachmentFilename.value})"
case ScenarioAttachment.Deleted(attachmentFilename, _, _) => s"Deleted(${attachmentFilename.value})"
}

private def stringify(comment: ScenarioComment): String = comment match {
case ScenarioComment.WithContent(comment, _, _) => comment
case ScenarioComment.WithoutContent(_, _) => "none"
}

private def stringify(result: DeploymentResult): String = result match {
case DeploymentResult.Success(_) => "Success"
case DeploymentResult.Failure(_, errorMessage) => s"Failure($errorMessage)"
}

private def logWithContext(
scenarioId: ScenarioId,
scenarioVersionId: Option[ScenarioVersionId],
username: String,
)(log: String): IO[Unit] = IO.delay {
MDC.clear()
MDC.put("scenarioId", scenarioId.value.toString)
MDC.put("scenarioVersionId", scenarioVersionId.map(_.value.toString).getOrElse("none"))
MDC.put("username", username)
logger.info(log)
MDC.clear()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object DefaultProcessingTypeDeployedScenariosProvider {
val dumbModelInfoProvier = ProcessingTypeDataProvider.withEmptyCombinedData(
Map(processingType -> ValueWithRestriction.anyUser(Map.empty[String, String]))
)
val actionRepository = new DbScenarioActionRepository(dbRef, dumbModelInfoProvier)
val actionRepository = DbScenarioActionRepository.create(dbRef, dumbModelInfoProvier)
val scenarioLabelsRepository = new ScenarioLabelsRepository(dbRef)
val processRepository = DBFetchingProcessRepository.create(dbRef, actionRepository, scenarioLabelsRepository)
val futureProcessRepository =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import cats.implicits.{toFoldableOps, toTraverseOps}
import cats.syntax.functor._
import com.typesafe.scalalogging.LazyLogging
import db.util.DBIOActionInstances._
import pl.touk.nussknacker.engine.api.Comment
import pl.touk.nussknacker.engine.api.component.NodesDeploymentData
import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName.{Cancel, Deploy}
import pl.touk.nussknacker.engine.api.deployment._
Expand All @@ -19,7 +20,6 @@ import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioWithDetails
import pl.touk.nussknacker.ui.api.{DeploymentCommentSettings, ListenerApiUser}
import pl.touk.nussknacker.ui.listener.ProcessChangeEvent.{OnActionExecutionFinished, OnActionFailed, OnActionSuccess}
import pl.touk.nussknacker.ui.listener.{ProcessChangeListener, User => ListenerUser}
import pl.touk.nussknacker.engine.api.Comment
import pl.touk.nussknacker.ui.process.ProcessStateProvider
import pl.touk.nussknacker.ui.process.ScenarioWithDetailsConversions._
import pl.touk.nussknacker.ui.process.deployment.LoggedUserConversions.LoggedUserOps
Expand Down Expand Up @@ -48,7 +48,7 @@ import scala.util.{Failure, Success}
class DeploymentService(
dispatcher: DeploymentManagerDispatcher,
processRepository: FetchingProcessRepository[DB],
actionRepository: DbScenarioActionRepository,
actionRepository: ScenarioActionRepository,
dbioRunner: DBIOActionRunner,
processValidator: ProcessingTypeDataProvider[UIProcessValidator, _],
scenarioResolver: ProcessingTypeDataProvider[ScenarioResolver, _],
Expand Down Expand Up @@ -141,7 +141,7 @@ class DeploymentService(
actionResult <- validateBeforeDeploy(ctx.latestScenarioDetails, deployedScenarioData, updateStrategy)
.transformWith {
case Failure(ex) =>
removeInvalidAction(ctx.actionId).transform(_ => Failure(ex))
removeInvalidAction(ctx).transform(_ => Failure(ex))
case Success(_) =>
// we notify of deployment finish/fail only if initial validation succeeded
val deploymentFuture = runActionAndHandleResults(
Expand Down Expand Up @@ -170,10 +170,9 @@ class DeploymentService(
getBuildInfoProcessingType: ScenarioWithDetailsEntity[PS] => Option[ProcessingType]
)(implicit user: LoggedUser): Future[CommandContext[PS]] = {
implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.Fresh
dbioRunner.runInTransaction(
// 1.1 lock for critical section
transactionallyRunCriticalSection(
for {
// 1.1 lock for critical section
_ <- actionRepository.lockActionsTable
// 1.2. fetch scenario data
processDetailsOpt <- processRepository.fetchLatestProcessDetailsForProcessId[PS](processId.id)
processDetails <- existsOrFail(processDetailsOpt, ProcessNotFoundError(processId.name))
Expand Down Expand Up @@ -207,6 +206,10 @@ class DeploymentService(
)
}

private def transactionallyRunCriticalSection[T](dbioAction: DB[T]) = {
dbioRunner.runInTransaction(actionRepository.withLockedTable(dbioAction))
}

// TODO: Use buildInfo explicitly instead of ProcessingType-that-is-used-to-calculate-buildInfo
private case class CommandContext[PS: ScenarioShapeFetchStrategy](
latestScenarioDetails: ScenarioWithDetailsEntity[PS],
Expand Down Expand Up @@ -387,14 +390,22 @@ class DeploymentService(
// Before we can do that we should check if we somewhere rely on fact that version is always defined -
// see ProcessAction.processVersionId
logger.info(s"Action $actionString finished for action without version id - skipping listener notification")
removeInvalidAction(ctx.actionId)
removeInvalidAction(ctx)
}
.map(_ => result)
}
}

private def removeInvalidAction(actionId: ProcessActionId): Future[Unit] = {
dbioRunner.runInTransaction(actionRepository.removeAction(actionId))
private def removeInvalidAction[PS: ScenarioShapeFetchStrategy](
context: CommandContext[PS]
)(implicit user: LoggedUser): Future[Unit] = {
dbioRunner.runInTransaction(
actionRepository.removeAction(
context.actionId,
context.latestScenarioDetails.processId,
context.versionOnWhichActionIsDone
)
)
}

// TODO: check deployment id to be sure that returned status is for given deployment
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package pl.touk.nussknacker.ui.process.repository

import db.util.DBIOActionInstances.DB
import slick.lifted.{AbstractTable, TableQuery => LTableQuery}

import scala.concurrent.ExecutionContext

trait LockableTable {

def withLockedTable[T](
dbioAction: DB[T]
): DB[T]

}

trait DbLockableTable { this: DbioRepository =>

import profile.api._

type ENTITY <: AbstractTable[_]

protected implicit def executionContext: ExecutionContext

protected def table: LTableQuery[ENTITY]

def withLockedTable[T](
dbioAction: DB[T]
): DB[T] = for {
_ <- lockTable
result <- dbioAction
} yield result

private def lockTable: DB[Unit] = {
run(table.filter(_ => false).forUpdate.result.map(_ => ()))
}

}
Loading
Loading