Skip to content

Commit

Permalink
WX-1333 Improve logging visibility for load management (#7253)
Browse files Browse the repository at this point in the history
  • Loading branch information
aednichols authored and AlexITC committed Jan 14, 2024
1 parent 84e88a7 commit a91499b
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 5 deletions.
1 change: 1 addition & 0 deletions engine/src/main/scala/cromwell/engine/io/IoActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ final class IoActor(ioConfig: IoConfig,

override def onBackpressure(scale: Option[Double] = None): Unit = {
incrementBackpressure()
log.warning("IoActor notifying HighLoad")
serviceRegistryActor ! LoadMetric("IO", HighLoad)

val uncappedDelay = scale.getOrElse(1.0d) * LoadConfig.IoNormalWindowMinimum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import akka.routing.Listen
import cats.data.NonEmptyList
import com.typesafe.config.{Config, ConfigFactory, ConfigObject}
import cromwell.core.Dispatcher.ServiceDispatcher
import cromwell.services.loadcontroller.LoadControllerService.LoadMetric
import cromwell.util.GracefulShutdownHelper
import cromwell.util.GracefulShutdownHelper.ShutdownCommand
import net.ceedubs.ficus.Ficus._
Expand Down Expand Up @@ -82,7 +83,9 @@ class ServiceRegistryActor(globalConfig: Config) extends Actor with ActorLogging
def receive = {
case msg: ServiceRegistryMessage =>
services.get(msg.serviceName) match {
case Some(ref) => ref.tell(transform(msg, sender()), sender())
case Some(ref) =>
debugLogLoadMessages(msg, sender())
ref.tell(transform(msg, sender()), sender())
case None =>
log.error("Received ServiceRegistryMessage requesting service '{}' for which no service is configured. Message: {}", msg.serviceName, msg)
sender() ! ServiceRegistryFailure(msg.serviceName)
Expand All @@ -107,6 +110,15 @@ class ServiceRegistryActor(globalConfig: Config) extends Actor with ActorLogging
sender() ! ServiceRegistryFailure("Message is not a ServiceRegistryMessage: " + fool)
}

private def debugLogLoadMessages(msg: ServiceRegistryMessage, sender: ActorRef): Unit = {
msg match {
case msg: LoadMetric =>
log.debug(s"Service Registry Actor receiving $msg message from $sender")
case _ =>
()
}
}

/**
* Set the supervision strategy such that any of the individual service actors fails to initialize that we'll pass
* the error up the chain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cromwell.backend.google.pipelines.common.api

import java.io.IOException
import java.util.UUID

import akka.actor.{Actor, ActorLogging, ActorRef, Props, SupervisorStrategy, Terminated, Timers}
import akka.dispatch.ControlMessage
import cats.data.NonEmptyList
Expand All @@ -18,7 +17,7 @@ import cromwell.core.Dispatcher.BackendDispatcher
import cromwell.core.retry.SimpleExponentialBackoff
import cromwell.core.{CromwellFatalExceptionMarker, LoadConfig, Mailbox, WorkflowId}
import cromwell.services.instrumentation.CromwellInstrumentationScheduler
import cromwell.services.loadcontroller.LoadControllerService.{HighLoad, LoadMetric, NormalLoad}
import cromwell.services.loadcontroller.LoadControllerService.{HighLoad, LoadLevel, LoadMetric, NormalLoad}
import eu.timepit.refined.api.Refined
import eu.timepit.refined.numeric._

Expand Down Expand Up @@ -89,15 +88,25 @@ class PipelinesApiRequestManager(val qps: Int Refined Positive, requestWorkers:
protected[api] var statusPollers: Vector[ActorRef] = Vector.empty
self ! ResetAllRequestWorkers

private var previousLoad: LoadLevel = NormalLoad

override def preStart() = {
log.info("Running with {} PAPI request workers", requestWorkers.value)
startInstrumentationTimer()
super.preStart()
}

def monitorQueueSize() = {
val load = if (workQueue.size > LoadConfig.PAPIThreshold) HighLoad else NormalLoad
serviceRegistryActor ! LoadMetric("PAPIQueryManager", load)
val newLoad = if (workQueue.size > LoadConfig.PAPIThreshold) HighLoad else NormalLoad

if (previousLoad == NormalLoad && newLoad == HighLoad)
log.warning(s"PAPI Request Manager transitioned to HighLoad with queue size ${workQueue.size} exceeding limit of ${LoadConfig.PAPIThreshold}")
else if (previousLoad == HighLoad && newLoad == NormalLoad)
log.info("PAPI Request Manager transitioned back to NormaLoad")

previousLoad = newLoad

serviceRegistryActor ! LoadMetric("PAPIQueryManager", newLoad)
updateQueueSize(workQueue.size)
}

Expand Down

0 comments on commit a91499b

Please sign in to comment.