Skip to content

Commit

Permalink
VTOrc Cleanup: Remove KV stores (vitessio#10645)
Browse files Browse the repository at this point in the history
* feat: remove kv_store from vtorc codebase

Signed-off-by: Manan Gupta <manan@planetscale.com>

* test: add import for side-effects for fixing tests

Signed-off-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
GuptaManan100 authored and timvaillancourt committed Aug 16, 2023
1 parent e08e3b1 commit a7c9793
Show file tree
Hide file tree
Showing 17 changed files with 1 addition and 725 deletions.
16 changes: 0 additions & 16 deletions go/vt/orchestrator/app/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"vitess.io/vitess/go/vt/orchestrator/external/golib/log"
"vitess.io/vitess/go/vt/orchestrator/external/golib/util"
"vitess.io/vitess/go/vt/orchestrator/inst"
"vitess.io/vitess/go/vt/orchestrator/kv"
"vitess.io/vitess/go/vt/orchestrator/logic"
"vitess.io/vitess/go/vt/orchestrator/process"
"vitess.io/vitess/go/vt/vtctl/reparentutil/promotionrule"
Expand Down Expand Up @@ -183,7 +182,6 @@ func Cli(command string, strict bool, instance string, destination string, owner
if !skipDatabaseCommands && !*config.RuntimeCLIFlags.SkipContinuousRegistration {
process.ContinuousRegistration(string(process.OrchestratorExecutionCliMode), command)
}
kv.InitKVStores()

// begin commands
switch command {
Expand Down Expand Up @@ -949,20 +947,6 @@ func Cli(command string, strict bool, instance string, destination string, owner
}
fmt.Println(lag)
}
case registerCliCommand("submit-primaries-to-kv-stores", "Key-value", `Submit primary of a specific cluster, or all primaries of all clusters to key-value stores`):
{
clusterName := getClusterName(clusterAlias, instanceKey)
log.Debugf("cluster name is <%s>", clusterName)

kvPairs, _, err := logic.SubmitPrimariesToKvStores(clusterName, true)
if err != nil {
log.Fatale(err)
}
for _, kvPair := range kvPairs {
fmt.Printf("%s:%s\n", kvPair.Key, kvPair.Value)
}
}

case registerCliCommand("tags", "tags", `List tags for a given instance`):
{
instanceKey, _ = inst.FigureInstanceKey(instanceKey, thisInstanceKey)
Expand Down
19 changes: 0 additions & 19 deletions go/vt/orchestrator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,6 @@ type Configuration struct {
DiscoveryIgnoreReplicaHostnameFilters []string // Regexp filters to apply to prevent auto-discovering new replicas. Usage: unreachable servers due to firewalls, applications which trigger binlog dumps
DiscoveryIgnorePrimaryHostnameFilters []string // Regexp filters to apply to prevent auto-discovering a primary. Usage: pointing your primary temporarily to replicate seom data from external host
DiscoveryIgnoreHostnameFilters []string // Regexp filters to apply to prevent discovering instances of any kind
ConsulAddress string // Address where Consul HTTP api is found. Example: 127.0.0.1:8500
ConsulScheme string // Scheme (http or https) for Consul
ConsulACLToken string // ACL token used to write to Consul KV
ConsulCrossDataCenterDistribution bool // should orchestrator automatically auto-deduce all consul DCs and write KVs in all DCs
ZkAddress string // UNSUPPERTED YET. Address where (single or multiple) ZooKeeper servers are found, in `srv1[:port1][,srv2[:port2]...]` format. Default port is 2181. Example: srv-a,srv-b:12181,srv-c
KVClusterPrimaryPrefix string // Prefix to use for clusters' primary's entries in KV stores (internal, consul, ZK), default: "mysql/primary"
WebMessage string // If provided, will be shown on all web pages below the title bar
MaxConcurrentReplicaOperations int // Maximum number of concurrent operations on replicas
InstanceDBExecContextTimeoutSeconds int // Timeout on context used while calling ExecContext on instance database
Expand Down Expand Up @@ -379,12 +373,6 @@ func newConfiguration() *Configuration {
OSCIgnoreHostnameFilters: []string{},
URLPrefix: "",
DiscoveryIgnoreReplicaHostnameFilters: []string{},
ConsulAddress: "",
ConsulScheme: "http",
ConsulACLToken: "",
ConsulCrossDataCenterDistribution: false,
ZkAddress: "",
KVClusterPrimaryPrefix: "mysql/primary",
WebMessage: "",
MaxConcurrentReplicaOperations: 5,
InstanceDBExecContextTimeoutSeconds: 30,
Expand Down Expand Up @@ -476,13 +464,6 @@ func (config *Configuration) postReadAdjustments() error {
if config.RaftAdvertise == "" {
config.RaftAdvertise = config.RaftBind
}
if config.KVClusterPrimaryPrefix != "/" {
// "/" remains "/"
// "prefix" turns to "prefix/"
// "some/prefix///" turns to "some/prefix/"
config.KVClusterPrimaryPrefix = strings.TrimRight(config.KVClusterPrimaryPrefix, "/")
config.KVClusterPrimaryPrefix = fmt.Sprintf("%s/", config.KVClusterPrimaryPrefix)
}
if config.HTTPAdvertise != "" {
u, err := url.Parse(config.HTTPAdvertise)
if err != nil {
Expand Down
8 changes: 0 additions & 8 deletions go/vt/orchestrator/db/generate_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,14 +801,6 @@ var generateSQLBase = []string{
PRIMARY KEY (hostname,port)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS kv_store (
store_key varchar(255) CHARACTER SET ascii NOT NULL,
store_value text CHARACTER SET utf8 not null,
last_updated timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (store_key)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS cluster_injected_pseudo_gtid (
cluster_name varchar(128) NOT NULL,
Expand Down
21 changes: 0 additions & 21 deletions go/vt/orchestrator/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1615,23 +1615,6 @@ func (httpAPI *API) UntagAll(params martini.Params, r render.Render, req *http.R
Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("%s removed from %+v instances", tag.TagName, len(*untagged)), Details: untagged.GetInstanceKeys()})
}

// SubmitPrimariesToKvStores writes a cluster's primary (or all clusters primaries) to kv stores.
// This should generally only happen once in a lifetime of a cluster. Otherwise KV
// stores are updated via failovers.
func (httpAPI *API) SubmitPrimariesToKvStores(params martini.Params, r render.Render, req *http.Request) {
clusterName, err := getClusterNameIfExists(params)
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: fmt.Sprintf("%+v", err)})
return
}
kvPairs, submittedCount, err := logic.SubmitPrimariesToKvStores(clusterName, true)
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: fmt.Sprintf("%+v", err)})
return
}
Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Submitted %d primaries", submittedCount), Details: kvPairs})
}

// Clusters provides list of known primaries
func (httpAPI *API) Primaries(params martini.Params, r render.Render, req *http.Request) {
instances, err := inst.ReadWriteableClustersPrimaries()
Expand Down Expand Up @@ -2884,10 +2867,6 @@ func (httpAPI *API) RegisterRequests(m *martini.ClassicMartini) {
httpAPI.registerAPIRequest(m, "topology-tags/:host/:port", httpAPI.ASCIITopologyTags)
httpAPI.registerAPIRequest(m, "snapshot-topologies", httpAPI.SnapshotTopologies)

// Key-value:
httpAPI.registerAPIRequest(m, "submit-primaries-to-kv-stores", httpAPI.SubmitPrimariesToKvStores)
httpAPI.registerAPIRequest(m, "submit-primaries-to-kv-stores/:clusterHint", httpAPI.SubmitPrimariesToKvStores)

// Tags:
httpAPI.registerAPIRequest(m, "tagged", httpAPI.Tagged)
httpAPI.registerAPIRequest(m, "tags/:host/:port", httpAPI.Tags)
Expand Down
1 change: 0 additions & 1 deletion go/vt/orchestrator/inst/binlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ var testCoordinates = BinlogCoordinates{LogFile: "mysql-bin.000010", LogPos: 108

func init() {
config.Config.HostnameResolveMethod = "none"
config.Config.KVClusterPrimaryPrefix = "test/primary/"
config.MarkConfigurationLoaded()
log.SetLevel(log.ERROR)
}
Expand Down
39 changes: 0 additions & 39 deletions go/vt/orchestrator/inst/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,12 @@
package inst

import (
"fmt"
"regexp"
"strings"

"vitess.io/vitess/go/vt/orchestrator/config"
"vitess.io/vitess/go/vt/orchestrator/kv"
)

func GetClusterPrimaryKVKey(clusterAlias string) string {
return fmt.Sprintf("%s%s", config.Config.KVClusterPrimaryPrefix, clusterAlias)
}

func getClusterPrimaryKVPair(clusterAlias string, primaryKey *InstanceKey) *kv.KeyValuePair {
if clusterAlias == "" {
return nil
}
if primaryKey == nil {
return nil
}
return kv.NewKVPair(GetClusterPrimaryKVKey(clusterAlias), primaryKey.StringCode())
}

// GetClusterPrimaryKVPairs returns all KV pairs associated with a primary. This includes the
// full identity of the primary as well as a breakdown by hostname, port, ipv4, ipv6
func GetClusterPrimaryKVPairs(clusterAlias string, primaryKey *InstanceKey) (kvPairs [](*kv.KeyValuePair)) {
primaryKVPair := getClusterPrimaryKVPair(clusterAlias, primaryKey)
if primaryKVPair == nil {
return kvPairs
}
kvPairs = append(kvPairs, primaryKVPair)

addPair := func(keySuffix, value string) {
key := fmt.Sprintf("%s/%s", primaryKVPair.Key, keySuffix)
kvPairs = append(kvPairs, kv.NewKVPair(key, value))
}

addPair("hostname", primaryKey.Hostname)
addPair("port", fmt.Sprintf("%d", primaryKey.Port))
if ipv4, ipv6, err := readHostnameIPs(primaryKey.Hostname); err == nil {
addPair("ipv4", ipv4)
addPair("ipv6", ipv6)
}
return kvPairs
}

// mappedClusterNameToAlias attempts to match a cluster with an alias based on
// configured ClusterNameToAlias map
func mappedClusterNameToAlias(clusterName string) string {
Expand Down
86 changes: 0 additions & 86 deletions go/vt/orchestrator/inst/cluster_test.go

This file was deleted.

25 changes: 0 additions & 25 deletions go/vt/orchestrator/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"vitess.io/vitess/go/vt/orchestrator/collection"
"vitess.io/vitess/go/vt/orchestrator/config"
"vitess.io/vitess/go/vt/orchestrator/db"
"vitess.io/vitess/go/vt/orchestrator/kv"
"vitess.io/vitess/go/vt/orchestrator/metrics/query"
"vitess.io/vitess/go/vt/orchestrator/util"
"vitess.io/vitess/go/vt/vtctl/reparentutil"
Expand Down Expand Up @@ -2058,30 +2057,6 @@ func ReadClustersInfo(clusterName string) ([]ClusterInfo, error) {
return clusters, err
}

// Get a listing of KeyValuePair for clusters primaries, for all clusters or for a specific cluster.
func GetPrimariesKVPairs(clusterName string) (kvPairs [](*kv.KeyValuePair), err error) {

clusterAliasMap := make(map[string]string)
clustersInfo, err := ReadClustersInfo(clusterName)
if err != nil {
return kvPairs, err
}
for _, clusterInfo := range clustersInfo {
clusterAliasMap[clusterInfo.ClusterName] = clusterInfo.ClusterAlias
}

primaries, err := ReadWriteableClustersPrimaries()
if err != nil {
return kvPairs, err
}
for _, primary := range primaries {
clusterPairs := GetClusterPrimaryKVPairs(clusterAliasMap[primary.ClusterName], &primary.Key)
kvPairs = append(kvPairs, clusterPairs...)
}

return kvPairs, err
}

// HeuristicallyApplyClusterDomainInstanceAttribute writes down the cluster-domain
// to primary-hostname as a general attribute, by reading current topology and **trusting** it to be correct
func HeuristicallyApplyClusterDomainInstanceAttribute(clusterName string) (instanceKey *InstanceKey, err error) {
Expand Down
1 change: 1 addition & 0 deletions go/vt/orchestrator/inst/keyspace_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package inst
import (
"testing"

_ "github.com/mattn/go-sqlite3"
"github.com/stretchr/testify/require"
_ "modernc.org/sqlite"

Expand Down
19 changes: 0 additions & 19 deletions go/vt/orchestrator/inst/resolve_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,22 +319,3 @@ func writeHostnameIPs(hostname string, ipv4String string, ipv6String string) err
}
return ExecDBWriteFunc(writeFunc)
}

// readUnresolvedHostname reverse-reads hostname resolve. It returns a hostname which matches given pattern and resovles to resolvedHostname,
// or, in the event no such hostname is found, the given resolvedHostname, unchanged.
func readHostnameIPs(hostname string) (ipv4 string, ipv6 string, err error) {
query := `
select
ipv4, ipv6
from
hostname_ips
where
hostname = ?
`
err = db.QueryOrchestrator(query, sqlutils.Args(hostname), func(m sqlutils.RowMap) error {
ipv4 = m.GetString("ipv4")
ipv6 = m.GetString("ipv6")
return nil
})
return ipv4, ipv6, log.Errore(err)
}
Loading

0 comments on commit a7c9793

Please sign in to comment.