From a91499b54b3ab7fa2cafb15588aae3ca31701788 Mon Sep 17 00:00:00 2001 From: Adam Nichols Date: Thu, 16 Nov 2023 18:07:32 -0500 Subject: [PATCH] WX-1333 Improve logging visibility for load management (#7253) --- .../main/scala/cromwell/engine/io/IoActor.scala | 1 + .../services/ServiceRegistryActor.scala | 14 +++++++++++++- .../common/api/PipelinesApiRequestManager.scala | 17 +++++++++++++---- 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/engine/src/main/scala/cromwell/engine/io/IoActor.scala b/engine/src/main/scala/cromwell/engine/io/IoActor.scala index b4b3a0b191f..80a362a7b78 100644 --- a/engine/src/main/scala/cromwell/engine/io/IoActor.scala +++ b/engine/src/main/scala/cromwell/engine/io/IoActor.scala @@ -136,6 +136,7 @@ final class IoActor(ioConfig: IoConfig, override def onBackpressure(scale: Option[Double] = None): Unit = { incrementBackpressure() + log.warning("IoActor notifying HighLoad") serviceRegistryActor ! LoadMetric("IO", HighLoad) val uncappedDelay = scale.getOrElse(1.0d) * LoadConfig.IoNormalWindowMinimum diff --git a/services/src/main/scala/cromwell/services/ServiceRegistryActor.scala b/services/src/main/scala/cromwell/services/ServiceRegistryActor.scala index 0c613f307ba..2e43e651b2b 100644 --- a/services/src/main/scala/cromwell/services/ServiceRegistryActor.scala +++ b/services/src/main/scala/cromwell/services/ServiceRegistryActor.scala @@ -6,6 +6,7 @@ import akka.routing.Listen import cats.data.NonEmptyList import com.typesafe.config.{Config, ConfigFactory, ConfigObject} import cromwell.core.Dispatcher.ServiceDispatcher +import cromwell.services.loadcontroller.LoadControllerService.LoadMetric import cromwell.util.GracefulShutdownHelper import cromwell.util.GracefulShutdownHelper.ShutdownCommand import net.ceedubs.ficus.Ficus._ @@ -82,7 +83,9 @@ class ServiceRegistryActor(globalConfig: Config) extends Actor with ActorLogging def receive = { case msg: ServiceRegistryMessage => services.get(msg.serviceName) match { - case Some(ref) => ref.tell(transform(msg, sender()), sender()) + case Some(ref) => + debugLogLoadMessages(msg, sender()) + ref.tell(transform(msg, sender()), sender()) case None => log.error("Received ServiceRegistryMessage requesting service '{}' for which no service is configured. Message: {}", msg.serviceName, msg) sender() ! ServiceRegistryFailure(msg.serviceName) @@ -107,6 +110,15 @@ class ServiceRegistryActor(globalConfig: Config) extends Actor with ActorLogging sender() ! ServiceRegistryFailure("Message is not a ServiceRegistryMessage: " + fool) } + private def debugLogLoadMessages(msg: ServiceRegistryMessage, sender: ActorRef): Unit = { + msg match { + case msg: LoadMetric => + log.debug(s"Service Registry Actor receiving $msg message from $sender") + case _ => + () + } + } + /** * Set the supervision strategy such that any of the individual service actors fails to initialize that we'll pass * the error up the chain diff --git a/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/api/PipelinesApiRequestManager.scala b/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/api/PipelinesApiRequestManager.scala index 02950e8bec8..81e3c2d9fda 100644 --- a/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/api/PipelinesApiRequestManager.scala +++ b/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/api/PipelinesApiRequestManager.scala @@ -2,7 +2,6 @@ package cromwell.backend.google.pipelines.common.api import java.io.IOException import java.util.UUID - import akka.actor.{Actor, ActorLogging, ActorRef, Props, SupervisorStrategy, Terminated, Timers} import akka.dispatch.ControlMessage import cats.data.NonEmptyList @@ -18,7 +17,7 @@ import cromwell.core.Dispatcher.BackendDispatcher import cromwell.core.retry.SimpleExponentialBackoff import cromwell.core.{CromwellFatalExceptionMarker, LoadConfig, Mailbox, WorkflowId} import cromwell.services.instrumentation.CromwellInstrumentationScheduler -import cromwell.services.loadcontroller.LoadControllerService.{HighLoad, LoadMetric, NormalLoad} +import cromwell.services.loadcontroller.LoadControllerService.{HighLoad, LoadLevel, LoadMetric, NormalLoad} import eu.timepit.refined.api.Refined import eu.timepit.refined.numeric._ @@ -89,6 +88,8 @@ class PipelinesApiRequestManager(val qps: Int Refined Positive, requestWorkers: protected[api] var statusPollers: Vector[ActorRef] = Vector.empty self ! ResetAllRequestWorkers + private var previousLoad: LoadLevel = NormalLoad + override def preStart() = { log.info("Running with {} PAPI request workers", requestWorkers.value) startInstrumentationTimer() @@ -96,8 +97,16 @@ class PipelinesApiRequestManager(val qps: Int Refined Positive, requestWorkers: } def monitorQueueSize() = { - val load = if (workQueue.size > LoadConfig.PAPIThreshold) HighLoad else NormalLoad - serviceRegistryActor ! LoadMetric("PAPIQueryManager", load) + val newLoad = if (workQueue.size > LoadConfig.PAPIThreshold) HighLoad else NormalLoad + + if (previousLoad == NormalLoad && newLoad == HighLoad) + log.warning(s"PAPI Request Manager transitioned to HighLoad with queue size ${workQueue.size} exceeding limit of ${LoadConfig.PAPIThreshold}") + else if (previousLoad == HighLoad && newLoad == NormalLoad) + log.info("PAPI Request Manager transitioned back to NormaLoad") + + previousLoad = newLoad + + serviceRegistryActor ! LoadMetric("PAPIQueryManager", newLoad) updateQueueSize(workQueue.size) }