From 2b9fdc8bbdd42e08e5a20a8bfcc6709d7a3a61b5 Mon Sep 17 00:00:00 2001 From: pierrebrunin Date: Tue, 15 Dec 2020 16:09:59 +0100 Subject: [PATCH] :bug: Add backpressure buffer in configuration (#472) --- .../src/main/paradox/settings/settings.md | 3 + .../app/domains/events/events.scala | 424 ++++++++++-------- .../domains/events/impl/BasicEventStore.scala | 20 +- .../impl/DistributedPubSubEventStore.scala | 25 +- .../domains/events/impl/RedisEventStore.scala | 33 +- izanami-server/app/env/configuration.scala | 166 +++---- izanami-server/app/libs/streams/streams.scala | 29 +- izanami-server/conf/application.conf | 6 +- izanami-server/test/test/izanami.scala | 2 +- 9 files changed, 385 insertions(+), 323 deletions(-) diff --git a/izanami-documentation/src/main/paradox/settings/settings.md b/izanami-documentation/src/main/paradox/settings/settings.md index 0de243608..86b210282 100644 --- a/izanami-documentation/src/main/paradox/settings/settings.md +++ b/izanami-documentation/src/main/paradox/settings/settings.md @@ -25,7 +25,10 @@ Here, you can find settings about the event store configuration. | `izanami.events.store` | `IZANAMI_EVENT_STORE` | Type of the event store. Could be `InMemory`, `Kafka`, `Redis`, `Distributed`. | `InMemory` | | `izanami.events.distributed.topic` | `DISTRIBUTED_TOPIC` | Name of the topic when `izanami.events.store`=`Distributed` | `izanami` | | `izanami.events.redis.topic` | `REDIS_TOPIC` | Name of the topic when `izanami.events.store`=`Redis` | `izanami:events` | +| `izanami.events.redis.backpressure-buffer-size` | `REDIS_BACKPRESSURE_BUFFER_SIZE` | Size of buffer (of backpressure) between `Redis` event store and notification (SSE, WebHook and hook). | `500` | | `izanami.events.kafka.topic` | `KAFKA_TOPIC` | Name of the topic when `izanami.events.store`=`Kafka` | `izanami` | +| `izanami.events.kafka.backpressure-buffer-size` | `KAFKA_BACKPRESSURE_BUFFER_SIZE` | Size of buffer (of backpressure) between `Kafka` event store and notification (SSE, WebHook and hook). | `500` | +| `izanami.events.inmemory.backpressure-buffer-size` | `INMEMORY_BACKPRESSURE_BUFFER_SIZE` | Size of buffer (of backpressure) between `InMemory` event store and notification (SSE, WebHook and hook). | `500` | | `izanami.cluster.seed-node-host` | `AKKA_CLUSTER SEED_NODE_HOST` | The host of the seed used to form a cluster | `127.0.0.1` | | `izanami.cluster.seed-node-port` | `AKKA_CLUSTER SEED_NODE_PORT` | The port of the seed used to form a cluster | 2551 | | `izanami.cluster.akka.remote.netty.tcp.hostname` | `AKKA_CLUSTER_HOST` | The host of the current node | `127.0.0.1` | diff --git a/izanami-server/app/domains/events/events.scala b/izanami-server/app/domains/events/events.scala index 03c83afee..234383c1e 100644 --- a/izanami-server/app/domains/events/events.scala +++ b/izanami-server/app/domains/events/events.scala @@ -322,9 +322,7 @@ package object events { JsError("events.unknow.type") } - private val writes: Writes[IzanamiEvent] = Writes[IzanamiEvent] { event => - event.toJson - } + private val writes: Writes[IzanamiEvent] = Writes[IzanamiEvent](event => event.toJson) implicit val format = Format(reads, writes) @@ -336,42 +334,46 @@ package object events { override def domain = Domain.Config } - case class ConfigCreated(key: ConfigKey, - config: Config, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends ConfigEvent { + case class ConfigCreated( + key: ConfigKey, + config: Config, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends ConfigEvent { val `type`: String = "CONFIG_CREATED" val payload: JsValue = ConfigInstances.format.writes(config) } - case class ConfigUpdated(key: ConfigKey, - oldValue: Config, - config: Config, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends ConfigEvent { + case class ConfigUpdated( + key: ConfigKey, + oldValue: Config, + config: Config, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends ConfigEvent { val `type`: String = "CONFIG_UPDATED" val payload: JsValue = ConfigInstances.format.writes(config) override def toJson: JsValue = super.toJson.as[JsObject] ++ Json.obj("oldValue" -> ConfigInstances.format.writes(oldValue)) } - case class ConfigDeleted(key: ConfigKey, - config: Config, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends ConfigEvent { + case class ConfigDeleted( + key: ConfigKey, + config: Config, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends ConfigEvent { val `type`: String = "CONFIG_DELETED" val payload: JsValue = ConfigInstances.format.writes(config) } - case class ConfigsDeleted(_id: Long = gen.nextId(), - patterns: Seq[String], - count: Long, - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends ConfigEvent { + case class ConfigsDeleted( + _id: Long = gen.nextId(), + patterns: Seq[String], + count: Long, + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends ConfigEvent { val `type`: String = "CONFIGS_DELETED" val key: ConfigKey = Key.Empty val payload: JsValue = Json.obj("count" -> count, "patterns" -> patterns) @@ -389,43 +391,47 @@ package object events { override def domain = Domain.Feature } - case class FeatureCreated(key: FeatureKey, - feature: Feature, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends FeatureEvent { + case class FeatureCreated( + key: FeatureKey, + feature: Feature, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends FeatureEvent { val `type`: String = "FEATURE_CREATED" val payload: JsValue = FeatureInstances.format.writes(feature) } - case class FeatureUpdated(key: FeatureKey, - oldValue: Feature, - feature: Feature, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends FeatureEvent { + case class FeatureUpdated( + key: FeatureKey, + oldValue: Feature, + feature: Feature, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends FeatureEvent { val `type`: String = "FEATURE_UPDATED" val payload: JsValue = FeatureInstances.format.writes(feature) override def toJson: JsValue = super.toJson.as[JsObject] ++ Json.obj("oldValue" -> FeatureInstances.format.writes(oldValue)) } - case class FeatureDeleted(key: FeatureKey, - feature: Feature, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends FeatureEvent { + case class FeatureDeleted( + key: FeatureKey, + feature: Feature, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends FeatureEvent { val `type`: String = "FEATURE_DELETED" val payload: JsValue = FeatureInstances.format.writes(feature) } - case class FeaturesDeleted(count: Long, - patterns: Seq[String], - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends FeatureEvent { + case class FeaturesDeleted( + count: Long, + patterns: Seq[String], + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends FeatureEvent { val key: FeatureKey = Key.Empty val `type`: String = "FEATURES_DELETED" val payload: JsValue = Json.obj("count" -> count, "patterns" -> patterns) @@ -443,42 +449,46 @@ package object events { override def domain = Domain.Script } - case class GlobalScriptCreated(key: GlobalScriptKey, - globalScript: GlobalScript, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends GlobalScriptEvent { + case class GlobalScriptCreated( + key: GlobalScriptKey, + globalScript: GlobalScript, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends GlobalScriptEvent { val `type`: String = "GLOBALSCRIPT_CREATED" val payload: JsValue = GlobalScriptInstances.format.writes(globalScript) } - case class GlobalScriptUpdated(key: GlobalScriptKey, - oldValue: GlobalScript, - globalScript: GlobalScript, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends GlobalScriptEvent { + case class GlobalScriptUpdated( + key: GlobalScriptKey, + oldValue: GlobalScript, + globalScript: GlobalScript, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends GlobalScriptEvent { val `type`: String = "GLOBALSCRIPT_UPDATED" val payload: JsValue = GlobalScriptInstances.format.writes(globalScript) override def toJson: JsValue = super.toJson.as[JsObject] ++ Json.obj("oldValue" -> GlobalScriptInstances.format.writes(oldValue)) } - case class GlobalScriptDeleted(key: GlobalScriptKey, - globalScript: GlobalScript, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends GlobalScriptEvent { + case class GlobalScriptDeleted( + key: GlobalScriptKey, + globalScript: GlobalScript, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends GlobalScriptEvent { val `type`: String = "GLOBALSCRIPT_DELETED" val payload: JsValue = GlobalScriptInstances.format.writes(globalScript) } - case class GlobalScriptsDeleted(count: Long, - patterns: Seq[String], - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends GlobalScriptEvent { + case class GlobalScriptsDeleted( + count: Long, + patterns: Seq[String], + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends GlobalScriptEvent { val key = Key.Empty val `type`: String = "GLOBALSCRIPTS_DELETED" val payload: JsValue = Json.obj("count" -> count, "patterns" -> patterns) @@ -490,45 +500,49 @@ package object events { override def domain = Domain.User } - case class UserCreated(key: UserKey, - user: User, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends UserEvent { + case class UserCreated( + key: UserKey, + user: User, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends UserEvent { val `type`: String = "USER_CREATED" val payload: JsValue = UserInstances.format.writes(user) } - case class UserUpdated(key: UserKey, - oldValue: User, - user: User, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends UserEvent { + case class UserUpdated( + key: UserKey, + oldValue: User, + user: User, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends UserEvent { val `type`: String = "USER_UPDATED" val payload: JsValue = UserInstances.format.writes(user) override def toJson: JsValue = super.toJson.as[JsObject] ++ Json.obj("oldValue" -> UserInstances.format.writes(oldValue)) } - case class UserDeleted(key: UserKey, - user: User, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends UserEvent { + case class UserDeleted( + key: UserKey, + user: User, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends UserEvent { val `type`: String = "USER_DELETED" val payload: JsValue = UserInstances.format.writes(user) } - case class UsersDeleted(count: Long, - patterns: Seq[String], - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends UserEvent { + case class UsersDeleted( + count: Long, + patterns: Seq[String], + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends UserEvent { val `type`: String = "USERS_DELETED" val key: UserKey = Key.Empty val payload: JsValue = Json.obj("count" -> count, "patterns" -> patterns) @@ -546,42 +560,46 @@ package object events { override def domain = Domain.Webhook } - case class WebhookCreated(key: WebhookKey, - webhook: Webhook, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends WebhookEvent { + case class WebhookCreated( + key: WebhookKey, + webhook: Webhook, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends WebhookEvent { val `type`: String = "WEBHOOK_CREATED" val payload: JsValue = WebhookInstances.format.writes(webhook) } - case class WebhookUpdated(key: WebhookKey, - oldValue: Webhook, - webhook: Webhook, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends WebhookEvent { + case class WebhookUpdated( + key: WebhookKey, + oldValue: Webhook, + webhook: Webhook, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends WebhookEvent { val `type`: String = "WEBHOOK_UPDATED" val payload: JsValue = WebhookInstances.format.writes(webhook) override def toJson: JsValue = super.toJson.as[JsObject] ++ Json.obj("oldValue" -> WebhookInstances.format.writes(oldValue)) } - case class WebhookDeleted(key: WebhookKey, - webhook: Webhook, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends WebhookEvent { + case class WebhookDeleted( + key: WebhookKey, + webhook: Webhook, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends WebhookEvent { val `type`: String = "WEBHOOK_DELETED" val payload: JsValue = WebhookInstances.format.writes(webhook) } - case class WebhooksDeleted(count: Long, - patterns: Seq[String], - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends WebhookEvent { + case class WebhooksDeleted( + count: Long, + patterns: Seq[String], + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends WebhookEvent { val key = Key.Empty val `type`: String = "WEBHOOKS_DELETED" val payload: JsValue = Json.obj("count" -> count, "patterns" -> patterns) @@ -598,45 +616,49 @@ package object events { implicit val apikeyUpdated = Json.format[ApikeyUpdated] } - case class ApikeyCreated(key: ApikeyKey, - apikey: Apikey, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends ApikeyEvent { + case class ApikeyCreated( + key: ApikeyKey, + apikey: Apikey, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends ApikeyEvent { val `type`: String = "APIKEY_CREATED" val payload: JsValue = Json.toJson(apikey) } - case class ApikeyUpdated(key: ApikeyKey, - oldValue: Apikey, - apikey: Apikey, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends ApikeyEvent { + case class ApikeyUpdated( + key: ApikeyKey, + oldValue: Apikey, + apikey: Apikey, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends ApikeyEvent { val `type`: String = "APIKEY_UPDATED" val payload: JsValue = Json.toJson(apikey) override def toJson: JsValue = super.toJson.as[JsObject] ++ Json.obj("oldValue" -> Json.toJson(oldValue)) } - case class ApikeyDeleted(key: ApikeyKey, - apikey: Apikey, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends ApikeyEvent { + case class ApikeyDeleted( + key: ApikeyKey, + apikey: Apikey, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends ApikeyEvent { val `type`: String = "APIKEY_DELETED" val payload: JsValue = Json.toJson(apikey) } - case class ApikeysDeleted(count: Long, - patterns: Seq[String], - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends ApikeyEvent { + case class ApikeysDeleted( + count: Long, + patterns: Seq[String], + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends ApikeyEvent { val `type`: String = "APIKEYS_DELETED" val key: ApikeyKey = Key.Empty val payload: JsValue = Json.obj("count" -> count, "patterns" -> patterns) @@ -647,45 +669,49 @@ package object events { override def domain = Domain.Experiment } - case class ExperimentCreated(key: ExperimentKey, - experiment: Experiment, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends ExperimentEvent { + case class ExperimentCreated( + key: ExperimentKey, + experiment: Experiment, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends ExperimentEvent { val `type`: String = "EXPERIMENT_CREATED" val payload: JsValue = ExperimentInstances.format.writes(experiment) } - case class ExperimentUpdated(key: ExperimentKey, - oldValue: Experiment, - experiment: Experiment, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends ExperimentEvent { + case class ExperimentUpdated( + key: ExperimentKey, + oldValue: Experiment, + experiment: Experiment, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends ExperimentEvent { val `type`: String = "EXPERIMENT_UPDATED" val payload: JsValue = ExperimentInstances.format.writes(experiment) override def toJson: JsValue = super.toJson.as[JsObject] ++ Json.obj("oldValue" -> ExperimentInstances.format.writes(oldValue)) } - case class ExperimentDeleted(key: ExperimentKey, - experiment: Experiment, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends ExperimentEvent { + case class ExperimentDeleted( + key: ExperimentKey, + experiment: Experiment, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends ExperimentEvent { val `type`: String = "EXPERIMENT_DELETED" val payload: JsValue = ExperimentInstances.format.writes(experiment) } - case class ExperimentsDeleted(count: Long, - patterns: Seq[String], - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends ExperimentEvent { + case class ExperimentsDeleted( + count: Long, + patterns: Seq[String], + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends ExperimentEvent { val `type`: String = "EXPERIMENTS_DELETED" val payload: JsValue = Json.obj("count" -> count, "patterns" -> patterns) val key: ExperimentKey = Key.Empty @@ -695,22 +721,24 @@ package object events { sealed trait ExperimentVariantEventEvent extends ExperimentEvent - case class ExperimentVariantEventCreated(id: ExperimentVariantEventKey, - data: ExperimentVariantEvent, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends ExperimentVariantEventEvent { + case class ExperimentVariantEventCreated( + id: ExperimentVariantEventKey, + data: ExperimentVariantEvent, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends ExperimentVariantEventEvent { override def `type`: String = "EXPERIMENT_VARIANT_EVENT_CREATED" override def key: ExperimentKey = id.key override def payload: JsValue = ExperimentVariantEventInstances.format.writes(data) } - case class ExperimentVariantEventsDeleted(experiment: Experiment, - _id: Long = gen.nextId(), - timestamp: LocalDateTime = LocalDateTime.now(), - authInfo: Option[AuthInfo.Service]) - extends ExperimentVariantEventEvent { + case class ExperimentVariantEventsDeleted( + experiment: Experiment, + _id: Long = gen.nextId(), + timestamp: LocalDateTime = LocalDateTime.now(), + authInfo: Option[AuthInfo.Service] + ) extends ExperimentVariantEventEvent { override def `type`: String = "EXPERIMENT_VARIANT_EVENT_DELETED" override def key: ExperimentKey = experiment.id override def payload: JsValue = @@ -751,9 +779,11 @@ package object events { def publish(event: IzanamiEvent): ZIO[ZLogger with AuthInfo, IzanamiErrors, Done] - def events(domains: Seq[Domain] = Seq.empty[Domain], - patterns: Seq[String] = Seq.empty[String], - lastEventId: Option[Long] = None): Source[IzanamiEvent, NotUsed] + def events( + domains: Seq[Domain] = Seq.empty[Domain], + patterns: Seq[String] = Seq.empty[String], + lastEventId: Option[Long] = None + ): Source[IzanamiEvent, NotUsed] def check(): RIO[ZLogger with AuthInfo, Unit] @@ -768,7 +798,7 @@ package object events { izanamiConfig: IzanamiConfig ): ZLayer[DataStoreLayerContext, Throwable, EventStore] = izanamiConfig.events match { - case InMemoryEvents(_) => BasicEventStore.live + case InMemoryEvents(c) => BasicEventStore.live(c) case KafkaEvents(c) => KafkaEventStore.live(c) case RedisEvents(c) => Drivers.redisClientLayer.passthrough >>> RedisEventStore.live(c) case DistributedEvents(c) => DistributedPubSubEventStore.live(c) @@ -778,9 +808,11 @@ package object events { def publish(event: IzanamiEvent): ZIO[EventStoreContext, IzanamiErrors, Done] = ZIO.accessM(_.get[EventStore.Service].publish(event)) - def events(domains: Seq[Domain] = Seq.empty[Domain], - patterns: Seq[String] = Seq.empty[String], - lastEventId: Option[Long] = None): RIO[EventStoreContext, Source[IzanamiEvent, NotUsed]] = + def events( + domains: Seq[Domain] = Seq.empty[Domain], + patterns: Seq[String] = Seq.empty[String], + lastEventId: Option[Long] = None + ): RIO[EventStoreContext, Source[IzanamiEvent, NotUsed]] = ZIO.access(_.get[EventStore.Service].events(domains, patterns, lastEventId)) def check(): ZIO[EventStoreContext, IzanamiErrors, Unit] = diff --git a/izanami-server/app/domains/events/impl/BasicEventStore.scala b/izanami-server/app/domains/events/impl/BasicEventStore.scala index 561f2f2ef..66e1ae555 100644 --- a/izanami-server/app/domains/events/impl/BasicEventStore.scala +++ b/izanami-server/app/domains/events/impl/BasicEventStore.scala @@ -11,23 +11,25 @@ import domains.events.EventStore import domains.events.Events.IzanamiEvent import libs.streams.CacheableQueue import domains.errors.IzanamiErrors +import env.InMemoryEventsConfig import store.datastore.DataStoreLayerContext import zio.{IO, Task, ZLayer} import scala.util.Try object BasicEventStore { - val live: ZLayer[DataStoreLayerContext, Throwable, EventStore] = ZLayer.fromFunction { mix => - implicit val system: ActorSystem = mix.get[PlayModule.Service].system - new BasicEventStore + def live(config: InMemoryEventsConfig): ZLayer[DataStoreLayerContext, Throwable, EventStore] = ZLayer.fromFunction { + mix => + implicit val system: ActorSystem = mix.get[PlayModule.Service].system + new BasicEventStore(config) } } -class BasicEventStore(implicit system: ActorSystem) extends EventStore.Service { +class BasicEventStore(config: InMemoryEventsConfig)(implicit system: ActorSystem) extends EventStore.Service { logger.info("Starting default event store") - private val queue = CacheableQueue[IzanamiEvent](500, queueBufferSize = 500) + private val queue = CacheableQueue[IzanamiEvent](500, queueBufferSize = config.backpressureBufferSize) system.actorOf(EventStreamActor.props(queue)) override def publish(event: IzanamiEvent): IO[IzanamiErrors, Done] = @@ -37,9 +39,11 @@ class BasicEventStore(implicit system: ActorSystem) extends EventStore.Service { Done }.orDie - override def events(domains: Seq[Domain], - patterns: Seq[String], - lastEventId: Option[Long]): Source[IzanamiEvent, NotUsed] = + override def events( + domains: Seq[Domain], + patterns: Seq[String], + lastEventId: Option[Long] + ): Source[IzanamiEvent, NotUsed] = lastEventId match { case Some(_) => queue.sourceWithCache diff --git a/izanami-server/app/domains/events/impl/DistributedPubSubEventStore.scala b/izanami-server/app/domains/events/impl/DistributedPubSubEventStore.scala index 384962670..84773d727 100644 --- a/izanami-server/app/domains/events/impl/DistributedPubSubEventStore.scala +++ b/izanami-server/app/domains/events/impl/DistributedPubSubEventStore.scala @@ -37,12 +37,8 @@ object DistributedPubSubEventStore { ZLogger.info(s"Starting akka cluster with config ${globalConfig.getConfig("cluster")}") *> Task( ActorSystem(actorSystemName, globalConfig.getConfig("cluster")) ) - )( - s => Task.fromFuture(_ => s.terminate()).ignore - ) - .map { implicit actorSystem => - new DistributedPubSubEventStore(config) - } + )(s => Task.fromFuture(_ => s.terminate()).ignore) + .map(implicit actorSystem => new DistributedPubSubEventStore(config)) .provide(mix) } } @@ -51,7 +47,7 @@ class DistributedPubSubEventStore(config: DistributedEventsConfig)(implicit s: A logger.info(s"Creating distributed event store") - private val queue = CacheableQueue[IzanamiEvent](500, queueBufferSize = 500) + private val queue = CacheableQueue[IzanamiEvent](500, queueBufferSize = config.backpressureBufferSize) private val actor = s.actorOf(DistributedEventsPublisherActor.props(queue, config)) @@ -62,9 +58,11 @@ class DistributedPubSubEventStore(config: DistributedEventsConfig)(implicit s: A IO.succeed(Done) } - override def events(domains: Seq[Domain], - patterns: Seq[String], - lastEventId: Option[Long]): Source[IzanamiEvent, NotUsed] = + override def events( + domains: Seq[Domain], + patterns: Seq[String], + lastEventId: Option[Long] + ): Source[IzanamiEvent, NotUsed] = lastEventId match { case Some(_) => queue.sourceWithCache @@ -119,9 +117,10 @@ object DistributedEventsPublisherActor { Props(new DistributedEventsPublisherActor(queue, config)) } -private[events] class DistributedEventsPublisherActor(queue: CacheableQueue[IzanamiEvent], - config: DistributedEventsConfig) - extends Actor { +private[events] class DistributedEventsPublisherActor( + queue: CacheableQueue[IzanamiEvent], + config: DistributedEventsConfig +) extends Actor { import context.dispatcher diff --git a/izanami-server/app/domains/events/impl/RedisEventStore.scala b/izanami-server/app/domains/events/impl/RedisEventStore.scala index d4aeb6a92..4da77f41b 100644 --- a/izanami-server/app/domains/events/impl/RedisEventStore.scala +++ b/izanami-server/app/domains/events/impl/RedisEventStore.scala @@ -29,9 +29,11 @@ object RedisEventStore { create(redisDriver, config, playModule.system) } - def create(client: RedisWrapper, - config: RedisEventsConfig, - system: ActorSystem): Managed[Throwable, RedisEventStore] = + def create( + client: RedisWrapper, + config: RedisEventsConfig, + system: ActorSystem + ): Managed[Throwable, RedisEventStore] = (client.managedConnection <*> client.connectPubSub).map { case (connection, connectionPubSub) => @@ -39,11 +41,12 @@ object RedisEventStore { } } -class RedisEventStore(connection: StatefulRedisConnection[String, String], - connectionPubSub: StatefulRedisPubSubConnection[String, String], - config: RedisEventsConfig, - system: ActorSystem) - extends EventStore.Service { +class RedisEventStore( + connection: StatefulRedisConnection[String, String], + connectionPubSub: StatefulRedisPubSubConnection[String, String], + config: RedisEventsConfig, + system: ActorSystem +) extends EventStore.Service { import EventLogger._ import system.dispatcher @@ -52,7 +55,7 @@ class RedisEventStore(connection: StatefulRedisConnection[String, String], logger.info(s"Starting redis event store") - private val queue = CacheableQueue[IzanamiEvent](500, queueBufferSize = 500) + private val queue = CacheableQueue[IzanamiEvent](500, queueBufferSize = config.backpressureBufferSize) connectionPubSub.addListener(new RedisPubSubListener[String, String] { private def publishMessage(message: String) = { @@ -106,9 +109,11 @@ class RedisEventStore(connection: StatefulRedisConnection[String, String], .orDie } - override def events(domains: Seq[Domain], - patterns: Seq[String], - lastEventId: Option[Long]): Source[IzanamiEvent, NotUsed] = + override def events( + domains: Seq[Domain], + patterns: Seq[String], + lastEventId: Option[Long] + ): Source[IzanamiEvent, NotUsed] = lastEventId match { case Some(_) => queue.sourceWithCache @@ -123,12 +128,12 @@ class RedisEventStore(connection: StatefulRedisConnection[String, String], connection .async() .get("test") - .whenComplete((_, e) => { + .whenComplete { (_, e) => if (e != null) { cb(IO.fail(e)) } else { cb(IO.succeed(())) } - }) + } } } diff --git a/izanami-server/app/env/configuration.scala b/izanami-server/app/env/configuration.scala index b7168ba86..6c07292c7 100644 --- a/izanami-server/app/env/configuration.scala +++ b/izanami-server/app/env/configuration.scala @@ -92,10 +92,10 @@ object IzanamiConfig { } implicit val keyTypeHint: ConfigConvert[KeyType] = - viaString[KeyType](catchReadError { KeyType.parse }, _.getValue) + viaString[KeyType](catchReadError(KeyType.parse), _.getValue) implicit val dbTypeHint: ConfigConvert[DbType] = - viaString[DbType](catchReadError { DbType.fromString }, _.toString) + viaString[DbType](catchReadError(DbType.fromString), _.toString) implicit val inetAddressCC: ConfigConvert[InetAddress] = viaString[InetAddress](catchReadError(InetAddress.getByName), _.getHostAddress) @@ -185,17 +185,21 @@ case class MetricsElasticConfig(enabled: Boolean, index: String, pushInterval: F case class LogoutConfig(url: String) case class ApiKeyHeaders(headerClientId: String, headerClientSecret: String) -case class OtoroshiFilterConfig(sharedKey: String, - issuer: String, - headerClaim: String, - headerRequestId: String, - headerGatewayState: String, - headerGatewayStateResp: String) -case class DefaultFilter(allowedPaths: Seq[String], - issuer: String, - sharedKey: String, - cookieClaim: String, - apiKeys: ApiKeyHeaders) +case class OtoroshiFilterConfig( + sharedKey: String, + issuer: String, + headerClaim: String, + headerRequestId: String, + headerGatewayState: String, + headerGatewayStateResp: String +) +case class DefaultFilter( + allowedPaths: Seq[String], + issuer: String, + sharedKey: String, + cookieClaim: String, + apiKeys: ApiKeyHeaders +) sealed trait IzanamiFilter case class Otoroshi(otoroshi: OtoroshiFilterConfig) extends IzanamiFilter case class Default(default: DefaultFilter) extends IzanamiFilter @@ -209,38 +213,42 @@ case class RSA(enabled: Boolean, size: Int, publicKey: String, privateKey: Optio case class JWKS(enabled: Boolean, url: String, headers: Option[Map[String, String]], timeout: Option[FiniteDuration]) extends AlgoSettingsConfig -case class Oauth2Config(enabled: Boolean, - authorizeUrl: String, - tokenUrl: String, - userInfoUrl: String, - introspectionUrl: String, - loginUrl: String, - logoutUrl: String, - clientId: String, - clientSecret: Option[String], - mtls: Option[MtlsConfig], - scope: Option[String] = None, - claims: String = "email name", - accessTokenField: String = "access_token", - jwtVerifier: Option[AlgoSettingsConfig], - readProfileFromToken: Boolean = false, - useCookie: Boolean = true, - useJson: Boolean = true, - idField: String, - nameField: String, - emailField: String, - adminField: String, - authorizedPatternField: String, - defaultPatterns: String, - izanamiManagedUser: Boolean, - admins: Option[Seq[String]] = None) +case class Oauth2Config( + enabled: Boolean, + authorizeUrl: String, + tokenUrl: String, + userInfoUrl: String, + introspectionUrl: String, + loginUrl: String, + logoutUrl: String, + clientId: String, + clientSecret: Option[String], + mtls: Option[MtlsConfig], + scope: Option[String] = None, + claims: String = "email name", + accessTokenField: String = "access_token", + jwtVerifier: Option[AlgoSettingsConfig], + readProfileFromToken: Boolean = false, + useCookie: Boolean = true, + useJson: Boolean = true, + idField: String, + nameField: String, + emailField: String, + adminField: String, + authorizedPatternField: String, + defaultPatterns: String, + izanamiManagedUser: Boolean, + admins: Option[Seq[String]] = None +) case class MtlsConfig(enabled: Boolean, config: Option[CertificateConfig]) -case class CertificateConfig(truststorePath: Option[String], - truststorePassword: Option[String], - truststoreType: String, - keystorePath: Option[String], - keystorePassword: Option[String], - keystoreType: String) +case class CertificateConfig( + truststorePath: Option[String], + truststorePassword: Option[String], + truststoreType: String, + keystorePath: Option[String], + keystorePassword: Option[String], + keystoreType: String +) case class ConfigConfig(db: DbDomainConfig) case class FeaturesConfig(db: DbDomainConfig) case class GlobalScriptConfig(db: DbDomainConfig) @@ -267,9 +275,9 @@ case class DistributedEvents(distributed: DistributedEventsConfig) extends Event case class RedisEvents(redis: RedisEventsConfig) extends EventsConfig case class KafkaEvents(kafka: KafkaEventsConfig) extends EventsConfig -case class InMemoryEventsConfig() -case class DistributedEventsConfig(topic: String) -case class RedisEventsConfig(topic: String) +case class InMemoryEventsConfig(backpressureBufferSize: Int) +case class DistributedEventsConfig(topic: String, backpressureBufferSize: Int) +case class RedisEventsConfig(topic: String, backpressureBufferSize: Int) case class KafkaEventsConfig(topic: String) case class DbConfig( @@ -310,42 +318,50 @@ case class RedisOneSentinelConfig(host: String, port: Int) case class LevelDbConfig(parentPath: String) -case class CassandraConfig(addresses: Seq[String], - clusterName: Option[String], - replicationFactor: Int, - keyspace: String, - username: Option[String] = None, - password: Option[String] = None) - -case class DynamoConfig(tableName: String, - eventsTableName: String, - region: String, - host: String, - port: Int, - tls: Boolean = true, - parallelism: Int = 32, - accessKey: Option[String] = None, - secretKey: Option[String] = None) +case class CassandraConfig( + addresses: Seq[String], + clusterName: Option[String], + replicationFactor: Int, + keyspace: String, + username: Option[String] = None, + password: Option[String] = None +) + +case class DynamoConfig( + tableName: String, + eventsTableName: String, + region: String, + host: String, + port: Int, + tls: Boolean = true, + parallelism: Int = 32, + accessKey: Option[String] = None, + secretKey: Option[String] = None +) case class KafkaConfig(servers: String, keyPass: Option[String], keystore: Location, truststore: Location) case class Location(location: Option[String]) -case class ElasticConfig(host: String, - port: Int, - scheme: String, - user: Option[String], - password: Option[String], - automaticRefresh: Boolean = false) +case class ElasticConfig( + host: String, + port: Int, + scheme: String, + user: Option[String], + password: Option[String], + automaticRefresh: Boolean = false +) case class MongoConfig(url: String, database: Option[String], name: Option[String]) -case class PostgresqlConfig(driver: String, - url: String, - username: String, - password: String, - connectionPoolSize: Int, - tmpfolder: Option[String]) +case class PostgresqlConfig( + driver: String, + url: String, + username: String, + password: String, + connectionPoolSize: Int, + tmpfolder: Option[String] +) case class DbDomainConfig(`type`: DbType, conf: DbDomainConfigDetails, `import`: Option[Path]) case class InitialUserConfig(userId: String, password: String) diff --git a/izanami-server/app/libs/streams/streams.scala b/izanami-server/app/libs/streams/streams.scala index 42245fd49..b85b2080d 100644 --- a/izanami-server/app/libs/streams/streams.scala +++ b/izanami-server/app/libs/streams/streams.scala @@ -24,9 +24,8 @@ object syntax { { err => IzanamiLogger.error(s"Error parsing $v : $err") List.empty[(Key, V)] - }, { v => - List((k, v)) - } + }, + v => List((k, v)) ) } } @@ -41,9 +40,7 @@ object Flows { val bcast = b.add(Broadcast[In](2)) val zip = b.add(Zip[Out, Int]()) - val count = Flow[In].fold(0) { (acc, _) => - acc + 1 - } + val count = Flow[In].fold(0)((acc, _) => acc + 1) bcast ~> count ~> zip.in1 bcast ~> aFlow ~> zip.in0 @@ -54,15 +51,17 @@ object Flows { } -case class CacheableQueue[T](queue: SourceQueueWithComplete[QueueElement[T]], - sourceWithCache: Source[T, NotUsed], - rawSource: Source[T, NotUsed]) - extends SourceQueueWithComplete[T] { - - override def offer(elem: T): Future[QueueOfferResult] = queue.offer(Element(elem)) - override def watchCompletion() = queue.watchCompletion() - override def complete() = queue.complete() - override def fail(ex: Throwable): Unit = queue.fail(ex) +case class CacheableQueue[T]( + queue: SourceQueueWithComplete[QueueElement[T]], + sourceWithCache: Source[T, NotUsed], + rawSource: Source[T, NotUsed] +) extends SourceQueueWithComplete[T] { + + override def offer(elem: T): Future[QueueOfferResult] = + queue.offer(Element(elem)) + override def watchCompletion() = queue.watchCompletion() + override def complete() = queue.complete() + override def fail(ex: Throwable): Unit = queue.fail(ex) } object CacheableQueue { diff --git a/izanami-server/conf/application.conf b/izanami-server/conf/application.conf index a2c803f0d..37d422126 100644 --- a/izanami-server/conf/application.conf +++ b/izanami-server/conf/application.conf @@ -74,16 +74,20 @@ izanami { distributed { topic = "izanami" topic = ${?DISTRIBUTED_TOPIC} + backpressure-buffer-size = 500 } redis { topic = "izanami:events" topic = ${?REDIS_TOPIC} + backpressure-buffer-size = 500 } kafka { topic = "izanami" topic = ${?KAFKA_TOPIC} } - inmemory {} + inmemory { + backpressure-buffer-size = 500 + } } db { default = "LevelDB" // Cassandra, Redis, LevelDB, InMemory, Elastic, Mongo, InMemoryWithDb, Dynamo diff --git a/izanami-server/test/test/izanami.scala b/izanami-server/test/test/izanami.scala index bfa99e9e7..fa6a5903b 100644 --- a/izanami-server/test/test/izanami.scala +++ b/izanami-server/test/test/izanami.scala @@ -143,7 +143,7 @@ object FakeConfig { WebhookConfig(dbConfig, WebhookEventsConfig(5, 1.second, 1, 1.second)), UserConfig(dbConfig, InitialUserConfig("", "")), ApikeyConfig(dbConfig, InitializeApiKey(None, None, "*")), - InMemoryEvents(InMemoryEventsConfig()), + InMemoryEvents(InMemoryEventsConfig(500)), PatchConfig(dbConfig), MetricsConfig( false,