Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VTOrc Cleanup: Remove KV stores #10645

Merged
merged 3 commits into from
Jul 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions go/vt/orchestrator/app/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"vitess.io/vitess/go/vt/orchestrator/external/golib/log"
"vitess.io/vitess/go/vt/orchestrator/external/golib/util"
"vitess.io/vitess/go/vt/orchestrator/inst"
"vitess.io/vitess/go/vt/orchestrator/kv"
"vitess.io/vitess/go/vt/orchestrator/logic"
"vitess.io/vitess/go/vt/orchestrator/process"
"vitess.io/vitess/go/vt/vtctl/reparentutil/promotionrule"
Expand Down Expand Up @@ -183,7 +182,6 @@ func Cli(command string, strict bool, instance string, destination string, owner
if !skipDatabaseCommands && !*config.RuntimeCLIFlags.SkipContinuousRegistration {
process.ContinuousRegistration(string(process.OrchestratorExecutionCliMode), command)
}
kv.InitKVStores()

// begin commands
switch command {
Expand Down Expand Up @@ -949,20 +947,6 @@ func Cli(command string, strict bool, instance string, destination string, owner
}
fmt.Println(lag)
}
case registerCliCommand("submit-primaries-to-kv-stores", "Key-value", `Submit primary of a specific cluster, or all primaries of all clusters to key-value stores`):
{
clusterName := getClusterName(clusterAlias, instanceKey)
log.Debugf("cluster name is <%s>", clusterName)

kvPairs, _, err := logic.SubmitPrimariesToKvStores(clusterName, true)
if err != nil {
log.Fatale(err)
}
for _, kvPair := range kvPairs {
fmt.Printf("%s:%s\n", kvPair.Key, kvPair.Value)
}
}

case registerCliCommand("tags", "tags", `List tags for a given instance`):
{
instanceKey, _ = inst.FigureInstanceKey(instanceKey, thisInstanceKey)
Expand Down
19 changes: 0 additions & 19 deletions go/vt/orchestrator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,6 @@ type Configuration struct {
DiscoveryIgnoreReplicaHostnameFilters []string // Regexp filters to apply to prevent auto-discovering new replicas. Usage: unreachable servers due to firewalls, applications which trigger binlog dumps
DiscoveryIgnorePrimaryHostnameFilters []string // Regexp filters to apply to prevent auto-discovering a primary. Usage: pointing your primary temporarily to replicate seom data from external host
DiscoveryIgnoreHostnameFilters []string // Regexp filters to apply to prevent discovering instances of any kind
ConsulAddress string // Address where Consul HTTP api is found. Example: 127.0.0.1:8500
ConsulScheme string // Scheme (http or https) for Consul
ConsulACLToken string // ACL token used to write to Consul KV
ConsulCrossDataCenterDistribution bool // should orchestrator automatically auto-deduce all consul DCs and write KVs in all DCs
ZkAddress string // UNSUPPERTED YET. Address where (single or multiple) ZooKeeper servers are found, in `srv1[:port1][,srv2[:port2]...]` format. Default port is 2181. Example: srv-a,srv-b:12181,srv-c
KVClusterPrimaryPrefix string // Prefix to use for clusters' primary's entries in KV stores (internal, consul, ZK), default: "mysql/primary"
WebMessage string // If provided, will be shown on all web pages below the title bar
MaxConcurrentReplicaOperations int // Maximum number of concurrent operations on replicas
InstanceDBExecContextTimeoutSeconds int // Timeout on context used while calling ExecContext on instance database
Expand Down Expand Up @@ -379,12 +373,6 @@ func newConfiguration() *Configuration {
OSCIgnoreHostnameFilters: []string{},
URLPrefix: "",
DiscoveryIgnoreReplicaHostnameFilters: []string{},
ConsulAddress: "",
ConsulScheme: "http",
ConsulACLToken: "",
ConsulCrossDataCenterDistribution: false,
ZkAddress: "",
KVClusterPrimaryPrefix: "mysql/primary",
WebMessage: "",
MaxConcurrentReplicaOperations: 5,
InstanceDBExecContextTimeoutSeconds: 30,
Expand Down Expand Up @@ -476,13 +464,6 @@ func (config *Configuration) postReadAdjustments() error {
if config.RaftAdvertise == "" {
config.RaftAdvertise = config.RaftBind
}
if config.KVClusterPrimaryPrefix != "/" {
// "/" remains "/"
// "prefix" turns to "prefix/"
// "some/prefix///" turns to "some/prefix/"
config.KVClusterPrimaryPrefix = strings.TrimRight(config.KVClusterPrimaryPrefix, "/")
config.KVClusterPrimaryPrefix = fmt.Sprintf("%s/", config.KVClusterPrimaryPrefix)
}
if config.HTTPAdvertise != "" {
u, err := url.Parse(config.HTTPAdvertise)
if err != nil {
Expand Down
8 changes: 0 additions & 8 deletions go/vt/orchestrator/db/generate_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,14 +801,6 @@ var generateSQLBase = []string{
PRIMARY KEY (hostname,port)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS kv_store (
store_key varchar(255) CHARACTER SET ascii NOT NULL,
store_value text CHARACTER SET utf8 not null,
last_updated timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (store_key)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS cluster_injected_pseudo_gtid (
cluster_name varchar(128) NOT NULL,
Expand Down
21 changes: 0 additions & 21 deletions go/vt/orchestrator/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1615,23 +1615,6 @@ func (httpAPI *API) UntagAll(params martini.Params, r render.Render, req *http.R
Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("%s removed from %+v instances", tag.TagName, len(*untagged)), Details: untagged.GetInstanceKeys()})
}

// SubmitPrimariesToKvStores writes a cluster's primary (or all clusters primaries) to kv stores.
// This should generally only happen once in a lifetime of a cluster. Otherwise KV
// stores are updated via failovers.
func (httpAPI *API) SubmitPrimariesToKvStores(params martini.Params, r render.Render, req *http.Request) {
clusterName, err := getClusterNameIfExists(params)
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: fmt.Sprintf("%+v", err)})
return
}
kvPairs, submittedCount, err := logic.SubmitPrimariesToKvStores(clusterName, true)
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: fmt.Sprintf("%+v", err)})
return
}
Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Submitted %d primaries", submittedCount), Details: kvPairs})
}

// Clusters provides list of known primaries
func (httpAPI *API) Primaries(params martini.Params, r render.Render, req *http.Request) {
instances, err := inst.ReadWriteableClustersPrimaries()
Expand Down Expand Up @@ -2884,10 +2867,6 @@ func (httpAPI *API) RegisterRequests(m *martini.ClassicMartini) {
httpAPI.registerAPIRequest(m, "topology-tags/:host/:port", httpAPI.ASCIITopologyTags)
httpAPI.registerAPIRequest(m, "snapshot-topologies", httpAPI.SnapshotTopologies)

// Key-value:
httpAPI.registerAPIRequest(m, "submit-primaries-to-kv-stores", httpAPI.SubmitPrimariesToKvStores)
httpAPI.registerAPIRequest(m, "submit-primaries-to-kv-stores/:clusterHint", httpAPI.SubmitPrimariesToKvStores)

// Tags:
httpAPI.registerAPIRequest(m, "tagged", httpAPI.Tagged)
httpAPI.registerAPIRequest(m, "tags/:host/:port", httpAPI.Tags)
Expand Down
1 change: 0 additions & 1 deletion go/vt/orchestrator/inst/binlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ var testCoordinates = BinlogCoordinates{LogFile: "mysql-bin.000010", LogPos: 108

func init() {
config.Config.HostnameResolveMethod = "none"
config.Config.KVClusterPrimaryPrefix = "test/primary/"
config.MarkConfigurationLoaded()
log.SetLevel(log.ERROR)
}
Expand Down
39 changes: 0 additions & 39 deletions go/vt/orchestrator/inst/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,12 @@
package inst

import (
"fmt"
"regexp"
"strings"

"vitess.io/vitess/go/vt/orchestrator/config"
"vitess.io/vitess/go/vt/orchestrator/kv"
)

func GetClusterPrimaryKVKey(clusterAlias string) string {
return fmt.Sprintf("%s%s", config.Config.KVClusterPrimaryPrefix, clusterAlias)
}

func getClusterPrimaryKVPair(clusterAlias string, primaryKey *InstanceKey) *kv.KeyValuePair {
if clusterAlias == "" {
return nil
}
if primaryKey == nil {
return nil
}
return kv.NewKVPair(GetClusterPrimaryKVKey(clusterAlias), primaryKey.StringCode())
}

// GetClusterPrimaryKVPairs returns all KV pairs associated with a primary. This includes the
// full identity of the primary as well as a breakdown by hostname, port, ipv4, ipv6
func GetClusterPrimaryKVPairs(clusterAlias string, primaryKey *InstanceKey) (kvPairs [](*kv.KeyValuePair)) {
primaryKVPair := getClusterPrimaryKVPair(clusterAlias, primaryKey)
if primaryKVPair == nil {
return kvPairs
}
kvPairs = append(kvPairs, primaryKVPair)

addPair := func(keySuffix, value string) {
key := fmt.Sprintf("%s/%s", primaryKVPair.Key, keySuffix)
kvPairs = append(kvPairs, kv.NewKVPair(key, value))
}

addPair("hostname", primaryKey.Hostname)
addPair("port", fmt.Sprintf("%d", primaryKey.Port))
if ipv4, ipv6, err := readHostnameIPs(primaryKey.Hostname); err == nil {
addPair("ipv4", ipv4)
addPair("ipv6", ipv6)
}
return kvPairs
}

// mappedClusterNameToAlias attempts to match a cluster with an alias based on
// configured ClusterNameToAlias map
func mappedClusterNameToAlias(clusterName string) string {
Expand Down
86 changes: 0 additions & 86 deletions go/vt/orchestrator/inst/cluster_test.go

This file was deleted.

25 changes: 0 additions & 25 deletions go/vt/orchestrator/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import (
"vitess.io/vitess/go/vt/orchestrator/collection"
"vitess.io/vitess/go/vt/orchestrator/config"
"vitess.io/vitess/go/vt/orchestrator/db"
"vitess.io/vitess/go/vt/orchestrator/kv"
"vitess.io/vitess/go/vt/orchestrator/metrics/query"
"vitess.io/vitess/go/vt/orchestrator/util"
"vitess.io/vitess/go/vt/vtctl/reparentutil"
Expand Down Expand Up @@ -2009,30 +2008,6 @@ func ReadClustersInfo(clusterName string) ([]ClusterInfo, error) {
return clusters, err
}

// Get a listing of KeyValuePair for clusters primaries, for all clusters or for a specific cluster.
func GetPrimariesKVPairs(clusterName string) (kvPairs [](*kv.KeyValuePair), err error) {

clusterAliasMap := make(map[string]string)
clustersInfo, err := ReadClustersInfo(clusterName)
if err != nil {
return kvPairs, err
}
for _, clusterInfo := range clustersInfo {
clusterAliasMap[clusterInfo.ClusterName] = clusterInfo.ClusterAlias
}

primaries, err := ReadWriteableClustersPrimaries()
if err != nil {
return kvPairs, err
}
for _, primary := range primaries {
clusterPairs := GetClusterPrimaryKVPairs(clusterAliasMap[primary.ClusterName], &primary.Key)
kvPairs = append(kvPairs, clusterPairs...)
}

return kvPairs, err
}

// HeuristicallyApplyClusterDomainInstanceAttribute writes down the cluster-domain
// to primary-hostname as a general attribute, by reading current topology and **trusting** it to be correct
func HeuristicallyApplyClusterDomainInstanceAttribute(clusterName string) (instanceKey *InstanceKey, err error) {
Expand Down
1 change: 1 addition & 0 deletions go/vt/orchestrator/inst/keyspace_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package inst
import (
"testing"

_ "github.com/mattn/go-sqlite3"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/orchestrator/db"
Expand Down
19 changes: 0 additions & 19 deletions go/vt/orchestrator/inst/resolve_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,22 +319,3 @@ func writeHostnameIPs(hostname string, ipv4String string, ipv6String string) err
}
return ExecDBWriteFunc(writeFunc)
}

// readUnresolvedHostname reverse-reads hostname resolve. It returns a hostname which matches given pattern and resovles to resolvedHostname,
// or, in the event no such hostname is found, the given resolvedHostname, unchanged.
func readHostnameIPs(hostname string) (ipv4 string, ipv6 string, err error) {
query := `
select
ipv4, ipv6
from
hostname_ips
where
hostname = ?
`
err = db.QueryOrchestrator(query, sqlutils.Args(hostname), func(m sqlutils.RowMap) error {
ipv4 = m.GetString("ipv4")
ipv6 = m.GetString("ipv6")
return nil
})
return ipv4, ipv6, log.Errore(err)
}
Loading