diff --git a/clients/metadata/metadata_cassandra.go b/clients/metadata/metadata_cassandra.go index ec642dd0..5c28fb24 100644 --- a/clients/metadata/metadata_cassandra.go +++ b/clients/metadata/metadata_cassandra.go @@ -189,6 +189,30 @@ type CassandraMetadataService struct { // interface implementation check var _ m.TChanMetadataService = (*CassandraMetadataService)(nil) +func parseConsistency(cfgCons string) (lowCons gocql.Consistency, midCons gocql.Consistency, highCons gocql.Consistency) { + + // use a minimum default consistency of 'One' + lowCons, midCons, highCons = gocql.One, gocql.One, gocql.One + + switch cons := strings.Split(cfgCons, ","); len(cons) { + case 3: + lowCons = gocql.ParseConsistency(strings.TrimSpace(cons[2])) + fallthrough + + case 2: + midCons = gocql.ParseConsistency(strings.TrimSpace(cons[1])) + if len(cons) == 2 { + lowCons = midCons + } + fallthrough + + case 1: + highCons = gocql.ParseConsistency(strings.TrimSpace(cons[0])) + } + + return +} + // NewCassandraMetadataService creates an instance of TChanMetadataServiceClient backed up by Cassandra. func NewCassandraMetadataService(cfg configure.CommonMetadataConfig, log bark.Logger) (*CassandraMetadataService, error) { @@ -196,19 +220,10 @@ func NewCassandraMetadataService(cfg configure.CommonMetadataConfig, log bark.Lo log = bark.NewLoggerFromLogrus(logrus.StandardLogger()) } - // use a minimum consistency of 'Two', so that reads from a recently added (non-current) cassandra - // host, does not result in inconsistent data. - lowCons, midCons := gocql.Two, gocql.Two - highCons := gocql.ParseConsistency(cfg.GetConsistency()) - - if highCons == gocql.One { - - envType := configure.NewCommonConfigure().GetEnvironment() - if envType == configure.EnvProduction { - log.Panic("Highest consistency level of ONE should only be used in TestEnvironment") - } + lowCons, midCons, highCons := parseConsistency(cfg.GetConsistency()) - lowCons, midCons = gocql.One, gocql.One + if highCons == gocql.One && configure.NewCommonConfigure().GetEnvironment() == configure.EnvProduction { + log.Panic("Highest consistency level of ONE should only be used in TestEnvironment") } clusterName := cfg.GetClusterName()