From 81c20ef601e5b34486377d8dc47a6e0f99d050d8 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 7 Jul 2022 16:02:50 +0530 Subject: [PATCH 1/2] feat: remove kv_store from vtorc codebase Signed-off-by: Manan Gupta --- go/vt/orchestrator/app/cli.go | 16 -- go/vt/orchestrator/config/config.go | 19 --- go/vt/orchestrator/db/generate_base.go | 8 - go/vt/orchestrator/http/api.go | 21 --- go/vt/orchestrator/inst/binlog_test.go | 1 - go/vt/orchestrator/inst/cluster.go | 39 ----- go/vt/orchestrator/inst/cluster_test.go | 86 ---------- go/vt/orchestrator/inst/instance_dao.go | 25 --- go/vt/orchestrator/inst/resolve_dao.go | 19 --- go/vt/orchestrator/kv/consul.go | 157 ------------------ go/vt/orchestrator/kv/internal.go | 67 -------- go/vt/orchestrator/kv/kv.go | 101 ----------- go/vt/orchestrator/kv/zk.go | 81 --------- go/vt/orchestrator/logic/command_applier.go | 12 -- go/vt/orchestrator/logic/orchestrator.go | 50 ------ go/vt/orchestrator/logic/topology_recovery.go | 23 --- 16 files changed, 725 deletions(-) delete mode 100644 go/vt/orchestrator/inst/cluster_test.go delete mode 100644 go/vt/orchestrator/kv/consul.go delete mode 100644 go/vt/orchestrator/kv/internal.go delete mode 100644 go/vt/orchestrator/kv/kv.go delete mode 100644 go/vt/orchestrator/kv/zk.go diff --git a/go/vt/orchestrator/app/cli.go b/go/vt/orchestrator/app/cli.go index e268e4cbbcf..99eefa27ae4 100644 --- a/go/vt/orchestrator/app/cli.go +++ b/go/vt/orchestrator/app/cli.go @@ -30,7 +30,6 @@ import ( "vitess.io/vitess/go/vt/orchestrator/external/golib/log" "vitess.io/vitess/go/vt/orchestrator/external/golib/util" "vitess.io/vitess/go/vt/orchestrator/inst" - "vitess.io/vitess/go/vt/orchestrator/kv" "vitess.io/vitess/go/vt/orchestrator/logic" "vitess.io/vitess/go/vt/orchestrator/process" "vitess.io/vitess/go/vt/vtctl/reparentutil/promotionrule" @@ -183,7 +182,6 @@ func Cli(command string, strict bool, instance string, destination string, owner if !skipDatabaseCommands && !*config.RuntimeCLIFlags.SkipContinuousRegistration { process.ContinuousRegistration(string(process.OrchestratorExecutionCliMode), command) } - kv.InitKVStores() // begin commands switch command { @@ -949,20 +947,6 @@ func Cli(command string, strict bool, instance string, destination string, owner } fmt.Println(lag) } - case registerCliCommand("submit-primaries-to-kv-stores", "Key-value", `Submit primary of a specific cluster, or all primaries of all clusters to key-value stores`): - { - clusterName := getClusterName(clusterAlias, instanceKey) - log.Debugf("cluster name is <%s>", clusterName) - - kvPairs, _, err := logic.SubmitPrimariesToKvStores(clusterName, true) - if err != nil { - log.Fatale(err) - } - for _, kvPair := range kvPairs { - fmt.Printf("%s:%s\n", kvPair.Key, kvPair.Value) - } - } - case registerCliCommand("tags", "tags", `List tags for a given instance`): { instanceKey, _ = inst.FigureInstanceKey(instanceKey, thisInstanceKey) diff --git a/go/vt/orchestrator/config/config.go b/go/vt/orchestrator/config/config.go index 26b7e66de16..c69ee363d3a 100644 --- a/go/vt/orchestrator/config/config.go +++ b/go/vt/orchestrator/config/config.go @@ -222,12 +222,6 @@ type Configuration struct { DiscoveryIgnoreReplicaHostnameFilters []string // Regexp filters to apply to prevent auto-discovering new replicas. Usage: unreachable servers due to firewalls, applications which trigger binlog dumps DiscoveryIgnorePrimaryHostnameFilters []string // Regexp filters to apply to prevent auto-discovering a primary. Usage: pointing your primary temporarily to replicate seom data from external host DiscoveryIgnoreHostnameFilters []string // Regexp filters to apply to prevent discovering instances of any kind - ConsulAddress string // Address where Consul HTTP api is found. Example: 127.0.0.1:8500 - ConsulScheme string // Scheme (http or https) for Consul - ConsulACLToken string // ACL token used to write to Consul KV - ConsulCrossDataCenterDistribution bool // should orchestrator automatically auto-deduce all consul DCs and write KVs in all DCs - ZkAddress string // UNSUPPERTED YET. Address where (single or multiple) ZooKeeper servers are found, in `srv1[:port1][,srv2[:port2]...]` format. Default port is 2181. Example: srv-a,srv-b:12181,srv-c - KVClusterPrimaryPrefix string // Prefix to use for clusters' primary's entries in KV stores (internal, consul, ZK), default: "mysql/primary" WebMessage string // If provided, will be shown on all web pages below the title bar MaxConcurrentReplicaOperations int // Maximum number of concurrent operations on replicas InstanceDBExecContextTimeoutSeconds int // Timeout on context used while calling ExecContext on instance database @@ -379,12 +373,6 @@ func newConfiguration() *Configuration { OSCIgnoreHostnameFilters: []string{}, URLPrefix: "", DiscoveryIgnoreReplicaHostnameFilters: []string{}, - ConsulAddress: "", - ConsulScheme: "http", - ConsulACLToken: "", - ConsulCrossDataCenterDistribution: false, - ZkAddress: "", - KVClusterPrimaryPrefix: "mysql/primary", WebMessage: "", MaxConcurrentReplicaOperations: 5, InstanceDBExecContextTimeoutSeconds: 30, @@ -476,13 +464,6 @@ func (config *Configuration) postReadAdjustments() error { if config.RaftAdvertise == "" { config.RaftAdvertise = config.RaftBind } - if config.KVClusterPrimaryPrefix != "/" { - // "/" remains "/" - // "prefix" turns to "prefix/" - // "some/prefix///" turns to "some/prefix/" - config.KVClusterPrimaryPrefix = strings.TrimRight(config.KVClusterPrimaryPrefix, "/") - config.KVClusterPrimaryPrefix = fmt.Sprintf("%s/", config.KVClusterPrimaryPrefix) - } if config.HTTPAdvertise != "" { u, err := url.Parse(config.HTTPAdvertise) if err != nil { diff --git a/go/vt/orchestrator/db/generate_base.go b/go/vt/orchestrator/db/generate_base.go index 3833f401b1d..b0a9848bde3 100644 --- a/go/vt/orchestrator/db/generate_base.go +++ b/go/vt/orchestrator/db/generate_base.go @@ -801,14 +801,6 @@ var generateSQLBase = []string{ PRIMARY KEY (hostname,port) ) ENGINE=InnoDB DEFAULT CHARSET=ascii `, - ` - CREATE TABLE IF NOT EXISTS kv_store ( - store_key varchar(255) CHARACTER SET ascii NOT NULL, - store_value text CHARACTER SET utf8 not null, - last_updated timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (store_key) - ) ENGINE=InnoDB DEFAULT CHARSET=ascii - `, ` CREATE TABLE IF NOT EXISTS cluster_injected_pseudo_gtid ( cluster_name varchar(128) NOT NULL, diff --git a/go/vt/orchestrator/http/api.go b/go/vt/orchestrator/http/api.go index 24edeea8882..9d31f7c3f37 100644 --- a/go/vt/orchestrator/http/api.go +++ b/go/vt/orchestrator/http/api.go @@ -1615,23 +1615,6 @@ func (httpAPI *API) UntagAll(params martini.Params, r render.Render, req *http.R Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("%s removed from %+v instances", tag.TagName, len(*untagged)), Details: untagged.GetInstanceKeys()}) } -// SubmitPrimariesToKvStores writes a cluster's primary (or all clusters primaries) to kv stores. -// This should generally only happen once in a lifetime of a cluster. Otherwise KV -// stores are updated via failovers. -func (httpAPI *API) SubmitPrimariesToKvStores(params martini.Params, r render.Render, req *http.Request) { - clusterName, err := getClusterNameIfExists(params) - if err != nil { - Respond(r, &APIResponse{Code: ERROR, Message: fmt.Sprintf("%+v", err)}) - return - } - kvPairs, submittedCount, err := logic.SubmitPrimariesToKvStores(clusterName, true) - if err != nil { - Respond(r, &APIResponse{Code: ERROR, Message: fmt.Sprintf("%+v", err)}) - return - } - Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Submitted %d primaries", submittedCount), Details: kvPairs}) -} - // Clusters provides list of known primaries func (httpAPI *API) Primaries(params martini.Params, r render.Render, req *http.Request) { instances, err := inst.ReadWriteableClustersPrimaries() @@ -2884,10 +2867,6 @@ func (httpAPI *API) RegisterRequests(m *martini.ClassicMartini) { httpAPI.registerAPIRequest(m, "topology-tags/:host/:port", httpAPI.ASCIITopologyTags) httpAPI.registerAPIRequest(m, "snapshot-topologies", httpAPI.SnapshotTopologies) - // Key-value: - httpAPI.registerAPIRequest(m, "submit-primaries-to-kv-stores", httpAPI.SubmitPrimariesToKvStores) - httpAPI.registerAPIRequest(m, "submit-primaries-to-kv-stores/:clusterHint", httpAPI.SubmitPrimariesToKvStores) - // Tags: httpAPI.registerAPIRequest(m, "tagged", httpAPI.Tagged) httpAPI.registerAPIRequest(m, "tags/:host/:port", httpAPI.Tags) diff --git a/go/vt/orchestrator/inst/binlog_test.go b/go/vt/orchestrator/inst/binlog_test.go index 82a232a682e..0e3d829aed4 100644 --- a/go/vt/orchestrator/inst/binlog_test.go +++ b/go/vt/orchestrator/inst/binlog_test.go @@ -12,7 +12,6 @@ var testCoordinates = BinlogCoordinates{LogFile: "mysql-bin.000010", LogPos: 108 func init() { config.Config.HostnameResolveMethod = "none" - config.Config.KVClusterPrimaryPrefix = "test/primary/" config.MarkConfigurationLoaded() log.SetLevel(log.ERROR) } diff --git a/go/vt/orchestrator/inst/cluster.go b/go/vt/orchestrator/inst/cluster.go index f4041e801ca..6e2abfaf7e8 100644 --- a/go/vt/orchestrator/inst/cluster.go +++ b/go/vt/orchestrator/inst/cluster.go @@ -17,51 +17,12 @@ package inst import ( - "fmt" "regexp" "strings" "vitess.io/vitess/go/vt/orchestrator/config" - "vitess.io/vitess/go/vt/orchestrator/kv" ) -func GetClusterPrimaryKVKey(clusterAlias string) string { - return fmt.Sprintf("%s%s", config.Config.KVClusterPrimaryPrefix, clusterAlias) -} - -func getClusterPrimaryKVPair(clusterAlias string, primaryKey *InstanceKey) *kv.KeyValuePair { - if clusterAlias == "" { - return nil - } - if primaryKey == nil { - return nil - } - return kv.NewKVPair(GetClusterPrimaryKVKey(clusterAlias), primaryKey.StringCode()) -} - -// GetClusterPrimaryKVPairs returns all KV pairs associated with a primary. This includes the -// full identity of the primary as well as a breakdown by hostname, port, ipv4, ipv6 -func GetClusterPrimaryKVPairs(clusterAlias string, primaryKey *InstanceKey) (kvPairs [](*kv.KeyValuePair)) { - primaryKVPair := getClusterPrimaryKVPair(clusterAlias, primaryKey) - if primaryKVPair == nil { - return kvPairs - } - kvPairs = append(kvPairs, primaryKVPair) - - addPair := func(keySuffix, value string) { - key := fmt.Sprintf("%s/%s", primaryKVPair.Key, keySuffix) - kvPairs = append(kvPairs, kv.NewKVPair(key, value)) - } - - addPair("hostname", primaryKey.Hostname) - addPair("port", fmt.Sprintf("%d", primaryKey.Port)) - if ipv4, ipv6, err := readHostnameIPs(primaryKey.Hostname); err == nil { - addPair("ipv4", ipv4) - addPair("ipv6", ipv6) - } - return kvPairs -} - // mappedClusterNameToAlias attempts to match a cluster with an alias based on // configured ClusterNameToAlias map func mappedClusterNameToAlias(clusterName string) string { diff --git a/go/vt/orchestrator/inst/cluster_test.go b/go/vt/orchestrator/inst/cluster_test.go deleted file mode 100644 index 597c5d472e7..00000000000 --- a/go/vt/orchestrator/inst/cluster_test.go +++ /dev/null @@ -1,86 +0,0 @@ -/* - Copyright 2014 Outbrain Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package inst - -import ( - "fmt" - - "testing" - - _ "github.com/mattn/go-sqlite3" - - "vitess.io/vitess/go/vt/orchestrator/config" - "vitess.io/vitess/go/vt/orchestrator/external/golib/log" - test "vitess.io/vitess/go/vt/orchestrator/external/golib/tests" -) - -var primaryKey = InstanceKey{Hostname: "host1", Port: 3306} - -func init() { - config.Config.HostnameResolveMethod = "none" - config.Config.KVClusterPrimaryPrefix = "test/primary/" - config.MarkConfigurationLoaded() - log.SetLevel(log.ERROR) -} - -func TestGetClusterPrimaryKVKey(t *testing.T) { - kvKey := GetClusterPrimaryKVKey("foo") - test.S(t).ExpectEquals(kvKey, "test/primary/foo") -} - -func TestGetClusterPrimaryKVPair(t *testing.T) { - { - kvPair := getClusterPrimaryKVPair("myalias", &primaryKey) - test.S(t).ExpectNotNil(kvPair) - test.S(t).ExpectEquals(kvPair.Key, "test/primary/myalias") - test.S(t).ExpectEquals(kvPair.Value, primaryKey.StringCode()) - } - { - kvPair := getClusterPrimaryKVPair("", &primaryKey) - test.S(t).ExpectTrue(kvPair == nil) - } - { - kvPair := getClusterPrimaryKVPair("myalias", nil) - test.S(t).ExpectTrue(kvPair == nil) - } -} - -func TestGetClusterPrimaryKVPairs(t *testing.T) { - kvPairs := GetClusterPrimaryKVPairs("myalias", &primaryKey) - test.S(t).ExpectTrue(len(kvPairs) >= 2) - - { - kvPair := kvPairs[0] - test.S(t).ExpectEquals(kvPair.Key, "test/primary/myalias") - test.S(t).ExpectEquals(kvPair.Value, primaryKey.StringCode()) - } - { - kvPair := kvPairs[1] - test.S(t).ExpectEquals(kvPair.Key, "test/primary/myalias/hostname") - test.S(t).ExpectEquals(kvPair.Value, primaryKey.Hostname) - } - { - kvPair := kvPairs[2] - test.S(t).ExpectEquals(kvPair.Key, "test/primary/myalias/port") - test.S(t).ExpectEquals(kvPair.Value, fmt.Sprintf("%d", primaryKey.Port)) - } -} - -func TestGetClusterPrimaryKVPairs2(t *testing.T) { - kvPairs := GetClusterPrimaryKVPairs("", &primaryKey) - test.S(t).ExpectEquals(len(kvPairs), 0) -} diff --git a/go/vt/orchestrator/inst/instance_dao.go b/go/vt/orchestrator/inst/instance_dao.go index d12ff3e90fb..0e3825fcd13 100644 --- a/go/vt/orchestrator/inst/instance_dao.go +++ b/go/vt/orchestrator/inst/instance_dao.go @@ -50,7 +50,6 @@ import ( "vitess.io/vitess/go/vt/orchestrator/collection" "vitess.io/vitess/go/vt/orchestrator/config" "vitess.io/vitess/go/vt/orchestrator/db" - "vitess.io/vitess/go/vt/orchestrator/kv" "vitess.io/vitess/go/vt/orchestrator/metrics/query" "vitess.io/vitess/go/vt/orchestrator/util" "vitess.io/vitess/go/vt/vtctl/reparentutil" @@ -2009,30 +2008,6 @@ func ReadClustersInfo(clusterName string) ([]ClusterInfo, error) { return clusters, err } -// Get a listing of KeyValuePair for clusters primaries, for all clusters or for a specific cluster. -func GetPrimariesKVPairs(clusterName string) (kvPairs [](*kv.KeyValuePair), err error) { - - clusterAliasMap := make(map[string]string) - clustersInfo, err := ReadClustersInfo(clusterName) - if err != nil { - return kvPairs, err - } - for _, clusterInfo := range clustersInfo { - clusterAliasMap[clusterInfo.ClusterName] = clusterInfo.ClusterAlias - } - - primaries, err := ReadWriteableClustersPrimaries() - if err != nil { - return kvPairs, err - } - for _, primary := range primaries { - clusterPairs := GetClusterPrimaryKVPairs(clusterAliasMap[primary.ClusterName], &primary.Key) - kvPairs = append(kvPairs, clusterPairs...) - } - - return kvPairs, err -} - // HeuristicallyApplyClusterDomainInstanceAttribute writes down the cluster-domain // to primary-hostname as a general attribute, by reading current topology and **trusting** it to be correct func HeuristicallyApplyClusterDomainInstanceAttribute(clusterName string) (instanceKey *InstanceKey, err error) { diff --git a/go/vt/orchestrator/inst/resolve_dao.go b/go/vt/orchestrator/inst/resolve_dao.go index 0082afc2245..998e1571c58 100644 --- a/go/vt/orchestrator/inst/resolve_dao.go +++ b/go/vt/orchestrator/inst/resolve_dao.go @@ -319,22 +319,3 @@ func writeHostnameIPs(hostname string, ipv4String string, ipv6String string) err } return ExecDBWriteFunc(writeFunc) } - -// readUnresolvedHostname reverse-reads hostname resolve. It returns a hostname which matches given pattern and resovles to resolvedHostname, -// or, in the event no such hostname is found, the given resolvedHostname, unchanged. -func readHostnameIPs(hostname string) (ipv4 string, ipv6 string, err error) { - query := ` - select - ipv4, ipv6 - from - hostname_ips - where - hostname = ? - ` - err = db.QueryOrchestrator(query, sqlutils.Args(hostname), func(m sqlutils.RowMap) error { - ipv4 = m.GetString("ipv4") - ipv6 = m.GetString("ipv6") - return nil - }) - return ipv4, ipv6, log.Errore(err) -} diff --git a/go/vt/orchestrator/kv/consul.go b/go/vt/orchestrator/kv/consul.go deleted file mode 100644 index 9ba73d9b0ed..00000000000 --- a/go/vt/orchestrator/kv/consul.go +++ /dev/null @@ -1,157 +0,0 @@ -/* - Copyright 2017 Shlomi Noach, GitHub Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package kv - -import ( - "crypto/tls" - "fmt" - "net/http" - "sync" - "sync/atomic" - - "vitess.io/vitess/go/vt/orchestrator/config" - - consulapi "github.com/hashicorp/consul/api" - "github.com/patrickmn/go-cache" - - "vitess.io/vitess/go/vt/orchestrator/external/golib/log" -) - -// A Consul store based on config's `ConsulAddress`, `ConsulScheme`, and `ConsulKVPrefix` -type consulStore struct { - client *consulapi.Client - kvCache *cache.Cache - distributionReentry int64 -} - -// NewConsulStore creates a new consul store. It is possible that the client for this store is nil, -// which is the case if no consul config is provided. -func NewConsulStore() KeyValueStore { - store := &consulStore{ - kvCache: cache.New(cache.NoExpiration, cache.DefaultExpiration), - } - - if config.Config.ConsulAddress != "" { - consulConfig := consulapi.DefaultConfig() - consulConfig.Address = config.Config.ConsulAddress - consulConfig.Scheme = config.Config.ConsulScheme - if config.Config.ConsulScheme == "https" { - consulConfig.HttpClient = &http.Client{ - Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}, - } - } - // ConsulAclToken defaults to "" - consulConfig.Token = config.Config.ConsulACLToken - if client, err := consulapi.NewClient(consulConfig); err != nil { - log.Errore(err) - } else { - store.client = client - } - } - return store -} - -func (cs *consulStore) PutKeyValue(key string, value string) (err error) { - if cs.client == nil { - return nil - } - pair := &consulapi.KVPair{Key: key, Value: []byte(value)} - _, err = cs.client.KV().Put(pair, nil) - return err -} - -func (cs *consulStore) GetKeyValue(key string) (value string, found bool, err error) { - if cs.client == nil { - return value, found, nil - } - pair, _, err := cs.client.KV().Get(key, nil) - if err != nil { - return value, found, err - } - if pair == nil { - return "", false, err - } - return string(pair.Value), true, nil -} - -func (cs *consulStore) DistributePairs(kvPairs [](*KeyValuePair)) (err error) { - // This function is non re-entrant (it can only be running once at any point in time) - if atomic.CompareAndSwapInt64(&cs.distributionReentry, 0, 1) { - defer atomic.StoreInt64(&cs.distributionReentry, 0) - } else { - return - } - - if !config.Config.ConsulCrossDataCenterDistribution { - return nil - } - - datacenters, err := cs.client.Catalog().Datacenters() - if err != nil { - return err - } - log.Debugf("consulStore.DistributePairs(): distributing %d pairs to %d datacenters", len(kvPairs), len(datacenters)) - consulPairs := [](*consulapi.KVPair){} - for _, kvPair := range kvPairs { - consulPairs = append(consulPairs, &consulapi.KVPair{Key: kvPair.Key, Value: []byte(kvPair.Value)}) - } - var wg sync.WaitGroup - for _, datacenter := range datacenters { - datacenter := datacenter - wg.Add(1) - go func() { - defer wg.Done() - - writeOptions := &consulapi.WriteOptions{Datacenter: datacenter} - queryOptions := &consulapi.QueryOptions{Datacenter: datacenter} - skipped := 0 - existing := 0 - written := 0 - failed := 0 - - for _, consulPair := range consulPairs { - val := string(consulPair.Value) - kcCacheKey := fmt.Sprintf("%s;%s", datacenter, consulPair.Key) - - if value, found := cs.kvCache.Get(kcCacheKey); found && val == value { - skipped++ - continue - } - if pair, _, err := cs.client.KV().Get(consulPair.Key, queryOptions); err == nil && pair != nil { - if val == string(pair.Value) { - existing++ - cs.kvCache.SetDefault(kcCacheKey, val) - continue - } - } - - if _, e := cs.client.KV().Put(consulPair, writeOptions); e != nil { - log.Errorf("consulStore.DistributePairs(): failed %s", kcCacheKey) - failed++ - err = e - } else { - log.Debugf("consulStore.DistributePairs(): written %s=%s", kcCacheKey, val) - written++ - cs.kvCache.SetDefault(kcCacheKey, val) - } - } - log.Debugf("consulStore.DistributePairs(): datacenter: %s; skipped: %d, existing: %d, written: %d, failed: %d", datacenter, skipped, existing, written, failed) - }() - } - wg.Wait() - return err -} diff --git a/go/vt/orchestrator/kv/internal.go b/go/vt/orchestrator/kv/internal.go deleted file mode 100644 index 906de60b93b..00000000000 --- a/go/vt/orchestrator/kv/internal.go +++ /dev/null @@ -1,67 +0,0 @@ -/* - Copyright 2017 Shlomi Noach, GitHub Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package kv - -import ( - "vitess.io/vitess/go/vt/orchestrator/db" - "vitess.io/vitess/go/vt/orchestrator/external/golib/log" - "vitess.io/vitess/go/vt/orchestrator/external/golib/sqlutils" -) - -// Internal key-value store, based on relational backend -type internalKVStore struct { -} - -func NewInternalKVStore() KeyValueStore { - return &internalKVStore{} -} - -func (kvStore *internalKVStore) PutKeyValue(key string, value string) (err error) { - _, err = db.ExecOrchestrator(` - replace - into kv_store ( - store_key, store_value, last_updated - ) values ( - ?, ?, now() - ) - `, key, value, - ) - return log.Errore(err) -} - -func (kvStore *internalKVStore) GetKeyValue(key string) (value string, found bool, err error) { - query := ` - select - store_value - from - kv_store - where - store_key = ? - ` - - err = db.QueryOrchestrator(query, sqlutils.Args(key), func(m sqlutils.RowMap) error { - value = m.GetString("store_value") - found = true - return nil - }) - - return value, found, log.Errore(err) -} - -func (kvStore *internalKVStore) DistributePairs(kvPairs [](*KeyValuePair)) (err error) { - return nil -} diff --git a/go/vt/orchestrator/kv/kv.go b/go/vt/orchestrator/kv/kv.go deleted file mode 100644 index e303f7c8581..00000000000 --- a/go/vt/orchestrator/kv/kv.go +++ /dev/null @@ -1,101 +0,0 @@ -/* - Copyright 2017 Shlomi Noach, GitHub Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package kv - -import ( - "fmt" - "sync" -) - -type KeyValuePair struct { - Key string - Value string -} - -func NewKVPair(key string, value string) *KeyValuePair { - return &KeyValuePair{Key: key, Value: value} -} - -func (kvPair *KeyValuePair) String() string { - return fmt.Sprintf("%s:%s", kvPair.Key, kvPair.Value) -} - -type KeyValueStore interface { - PutKeyValue(key string, value string) (err error) - GetKeyValue(key string) (value string, found bool, err error) - DistributePairs(kvPairs [](*KeyValuePair)) (err error) -} - -var kvMutex sync.Mutex -var kvInitOnce sync.Once -var kvStores []KeyValueStore - -// InitKVStores initializes the KV stores (duh), once in the lifetime of this app. -// Configuration reload does not affect a running instance. -func InitKVStores() { - kvMutex.Lock() - defer kvMutex.Unlock() - - kvInitOnce.Do(func() { - kvStores = []KeyValueStore{ - NewInternalKVStore(), - NewConsulStore(), - NewZkStore(), - } - }) -} - -func getKVStores() (stores []KeyValueStore) { - kvMutex.Lock() - defer kvMutex.Unlock() - - stores = kvStores - return stores -} - -func GetValue(key string) (value string, found bool, err error) { - for _, store := range getKVStores() { - // It's really only the first (internal) that matters here - return store.GetKeyValue(key) - } - return value, found, err -} - -func PutValue(key string, value string) (err error) { - for _, store := range getKVStores() { - if err := store.PutKeyValue(key, value); err != nil { - return err - } - } - return nil -} - -func PutKVPair(kvPair *KeyValuePair) (err error) { - if kvPair == nil { - return nil - } - return PutValue(kvPair.Key, kvPair.Value) -} - -func DistributePairs(kvPairs [](*KeyValuePair)) (err error) { - for _, store := range getKVStores() { - if err := store.DistributePairs(kvPairs); err != nil { - return err - } - } - return nil -} diff --git a/go/vt/orchestrator/kv/zk.go b/go/vt/orchestrator/kv/zk.go deleted file mode 100644 index 78556a3b175..00000000000 --- a/go/vt/orchestrator/kv/zk.go +++ /dev/null @@ -1,81 +0,0 @@ -/* - Copyright 2017 Shlomi Noach, GitHub Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package kv - -import ( - "fmt" - "math/rand" - "strings" - "time" - - zkconstants "github.com/samuel/go-zookeeper/zk" - - "vitess.io/vitess/go/vt/orchestrator/config" - "vitess.io/vitess/go/vt/orchestrator/external/zk" -) - -// Internal key-value store, based on relational backend -type zkStore struct { - zook *zk.ZooKeeper -} - -func normalizeKey(key string) (normalizedKey string) { - normalizedKey = strings.TrimLeft(key, "/") - normalizedKey = fmt.Sprintf("/%s", normalizedKey) - return normalizedKey -} - -func NewZkStore() KeyValueStore { - store := &zkStore{} - - if config.Config.ZkAddress != "" { - rand.Seed(time.Now().UnixNano()) - - serversArray := strings.Split(config.Config.ZkAddress, ",") - zook := zk.NewZooKeeper() - zook.SetServers(serversArray) - store.zook = zook - } - return store -} - -func (zk *zkStore) PutKeyValue(key string, value string) (err error) { - if zk.zook == nil { - return nil - } - - if _, err = zk.zook.Set(normalizeKey(key), []byte(value)); err == zkconstants.ErrNoNode { - aclstr := "" - _, err = zk.zook.Create(normalizeKey(key), []byte(value), aclstr, true) - } - return err -} - -func (zk *zkStore) GetKeyValue(key string) (value string, found bool, err error) { - if zk.zook == nil { - return value, false, nil - } - result, err := zk.zook.Get(normalizeKey(key)) - if err != nil { - return value, false, err - } - return string(result), true, nil -} - -func (zk *zkStore) DistributePairs(kvPairs [](*KeyValuePair)) (err error) { - return nil -} diff --git a/go/vt/orchestrator/logic/command_applier.go b/go/vt/orchestrator/logic/command_applier.go index 6eade05a6cd..2271be174ad 100644 --- a/go/vt/orchestrator/logic/command_applier.go +++ b/go/vt/orchestrator/logic/command_applier.go @@ -20,7 +20,6 @@ import ( "encoding/json" "vitess.io/vitess/go/vt/orchestrator/inst" - "vitess.io/vitess/go/vt/orchestrator/kv" "vitess.io/vitess/go/vt/orchestrator/external/golib/log" ) @@ -72,8 +71,6 @@ func (applier *CommandApplier) ApplyCommand(op string, value []byte) any { return applier.disableGlobalRecoveries(value) case "enable-global-recoveries": return applier.enableGlobalRecoveries(value) - case "put-key-value": - return applier.putKeyValue(value) case "put-instance-tag": return applier.putInstanceTag(value) case "delete-instance-tag": @@ -234,15 +231,6 @@ func (applier *CommandApplier) enableGlobalRecoveries(value []byte) any { return err } -func (applier *CommandApplier) putKeyValue(value []byte) any { - kvPair := kv.KeyValuePair{} - if err := json.Unmarshal(value, &kvPair); err != nil { - return log.Errore(err) - } - err := kv.PutKVPair(&kvPair) - return err -} - func (applier *CommandApplier) putInstanceTag(value []byte) any { instanceTag := inst.InstanceTag{} if err := json.Unmarshal(value, &instanceTag); err != nil { diff --git a/go/vt/orchestrator/logic/orchestrator.go b/go/vt/orchestrator/logic/orchestrator.go index d3e8efd6390..0933ea03380 100644 --- a/go/vt/orchestrator/logic/orchestrator.go +++ b/go/vt/orchestrator/logic/orchestrator.go @@ -33,7 +33,6 @@ import ( "vitess.io/vitess/go/vt/orchestrator/discovery" "vitess.io/vitess/go/vt/orchestrator/external/golib/log" "vitess.io/vitess/go/vt/orchestrator/inst" - "vitess.io/vitess/go/vt/orchestrator/kv" ometrics "vitess.io/vitess/go/vt/orchestrator/metrics" "vitess.io/vitess/go/vt/orchestrator/process" "vitess.io/vitess/go/vt/orchestrator/util" @@ -63,7 +62,6 @@ var discoveryMetrics = collection.CreateOrReturnCollection(discoveryMetricsName) var isElectedNode int64 var recentDiscoveryOperationKeys *cache.Cache -var kvFoundCache = cache.New(10*time.Minute, time.Minute) func init() { snapshotDiscoveryKeys = make(chan inst.InstanceKey, 10) @@ -324,49 +322,6 @@ func onHealthTick() { } } -// SubmitPrimariesToKvStores records a cluster's primary (or all clusters primaries) to kv stores. -// This should generally only happen once in a lifetime of a cluster. Otherwise KV -// stores are updated via failovers. -func SubmitPrimariesToKvStores(clusterName string, force bool) (kvPairs [](*kv.KeyValuePair), submittedCount int, err error) { - kvPairs, err = inst.GetPrimariesKVPairs(clusterName) - log.Debugf("kv.SubmitPrimariesToKvStores, clusterName: %s, force: %+v: numPairs: %+v", clusterName, force, len(kvPairs)) - if err != nil { - return kvPairs, submittedCount, log.Errore(err) - } - var selectedError error - var submitKvPairs [](*kv.KeyValuePair) - for _, kvPair := range kvPairs { - if !force { - // !force: Called periodically to auto-populate KV - // We'd like to avoid some overhead. - if _, found := kvFoundCache.Get(kvPair.Key); found { - // Let's not overload database with queries. Let's not overload raft with events. - continue - } - v, found, err := kv.GetValue(kvPair.Key) - if err == nil && found && v == kvPair.Value { - // Already has the right value. - kvFoundCache.Set(kvPair.Key, true, cache.DefaultExpiration) - continue - } - } - submitKvPairs = append(submitKvPairs, kvPair) - } - log.Debugf("kv.SubmitPrimariesToKvStores: submitKvPairs: %+v", len(submitKvPairs)) - for _, kvPair := range submitKvPairs { - err := kv.PutKVPair(kvPair) - if err == nil { - submittedCount++ - } else { - selectedError = err - } - } - if err := kv.DistributePairs(kvPairs); err != nil { - log.Errore(err) - } - return kvPairs, submittedCount, log.Errore(selectedError) -} - func injectSeeds(seedOnce *sync.Once) { seedOnce.Do(func() { for _, seed := range config.Config.DiscoverySeeds { @@ -412,7 +367,6 @@ func ContinuousDiscovery() { go ometrics.InitMetrics() go acceptSignals() - go kv.InitKVStores() if *config.RuntimeCLIFlags.GrabElection { process.GrabElection() @@ -462,10 +416,6 @@ func ContinuousDiscovery() { go ExpireFailureDetectionHistory() go ExpireTopologyRecoveryHistory() go ExpireTopologyRecoveryStepsHistory() - - if runCheckAndRecoverOperationsTimeRipe() && IsLeader() { - go SubmitPrimariesToKvStores("", false) - } } else { // Take this opportunity to refresh yourself go inst.LoadHostnameResolveCache() diff --git a/go/vt/orchestrator/logic/topology_recovery.go b/go/vt/orchestrator/logic/topology_recovery.go index 530b5257957..0c636b375ab 100644 --- a/go/vt/orchestrator/logic/topology_recovery.go +++ b/go/vt/orchestrator/logic/topology_recovery.go @@ -40,7 +40,6 @@ import ( "vitess.io/vitess/go/vt/orchestrator/config" "vitess.io/vitess/go/vt/orchestrator/external/golib/log" "vitess.io/vitess/go/vt/orchestrator/inst" - "vitess.io/vitess/go/vt/orchestrator/kv" ometrics "vitess.io/vitess/go/vt/orchestrator/metrics" "vitess.io/vitess/go/vt/orchestrator/os" "vitess.io/vitess/go/vt/orchestrator/process" @@ -725,17 +724,6 @@ func postErsCompletion(topologyRecovery *TopologyRecovery, analysisEntry inst.Re // Success! AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadPrimary: successfully promoted %+v", promotedReplica.Key)) - kvPairs := inst.GetClusterPrimaryKVPairs(analysisEntry.ClusterDetails.ClusterAlias, &promotedReplica.Key) - AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Writing KV %+v", kvPairs)) - for _, kvPair := range kvPairs { - err := kv.PutKVPair(kvPair) - log.Errore(err) - } - { - AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Distributing KV %+v", kvPairs)) - err := kv.DistributePairs(kvPairs) - log.Errore(err) - } if config.Config.PrimaryFailoverDetachReplicaPrimaryHost { postponedFunction := func() error { AuditTopologyRecovery(topologyRecovery, "- RecoverDeadPrimary: detaching primary host on promoted primary") @@ -1702,17 +1690,6 @@ func postPrsCompletion(topologyRecovery *TopologyRecovery, analysisEntry inst.Re // Success! AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("%+v: successfully promoted %+v", analysisEntry.Analysis, promotedReplica.Key)) - kvPairs := inst.GetClusterPrimaryKVPairs(analysisEntry.ClusterDetails.ClusterAlias, &promotedReplica.Key) - AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Writing KV %+v", kvPairs)) - for _, kvPair := range kvPairs { - err := kv.PutKVPair(kvPair) - log.Errore(err) - } - { - AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Distributing KV %+v", kvPairs)) - err := kv.DistributePairs(kvPairs) - log.Errore(err) - } func() error { before := analysisEntry.AnalyzedInstanceKey.StringCode() after := promotedReplica.Key.StringCode() From 66b8f921add5a0ee92c1dd681d7cf0b22452b0b6 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 7 Jul 2022 16:49:33 +0530 Subject: [PATCH 2/2] test: add import for side-effects for fixing tests Signed-off-by: Manan Gupta --- go/vt/orchestrator/inst/keyspace_dao_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/vt/orchestrator/inst/keyspace_dao_test.go b/go/vt/orchestrator/inst/keyspace_dao_test.go index 78aece75e27..8b55a9793e1 100644 --- a/go/vt/orchestrator/inst/keyspace_dao_test.go +++ b/go/vt/orchestrator/inst/keyspace_dao_test.go @@ -19,6 +19,7 @@ package inst import ( "testing" + _ "github.com/mattn/go-sqlite3" "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/orchestrator/db"