Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

common-ize phantom-extent checks #154

Merged
merged 2 commits into from
Apr 17, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,10 @@ const (

// InputHostForRemoteExtent is a special (and fake) input host ID for remote extent
InputHostForRemoteExtent = "88888888-8888-8888-8888-888888888888"

// KafkaPhantomExtentInputhost is placeholder/phantom inputhost uuid used for Kafka extents
KafkaPhantomExtentInputhost = "00000000-0000-0000-0000-000000000000"

// KafkaPhantomExtentStorehost is placeholder/phantom storehost uuid used for Kafka extents
KafkaPhantomExtentStorehost = "00000000-0000-0000-0000-000000000000"
)
14 changes: 14 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,3 +788,17 @@ func SetupSignalHandler(sig os.Signal, hostPort string, endpoint string, timeout
handleFunc(sig, hostPort, endpoint, timeout)
}()
}

// IsKafkaPhantomInput determines whether the given inputhost-uuid for
// an extent indicates that this is a Kafka 'phantom' extent.
func IsKafkaPhantomInput(inputUUID string) bool {

return inputUUID == KafkaPhantomExtentInputhost
}

// AreKafkaPhantomStores determines whether the given list of storehost-uuids
// for an extent indicates that this is a Kafka 'phantom' extent.
func AreKafkaPhantomStores(storeUUIDs []string) bool {

return len(storeUUIDs) == 1 && storeUUIDs[0] == KafkaPhantomExtentStorehost
}
6 changes: 3 additions & 3 deletions services/controllerhost/api_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func isUUIDLengthValid(uuid string) bool {
func isInputHealthy(context *Context, extent *m.DestinationExtent) bool {

// if this is a Kafka phantom extent, then assume "input" is healthy
if extent.GetInputHostUUID() == kafkaPhantomInputUUID {
if common.IsKafkaPhantomInput(extent.GetInputHostUUID()) {
return true
}

Expand Down Expand Up @@ -120,7 +120,7 @@ func isAnyStoreHealthy(context *Context, storeIDs []string) bool {

// special-case Kafka phantom extents that do not really have a physical
// store (in Cherami) and use a placeholder 'phantom' store instead.
if len(storeIDs) == 1 && storeIDs[0] == kafkaPhantomStoreUUID {
if common.AreKafkaPhantomStores(storeIDs) {
return true
}

Expand All @@ -138,7 +138,7 @@ func areExtentStoresHealthy(context *Context, extent *m.DestinationExtent) bool

// special-case Kafka phantom extents that do not really have a physical
// store (in Cherami) and use a placeholder 'phantom' store instead.
if len(storeIDs) == 1 && storeIDs[0] == kafkaPhantomStoreUUID {
if common.AreKafkaPhantomStores(storeIDs) {
return true
}

Expand Down
10 changes: 3 additions & 7 deletions services/controllerhost/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ var (
// We can keep serving stale entries for up to an hour,
// when we cannot refresh the cache (say, due to cassandra failure)
outputCacheTTL = 5 * time.Second

// Define phantom store/inputhost for extents belonging to Kafka destinations
kafkaPhantomStoreUUID = "00000000-0000-0000-0000-000000000000"
kafkaPhantomInputUUID = "00000000-0000-0000-0000-000000000000"
)

type cgExtentsByCategory struct {
Expand Down Expand Up @@ -141,7 +137,7 @@ func hostInfoMapToSlice(hosts map[string]*common.HostInfo) ([]string, []string)
func pickOutputHostForStoreHosts(context *Context, storeUUIDs []string) (*common.HostInfo, error) {

// special-case kafka phantom extents, that don't use Cherami stores
if len(storeUUIDs) == 1 && storeUUIDs[0] == kafkaPhantomStoreUUID {
if common.AreKafkaPhantomStores(storeUUIDs) {
return context.placement.PickOutputHost(nil)
}

Expand Down Expand Up @@ -459,8 +455,8 @@ func createDestExtent(context *Context, dstDesc *shared.DestinationDescription,
func createKafkaPhantomExtent(context *Context, dstUUID string, m3Scope int) (ext *m.DestinationExtent, err error) {

extentUUID := uuid.New()
inputhostUUID := kafkaPhantomInputUUID
storeUUIDs := []string{kafkaPhantomStoreUUID}
inputhostUUID := common.KafkaPhantomExtentInputhost
storeUUIDs := []string{common.KafkaPhantomExtentStorehost}

// create a 'phantom' extent and assign given inputhost/stores
if _, err = context.mm.CreateExtent(dstUUID, extentUUID, inputhostUUID, storeUUIDs); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion services/controllerhost/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (s *McpSuite) TestCGExtentSelectorHonorsCreatedTimeKafka() {

var nPhantom, nDlq int
for _, x := range gotExtents {
if x.GetStoreUUIDs()[0] == kafkaPhantomStoreUUID {
if common.AreKafkaPhantomStores(x.GetStoreUUIDs()) {
nPhantom++
} else {
s.Equal(extents[nDlq], x.GetExtentUUID(), "Extents not served in time order")
Expand Down
19 changes: 7 additions & 12 deletions services/controllerhost/controllerhost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,7 @@ func (s *McpSuite) TestGetOutputHostsMaxOpenExtentsLimit() {
if _, ok := extents[cgx.GetExtentUUID()]; ok {
dlqExtents++
} else {
s.Equal(1, len(cgx.GetStoreUUIDs()), "Expected one phantom store")
s.Equal(kafkaPhantomStoreUUID, cgx.GetStoreUUIDs()[0], "Expected phantom store")
s.True(common.AreKafkaPhantomStores(cgx.GetStoreUUIDs()), "expected phantom stores")
phantomExtents++
}
}
Expand Down Expand Up @@ -821,8 +820,7 @@ func (s *McpSuite) TestGetOutputHostsKafka() {
s.Nil(err, "Failed to find extent created by GetOutputHosts()")

// validate all returned extents are 'phantom' extents
s.Equal(1, len(cgx.GetStoreUUIDs()), "Expected one phantom store")
s.Equal(kafkaPhantomStoreUUID, cgx.GetStoreUUIDs()[0], "Expected phantom store")
s.True(common.AreKafkaPhantomStores(cgx.GetStoreUUIDs()), "expected phantom stores")
}

s.True(outputHosts.subset(cgOutputHosts), "invalid outputhost returned")
Expand Down Expand Up @@ -867,8 +865,7 @@ func (s *McpSuite) TestGetOutputHostsKafka() {
if dlqExtents.contains(extentUUID) {
nDlq++
} else {
s.Equal(1, len(cgx.GetStoreUUIDs()), "Expected one phantom store")
s.Equal(kafkaPhantomStoreUUID, cgx.GetStoreUUIDs()[0], "Expected phantom store")
s.True(common.AreKafkaPhantomStores(cgx.GetStoreUUIDs()), "expected phantom stores")
nPhantom++
}
}
Expand Down Expand Up @@ -918,8 +915,7 @@ func (s *McpSuite) TestGetOutputHostsKafka() {
if dlqExtents.contains(extentUUID) {
nDlq++
} else {
s.Equal(1, len(cgx.GetStoreUUIDs()), "Expected one phantom store")
s.Equal(kafkaPhantomStoreUUID, cgx.GetStoreUUIDs()[0], "Expected phantom store")
s.True(common.AreKafkaPhantomStores(cgx.GetStoreUUIDs()), "expected phantom stores")
nPhantom++
}
}
Expand Down Expand Up @@ -982,8 +978,7 @@ func (s *McpSuite) TestGetOutputHostsKafka() {
if dlqExtents.contains(extentUUID) {
nDlq++
} else {
s.Equal(1, len(cgx.GetStoreUUIDs()), "Expected one phantom store")
s.Equal(kafkaPhantomStoreUUID, cgx.GetStoreUUIDs()[0], "Expected phantom store")
s.True(common.AreKafkaPhantomStores(cgx.GetStoreUUIDs()), "expected phantom stores")
nPhantom++
}
}
Expand Down Expand Up @@ -1312,8 +1307,8 @@ func (s *McpSuite) TestKafkaPhantomExtentChecks() {
context := s.mcp.context

// setup a Kafka extent with 'phatom' input/stores
inputUUID := kafkaPhantomInputUUID
storeUUIDs := []string{kafkaPhantomStoreUUID}
inputUUID := common.KafkaPhantomExtentInputhost
storeUUIDs := []string{common.KafkaPhantomExtentStorehost}

kafkaExt := &m.DestinationExtent{
ExtentUUID: common.StringPtr(uuid.New()),
Expand Down