Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

common-ize phantom-extent checks #154

Merged
merged 2 commits into from
Apr 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,10 @@ const (

// InputHostForRemoteExtent is a special (and fake) input host ID for remote extent
InputHostForRemoteExtent = "88888888-8888-8888-8888-888888888888"

// KafkaPhantomExtentInputhost is placeholder/phantom inputhost uuid used for Kafka extents
KafkaPhantomExtentInputhost = "00000000-0000-0000-0000-000000000000"

// KafkaPhantomExtentStorehost is placeholder/phantom storehost uuid used for Kafka extents
KafkaPhantomExtentStorehost = "00000000-0000-0000-0000-000000000000"
)
14 changes: 14 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,3 +788,17 @@ func SetupSignalHandler(sig os.Signal, hostPort string, endpoint string, timeout
handleFunc(sig, hostPort, endpoint, timeout)
}()
}

// IsKafkaPhantomInput determines whether the given inputhost-uuid for
// an extent indicates that this is a Kafka 'phantom' extent.
func IsKafkaPhantomInput(inputUUID string) bool {

return inputUUID == KafkaPhantomExtentInputhost
}

// AreKafkaPhantomStores determines whether the given list of storehost-uuids
// for an extent indicates that this is a Kafka 'phantom' extent.
func AreKafkaPhantomStores(storeUUIDs []string) bool {

return len(storeUUIDs) == 1 && storeUUIDs[0] == KafkaPhantomExtentStorehost
}
6 changes: 3 additions & 3 deletions services/controllerhost/api_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func isUUIDLengthValid(uuid string) bool {
func isInputHealthy(context *Context, extent *m.DestinationExtent) bool {

// if this is a Kafka phantom extent, then assume "input" is healthy
if extent.GetInputHostUUID() == kafkaPhantomInputUUID {
if common.IsKafkaPhantomInput(extent.GetInputHostUUID()) {
return true
}

Expand Down Expand Up @@ -120,7 +120,7 @@ func isAnyStoreHealthy(context *Context, storeIDs []string) bool {

// special-case Kafka phantom extents that do not really have a physical
// store (in Cherami) and use a placeholder 'phantom' store instead.
if len(storeIDs) == 1 && storeIDs[0] == kafkaPhantomStoreUUID {
if common.AreKafkaPhantomStores(storeIDs) {
return true
}

Expand All @@ -138,7 +138,7 @@ func areExtentStoresHealthy(context *Context, extent *m.DestinationExtent) bool

// special-case Kafka phantom extents that do not really have a physical
// store (in Cherami) and use a placeholder 'phantom' store instead.
if len(storeIDs) == 1 && storeIDs[0] == kafkaPhantomStoreUUID {
if common.AreKafkaPhantomStores(storeIDs) {
return true
}

Expand Down
10 changes: 3 additions & 7 deletions services/controllerhost/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ var (
// We can keep serving stale entries for up to an hour,
// when we cannot refresh the cache (say, due to cassandra failure)
outputCacheTTL = 5 * time.Second

// Define phantom store/inputhost for extents belonging to Kafka destinations
kafkaPhantomStoreUUID = "00000000-0000-0000-0000-000000000000"
kafkaPhantomInputUUID = "00000000-0000-0000-0000-000000000000"
)

type cgExtentsByCategory struct {
Expand Down Expand Up @@ -141,7 +137,7 @@ func hostInfoMapToSlice(hosts map[string]*common.HostInfo) ([]string, []string)
func pickOutputHostForStoreHosts(context *Context, storeUUIDs []string) (*common.HostInfo, error) {

// special-case kafka phantom extents, that don't use Cherami stores
if len(storeUUIDs) == 1 && storeUUIDs[0] == kafkaPhantomStoreUUID {
if common.AreKafkaPhantomStores(storeUUIDs) {
return context.placement.PickOutputHost(nil)
}

Expand Down Expand Up @@ -459,8 +455,8 @@ func createDestExtent(context *Context, dstDesc *shared.DestinationDescription,
func createKafkaPhantomExtent(context *Context, dstUUID string, m3Scope int) (ext *m.DestinationExtent, err error) {

extentUUID := uuid.New()
inputhostUUID := kafkaPhantomInputUUID
storeUUIDs := []string{kafkaPhantomStoreUUID}
inputhostUUID := common.KafkaPhantomExtentInputhost
storeUUIDs := []string{common.KafkaPhantomExtentStorehost}

// create a 'phantom' extent and assign given inputhost/stores
if _, err = context.mm.CreateExtent(dstUUID, extentUUID, inputhostUUID, storeUUIDs); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion services/controllerhost/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (s *McpSuite) TestCGExtentSelectorHonorsCreatedTimeKafka() {

var nPhantom, nDlq int
for _, x := range gotExtents {
if x.GetStoreUUIDs()[0] == kafkaPhantomStoreUUID {
if common.AreKafkaPhantomStores(x.GetStoreUUIDs()) {
nPhantom++
} else {
s.Equal(extents[nDlq], x.GetExtentUUID(), "Extents not served in time order")
Expand Down
21 changes: 8 additions & 13 deletions services/controllerhost/controllerhost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,7 @@ func (s *McpSuite) TestGetOutputHostsMaxOpenExtentsLimit() {
if _, ok := extents[cgx.GetExtentUUID()]; ok {
dlqExtents++
} else {
s.Equal(1, len(cgx.GetStoreUUIDs()), "Expected one phantom store")
s.Equal(kafkaPhantomStoreUUID, cgx.GetStoreUUIDs()[0], "Expected phantom store")
s.True(common.AreKafkaPhantomStores(cgx.GetStoreUUIDs()), "expected phantom stores")
phantomExtents++
}
}
Expand Down Expand Up @@ -821,8 +820,7 @@ func (s *McpSuite) TestGetOutputHostsKafka() {
s.Nil(err, "Failed to find extent created by GetOutputHosts()")

// validate all returned extents are 'phantom' extents
s.Equal(1, len(cgx.GetStoreUUIDs()), "Expected one phantom store")
s.Equal(kafkaPhantomStoreUUID, cgx.GetStoreUUIDs()[0], "Expected phantom store")
s.True(common.AreKafkaPhantomStores(cgx.GetStoreUUIDs()), "expected phantom stores")
}

s.True(outputHosts.subset(cgOutputHosts), "invalid outputhost returned")
Expand Down Expand Up @@ -867,8 +865,7 @@ func (s *McpSuite) TestGetOutputHostsKafka() {
if dlqExtents.contains(extentUUID) {
nDlq++
} else {
s.Equal(1, len(cgx.GetStoreUUIDs()), "Expected one phantom store")
s.Equal(kafkaPhantomStoreUUID, cgx.GetStoreUUIDs()[0], "Expected phantom store")
s.True(common.AreKafkaPhantomStores(cgx.GetStoreUUIDs()), "expected phantom stores")
nPhantom++
}
}
Expand Down Expand Up @@ -918,8 +915,7 @@ func (s *McpSuite) TestGetOutputHostsKafka() {
if dlqExtents.contains(extentUUID) {
nDlq++
} else {
s.Equal(1, len(cgx.GetStoreUUIDs()), "Expected one phantom store")
s.Equal(kafkaPhantomStoreUUID, cgx.GetStoreUUIDs()[0], "Expected phantom store")
s.True(common.AreKafkaPhantomStores(cgx.GetStoreUUIDs()), "expected phantom stores")
nPhantom++
}
}
Expand Down Expand Up @@ -982,8 +978,7 @@ func (s *McpSuite) TestGetOutputHostsKafka() {
if dlqExtents.contains(extentUUID) {
nDlq++
} else {
s.Equal(1, len(cgx.GetStoreUUIDs()), "Expected one phantom store")
s.Equal(kafkaPhantomStoreUUID, cgx.GetStoreUUIDs()[0], "Expected phantom store")
s.True(common.AreKafkaPhantomStores(cgx.GetStoreUUIDs()), "expected phantom stores")
nPhantom++
}
}
Expand Down Expand Up @@ -1311,9 +1306,9 @@ func (s *McpSuite) TestKafkaPhantomExtentChecks() {

context := s.mcp.context

// setup a Kafka extent with 'phatom' input/stores
inputUUID := kafkaPhantomInputUUID
storeUUIDs := []string{kafkaPhantomStoreUUID}
// setup a Kafka extent with 'phantom' input/stores
inputUUID := common.KafkaPhantomExtentInputhost
storeUUIDs := []string{common.KafkaPhantomExtentStorehost}

kafkaExt := &m.DestinationExtent{
ExtentUUID: common.StringPtr(uuid.New()),
Expand Down