Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rotation support for members. #49

Merged
merged 22 commits into from
Mar 23, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Working on agency is happy check
  • Loading branch information
ewoutp committed Mar 20, 2018
commit 0635fd41b119d036884a9258c63d4192fc3e40b0
2 changes: 1 addition & 1 deletion examples/simple-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
name: "example-simple-cluster"
spec:
mode: cluster
image: arangodb/arangodb:3.2.9
image: arangodb/arangodb:3.3.4
tls:
altNames: ["kube-01", "kube-02", "kube-03"]
coordinators:
Expand Down
26 changes: 24 additions & 2 deletions pkg/deployment/action_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import (
"fmt"

driver "github.com/arangodb/go-driver"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)

// ActionContext provides methods to the Action implementations
Expand All @@ -44,6 +46,8 @@ type ActionContext interface {
GetDatabaseClient(ctx context.Context) (driver.Client, error)
// GetServerClient returns a cached client for a specific server.
GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error)
// GetAgencyClients returns a client connection for every agency member.
GetAgencyClients(ctx context.Context) ([]arangod.Agency, error)
// GetMemberStatusByID returns the current member status
// for the member with given id.
// Returns member status, true when found, or false
Expand Down Expand Up @@ -101,6 +105,24 @@ func (ac *actionContext) GetServerClient(ctx context.Context, group api.ServerGr
return c, nil
}

// GetAgencyClients returns a client connection for every agency member.
func (ac *actionContext) GetAgencyClients(ctx context.Context) ([]arangod.Agency, error) {
agencyMembers := ac.deployment.status.Members.Agents
result := make([]arangod.Agency, 0, len(agencyMembers))
for _, m := range agencyMembers {
client, err := ac.GetServerClient(ctx, api.ServerGroupAgents, m.ID)
if err != nil {
return nil, maskAny(err)
}
aClient, err := arangod.NewAgencyClient(client)
if err != nil {
return nil, maskAny(err)
}
result = append(result, aClient)
}
return result, nil
}

// GetMemberStatusByID returns the current member status
// for the member with given id.
// Returns member status, true when found, or false
Expand Down
77 changes: 71 additions & 6 deletions pkg/deployment/action_wait_for_member_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ package deployment

import (
"context"
"sync"
"time"

driver "github.com/arangodb/go-driver"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/rs/zerolog"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
)

// NewWaitForMemberUpAction creates a new Action that implements the given
Expand All @@ -40,6 +44,10 @@ func NewWaitForMemberUpAction(log zerolog.Logger, action api.Action, actionCtx A
}
}

const (
maxAgentResponseTime = time.Second * 10
)

// actionWaitForMemberUp implements an WaitForMemberUp.
type actionWaitForMemberUp struct {
log zerolog.Logger
Expand Down Expand Up @@ -91,19 +99,76 @@ func (a *actionWaitForMemberUp) checkProgressSingle(ctx context.Context) (bool,
return true, nil
}

type agentStatus struct {
IsLeader bool
LeaderEndpoint string
IsResponding bool
}

// checkProgressAgent checks the progress of the action in the case
// of an agent.
func (a *actionWaitForMemberUp) checkProgressAgent(ctx context.Context) (bool, error) {
log := a.log
c, err := a.actionCtx.GetDatabaseClient(ctx)
clients, err := a.actionCtx.GetAgencyClients(ctx)
if err != nil {
log.Debug().Err(err).Msg("Failed to create database client")
log.Debug().Err(err).Msg("Failed to create agency clients")
return false, maskAny(err)
}
if _, err := c.Version(ctx); err != nil {
log.Debug().Err(err).Msg("Failed to get version")
return false, maskAny(err)

wg := sync.WaitGroup{}
invalidKey := []string{"does-not-exists-149e97e8-4b81-5664-a8a8-9ba93881d64c"}
statuses := make([]agentStatus, len(clients))
for i, c := range clients {
wg.Add(1)
go func(i int, c arangod.Agency) {
defer wg.Done()
var trash interface{}
lctx, cancel := context.WithTimeout(ctx, maxAgentResponseTime)
defer cancel()
if err := c.ReadKey(lctx, invalidKey, &trash); err == nil || arangod.IsKeyNotFound(err) {
// We got a valid read from the leader
statuses[i].IsLeader = true
statuses[i].LeaderEndpoint = c.Endpoint()
statuses[i].IsResponding = true
} else {
if location, ok := arangod.IsNotLeader(err); ok {
// Valid response from a follower
statuses[i].IsLeader = false
statuses[i].LeaderEndpoint = location
statuses[i].IsResponding = true
} else {
// Unexpected / invalid response
statuses[i].IsResponding = false
}
}
}(i, c)
}
wg.Wait()

// Check the results
noLeaders := 0
for i, status := range statuses {
if !status.IsResponding {
log.Debug().Msg("Not all agents are responding")
return false, nil
}
if status.IsLeader {
noLeaders++
}
if i > 0 {
// Compare leader endpoint with previous
prev := statuses[i-1].LeaderEndpoint
if !arangod.IsSameEndpoint(prev, status.LeaderEndpoint) {
log.Debug().Msg("Not all agents report the same leader endpoint")
return false, nil
}
}
}
if noLeaders != 1 {
log.Debug().Int("leaders", noLeaders).Msg("Unexpected number of agency leaders")
return false, nil
}

return true, nil
}

Expand Down
146 changes: 146 additions & 0 deletions pkg/util/arangod/agency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//

package arangod

import (
"context"
"encoding/json"
"fmt"
"strings"

driver "github.com/arangodb/go-driver"
"github.com/pkg/errors"
)

// Agency provides API implemented by the ArangoDB agency.
type Agency interface {
// ReadKey reads the value of a given key in the agency.
ReadKey(ctx context.Context, key []string, value interface{}) error
// Endpoint returns the endpoint of this agent connection
Endpoint() string
}

// NewAgencyClient creates a new Agency connection from the given client
// connection.
// The number of endpoints of the client must be exactly 1.
func NewAgencyClient(c driver.Client) (Agency, error) {
if len(c.Connection().Endpoints()) > 1 {
return nil, maskAny(fmt.Errorf("Got multiple endpoints"))
}
return &agency{
conn: c.Connection(),
}, nil
}

type agency struct {
conn driver.Connection
}

// ReadKey reads the value of a given key in the agency.
func (a *agency) ReadKey(ctx context.Context, key []string, value interface{}) error {
conn := a.conn
req, err := conn.NewRequest("POST", "_api/agency/read")
if err != nil {
return maskAny(err)
}
fullKey := createFullKey(key)
input := [][]string{{fullKey}}
req, err = req.SetBody(input)
if err != nil {
return maskAny(err)
}
//var raw []byte
//ctx = driver.WithRawResponse(ctx, &raw)
resp, err := conn.Do(ctx, req)
if err != nil {
return maskAny(err)
}
if resp.StatusCode() == 307 {
// Not leader
location := resp.Header("Location")
return NotLeaderError{Leader: location}
}
if err := resp.CheckStatus(200, 201, 202); err != nil {
return maskAny(err)
}
//fmt.Printf("Agent response: %s\n", string(raw))
elems, err := resp.ParseArrayBody()
if err != nil {
return maskAny(err)
}
if len(elems) != 1 {
return maskAny(fmt.Errorf("Expected 1 element, got %d", len(elems)))
}
// If empty key parse directly
if len(key) == 0 {
if err := elems[0].ParseBody("", &value); err != nil {
return maskAny(err)
}
} else {
// Now remove all wrapping objects for each key element
var rawObject map[string]interface{}
if err := elems[0].ParseBody("", &rawObject); err != nil {
return maskAny(err)
}
var rawMsg interface{}
for keyIndex := 0; keyIndex < len(key); keyIndex++ {
if keyIndex > 0 {
var ok bool
rawObject, ok = rawMsg.(map[string]interface{})
if !ok {
return maskAny(fmt.Errorf("Data is not an object at key %s", key[:keyIndex+1]))
}
}
var found bool
rawMsg, found = rawObject[key[keyIndex]]
if !found {
return errors.Wrapf(KeyNotFoundError, "Missing data at key %s", key[:keyIndex+1])
}
}
// Encode to json ...
encoded, err := json.Marshal(rawMsg)
if err != nil {
return maskAny(err)
}
// and decode back into result
if err := json.Unmarshal(encoded, &value); err != nil {
return maskAny(err)
}
}

// fmt.Printf("result as JSON: %s\n", rawResult)
return nil
}

// Endpoint returns the endpoint of this agent connection
func (a *agency) Endpoint() string {
ep := a.conn.Endpoints()
if len(ep) == 0 {
return ""
}
return ep[0]
}

func createFullKey(key []string) string {
return "/" + strings.Join(key, "/")
}
42 changes: 42 additions & 0 deletions pkg/util/arangod/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//

package arangod

import "net/url"

// IsSameEndpoint returns true when the 2 given endpoints
// refer to the same server.
func IsSameEndpoint(a, b string) bool {
if a == b {
return true
}
ua, err := url.Parse(a)
if err != nil {
return false
}
ub, err := url.Parse(b)
if err != nil {
return false
}
return ua.Hostname() == ub.Hostname()
}
27 changes: 27 additions & 0 deletions pkg/util/arangod/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,32 @@ package arangod
import "github.com/pkg/errors"

var (
KeyNotFoundError = errors.New("Key not found")

maskAny = errors.WithStack
)

// IsKeyNotFound returns true if the given error is (or is caused by) a KeyNotFoundError.
func IsKeyNotFound(err error) bool {
return errors.Cause(err) == KeyNotFoundError
}

// NotLeaderError indicates the response of an agent when it is
// not the leader of the agency.
type NotLeaderError struct {
Leader string // Endpoint of the current leader
}

// Error implements error.
func (e NotLeaderError) Error() string {
return "not the leader"
}

// IsNotLeader returns true if the given error is (or is caused by) a NotLeaderError.
func IsNotLeader(err error) (string, bool) {
nlErr, ok := err.(NotLeaderError)
if ok {
return nlErr.Leader, true
}
return "", false
}