Skip to content

Commit

Permalink
feat(cluster): ShardStatus V2 endpoint - optimizing response size for…
Browse files Browse the repository at this point in the history
… clusteringV2 ShardAssignmentStrategy (#1926)

* feat(cluster): ShardStatus V2 endpoint - optimizing response size for clusteringV2 ShardAssignmentStrategy
  • Loading branch information
sandeep6189 authored Jan 7, 2025
1 parent 990dbf0 commit 7c49227
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,20 @@ object NodeClusterActor {
final case class DatasetUnknown(ref: DatasetRef) extends ErrorResponse
final case class BadSchema(message: String) extends ErrorResponse
final case class BadData(message: String) extends ErrorResponse
final case class InternalServiceError(message: String) extends ErrorResponse

// Cluste state info commands
// Cluster state info commands
// Returns a Seq[DatasetRef]
case object ListRegisteredDatasets
// Returns CurrentShardSnapshot or DatasetUnknown
final case class GetShardMap(ref: DatasetRef)

/**
* @param ref compressed ShardMap information for sending over the wire.
* IMPORTANT: Only works with ClusteringV2 shard assignment strategy.
*/
final case class GetShardMapV2(ref: DatasetRef)

/** Registers sending actor to receive `ShardMapUpdate` whenever it changes. DeathWatch
* will be used on the sending actors to watch for updates. On subscribe, will
* immediately send back the current state via a `ShardMapUpdate` message.
Expand Down
24 changes: 24 additions & 0 deletions coordinator/src/main/scala/filodb.coordinator/ShardStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,30 @@ sealed trait ShardAction extends Serializable
final case class CurrentShardSnapshot(ref: DatasetRef,
map: ShardMapper) extends ShardAction with Response

/**
* Optimized form of the ShardMapper state representation.
* NOTE: It doesn't track the shard status updates from coordinator or Ingestion actors. It is just
* a wrapper which compresses the response of ShardMapper state to reduce network transmission costs.
*
* @param nodeCountInCluster Number of replicas in the filodb cluster
* @param numShards Number of shards in the filodb cluster
* @param k8sHostFormat K8s host format. Valid ONLY for ClusterV2 shard assignment strategy
* @param shardState ByteArray. Each bit of the byte represents the shard status.
* For example: lets say we have 4 shards with following status:
* Seq[ShardStatusAssigned, ShardStatusRecovery, ShardStatusAssigned, ShardStatusAssigned]
* Then the shardState would be an array of single byte whose bit representation is - 1000 0000
* Explanation - corresponding bit is set to 1 if the shard is assigned, else 0
*/
final case class ShardMapperV2(nodeCountInCluster: Int, numShards: Int, k8sHostFormat: String,
shardState: Array[Byte])

/**
* Response to GetShardMapV2 request. Uses the optimized ShardMapperV2 representation. Only applicable
* for ClusterV2 shard assignment strategy.
* @param map ShardMapperV2
*/
final case class ShardSnapshot(map: ShardMapperV2) extends ShardAction with Response

/**
* Full state of all shards, sent to all ingestion actors. They react by starting/stopping
* ingestion for the shards they own or no longer own. The version is expected to be global
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,20 @@ trait ClusterOps extends ClientBase with StrictLogging {
}
}
}

/**
* ShardMapperV2 is an optimization of the response size over the ShardMapper and GetShardMap ask call
* @return Some(ShardMapperV2) if the dataset is registered, None if dataset not found
*/
def getShardMapperV2(dataset: DatasetRef, v2Enabled: Boolean,
timeout: FiniteDuration = 30.seconds): Option[ShardMapperV2] = {
require(v2Enabled, s"ClusterV2 ShardAssignment is must for this operation")
val actor = Some(nodeCoordinator)
actor.flatMap { ref =>
Client.actorAsk(ref, GetShardMapV2(dataset), timeout) {
case ShardSnapshot(shardMapperV2) => Some(shardMapperV2)
case _ => None
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import filodb.query.QueryCommand
final case class GetShardMapScatter(ref: DatasetRef)
case object LocalShardsHealthRequest
case class DatasetShardHealth(dataset: DatasetRef, shard: Int, status: ShardStatus)
case class LocalShardsHealthResponse(shardStatus: Seq[DatasetShardHealth])

object NewNodeCoordinatorActor {

Expand All @@ -30,6 +29,36 @@ object NewNodeCoordinatorActor {
clusterDiscovery: FiloDbClusterDiscovery,
settings: FilodbSettings): Props =
Props(new NewNodeCoordinatorActor(memStore, clusterDiscovery, settings))


/**
* Converts a ShardMapper.statuses to a bitmap representation where the bit is set to:
* - 1, if ShardStatus == ShardStatusActive
* - 0, any other ShardStatus like ShardStatusAssigned, ShardStatusRecovery etc.
* WHY this is the case ? This is because, the QueryActor is can only execute the query on the active shards.
*
* NOTE: bitmap is byte aligned. So extra bits are padded with 0.
*
* EXAMPLE - Following are some example of shards with statuses and their bit representation as below:
* Status | BitMap Representation | Hex Representation
* ---------------------------------------------------------------------------------------------------------------
* Assigned, Active, Recovery, Error | 0100 0000 | 0x40
* Active, Active, Active, Active | 1111 0000 | 0xF0
* Error, Active, Active, Error, Active, Active | 0110 1100 | 0x6C
*
* @param shardMapper ShardMapper object which stores the bitmap representation
* @return A byte array where each byte represents 8 shards and the bit is set to 1 if the shard is active. Extra bits
* are padded with 0.
*/
def shardMapperBitMapRepresentation(shardMapper: ShardMapper) : Array[Byte] = {
val byteArray = new Array[Byte]((shardMapper.statuses.length + 7) / 8)
for (i <- shardMapper.statuses.indices) {
if (shardMapper.statuses(i) == ShardStatusActive) {
byteArray(i / 8) = (byteArray(i / 8) | (1 << (7 - (i % 8)))).toByte
}
}
byteArray
}
}

private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
Expand Down Expand Up @@ -172,6 +201,7 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
withQueryActor(originator, dataset) { _.tell(QueryActor.ThrowException(dataset), originator) }
}

// scalastyle:off method.length
def shardManagementHandlers: Receive = LoggingReceive {
// sent by ingestion actors when shard status changes
case ev: ShardEvent => try {
Expand All @@ -186,6 +216,32 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
sender() ! CurrentShardSnapshot(g.ref, clusterDiscovery.shardMapper(g.ref))
} catch { case e: Exception =>
logger.error(s"[ClusterV2] Error occurred when processing message $g", e)
// send a response to avoid blocking of akka caller for long time
sender() ! InternalServiceError(s"Exception while executing GetShardMap for dataset: ${g.ref.dataset}")
}
/*
* requested from HTTP API
* What is the trade-off between GetShardMap vs GetShardMapV2 ?
*
* No | Ask Call | Size of Response (256 Shards) | Compute Used
* -------------------------------------------------------------------------------------------------------------
* 1 | GetShardMap | ~37KB | Baseline - Uses ShardMapper for shard update tracking
* 2 | GetShardMapV2 | 172 Bytes with padding | Additional CPU used to convert ShardMapper to BitMap
* | Will save CPU at the caller by avoiding string parsing
* */
case g: GetShardMapV2 =>
try {
val shardBitMap = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(clusterDiscovery.shardMapper(g.ref))
val shardMapperV2 = ShardMapperV2(
settings.minNumNodes.get,
ingestionConfigs(g.ref).numShards,
settings.k8sHostFormat.get,
shardBitMap)
sender() ! ShardSnapshot(shardMapperV2)
} catch { case e: Exception =>
logger.error(s"[ClusterV2] Error occurred when processing message $g", e)
// send a response to avoid blocking of akka caller for long time
sender() ! InternalServiceError(s"Exception while executing GetShardMapV2 for dataset: ${g.ref.dataset}")
}

// requested from peer NewNodeCoordActors upon them receiving GetShardMap call
Expand Down Expand Up @@ -214,8 +270,8 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
} catch { case e: Exception =>
logger.error(s"[ClusterV2] Error occurred when processing message LocalShardsHealthRequest", e)
}

}
// scalastyle:on method.length

def initHandler: Receive = {
case InitNewNodeCoordinatorActor => initialize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package filodb.coordinator

import akka.actor.ActorRef
import akka.testkit._

import filodb.coordinator.v2.NewNodeCoordinatorActor
import filodb.core._

object ShardMapperSpec extends ActorSpecConfig
Expand Down Expand Up @@ -303,4 +303,100 @@ class ShardMapperSpec extends ActorTest(ShardMapperSpec.getNewSystem) {
Seq(0, 3).forall(s => map.coordForShard(s) == newCoord) shouldEqual true
assert(coord = newCoord, shards = Seq(0, 3), numAssignedShards = 4, unassignedShards = 28)
}

it ("test bitmap conversion of shard mapper") {
val numShards = 32
val shardMapper = new ShardMapper(numShards) // default init to ShardStatusUnassigned
var bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper)
bitRep.length shouldEqual 4
bitRep.forall (x => x == 0x00.toByte) shouldEqual true // no bit should be set at this point

// move everyone to assigned
shardMapper.registerNode(shardMapper.statuses.indices, TestProbe().ref)
shardMapper.assignedShards.length shouldEqual 32

// status updated to assigned but bitmap representation should NOT yet be set to 1
bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper)
bitRep.length shouldEqual 4
bitRep.forall (x => x == 0x00.toByte) shouldEqual true // no bit should be set at this point

// move first 8 and last 8 shards to active
for (i <- 0 to 7) {
shardMapper.updateFromEvent(IngestionStarted(dataset, i, TestProbe().ref))
}
for (i <- 24 to 31) {
shardMapper.updateFromEvent(IngestionStarted(dataset, i, TestProbe().ref))
}
shardMapper.activeShards().size shouldEqual 16
bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper)
// 1111 1111 0000 0000 0000 0000 1111 1111
bitRep(0) shouldEqual 0xFF.toByte
bitRep(1) shouldEqual 0x00.toByte
bitRep(2) shouldEqual 0x00.toByte
bitRep(3) shouldEqual 0xFF.toByte
}

it ("test bitmap conversion of shard mapper with 256 shards") {
val shardMapper = new ShardMapper(256) // default init to ShardStatusUnassigned
var bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper)
bitRep.length shouldEqual 32
bitRep.forall (x => x == 0x00.toByte) shouldEqual true // no bit should be set at this point
shardMapper.registerNode(shardMapper.statuses.indices, TestProbe().ref)
shardMapper.assignedShards.length shouldEqual 256

// make all shards active
for (i <- 0 to 255) {
shardMapper.updateFromEvent(IngestionStarted(dataset, i, TestProbe().ref))
}
// check if all the bits are set correctly
bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper)
bitRep.forall (x => x == 0xFF.toByte) shouldEqual true

// make some shards in recovery mode
for (i <- 60 to 63) {
shardMapper.updateFromEvent(RecoveryInProgress(dataset, i, TestProbe().ref, 50))
}
// make some shards in down mode
for (i <- 64 to 67) {
shardMapper.updateFromEvent(ShardDown(dataset, i, TestProbe().ref))
}

shardMapper.activeShards().size shouldEqual 248
shardMapper.notActiveShards().size shouldEqual 8
bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper)

// first 60 shards are active
for (i <- 0 to 6) {
bitRep(i) shouldEqual 0xFF.toByte
}

// shards 56-63 should be 1111 0000
bitRep(7) shouldEqual 0xF0.toByte

// shards 64-71 should be 0000 1111
bitRep(8) shouldEqual 0x0F.toByte

// last 188 shards are active
for (i <- 9 to 31) {
bitRep(i) shouldEqual 0xFF.toByte
}
}

it ("test padding is set correctly in non 8 byte aligned number of shards") {
val shardMapper = new ShardMapper(2) // default init to ShardStatusUnassigned
var bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper)
bitRep.length shouldEqual 1
bitRep.forall (x => x == 0x00.toByte) shouldEqual true // no bit should be set at this point
shardMapper.registerNode(shardMapper.statuses.indices, TestProbe().ref)
shardMapper.assignedShards.length shouldEqual 2
for (i <- 0 to 1) {
shardMapper.updateFromEvent(IngestionStarted(dataset, i, TestProbe().ref))
}
bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper)
bitRep(0) shouldEqual 0xC0.toByte // 1100 0000 - padding for last 6 shards
shardMapper.updateFromEvent(ShardDown(dataset, 1, TestProbe().ref))

bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper)
bitRep(0) shouldEqual 0x80.toByte // 1000 0000
}
}
17 changes: 16 additions & 1 deletion http/src/main/scala/filodb/http/ClusterApiRoute.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import akka.http.scaladsl.server.Directives._
import com.typesafe.scalalogging.StrictLogging
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport

import filodb.coordinator.{CurrentShardSnapshot, NodeClusterActor}
import filodb.coordinator.{CurrentShardSnapshot, NodeClusterActor, ShardSnapshot}
import filodb.core.{DatasetRef, ErrorResponse, Success => SuccessResponse}
import filodb.core.store.{AssignShardConfig, UnassignShardConfig}
import filodb.http.apiv1.{HttpSchema, HttpShardDetails, HttpShardState, HttpShardStateByAddress}
Expand All @@ -31,9 +31,24 @@ class ClusterApiRoute(clusterProxy: ActorRef) extends FiloRoute with StrictLoggi
complete(httpList(statusList))
case DatasetUnknown(_) =>
complete(Codes.NotFound -> httpErr("DatasetUnknown", s"Dataset $dataset is not registered"))
case InternalServiceError(errorMessage) =>
complete(Codes.InternalServerError -> httpErr("InternalServerError", errorMessage))
}
}
} ~
// NOTE: statusV2 will only work with ClusteringV2 ShardAssignment strategy
path(Segment / "statusV2") { dataset =>
get {
onSuccess(asyncAsk(clusterProxy, GetShardMapV2(DatasetRef.fromDotString(dataset)))) {
case ShardSnapshot(shardMapperV2) =>
complete(httpList(Seq(shardMapperV2)))
case DatasetUnknown(_) =>
complete(Codes.NotFound -> httpErr("DatasetUnknown", s"Dataset $dataset is not registered"))
case InternalServiceError(errorMessage) =>
complete(Codes.InternalServerError -> httpErr("InternalServerError", errorMessage))
}
}
} ~
// GET /api/v1/cluster/<dataset>/statusByAddress - shard health status grouped by node address
// Sample output as follows:
// {{{
Expand Down

0 comments on commit 7c49227

Please sign in to comment.