Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mproch committed Jul 20, 2022
1 parent 36c996b commit 8ee6a00
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 23 deletions.
4 changes: 4 additions & 0 deletions ui/server/src/main/resources/defaultUiConfig.conf
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,7 @@ testDataSettings: {
testDataMaxBytes: 200000
resultsMaxBytes: 50000000
}

notifications {
duration: 10 minutes
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import fr.davit.akka.http.metrics.core.{HttpMetricsRegistry, HttpMetricsSettings
import fr.davit.akka.http.metrics.dropwizard.{DropwizardRegistry, DropwizardSettings}
import io.dropwizard.metrics5.MetricRegistry
import io.dropwizard.metrics5.jmx.JmxReporter
import net.ceedubs.ficus.readers.ArbitraryTypeReader.arbitraryTypeValueReader
import pl.touk.nussknacker.engine.ProcessingTypeData
import pl.touk.nussknacker.engine.api.component.AdditionalPropertyConfig
import pl.touk.nussknacker.engine.dict.ProcessDictSubstitutor
Expand All @@ -29,7 +30,7 @@ import pl.touk.nussknacker.ui.initialization.Initialization
import pl.touk.nussknacker.ui.listener.ProcessChangeListenerLoader
import pl.touk.nussknacker.ui.listener.services.NussknackerServices
import pl.touk.nussknacker.ui.metrics.RepositoryGauges
import pl.touk.nussknacker.ui.notifications.{NotificationService, NotificationsListener}
import pl.touk.nussknacker.ui.notifications.{ManagementActorCurrentDeployments, NotificationConfig, NotificationService, NotificationsListener}
import pl.touk.nussknacker.ui.process._
import pl.touk.nussknacker.ui.process.deployment.{DeploymentService, ManagementActor, ScenarioResolver}
import pl.touk.nussknacker.ui.process.migrate.{HttpRemoteEnvironment, TestModelMigrations}
Expand Down Expand Up @@ -122,7 +123,7 @@ trait NusskanckerDefaultAppRouter extends NusskanckerAppRouter {
val processRepository = DBFetchingProcessRepository.create(dbConfig)
val writeProcessRepository = ProcessRepository.create(dbConfig, modelData)

val notificationListener = new NotificationsListener
val notificationListener = new NotificationsListener(config.as[NotificationConfig]("notifications"))
val processChangeListener = ProcessChangeListenerLoader
.loadListeners(getClass.getClassLoader, config, NussknackerServices(new PullProcessRepository(processRepository)), notificationListener)

Expand Down Expand Up @@ -154,7 +155,7 @@ trait NusskanckerDefaultAppRouter extends NusskanckerAppRouter {

val componentService = DefaultComponentService(config, typeToConfig, processService, processCategoryService)

val notificationService = new NotificationService(managementActor, notificationListener)
val notificationService = new NotificationService(new ManagementActorCurrentDeployments(managementActor), notificationListener)

initMetrics(metricsRegistry, processRepository)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ object NotificationAction extends Enumeration {
implicit val typeDecoder: Decoder[NotificationAction.Value] = Decoder.decodeEnumeration(NotificationAction)

type NotificationAction = Value
val deploymentFailed, deploymentFinished = Value
val deploymentFailed, deploymentFinished, deploymentInProgress = Value
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,47 @@ import pl.touk.nussknacker.ui.notifications.NotificationAction._
import pl.touk.nussknacker.ui.process.deployment.{DeployInfo, DeploymentActionType, DeploymentStatus, DeploymentStatusResponse}
import pl.touk.nussknacker.ui.security.api.LoggedUser

import java.time.Instant
import java.time.{Clock, Instant}
import java.time.temporal.ChronoUnit
import java.util.UUID
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}

class NotificationsListener extends ProcessChangeListener {
case class NotificationConfig(duration: FiniteDuration)

private val timeout = 10 minutes
class NotificationsListener(config: NotificationConfig, clock: Clock = Clock.systemUTC()) extends ProcessChangeListener {

//not too efficient, but we don't expect too much data...
private val data: ArrayBuffer[NotificationEvent] = ArrayBuffer()

override def handle(event: ProcessChangeEvent)(implicit ec: ExecutionContext, user: User): Unit = synchronized {
val now = Instant.now()
val now = Instant.now(clock)
data.append(NotificationEvent(UUID.randomUUID().toString, event, now, user))
filterOldNotifications(now)
}

private def filterOldNotifications(now: Instant): Unit = {
data.zipWithIndex.find(_._1.date.isBefore(now.minus(timeout.toMillis, ChronoUnit.MILLIS))).foreach(i => data.remove(i._2))
data.zipWithIndex.filter(_._1.date.isBefore(now.minus(config.duration.toMillis, ChronoUnit.MILLIS))).foreach(i => data.remove(i._2))
}

private[notifications] def dataFor(user: LoggedUser, notificationsAfter: Option[Instant]): List[NotificationEvent] = synchronized {
filterOldNotifications(Instant.now())
data.filter(event => event.user.id == user.id && !notificationsAfter.exists(_.isBefore(event.date))).toList
filterOldNotifications(Instant.now(clock))
data.filter(event => event.user.id == user.id && !notificationsAfter.exists(_.isAfter(event.date))).toList
}


}

class NotificationService(managementActor: ActorRef,
trait CurrentDeployments {
def retrieve(implicit timeout: Timeout): Future[DeploymentStatusResponse]
}

class ManagementActorCurrentDeployments(managementActor: ActorRef) extends CurrentDeployments {
override def retrieve(implicit timeout: Timeout): Future[DeploymentStatusResponse] = (managementActor ? DeploymentStatus).mapTo[DeploymentStatusResponse]
}

class NotificationService(currentDeployments: CurrentDeployments,
store: NotificationsListener) {

def notifications(user: LoggedUser, notificationsAfter: Option[Instant])(implicit ec: ExecutionContext, timeout: Timeout): Future[List[Notification]] = {
Expand All @@ -61,15 +70,12 @@ class NotificationService(managementActor: ActorRef,
}

private def prepareDeploymentNotifications(user: LoggedUser)(implicit ec: ExecutionContext, timeout: Timeout): Future[List[Notification]] = {
(managementActor ? DeploymentStatus)
.mapTo[DeploymentStatusResponse]
.map {
case DeploymentStatusResponse(deploymentInfos) =>
deploymentInfos
//no need to inform current user
.filterNot(_._2.userId == user.id)
.map { case (k, v) => currentDeploymentToNotification(k, v) }.toList
}
currentDeployments.retrieve.map { case DeploymentStatusResponse(deploymentInfos) =>
deploymentInfos
//no need to inform current user
.filterNot(_._2.userId == user.id)
.map { case (k, v) => currentDeploymentToNotification(k, v) }.toList
}
}

private def currentDeploymentToNotification(processName: ProcessName, deploymentInfo: DeployInfo): Notification = {
Expand All @@ -78,7 +84,7 @@ class NotificationService(managementActor: ActorRef,
case DeploymentActionType.Cancel => "cancelled"
}
//TODO: should it be displayed only once?
Notification(UUID.randomUUID().toString, s"Scenario ${processName.value} is being $actionString by ${deploymentInfo.userId}", NotificationType.info, None)
Notification(UUID.randomUUID().toString, s"Scenario ${processName.value} is being $actionString by ${deploymentInfo.userId}", NotificationType.info, Some(deploymentInProgress))
}

}
Expand Down
4 changes: 4 additions & 0 deletions ui/server/src/test/resources/ui.conf
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,7 @@ testDataSettings: {
testDataMaxBytes: 10000
resultsMaxBytes: 50000000
}

notifications {
duration: 1 minute
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package pl.touk.nussknacker.ui.notifications

import akka.util.Timeout
import org.scalatest.{FunSuite, Matchers}
import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName}
import pl.touk.nussknacker.test.PatientScalaFutures
import pl.touk.nussknacker.ui.api.ListenerApiUser
import pl.touk.nussknacker.ui.listener.ProcessChangeEvent.OnDeployActionFailed
import pl.touk.nussknacker.ui.notifications.NotificationAction.{deploymentFailed, deploymentInProgress}
import pl.touk.nussknacker.ui.process.deployment.DeploymentActionType.Deployment
import pl.touk.nussknacker.ui.process.deployment.{DeployInfo, DeploymentStatusResponse}
import pl.touk.nussknacker.ui.security.api.LoggedUser

import java.time.temporal.ChronoUnit
import java.time.{Clock, Instant, ZoneId}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

class NotificationServiceTest extends FunSuite with Matchers with PatientScalaFutures {

private implicit val timeout: Timeout = Timeout(1 second)

test("Should return only events for user in given time") {
var currentInstant: Instant = Instant.now()

val clock: Clock = clockForInstant(() => currentInstant)

val currentDeployments = new CurrentDeployments {
override def retrieve(implicit timeout: Timeout): Future[DeploymentStatusResponse] = Future.successful(DeploymentStatusResponse(
Map(ProcessName("id1") -> DeployInfo("deployingUser", clock.millis(), Deployment))))
}
val listener = new NotificationsListener(NotificationConfig(20 minutes), clock)
val notificationService = new NotificationService(currentDeployments, listener)
def notificationsFor(user: String, after: Option[Instant] = None) = notificationService.notifications(LoggedUser(user, ""), after).futureValue


notificationsFor("deployingUser") shouldBe 'empty
notificationsFor("randomUser").map(_.action) shouldBe List(Some(deploymentInProgress))

val userId = "user1"
listener.handle(OnDeployActionFailed(ProcessId(1), new RuntimeException("Failure")))(global, ListenerApiUser(LoggedUser(userId, "")))
notificationsFor(userId).map(_.action) shouldBe List(Some(deploymentInProgress), Some(deploymentFailed))
notificationsFor("user2").map(_.action) shouldBe List(Some(deploymentInProgress))

notificationsFor(userId, Some(currentInstant.minusSeconds(20))).map(_.action) shouldBe List(Some(deploymentInProgress), Some(deploymentFailed))
notificationsFor(userId, Some(currentInstant.plusSeconds(20))).map(_.action) shouldBe List(Some(deploymentInProgress))

currentInstant = currentInstant.plus(1, ChronoUnit.HOURS)
notificationsFor(userId).map(_.action) shouldBe List(Some(deploymentInProgress))
}

private def clockForInstant(currentInstant: () => Instant): Clock = {
new Clock {
override def getZone: ZoneId = ZoneId.systemDefault()

override def withZone(zone: ZoneId): Clock = ???

override def instant(): Instant = currentInstant()
}
}
}

0 comments on commit 8ee6a00

Please sign in to comment.