Skip to content

Commit

Permalink
Remove explicit usage of etcd v2 (api and storage) (#13791)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord authored Aug 22, 2023
1 parent a97f468 commit e6df5d8
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 21 deletions.
9 changes: 9 additions & 0 deletions changelog/18.0/18.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- **[Major Changes](#major-changes)**
- **[Breaking Changes](#breaking-changes)**
- [Local examples now use etcd v3 storage and API](#local-examples-etcd-v3)
- **[New command line flags and behavior](#new-flag)**
- [VTOrc flag `--allow-emergency-reparent`](#new-flag-toggle-ers)
- [ERS sub flag `--wait-for-all-tablets`](#new-ers-subflag)
Expand All @@ -28,6 +29,14 @@

### <a id="breaking-changes"/>Breaking Changes

#### <a id="local-examples-etcd-v3"/>Local examples now use etcd v3 storage and API
In previous releases the [local examples](https://github.com/vitessio/vitess/tree/main/examples/local) were
explicitly using etcd v2 storage (`etcd --enable-v2=true`) and API (`ETCDCTL_API=2`) mode. We have now
removed this legacy etcd usage and instead use the new (default) etcd v3 storage and API. Please see
[PR #13791](https://github.com/vitessio/vitess/pull/13791) for additional info. If you are using the local
examples in any sort of long-term non-testing capacity, then you will need to explicitly use the v2 storage
and API mode or [migrate your existing data from v2 to v3](https://etcd.io/docs/v3.5/tutorials/how-to-migrate/).

### <a id="new-flag"/>New command line flags and behavior

#### <a id="new-flag-toggle-ers"/>VTOrc flag `--allow-emergency-reparent`
Expand Down
9 changes: 1 addition & 8 deletions examples/common/scripts/etcd-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,15 @@
source "$(dirname "${BASH_SOURCE[0]:-$0}")/../env.sh"

cell=${CELL:-'test'}
export ETCDCTL_API=2

# Check that etcd is not already running
curl "http://${ETCD_SERVER}" > /dev/null 2>&1 && fail "etcd is already running. Exiting."

etcd --enable-v2=true --data-dir "${VTDATAROOT}/etcd/" --listen-client-urls "http://${ETCD_SERVER}" --advertise-client-urls "http://${ETCD_SERVER}" > "${VTDATAROOT}"/tmp/etcd.out 2>&1 &
etcd --data-dir "${VTDATAROOT}/etcd/" --listen-client-urls "http://${ETCD_SERVER}" --advertise-client-urls "http://${ETCD_SERVER}" > "${VTDATAROOT}"/tmp/etcd.out 2>&1 &
PID=$!
echo $PID > "${VTDATAROOT}/tmp/etcd.pid"
sleep 5

echo "add /vitess/global"
etcdctl --endpoints "http://${ETCD_SERVER}" mkdir /vitess/global &

echo "add /vitess/$cell"
etcdctl --endpoints "http://${ETCD_SERVER}" mkdir /vitess/$cell &

# And also add the CellInfo description for the cell.
# If the node already exists, it's fine, means we used existing data.
echo "add $cell CellInfo"
Expand Down
55 changes: 46 additions & 9 deletions go/test/endtoend/cluster/topo_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ limitations under the License.
package cluster

import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"os"
"os/exec"
Expand All @@ -27,7 +29,10 @@ import (
"syscall"
"time"

clientv3 "go.etcd.io/etcd/client/v3"

"vitess.io/vitess/go/vt/log"
vtopo "vitess.io/vitess/go/vt/topo"
)

// TopoProcess is a generic handle for a running Topo service .
Expand All @@ -44,6 +49,7 @@ type TopoProcess struct {
VerifyURL string
PeerURL string
ZKPorts string
Client interface{}

proc *exec.Cmd
exit chan error
Expand All @@ -57,10 +63,9 @@ func (topo *TopoProcess) Setup(topoFlavor string, cluster *LocalProcessCluster)
case "consul":
return topo.SetupConsul(cluster)
default:
// We still rely on the etcd v2 API for things like mkdir.
// If this ENV var is not set then some tests may fail with etcd 3.4+
// where the v2 API is disabled by default in both the client and server.
os.Setenv("ETCDCTL_API", "2")
// Override any inherited ETCDCTL_API env value to
// ensure that we use the v3 API and storage.
os.Setenv("ETCDCTL_API", "3")
return topo.SetupEtcd()
}
}
Expand All @@ -77,7 +82,6 @@ func (topo *TopoProcess) SetupEtcd() (err error) {
"--initial-advertise-peer-urls", topo.PeerURL,
"--listen-peer-urls", topo.PeerURL,
"--initial-cluster", fmt.Sprintf("%s=%s", topo.Name, topo.PeerURL),
"--enable-v2=true",
)

err = createDirectory(topo.DataDirectory, 0700)
Expand Down Expand Up @@ -109,6 +113,14 @@ func (topo *TopoProcess) SetupEtcd() (err error) {
timeout := time.Now().Add(60 * time.Second)
for time.Now().Before(timeout) {
if topo.IsHealthy() {
cli, cerr := clientv3.New(clientv3.Config{
Endpoints: []string{net.JoinHostPort(topo.Host, fmt.Sprintf("%d", topo.Port))},
DialTimeout: 5 * time.Second,
})
if cerr != nil {
return err
}
topo.Client = cli
return
}
select {
Expand All @@ -125,7 +137,6 @@ func (topo *TopoProcess) SetupEtcd() (err error) {
// SetupZookeeper spawns a new zookeeper topo service and initializes it with the defaults.
// The service is kept running in the background until TearDown() is called.
func (topo *TopoProcess) SetupZookeeper(cluster *LocalProcessCluster) (err error) {

host, err := os.Hostname()
if err != nil {
return
Expand Down Expand Up @@ -171,7 +182,6 @@ type PortsInfo struct {
// SetupConsul spawns a new consul service and initializes it with the defaults.
// The service is kept running in the background until TearDown() is called.
func (topo *TopoProcess) SetupConsul(cluster *LocalProcessCluster) (err error) {

topo.VerifyURL = fmt.Sprintf("http://%s:%d/v1/kv/?keys", topo.Host, topo.Port)

_ = os.MkdirAll(topo.LogDirectory, os.ModePerm)
Expand Down Expand Up @@ -247,8 +257,16 @@ func (topo *TopoProcess) SetupConsul(cluster *LocalProcessCluster) (err error) {
return fmt.Errorf("process '%s' timed out after 60s (err: %s)", topo.Binary, <-topo.exit)
}

// TearDown shutdowns the running topo service
// TearDown shutdowns the running topo service.
func (topo *TopoProcess) TearDown(Cell string, originalVtRoot string, currentRoot string, keepdata bool, topoFlavor string) error {
if topo.Client != nil {
switch cli := topo.Client.(type) {
case *clientv3.Client:
_ = cli.Close()
default:
log.Errorf("Unknown topo client type %T", cli)
}
}

if topoFlavor == "zk2" {
cmd := "shutdown"
Expand Down Expand Up @@ -324,6 +342,9 @@ func (topo *TopoProcess) ManageTopoDir(command string, directory string) (err er
url := topo.VerifyURL + directory
payload := strings.NewReader(`{"dir":"true"}`)
if command == "mkdir" {
if *topoFlavor == "etcd2" { // No need to create the empty prefix keys in v3
return nil
}
req, _ := http.NewRequest("PUT", url, payload)
req.Header.Add("content-type", "application/json")
resp, err := http.DefaultClient.Do(req)
Expand All @@ -332,6 +353,22 @@ func (topo *TopoProcess) ManageTopoDir(command string, directory string) (err er
}
return err
} else if command == "rmdir" {
if *topoFlavor == "etcd2" {
if topo.Client == nil {
return fmt.Errorf("etcd client is not initialized")
}
cli, ok := topo.Client.(*clientv3.Client)
if !ok {
return fmt.Errorf("etcd client is invalid")
}
ctx, cancel := context.WithTimeout(context.Background(), vtopo.RemoteOperationTimeout)
defer cancel()
_, err = cli.Delete(ctx, directory, clientv3.WithPrefix())
if err != nil {
return err
}
return nil
}
req, _ := http.NewRequest("DELETE", url+"?dir=true", payload)
resp, err := http.DefaultClient.Do(req)
if err == nil {
Expand Down Expand Up @@ -366,7 +403,7 @@ func TopoProcessInstance(port int, peerPort int, hostname string, flavor string,
topo.ListenClientURL = fmt.Sprintf("http://%s:%d", topo.Host, topo.Port)
topo.DataDirectory = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("%s_%d", "topo", port))
topo.LogDirectory = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("%s_%d", "topo", port), "logs")
topo.VerifyURL = fmt.Sprintf("http://%s:%d/v2/keys", topo.Host, topo.Port)
topo.VerifyURL = fmt.Sprintf("http://%s:%d/health", topo.Host, topo.Port)
topo.PeerURL = fmt.Sprintf("http://%s:%d", hostname, peerPort)
return topo
}
39 changes: 35 additions & 4 deletions go/test/endtoend/clustertest/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,46 @@ package clustertest

import (
"fmt"
"net"
"testing"
"time"

"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"

"vitess.io/vitess/go/test/endtoend/cluster"
)

func TestEtcdServer(t *testing.T) {
defer cluster.PanicHandler(t)
etcdURL := fmt.Sprintf("http://%s:%d/v2/keys", clusterInstance.Hostname, clusterInstance.TopoPort)
testURL(t, etcdURL, "generic etcd url")
testURL(t, etcdURL+"/vitess/global", "vitess global key")
testURL(t, etcdURL+"/vitess/zone1", "vitess zone1 key")

// Confirm the basic etcd cluster health.
etcdHealthURL := fmt.Sprintf("http://%s:%d/health", clusterInstance.Hostname, clusterInstance.TopoPort)
testURL(t, etcdHealthURL, "generic etcd health url")

// Confirm that we have a working topo server by looking for some
// expected keys.
etcdClientOptions := []clientv3.OpOption{
clientv3.WithPrefix(),
clientv3.WithKeysOnly(),
clientv3.WithLimit(1),
}
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{net.JoinHostPort(clusterInstance.TopoProcess.Host, fmt.Sprintf("%d", clusterInstance.TopoProcess.Port))},
DialTimeout: 5 * time.Second,
})
require.NoError(t, err)
defer cli.Close()
keyPrefixes := []string{
// At a minimum, this prefix confirms that we have a functioning
// global topo server with a valid cell from the test env.
fmt.Sprintf("/vitess/global/cells/%s", cell),
}
for _, keyPrefix := range keyPrefixes {
res, err := cli.Get(cli.Ctx(), keyPrefix, etcdClientOptions...)
require.NoError(t, err)
require.NotNil(t, res)
// Confirm that we have at least one key matching the prefix.
require.Greaterf(t, len(res.Kvs), 0, "no keys found matching prefix: %s", keyPrefix)
}
}

0 comments on commit e6df5d8

Please sign in to comment.