Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publicise the mbean traits #3813

Open
wants to merge 14 commits into
base: series/3.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,25 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform)
"cats.effect.unsafe.IORuntimeBuilder.this"),
// introduced by #3695, which enabled fiber dumps on native
ProblemFilters.exclude[MissingClassProblem](
"cats.effect.unsafe.FiberMonitorCompanionPlatform")
"cats.effect.unsafe.FiberMonitorCompanionPlatform"),
// introduced by #3813, which moved metrics to unsafe.metrics
ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvation"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvation$"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvationMBean"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvationMetrics"),
ProblemFilters.exclude[MissingClassProblem](
"cats.effect.metrics.JvmCpuStarvationMetrics"),
ProblemFilters.exclude[MissingClassProblem](
"cats.effect.metrics.JvmCpuStarvationMetrics$"),
ProblemFilters.exclude[MissingClassProblem](
"cats.effect.metrics.JvmCpuStarvationMetrics$NoOpCpuStarvationMetrics"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.JsCpuStarvationMetrics"),
ProblemFilters.exclude[MissingClassProblem](
"cats.effect.metrics.JsCpuStarvationMetrics$"),
ProblemFilters.exclude[MissingClassProblem](
"cats.effect.metrics.NativeCpuStarvationMetrics"),
ProblemFilters.exclude[MissingClassProblem](
"cats.effect.metrics.NativeCpuStarvationMetrics$")
) ++ {
if (tlIsScala3.value) {
// Scala 3 specific exclusions
Expand Down
3 changes: 2 additions & 1 deletion core/js/src/main/scala/cats/effect/IOApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package cats.effect

import cats.effect.metrics.{CpuStarvationWarningMetrics, JsCpuStarvationMetrics}
import cats.effect.metrics.CpuStarvationWarningMetrics
import cats.effect.std.Console
import cats.effect.tracing.TracingConstants._
import cats.effect.unsafe.metrics.JsCpuStarvationMetrics

import scala.concurrent.CancellationException
import scala.concurrent.duration._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package cats.effect.metrics
package cats.effect.unsafe.metrics

import cats.effect.IO

Expand Down
3 changes: 2 additions & 1 deletion core/jvm/src/main/scala/cats/effect/IOApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package cats.effect

import cats.effect.metrics.{CpuStarvationWarningMetrics, JvmCpuStarvationMetrics}
import cats.effect.metrics.CpuStarvationWarningMetrics
import cats.effect.std.Console
import cats.effect.tracing.TracingConstants._
import cats.effect.unsafe.metrics.JvmCpuStarvationMetrics
import cats.syntax.all._

import scala.concurrent.{blocking, CancellationException, ExecutionContext}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,10 @@ private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type
if (mBeanServer ne null) {
val registeredMBeans = mutable.Set.empty[ObjectName]

val hash = System.identityHashCode(threadPool).toHexString

try {
val cpsId = ComputePoolSampler.counter.getAndIncrement()
val computePoolSamplerName = new ObjectName(
s"cats.effect.unsafe.metrics:type=ComputePoolSampler-$hash")
s"cats.effect.unsafe.metrics:type=ComputePoolSampler-${cpsId}")
val computePoolSampler = new ComputePoolSampler(threadPool)
mBeanServer.registerMBean(computePoolSampler, computePoolSamplerName)
registeredMBeans += computePoolSamplerName
Expand All @@ -119,13 +118,14 @@ private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type
val localQueues = threadPool.localQueues
var i = 0
val len = localQueues.length
val lqsId = LocalQueueSampler.counter.getAndIncrement()

while (i < len) {
val localQueue = localQueues(i)

try {
val localQueueSamplerName = new ObjectName(
s"cats.effect.unsafe.metrics:type=LocalQueueSampler-$hash-$i")
s"cats.effect.unsafe.metrics:type=LocalQueueSampler-${lqsId}-$i")
val localQueueSampler = new LocalQueueSampler(localQueue)
mBeanServer.registerMBean(localQueueSampler, localQueueSamplerName)
registeredMBeans += localQueueSamplerName
Expand Down Expand Up @@ -262,11 +262,10 @@ private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type
}

if (mBeanServer ne null) {
val hash = System.identityHashCode(fiberMonitor).toHexString

try {
val mbeanId = LiveFiberSnapshotTrigger.counter.getAndIncrement()
val liveFiberSnapshotTriggerName = new ObjectName(
s"cats.effect.unsafe.metrics:type=LiveFiberSnapshotTrigger-$hash")
s"cats.effect.unsafe.metrics:type=LiveFiberSnapshotTrigger-${mbeanId}")
val liveFiberSnapshotTrigger = new LiveFiberSnapshotTrigger(fiberMonitor)
mBeanServer.registerMBean(liveFiberSnapshotTrigger, liveFiberSnapshotTriggerName)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package cats.effect.unsafe
package metrics

import java.util.concurrent.atomic.AtomicLong

/**
* An implementation of the [[ComputePoolSamplerMBean]] interface which simply delegates to the
* corresponding methods of the [[cats.effect.unsafe.WorkStealingThreadPool]] being monitored.
Expand All @@ -25,11 +27,15 @@ package metrics
* the monitored compute work stealing thread pool
*/
private[unsafe] final class ComputePoolSampler(compute: WorkStealingThreadPool[_])
extends ComputePoolSamplerMBean {
extends UnsealedComputePoolSamplerMBean {
def getWorkerThreadCount(): Int = compute.getWorkerThreadCount()
def getActiveThreadCount(): Int = compute.getActiveThreadCount()
def getSearchingThreadCount(): Int = compute.getSearchingThreadCount()
def getBlockedWorkerThreadCount(): Int = compute.getBlockedWorkerThreadCount()
def getLocalQueueFiberCount(): Long = compute.getLocalQueueFiberCount()
def getSuspendedFiberCount(): Long = compute.getSuspendedFiberCount()
}

private[unsafe] object ComputePoolSampler {
val counter: AtomicLong = new AtomicLong(0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
package cats.effect.unsafe.metrics

/**
* An MBean interface for monitoring a [[WorkStealingThreadPool]] backed compute thread pool.
* An MBean interface for monitoring a `WorkStealingThreadPool` backed compute thread pool.
*/
private[unsafe] trait ComputePoolSamplerMBean {
sealed trait ComputePoolSamplerMBean {

/**
* Returns the number of [[WorkerThread]] instances backing the [[WorkStealingThreadPool]].
* Returns the number of `WorkerThread` instances backing the `WorkStealingThreadPool`.
*
* @note
* This is a fixed value, as the [[WorkStealingThreadPool]] has a fixed number of worker
* This is a fixed value, as the `WorkStealingThreadPool` has a fixed number of worker
* threads.
*
* @return
Expand All @@ -34,7 +34,7 @@ private[unsafe] trait ComputePoolSamplerMBean {
def getWorkerThreadCount(): Int

/**
* Returns the number of active [[WorkerThread]] instances currently executing fibers on the
* Returns the number of active `WorkerThread` instances currently executing fibers on the
* compute thread pool.
*
* @return
Expand All @@ -43,16 +43,16 @@ private[unsafe] trait ComputePoolSamplerMBean {
def getActiveThreadCount(): Int

/**
* Returns the number of [[WorkerThread]] instances currently searching for fibers to steal
* from other worker threads.
* Returns the number of `WorkerThread` instances currently searching for fibers to steal from
* other worker threads.
*
* @return
* the number of worker threads searching for work
*/
def getSearchingThreadCount(): Int

/**
* Returns the number of [[WorkerThread]] instances which are currently blocked due to running
* Returns the number of `WorkerThread` instances which are currently blocked due to running
* blocking actions on the compute thread pool.
*
* @return
Expand All @@ -76,3 +76,5 @@ private[unsafe] trait ComputePoolSamplerMBean {
*/
def getSuspendedFiberCount(): Long
}

private[unsafe] trait UnsealedComputePoolSamplerMBean extends ComputePoolSamplerMBean
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@
* limitations under the License.
*/

package cats.effect.metrics
package cats.effect.unsafe.metrics

import cats.effect.IO

import scala.concurrent.duration.FiniteDuration

import java.util.concurrent.atomic.AtomicLong

private[metrics] class CpuStarvation private (
private[unsafe] class CpuStarvation private (
counter: AtomicLong,
currentClockDrift: AtomicLong,
maxClockDrift: AtomicLong)
extends CpuStarvationMBean {
extends UnsealedCpuStarvationMBean {

override def getCpuStarvationCount(): Long = counter.get()

Expand All @@ -48,8 +48,8 @@ private[metrics] class CpuStarvation private (

}

private[metrics] object CpuStarvation {
private[metrics] def apply(): IO[CpuStarvation] = for {
private[unsafe] object CpuStarvation {
private[unsafe] def apply(): IO[CpuStarvation] = for {
counter <- IO.delay(new AtomicLong(0))
currentClockDrift <- IO.delay(new AtomicLong(0))
maxClockDrift <- IO.delay(new AtomicLong(0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
* limitations under the License.
*/

package cats.effect.metrics
package cats.effect.unsafe.metrics

/**
* An MBean interfaces for monitoring when CPU starvation occurs.
*/
private[metrics] trait CpuStarvationMBean {
sealed trait CpuStarvationMBean {

/**
* Returns the number of times CPU starvation has occurred.
Expand All @@ -45,3 +45,5 @@ private[metrics] trait CpuStarvationMBean {
*/
def getCurrentClockDriftMs(): Long
}

private[unsafe] trait UnsealedCpuStarvationMBean extends CpuStarvationMBean
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package cats.effect.metrics
package cats.effect.unsafe.metrics

import cats.effect.{IO, Resource}
import cats.effect.std.Console
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package metrics

import scala.collection.mutable.ArrayBuffer

import java.util.concurrent.atomic.AtomicLong

/**
* An implementation of the [[LiveFiberSnapshotTriggerMBean]] interface which simply delegates
* to the corresponding method of the backing [[cats.effect.unsafe.FiberMonitor]].
Expand All @@ -27,10 +29,14 @@ import scala.collection.mutable.ArrayBuffer
* the backing fiber monitor
*/
private[unsafe] final class LiveFiberSnapshotTrigger(monitor: FiberMonitor)
extends LiveFiberSnapshotTriggerMBean {
extends UnsealedLiveFiberSnapshotTriggerMBean {
def liveFiberSnapshot(): Array[String] = {
val buffer = new ArrayBuffer[String]
monitor.liveFiberSnapshot(buffer += _)
buffer.toArray
}
}

private[unsafe] object LiveFiberSnapshotTrigger {
val counter: AtomicLong = new AtomicLong(0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package cats.effect.unsafe.metrics
/**
* An MBean interface for triggering live fiber snapshots.
*/
private[unsafe] trait LiveFiberSnapshotTriggerMBean {
sealed trait LiveFiberSnapshotTriggerMBean {

/**
* Obtains a snapshot of the fibers currently live on the [[IORuntime]] which this fiber
Expand All @@ -30,3 +30,6 @@ private[unsafe] trait LiveFiberSnapshotTriggerMBean {
*/
def liveFiberSnapshot(): Array[String]
}

private[unsafe] trait UnsealedLiveFiberSnapshotTriggerMBean
extends LiveFiberSnapshotTriggerMBean
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package cats.effect.unsafe
package metrics

import java.util.concurrent.atomic.AtomicLong

/**
* An implementation of the [[LocalQueueSamplerMBean]] interface which simply delegates to the
* corresponding methods of the [[cats.effect.unsafe.LocalQueue]] being monitored.
Expand All @@ -25,7 +27,7 @@ package metrics
* the monitored local queue
*/
private[unsafe] final class LocalQueueSampler(queue: LocalQueue)
extends LocalQueueSamplerMBean {
extends UnsealedLocalQueueSamplerMBean {
def getFiberCount(): Int = queue.getFiberCount()
def getHeadIndex(): Int = queue.getHeadIndex()
def getTailIndex(): Int = queue.getTailIndex()
Expand All @@ -37,3 +39,7 @@ private[unsafe] final class LocalQueueSampler(queue: LocalQueue)
def getStealHeadTag(): Int = queue.getStealHeadTag()
def getTailTag(): Int = queue.getTailTag()
}

private[unsafe] object LocalQueueSampler {
val counter: AtomicLong = new AtomicLong(0)
}
Loading
Loading