diff --git a/coordinator/src/main/scala/filodb.coordinator/NodeClusterActor.scala b/coordinator/src/main/scala/filodb.coordinator/NodeClusterActor.scala index b4df27a496..0aaf370a59 100644 --- a/coordinator/src/main/scala/filodb.coordinator/NodeClusterActor.scala +++ b/coordinator/src/main/scala/filodb.coordinator/NodeClusterActor.scala @@ -95,13 +95,20 @@ object NodeClusterActor { final case class DatasetUnknown(ref: DatasetRef) extends ErrorResponse final case class BadSchema(message: String) extends ErrorResponse final case class BadData(message: String) extends ErrorResponse + final case class InternalServiceError(message: String) extends ErrorResponse - // Cluste state info commands + // Cluster state info commands // Returns a Seq[DatasetRef] case object ListRegisteredDatasets // Returns CurrentShardSnapshot or DatasetUnknown final case class GetShardMap(ref: DatasetRef) + /** + * @param ref compressed ShardMap information for sending over the wire. + * IMPORTANT: Only works with ClusteringV2 shard assignment strategy. + */ + final case class GetShardMapV2(ref: DatasetRef) + /** Registers sending actor to receive `ShardMapUpdate` whenever it changes. DeathWatch * will be used on the sending actors to watch for updates. On subscribe, will * immediately send back the current state via a `ShardMapUpdate` message. diff --git a/coordinator/src/main/scala/filodb.coordinator/ShardStatus.scala b/coordinator/src/main/scala/filodb.coordinator/ShardStatus.scala index 77814a91be..eeb6e803de 100644 --- a/coordinator/src/main/scala/filodb.coordinator/ShardStatus.scala +++ b/coordinator/src/main/scala/filodb.coordinator/ShardStatus.scala @@ -11,6 +11,30 @@ sealed trait ShardAction extends Serializable final case class CurrentShardSnapshot(ref: DatasetRef, map: ShardMapper) extends ShardAction with Response +/** + * Optimized form of the ShardMapper state representation. + * NOTE: It doesn't track the shard status updates from coordinator or Ingestion actors. It is just + * a wrapper which compresses the response of ShardMapper state to reduce network transmission costs. + * + * @param nodeCountInCluster Number of replicas in the filodb cluster + * @param numShards Number of shards in the filodb cluster + * @param k8sHostFormat K8s host format. Valid ONLY for ClusterV2 shard assignment strategy + * @param shardState ByteArray. Each bit of the byte represents the shard status. + * For example: lets say we have 4 shards with following status: + * Seq[ShardStatusAssigned, ShardStatusRecovery, ShardStatusAssigned, ShardStatusAssigned] + * Then the shardState would be an array of single byte whose bit representation is - 1000 0000 + * Explanation - corresponding bit is set to 1 if the shard is assigned, else 0 + */ +final case class ShardMapperV2(nodeCountInCluster: Int, numShards: Int, k8sHostFormat: String, + shardState: Array[Byte]) + +/** + * Response to GetShardMapV2 request. Uses the optimized ShardMapperV2 representation. Only applicable + * for ClusterV2 shard assignment strategy. + * @param map ShardMapperV2 + */ +final case class ShardSnapshot(map: ShardMapperV2) extends ShardAction with Response + /** * Full state of all shards, sent to all ingestion actors. They react by starting/stopping * ingestion for the shards they own or no longer own. The version is expected to be global diff --git a/coordinator/src/main/scala/filodb.coordinator/client/ClusterOps.scala b/coordinator/src/main/scala/filodb.coordinator/client/ClusterOps.scala index e3780f8ba1..208b55ad6d 100644 --- a/coordinator/src/main/scala/filodb.coordinator/client/ClusterOps.scala +++ b/coordinator/src/main/scala/filodb.coordinator/client/ClusterOps.scala @@ -47,4 +47,20 @@ trait ClusterOps extends ClientBase with StrictLogging { } } } + + /** + * ShardMapperV2 is an optimization of the response size over the ShardMapper and GetShardMap ask call + * @return Some(ShardMapperV2) if the dataset is registered, None if dataset not found + */ + def getShardMapperV2(dataset: DatasetRef, v2Enabled: Boolean, + timeout: FiniteDuration = 30.seconds): Option[ShardMapperV2] = { + require(v2Enabled, s"ClusterV2 ShardAssignment is must for this operation") + val actor = Some(nodeCoordinator) + actor.flatMap { ref => + Client.actorAsk(ref, GetShardMapV2(dataset), timeout) { + case ShardSnapshot(shardMapperV2) => Some(shardMapperV2) + case _ => None + } + } + } } \ No newline at end of file diff --git a/coordinator/src/main/scala/filodb/coordinator/v2/NewNodeCoordinatorActor.scala b/coordinator/src/main/scala/filodb/coordinator/v2/NewNodeCoordinatorActor.scala index 6f93038b6e..120d5d045b 100644 --- a/coordinator/src/main/scala/filodb/coordinator/v2/NewNodeCoordinatorActor.scala +++ b/coordinator/src/main/scala/filodb/coordinator/v2/NewNodeCoordinatorActor.scala @@ -20,7 +20,6 @@ import filodb.query.QueryCommand final case class GetShardMapScatter(ref: DatasetRef) case object LocalShardsHealthRequest case class DatasetShardHealth(dataset: DatasetRef, shard: Int, status: ShardStatus) -case class LocalShardsHealthResponse(shardStatus: Seq[DatasetShardHealth]) object NewNodeCoordinatorActor { @@ -30,6 +29,36 @@ object NewNodeCoordinatorActor { clusterDiscovery: FiloDbClusterDiscovery, settings: FilodbSettings): Props = Props(new NewNodeCoordinatorActor(memStore, clusterDiscovery, settings)) + + + /** + * Converts a ShardMapper.statuses to a bitmap representation where the bit is set to: + * - 1, if ShardStatus == ShardStatusActive + * - 0, any other ShardStatus like ShardStatusAssigned, ShardStatusRecovery etc. + * WHY this is the case ? This is because, the QueryActor is can only execute the query on the active shards. + * + * NOTE: bitmap is byte aligned. So extra bits are padded with 0. + * + * EXAMPLE - Following are some example of shards with statuses and their bit representation as below: + * Status | BitMap Representation | Hex Representation + * --------------------------------------------------------------------------------------------------------------- + * Assigned, Active, Recovery, Error | 0100 0000 | 0x40 + * Active, Active, Active, Active | 1111 0000 | 0xF0 + * Error, Active, Active, Error, Active, Active | 0110 1100 | 0x6C + * + * @param shardMapper ShardMapper object which stores the bitmap representation + * @return A byte array where each byte represents 8 shards and the bit is set to 1 if the shard is active. Extra bits + * are padded with 0. + */ + def shardMapperBitMapRepresentation(shardMapper: ShardMapper) : Array[Byte] = { + val byteArray = new Array[Byte]((shardMapper.statuses.length + 7) / 8) + for (i <- shardMapper.statuses.indices) { + if (shardMapper.statuses(i) == ShardStatusActive) { + byteArray(i / 8) = (byteArray(i / 8) | (1 << (7 - (i % 8)))).toByte + } + } + byteArray + } } private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore, @@ -172,6 +201,7 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore, withQueryActor(originator, dataset) { _.tell(QueryActor.ThrowException(dataset), originator) } } + // scalastyle:off method.length def shardManagementHandlers: Receive = LoggingReceive { // sent by ingestion actors when shard status changes case ev: ShardEvent => try { @@ -186,6 +216,32 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore, sender() ! CurrentShardSnapshot(g.ref, clusterDiscovery.shardMapper(g.ref)) } catch { case e: Exception => logger.error(s"[ClusterV2] Error occurred when processing message $g", e) + // send a response to avoid blocking of akka caller for long time + sender() ! InternalServiceError(s"Exception while executing GetShardMap for dataset: ${g.ref.dataset}") + } + /* + * requested from HTTP API + * What is the trade-off between GetShardMap vs GetShardMapV2 ? + * + * No | Ask Call | Size of Response (256 Shards) | Compute Used + * ------------------------------------------------------------------------------------------------------------- + * 1 | GetShardMap | ~37KB | Baseline - Uses ShardMapper for shard update tracking + * 2 | GetShardMapV2 | 172 Bytes with padding | Additional CPU used to convert ShardMapper to BitMap + * | Will save CPU at the caller by avoiding string parsing + * */ + case g: GetShardMapV2 => + try { + val shardBitMap = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(clusterDiscovery.shardMapper(g.ref)) + val shardMapperV2 = ShardMapperV2( + settings.minNumNodes.get, + ingestionConfigs(g.ref).numShards, + settings.k8sHostFormat.get, + shardBitMap) + sender() ! ShardSnapshot(shardMapperV2) + } catch { case e: Exception => + logger.error(s"[ClusterV2] Error occurred when processing message $g", e) + // send a response to avoid blocking of akka caller for long time + sender() ! InternalServiceError(s"Exception while executing GetShardMapV2 for dataset: ${g.ref.dataset}") } // requested from peer NewNodeCoordActors upon them receiving GetShardMap call @@ -214,8 +270,8 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore, } catch { case e: Exception => logger.error(s"[ClusterV2] Error occurred when processing message LocalShardsHealthRequest", e) } - } + // scalastyle:on method.length def initHandler: Receive = { case InitNewNodeCoordinatorActor => initialize() diff --git a/coordinator/src/test/scala/filodb.coordinator/ShardMapperSpec.scala b/coordinator/src/test/scala/filodb.coordinator/ShardMapperSpec.scala index 874e209f31..cadbf9a97f 100644 --- a/coordinator/src/test/scala/filodb.coordinator/ShardMapperSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/ShardMapperSpec.scala @@ -2,7 +2,7 @@ package filodb.coordinator import akka.actor.ActorRef import akka.testkit._ - +import filodb.coordinator.v2.NewNodeCoordinatorActor import filodb.core._ object ShardMapperSpec extends ActorSpecConfig @@ -303,4 +303,100 @@ class ShardMapperSpec extends ActorTest(ShardMapperSpec.getNewSystem) { Seq(0, 3).forall(s => map.coordForShard(s) == newCoord) shouldEqual true assert(coord = newCoord, shards = Seq(0, 3), numAssignedShards = 4, unassignedShards = 28) } + + it ("test bitmap conversion of shard mapper") { + val numShards = 32 + val shardMapper = new ShardMapper(numShards) // default init to ShardStatusUnassigned + var bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper) + bitRep.length shouldEqual 4 + bitRep.forall (x => x == 0x00.toByte) shouldEqual true // no bit should be set at this point + + // move everyone to assigned + shardMapper.registerNode(shardMapper.statuses.indices, TestProbe().ref) + shardMapper.assignedShards.length shouldEqual 32 + + // status updated to assigned but bitmap representation should NOT yet be set to 1 + bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper) + bitRep.length shouldEqual 4 + bitRep.forall (x => x == 0x00.toByte) shouldEqual true // no bit should be set at this point + + // move first 8 and last 8 shards to active + for (i <- 0 to 7) { + shardMapper.updateFromEvent(IngestionStarted(dataset, i, TestProbe().ref)) + } + for (i <- 24 to 31) { + shardMapper.updateFromEvent(IngestionStarted(dataset, i, TestProbe().ref)) + } + shardMapper.activeShards().size shouldEqual 16 + bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper) + // 1111 1111 0000 0000 0000 0000 1111 1111 + bitRep(0) shouldEqual 0xFF.toByte + bitRep(1) shouldEqual 0x00.toByte + bitRep(2) shouldEqual 0x00.toByte + bitRep(3) shouldEqual 0xFF.toByte + } + + it ("test bitmap conversion of shard mapper with 256 shards") { + val shardMapper = new ShardMapper(256) // default init to ShardStatusUnassigned + var bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper) + bitRep.length shouldEqual 32 + bitRep.forall (x => x == 0x00.toByte) shouldEqual true // no bit should be set at this point + shardMapper.registerNode(shardMapper.statuses.indices, TestProbe().ref) + shardMapper.assignedShards.length shouldEqual 256 + + // make all shards active + for (i <- 0 to 255) { + shardMapper.updateFromEvent(IngestionStarted(dataset, i, TestProbe().ref)) + } + // check if all the bits are set correctly + bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper) + bitRep.forall (x => x == 0xFF.toByte) shouldEqual true + + // make some shards in recovery mode + for (i <- 60 to 63) { + shardMapper.updateFromEvent(RecoveryInProgress(dataset, i, TestProbe().ref, 50)) + } + // make some shards in down mode + for (i <- 64 to 67) { + shardMapper.updateFromEvent(ShardDown(dataset, i, TestProbe().ref)) + } + + shardMapper.activeShards().size shouldEqual 248 + shardMapper.notActiveShards().size shouldEqual 8 + bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper) + + // first 60 shards are active + for (i <- 0 to 6) { + bitRep(i) shouldEqual 0xFF.toByte + } + + // shards 56-63 should be 1111 0000 + bitRep(7) shouldEqual 0xF0.toByte + + // shards 64-71 should be 0000 1111 + bitRep(8) shouldEqual 0x0F.toByte + + // last 188 shards are active + for (i <- 9 to 31) { + bitRep(i) shouldEqual 0xFF.toByte + } + } + + it ("test padding is set correctly in non 8 byte aligned number of shards") { + val shardMapper = new ShardMapper(2) // default init to ShardStatusUnassigned + var bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper) + bitRep.length shouldEqual 1 + bitRep.forall (x => x == 0x00.toByte) shouldEqual true // no bit should be set at this point + shardMapper.registerNode(shardMapper.statuses.indices, TestProbe().ref) + shardMapper.assignedShards.length shouldEqual 2 + for (i <- 0 to 1) { + shardMapper.updateFromEvent(IngestionStarted(dataset, i, TestProbe().ref)) + } + bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper) + bitRep(0) shouldEqual 0xC0.toByte // 1100 0000 - padding for last 6 shards + shardMapper.updateFromEvent(ShardDown(dataset, 1, TestProbe().ref)) + + bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper) + bitRep(0) shouldEqual 0x80.toByte // 1000 0000 + } } diff --git a/http/src/main/scala/filodb/http/ClusterApiRoute.scala b/http/src/main/scala/filodb/http/ClusterApiRoute.scala index 2b23e33401..1dd4b457d1 100644 --- a/http/src/main/scala/filodb/http/ClusterApiRoute.scala +++ b/http/src/main/scala/filodb/http/ClusterApiRoute.scala @@ -6,7 +6,7 @@ import akka.http.scaladsl.server.Directives._ import com.typesafe.scalalogging.StrictLogging import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport -import filodb.coordinator.{CurrentShardSnapshot, NodeClusterActor} +import filodb.coordinator.{CurrentShardSnapshot, NodeClusterActor, ShardSnapshot} import filodb.core.{DatasetRef, ErrorResponse, Success => SuccessResponse} import filodb.core.store.{AssignShardConfig, UnassignShardConfig} import filodb.http.apiv1.{HttpSchema, HttpShardDetails, HttpShardState, HttpShardStateByAddress} @@ -31,9 +31,24 @@ class ClusterApiRoute(clusterProxy: ActorRef) extends FiloRoute with StrictLoggi complete(httpList(statusList)) case DatasetUnknown(_) => complete(Codes.NotFound -> httpErr("DatasetUnknown", s"Dataset $dataset is not registered")) + case InternalServiceError(errorMessage) => + complete(Codes.InternalServerError -> httpErr("InternalServerError", errorMessage)) } } } ~ + // NOTE: statusV2 will only work with ClusteringV2 ShardAssignment strategy + path(Segment / "statusV2") { dataset => + get { + onSuccess(asyncAsk(clusterProxy, GetShardMapV2(DatasetRef.fromDotString(dataset)))) { + case ShardSnapshot(shardMapperV2) => + complete(httpList(Seq(shardMapperV2))) + case DatasetUnknown(_) => + complete(Codes.NotFound -> httpErr("DatasetUnknown", s"Dataset $dataset is not registered")) + case InternalServiceError(errorMessage) => + complete(Codes.InternalServerError -> httpErr("InternalServerError", errorMessage)) + } + } + } ~ // GET /api/v1/cluster//statusByAddress - shard health status grouped by node address // Sample output as follows: // {{{