Skip to content

Commit

Permalink
🐛 Add backpressure buffer in configuration (#472)
Browse files Browse the repository at this point in the history
  • Loading branch information
pierrebruninmaif committed Dec 15, 2020
1 parent c301e71 commit 2b9fdc8
Show file tree
Hide file tree
Showing 9 changed files with 385 additions and 323 deletions.
3 changes: 3 additions & 0 deletions izanami-documentation/src/main/paradox/settings/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ Here, you can find settings about the event store configuration.
| `izanami.events.store` | `IZANAMI_EVENT_STORE` | Type of the event store. Could be `InMemory`, `Kafka`, `Redis`, `Distributed`. | `InMemory` |
| `izanami.events.distributed.topic` | `DISTRIBUTED_TOPIC` | Name of the topic when `izanami.events.store`=`Distributed` | `izanami` |
| `izanami.events.redis.topic` | `REDIS_TOPIC` | Name of the topic when `izanami.events.store`=`Redis` | `izanami:events` |
| `izanami.events.redis.backpressure-buffer-size` | `REDIS_BACKPRESSURE_BUFFER_SIZE` | Size of buffer (of backpressure) between `Redis` event store and notification (SSE, WebHook and hook). | `500` |
| `izanami.events.kafka.topic` | `KAFKA_TOPIC` | Name of the topic when `izanami.events.store`=`Kafka` | `izanami` |
| `izanami.events.kafka.backpressure-buffer-size` | `KAFKA_BACKPRESSURE_BUFFER_SIZE` | Size of buffer (of backpressure) between `Kafka` event store and notification (SSE, WebHook and hook). | `500` |
| `izanami.events.inmemory.backpressure-buffer-size` | `INMEMORY_BACKPRESSURE_BUFFER_SIZE` | Size of buffer (of backpressure) between `InMemory` event store and notification (SSE, WebHook and hook). | `500` |
| `izanami.cluster.seed-node-host` | `AKKA_CLUSTER SEED_NODE_HOST` | The host of the seed used to form a cluster | `127.0.0.1` |
| `izanami.cluster.seed-node-port` | `AKKA_CLUSTER SEED_NODE_PORT` | The port of the seed used to form a cluster | 2551 |
| `izanami.cluster.akka.remote.netty.tcp.hostname` | `AKKA_CLUSTER_HOST` | The host of the current node | `127.0.0.1` |
Expand Down
424 changes: 228 additions & 196 deletions izanami-server/app/domains/events/events.scala

Large diffs are not rendered by default.

20 changes: 12 additions & 8 deletions izanami-server/app/domains/events/impl/BasicEventStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,25 @@ import domains.events.EventStore
import domains.events.Events.IzanamiEvent
import libs.streams.CacheableQueue
import domains.errors.IzanamiErrors
import env.InMemoryEventsConfig
import store.datastore.DataStoreLayerContext
import zio.{IO, Task, ZLayer}

import scala.util.Try

object BasicEventStore {
val live: ZLayer[DataStoreLayerContext, Throwable, EventStore] = ZLayer.fromFunction { mix =>
implicit val system: ActorSystem = mix.get[PlayModule.Service].system
new BasicEventStore
def live(config: InMemoryEventsConfig): ZLayer[DataStoreLayerContext, Throwable, EventStore] = ZLayer.fromFunction {
mix =>
implicit val system: ActorSystem = mix.get[PlayModule.Service].system
new BasicEventStore(config)
}
}

class BasicEventStore(implicit system: ActorSystem) extends EventStore.Service {
class BasicEventStore(config: InMemoryEventsConfig)(implicit system: ActorSystem) extends EventStore.Service {

logger.info("Starting default event store")

private val queue = CacheableQueue[IzanamiEvent](500, queueBufferSize = 500)
private val queue = CacheableQueue[IzanamiEvent](500, queueBufferSize = config.backpressureBufferSize)
system.actorOf(EventStreamActor.props(queue))

override def publish(event: IzanamiEvent): IO[IzanamiErrors, Done] =
Expand All @@ -37,9 +39,11 @@ class BasicEventStore(implicit system: ActorSystem) extends EventStore.Service {
Done
}.orDie

override def events(domains: Seq[Domain],
patterns: Seq[String],
lastEventId: Option[Long]): Source[IzanamiEvent, NotUsed] =
override def events(
domains: Seq[Domain],
patterns: Seq[String],
lastEventId: Option[Long]
): Source[IzanamiEvent, NotUsed] =
lastEventId match {
case Some(_) =>
queue.sourceWithCache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,8 @@ object DistributedPubSubEventStore {
ZLogger.info(s"Starting akka cluster with config ${globalConfig.getConfig("cluster")}") *> Task(
ActorSystem(actorSystemName, globalConfig.getConfig("cluster"))
)
)(
s => Task.fromFuture(_ => s.terminate()).ignore
)
.map { implicit actorSystem =>
new DistributedPubSubEventStore(config)
}
)(s => Task.fromFuture(_ => s.terminate()).ignore)
.map(implicit actorSystem => new DistributedPubSubEventStore(config))
.provide(mix)
}
}
Expand All @@ -51,7 +47,7 @@ class DistributedPubSubEventStore(config: DistributedEventsConfig)(implicit s: A

logger.info(s"Creating distributed event store")

private val queue = CacheableQueue[IzanamiEvent](500, queueBufferSize = 500)
private val queue = CacheableQueue[IzanamiEvent](500, queueBufferSize = config.backpressureBufferSize)

private val actor =
s.actorOf(DistributedEventsPublisherActor.props(queue, config))
Expand All @@ -62,9 +58,11 @@ class DistributedPubSubEventStore(config: DistributedEventsConfig)(implicit s: A
IO.succeed(Done)
}

override def events(domains: Seq[Domain],
patterns: Seq[String],
lastEventId: Option[Long]): Source[IzanamiEvent, NotUsed] =
override def events(
domains: Seq[Domain],
patterns: Seq[String],
lastEventId: Option[Long]
): Source[IzanamiEvent, NotUsed] =
lastEventId match {
case Some(_) =>
queue.sourceWithCache
Expand Down Expand Up @@ -119,9 +117,10 @@ object DistributedEventsPublisherActor {
Props(new DistributedEventsPublisherActor(queue, config))
}

private[events] class DistributedEventsPublisherActor(queue: CacheableQueue[IzanamiEvent],
config: DistributedEventsConfig)
extends Actor {
private[events] class DistributedEventsPublisherActor(
queue: CacheableQueue[IzanamiEvent],
config: DistributedEventsConfig
) extends Actor {

import context.dispatcher

Expand Down
33 changes: 19 additions & 14 deletions izanami-server/app/domains/events/impl/RedisEventStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,24 @@ object RedisEventStore {
create(redisDriver, config, playModule.system)
}

def create(client: RedisWrapper,
config: RedisEventsConfig,
system: ActorSystem): Managed[Throwable, RedisEventStore] =
def create(
client: RedisWrapper,
config: RedisEventsConfig,
system: ActorSystem
): Managed[Throwable, RedisEventStore] =
(client.managedConnection <*>
client.connectPubSub).map {
case (connection, connectionPubSub) =>
new RedisEventStore(connection, connectionPubSub, config, system)
}
}

class RedisEventStore(connection: StatefulRedisConnection[String, String],
connectionPubSub: StatefulRedisPubSubConnection[String, String],
config: RedisEventsConfig,
system: ActorSystem)
extends EventStore.Service {
class RedisEventStore(
connection: StatefulRedisConnection[String, String],
connectionPubSub: StatefulRedisPubSubConnection[String, String],
config: RedisEventsConfig,
system: ActorSystem
) extends EventStore.Service {

import EventLogger._
import system.dispatcher
Expand All @@ -52,7 +55,7 @@ class RedisEventStore(connection: StatefulRedisConnection[String, String],

logger.info(s"Starting redis event store")

private val queue = CacheableQueue[IzanamiEvent](500, queueBufferSize = 500)
private val queue = CacheableQueue[IzanamiEvent](500, queueBufferSize = config.backpressureBufferSize)

connectionPubSub.addListener(new RedisPubSubListener[String, String] {
private def publishMessage(message: String) = {
Expand Down Expand Up @@ -106,9 +109,11 @@ class RedisEventStore(connection: StatefulRedisConnection[String, String],
.orDie
}

override def events(domains: Seq[Domain],
patterns: Seq[String],
lastEventId: Option[Long]): Source[IzanamiEvent, NotUsed] =
override def events(
domains: Seq[Domain],
patterns: Seq[String],
lastEventId: Option[Long]
): Source[IzanamiEvent, NotUsed] =
lastEventId match {
case Some(_) =>
queue.sourceWithCache
Expand All @@ -123,12 +128,12 @@ class RedisEventStore(connection: StatefulRedisConnection[String, String],
connection
.async()
.get("test")
.whenComplete((_, e) => {
.whenComplete { (_, e) =>
if (e != null) {
cb(IO.fail(e))
} else {
cb(IO.succeed(()))
}
})
}
}
}
166 changes: 91 additions & 75 deletions izanami-server/app/env/configuration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ object IzanamiConfig {
}

implicit val keyTypeHint: ConfigConvert[KeyType] =
viaString[KeyType](catchReadError { KeyType.parse }, _.getValue)
viaString[KeyType](catchReadError(KeyType.parse), _.getValue)

implicit val dbTypeHint: ConfigConvert[DbType] =
viaString[DbType](catchReadError { DbType.fromString }, _.toString)
viaString[DbType](catchReadError(DbType.fromString), _.toString)

implicit val inetAddressCC: ConfigConvert[InetAddress] =
viaString[InetAddress](catchReadError(InetAddress.getByName), _.getHostAddress)
Expand Down Expand Up @@ -185,17 +185,21 @@ case class MetricsElasticConfig(enabled: Boolean, index: String, pushInterval: F

case class LogoutConfig(url: String)
case class ApiKeyHeaders(headerClientId: String, headerClientSecret: String)
case class OtoroshiFilterConfig(sharedKey: String,
issuer: String,
headerClaim: String,
headerRequestId: String,
headerGatewayState: String,
headerGatewayStateResp: String)
case class DefaultFilter(allowedPaths: Seq[String],
issuer: String,
sharedKey: String,
cookieClaim: String,
apiKeys: ApiKeyHeaders)
case class OtoroshiFilterConfig(
sharedKey: String,
issuer: String,
headerClaim: String,
headerRequestId: String,
headerGatewayState: String,
headerGatewayStateResp: String
)
case class DefaultFilter(
allowedPaths: Seq[String],
issuer: String,
sharedKey: String,
cookieClaim: String,
apiKeys: ApiKeyHeaders
)
sealed trait IzanamiFilter
case class Otoroshi(otoroshi: OtoroshiFilterConfig) extends IzanamiFilter
case class Default(default: DefaultFilter) extends IzanamiFilter
Expand All @@ -209,38 +213,42 @@ case class RSA(enabled: Boolean, size: Int, publicKey: String, privateKey: Optio
case class JWKS(enabled: Boolean, url: String, headers: Option[Map[String, String]], timeout: Option[FiniteDuration])
extends AlgoSettingsConfig

case class Oauth2Config(enabled: Boolean,
authorizeUrl: String,
tokenUrl: String,
userInfoUrl: String,
introspectionUrl: String,
loginUrl: String,
logoutUrl: String,
clientId: String,
clientSecret: Option[String],
mtls: Option[MtlsConfig],
scope: Option[String] = None,
claims: String = "email name",
accessTokenField: String = "access_token",
jwtVerifier: Option[AlgoSettingsConfig],
readProfileFromToken: Boolean = false,
useCookie: Boolean = true,
useJson: Boolean = true,
idField: String,
nameField: String,
emailField: String,
adminField: String,
authorizedPatternField: String,
defaultPatterns: String,
izanamiManagedUser: Boolean,
admins: Option[Seq[String]] = None)
case class Oauth2Config(
enabled: Boolean,
authorizeUrl: String,
tokenUrl: String,
userInfoUrl: String,
introspectionUrl: String,
loginUrl: String,
logoutUrl: String,
clientId: String,
clientSecret: Option[String],
mtls: Option[MtlsConfig],
scope: Option[String] = None,
claims: String = "email name",
accessTokenField: String = "access_token",
jwtVerifier: Option[AlgoSettingsConfig],
readProfileFromToken: Boolean = false,
useCookie: Boolean = true,
useJson: Boolean = true,
idField: String,
nameField: String,
emailField: String,
adminField: String,
authorizedPatternField: String,
defaultPatterns: String,
izanamiManagedUser: Boolean,
admins: Option[Seq[String]] = None
)
case class MtlsConfig(enabled: Boolean, config: Option[CertificateConfig])
case class CertificateConfig(truststorePath: Option[String],
truststorePassword: Option[String],
truststoreType: String,
keystorePath: Option[String],
keystorePassword: Option[String],
keystoreType: String)
case class CertificateConfig(
truststorePath: Option[String],
truststorePassword: Option[String],
truststoreType: String,
keystorePath: Option[String],
keystorePassword: Option[String],
keystoreType: String
)
case class ConfigConfig(db: DbDomainConfig)
case class FeaturesConfig(db: DbDomainConfig)
case class GlobalScriptConfig(db: DbDomainConfig)
Expand All @@ -267,9 +275,9 @@ case class DistributedEvents(distributed: DistributedEventsConfig) extends Event
case class RedisEvents(redis: RedisEventsConfig) extends EventsConfig
case class KafkaEvents(kafka: KafkaEventsConfig) extends EventsConfig

case class InMemoryEventsConfig()
case class DistributedEventsConfig(topic: String)
case class RedisEventsConfig(topic: String)
case class InMemoryEventsConfig(backpressureBufferSize: Int)
case class DistributedEventsConfig(topic: String, backpressureBufferSize: Int)
case class RedisEventsConfig(topic: String, backpressureBufferSize: Int)
case class KafkaEventsConfig(topic: String)

case class DbConfig(
Expand Down Expand Up @@ -310,42 +318,50 @@ case class RedisOneSentinelConfig(host: String, port: Int)

case class LevelDbConfig(parentPath: String)

case class CassandraConfig(addresses: Seq[String],
clusterName: Option[String],
replicationFactor: Int,
keyspace: String,
username: Option[String] = None,
password: Option[String] = None)

case class DynamoConfig(tableName: String,
eventsTableName: String,
region: String,
host: String,
port: Int,
tls: Boolean = true,
parallelism: Int = 32,
accessKey: Option[String] = None,
secretKey: Option[String] = None)
case class CassandraConfig(
addresses: Seq[String],
clusterName: Option[String],
replicationFactor: Int,
keyspace: String,
username: Option[String] = None,
password: Option[String] = None
)

case class DynamoConfig(
tableName: String,
eventsTableName: String,
region: String,
host: String,
port: Int,
tls: Boolean = true,
parallelism: Int = 32,
accessKey: Option[String] = None,
secretKey: Option[String] = None
)

case class KafkaConfig(servers: String, keyPass: Option[String], keystore: Location, truststore: Location)

case class Location(location: Option[String])

case class ElasticConfig(host: String,
port: Int,
scheme: String,
user: Option[String],
password: Option[String],
automaticRefresh: Boolean = false)
case class ElasticConfig(
host: String,
port: Int,
scheme: String,
user: Option[String],
password: Option[String],
automaticRefresh: Boolean = false
)

case class MongoConfig(url: String, database: Option[String], name: Option[String])

case class PostgresqlConfig(driver: String,
url: String,
username: String,
password: String,
connectionPoolSize: Int,
tmpfolder: Option[String])
case class PostgresqlConfig(
driver: String,
url: String,
username: String,
password: String,
connectionPoolSize: Int,
tmpfolder: Option[String]
)

case class DbDomainConfig(`type`: DbType, conf: DbDomainConfigDetails, `import`: Option[Path])
case class InitialUserConfig(userId: String, password: String)
Expand Down
Loading

0 comments on commit 2b9fdc8

Please sign in to comment.