Skip to content

Commit

Permalink
Merge pull request #9272 from gyuho/logger
Browse files Browse the repository at this point in the history
clientv3: use "pkg.Logger"
  • Loading branch information
gyuho authored Feb 5, 2018
2 parents 4e662b2 + 7ba860e commit 3903385
Show file tree
Hide file tree
Showing 12 changed files with 169 additions and 88 deletions.
1 change: 1 addition & 0 deletions .words
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ RPC
RPCs
TODO
WithRequireLeader
args
backoff
blackhole
blackholed
Expand Down
2 changes: 1 addition & 1 deletion clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (c *Client) autoSync() {
err := c.Sync(ctx)
cancel()
if err != nil && err != c.ctx.Err() {
logger.Println("Auto sync endpoints failed:", err)
lg.Lvl(4).Infof("Auto sync endpoints failed: %v", err)
}
}
}
Expand Down
18 changes: 9 additions & 9 deletions clientv3/health_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,26 +158,26 @@ func (b *healthBalancer) pinned() string {

func (b *healthBalancer) hostPortError(hostPort string, err error) {
if b.endpoint(hostPort) == "" {
logger.Lvl(4).Infof("clientv3/balancer: %q is stale (skip marking as unhealthy on %q)", hostPort, err.Error())
lg.Lvl(4).Infof("clientv3/balancer: %q is stale (skip marking as unhealthy on %q)", hostPort, err.Error())
return
}

b.unhealthyMu.Lock()
b.unhealthyHostPorts[hostPort] = time.Now()
b.unhealthyMu.Unlock()
logger.Lvl(4).Infof("clientv3/balancer: %q is marked unhealthy (%q)", hostPort, err.Error())
lg.Lvl(4).Infof("clientv3/balancer: %q is marked unhealthy (%q)", hostPort, err.Error())
}

func (b *healthBalancer) removeUnhealthy(hostPort, msg string) {
if b.endpoint(hostPort) == "" {
logger.Lvl(4).Infof("clientv3/balancer: %q was not in unhealthy (%q)", hostPort, msg)
lg.Lvl(4).Infof("clientv3/balancer: %q was not in unhealthy (%q)", hostPort, msg)
return
}

b.unhealthyMu.Lock()
delete(b.unhealthyHostPorts, hostPort)
b.unhealthyMu.Unlock()
logger.Lvl(4).Infof("clientv3/balancer: %q is removed from unhealthy (%q)", hostPort, msg)
lg.Lvl(4).Infof("clientv3/balancer: %q is removed from unhealthy (%q)", hostPort, msg)
}

func (b *healthBalancer) countUnhealthy() (count int) {
Expand All @@ -199,7 +199,7 @@ func (b *healthBalancer) cleanupUnhealthy() {
for k, v := range b.unhealthyHostPorts {
if time.Since(v) > b.healthCheckTimeout {
delete(b.unhealthyHostPorts, k)
logger.Lvl(4).Infof("clientv3/balancer: removed %q from unhealthy after %v", k, b.healthCheckTimeout)
lg.Lvl(4).Infof("clientv3/balancer: removed %q from unhealthy after %v", k, b.healthCheckTimeout)
}
}
b.unhealthyMu.Unlock()
Expand Down Expand Up @@ -402,15 +402,15 @@ func (b *healthBalancer) Up(addr grpc.Address) func(error) {
}

if b.pinAddr != "" {
logger.Lvl(4).Infof("clientv3/balancer: %q is up but not pinned (already pinned %q)", addr.Addr, b.pinAddr)
lg.Lvl(4).Infof("clientv3/balancer: %q is up but not pinned (already pinned %q)", addr.Addr, b.pinAddr)
return func(err error) {}
}

// notify waiting Get()s and pin first connected address
close(b.upc)
b.downc = make(chan struct{})
b.pinAddr = addr.Addr
logger.Lvl(4).Infof("clientv3/balancer: pin %q", addr.Addr)
lg.Lvl(4).Infof("clientv3/balancer: pin %q", addr.Addr)

// notify client that a connection is up
b.readyOnce.Do(func() { close(b.readyc) })
Expand All @@ -427,7 +427,7 @@ func (b *healthBalancer) Up(addr grpc.Address) func(error) {
close(b.downc)
b.pinAddr = ""
b.mu.Unlock()
logger.Lvl(4).Infof("clientv3/balancer: unpin %q (%q)", addr.Addr, err.Error())
lg.Lvl(4).Infof("clientv3/balancer: unpin %q (%q)", addr.Addr, err.Error())
}
}

Expand All @@ -454,7 +454,7 @@ func (b *healthBalancer) mayPin(addr grpc.Address) bool {
// 3. grpc-healthcheck still SERVING, thus retry to pin
// instead, return before grpc-healthcheck if failed within healthcheck timeout
if elapsed := time.Since(failedTime); elapsed < b.healthCheckTimeout {
logger.Lvl(4).Infof("clientv3/balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, b.healthCheckTimeout)
lg.Lvl(4).Infof("clientv3/balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, b.healthCheckTimeout)
return false
}

Expand Down
70 changes: 18 additions & 52 deletions clientv3/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,14 @@ import (
"io/ioutil"
"sync"

"github.com/coreos/etcd/pkg/logger"

"google.golang.org/grpc/grpclog"
)

// Logger is the logger used by client library.
// It implements grpclog.LoggerV2 interface.
type Logger interface {
grpclog.LoggerV2

// Lvl returns logger if logger's verbosity level >= "lvl".
// Otherwise, logger that discards all logs.
Lvl(lvl int) Logger

// to satisfy capnslog

Print(args ...interface{})
Printf(format string, args ...interface{})
Println(args ...interface{})
}

var (
loggerMu sync.RWMutex
logger Logger
lgMu sync.RWMutex
lg logger.Logger
)

type settableLogger struct {
Expand All @@ -49,29 +35,29 @@ type settableLogger struct {

func init() {
// disable client side logs by default
logger = &settableLogger{}
lg = &settableLogger{}
SetLogger(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, ioutil.Discard))
}

// SetLogger sets client-side Logger.
func SetLogger(l grpclog.LoggerV2) {
loggerMu.Lock()
logger = NewLogger(l)
lgMu.Lock()
lg = logger.New(l)
// override grpclog so that any changes happen with locking
grpclog.SetLoggerV2(logger)
loggerMu.Unlock()
grpclog.SetLoggerV2(lg)
lgMu.Unlock()
}

// GetLogger returns the current logger.
func GetLogger() Logger {
loggerMu.RLock()
l := logger
loggerMu.RUnlock()
// GetLogger returns the current logger.Logger.
func GetLogger() logger.Logger {
lgMu.RLock()
l := lg
lgMu.RUnlock()
return l
}

// NewLogger returns a new Logger with grpclog.LoggerV2.
func NewLogger(gl grpclog.LoggerV2) Logger {
// NewLogger returns a new Logger with logger.Logger.
func NewLogger(gl grpclog.LoggerV2) logger.Logger {
return &settableLogger{l: gl}
}

Expand Down Expand Up @@ -104,32 +90,12 @@ func (s *settableLogger) Print(args ...interface{}) { s.get().In
func (s *settableLogger) Printf(format string, args ...interface{}) { s.get().Infof(format, args...) }
func (s *settableLogger) Println(args ...interface{}) { s.get().Infoln(args...) }
func (s *settableLogger) V(l int) bool { return s.get().V(l) }
func (s *settableLogger) Lvl(lvl int) Logger {
func (s *settableLogger) Lvl(lvl int) grpclog.LoggerV2 {
s.mu.RLock()
l := s.l
s.mu.RUnlock()
if l.V(lvl) {
return s
}
return &noLogger{}
return logger.NewDiscardLogger()
}

type noLogger struct{}

func (*noLogger) Info(args ...interface{}) {}
func (*noLogger) Infof(format string, args ...interface{}) {}
func (*noLogger) Infoln(args ...interface{}) {}
func (*noLogger) Warning(args ...interface{}) {}
func (*noLogger) Warningf(format string, args ...interface{}) {}
func (*noLogger) Warningln(args ...interface{}) {}
func (*noLogger) Error(args ...interface{}) {}
func (*noLogger) Errorf(format string, args ...interface{}) {}
func (*noLogger) Errorln(args ...interface{}) {}
func (*noLogger) Fatal(args ...interface{}) {}
func (*noLogger) Fatalf(format string, args ...interface{}) {}
func (*noLogger) Fatalln(args ...interface{}) {}
func (*noLogger) Print(args ...interface{}) {}
func (*noLogger) Printf(format string, args ...interface{}) {}
func (*noLogger) Println(args ...interface{}) {}
func (*noLogger) V(l int) bool { return false }
func (ng *noLogger) Lvl(lvl int) Logger { return ng }
8 changes: 4 additions & 4 deletions clientv3/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ func (c *Client) newRetryWrapper() retryRPCFunc {
if err == nil {
return nil
}
logger.Lvl(4).Infof("clientv3/retry: error %q on pinned endpoint %q", err.Error(), pinned)
lg.Lvl(4).Infof("clientv3/retry: error %q on pinned endpoint %q", err.Error(), pinned)

if s, ok := status.FromError(err); ok && (s.Code() == codes.Unavailable || s.Code() == codes.DeadlineExceeded || s.Code() == codes.Internal) {
// mark this before endpoint switch is triggered
c.balancer.hostPortError(pinned, err)
c.balancer.next()
logger.Lvl(4).Infof("clientv3/retry: switching from %q due to error %q", pinned, err.Error())
lg.Lvl(4).Infof("clientv3/retry: switching from %q due to error %q", pinned, err.Error())
}

if isStop(err) {
Expand All @@ -120,12 +120,12 @@ func (c *Client) newAuthRetryWrapper(retryf retryRPCFunc) retryRPCFunc {
if err == nil {
return nil
}
logger.Lvl(4).Infof("clientv3/auth-retry: error %q on pinned endpoint %q", err.Error(), pinned)
lg.Lvl(4).Infof("clientv3/auth-retry: error %q on pinned endpoint %q", err.Error(), pinned)
// always stop retry on etcd errors other than invalid auth token
if rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken {
gterr := c.getToken(rpcCtx)
if gterr != nil {
logger.Lvl(4).Infof("clientv3/auth-retry: cannot retry due to error %q(%q) on pinned endpoint %q", err.Error(), gterr.Error(), pinned)
lg.Lvl(4).Infof("clientv3/auth-retry: cannot retry due to error %q(%q) on pinned endpoint %q", err.Error(), gterr.Error(), pinned)
return err // return the original error for simplicity
}
continue
Expand Down
7 changes: 3 additions & 4 deletions etcdctl/ctlv3/command/snapshot_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/snapshot"

"github.com/coreos/pkg/capnslog"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -103,7 +102,7 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
ExitWithError(ExitError, err)
}
if debug {
lg = logger.NewPackageLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "snapshot"))
lg = logger.NewPackageLogger("github.com/coreos/etcd", "snapshot")
}
sp := snapshot.NewV3(mustClientFromCmd(cmd), lg)

Expand All @@ -127,7 +126,7 @@ func snapshotStatusCommandFunc(cmd *cobra.Command, args []string) {
ExitWithError(ExitError, err)
}
if debug {
lg = logger.NewPackageLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "snapshot"))
lg = logger.NewPackageLogger("github.com/coreos/etcd", "snapshot")
}
sp := snapshot.NewV3(nil, lg)

Expand Down Expand Up @@ -165,7 +164,7 @@ func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
ExitWithError(ExitError, err)
}
if debug {
lg = logger.NewPackageLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "snapshot"))
lg = logger.NewPackageLogger("github.com/coreos/etcd", "snapshot")
}
sp := snapshot.NewV3(nil, lg)

Expand Down
9 changes: 7 additions & 2 deletions pkg/logger/discard.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@

package logger

import "log"
import (
"log"

// assert that Logger satisfies grpclog.LoggerV2
"google.golang.org/grpc/grpclog"
)

// assert that "discardLogger" satisfy "Logger" interface
var _ Logger = &discardLogger{}

// NewDiscardLogger returns a new Logger that discards everything except "fatal".
Expand All @@ -39,3 +43,4 @@ func (l *discardLogger) Fatalf(format string, args ...interface{}) { log.Fatal
func (l *discardLogger) V(lvl int) bool {
return false
}
func (l *discardLogger) Lvl(lvl int) grpclog.LoggerV2 { return l }
47 changes: 44 additions & 3 deletions pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,47 @@ package logger
import "google.golang.org/grpc/grpclog"

// Logger defines logging interface.
// TODO: add "Lvl(lvl int)" for clientv3 loggers.
// TODO: make this settable with "Set" method.
type Logger grpclog.LoggerV2
type Logger interface {
grpclog.LoggerV2

// Lvl returns logger if logger's verbosity level >= "lvl".
// Otherwise, logger that discards everything.
Lvl(lvl int) grpclog.LoggerV2
}

// assert that "defaultLogger" satisfy "Logger" interface
var _ Logger = &defaultLogger{}

// New wraps "grpclog.LoggerV2" that implements "Logger" interface.
//
// For example:
//
// var defaultLogger Logger
// g := grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 4)
// defaultLogger = New(g)
//
func New(g grpclog.LoggerV2) Logger { return &defaultLogger{g: g} }

type defaultLogger struct {
g grpclog.LoggerV2
}

func (l *defaultLogger) Info(args ...interface{}) { l.g.Info(args...) }
func (l *defaultLogger) Infoln(args ...interface{}) { l.g.Info(args...) }
func (l *defaultLogger) Infof(format string, args ...interface{}) { l.g.Infof(format, args...) }
func (l *defaultLogger) Warning(args ...interface{}) { l.g.Warning(args...) }
func (l *defaultLogger) Warningln(args ...interface{}) { l.g.Warning(args...) }
func (l *defaultLogger) Warningf(format string, args ...interface{}) { l.g.Warningf(format, args...) }
func (l *defaultLogger) Error(args ...interface{}) { l.g.Error(args...) }
func (l *defaultLogger) Errorln(args ...interface{}) { l.g.Error(args...) }
func (l *defaultLogger) Errorf(format string, args ...interface{}) { l.g.Errorf(format, args...) }
func (l *defaultLogger) Fatal(args ...interface{}) { l.g.Fatal(args...) }
func (l *defaultLogger) Fatalln(args ...interface{}) { l.g.Fatal(args...) }
func (l *defaultLogger) Fatalf(format string, args ...interface{}) { l.g.Fatalf(format, args...) }
func (l *defaultLogger) V(lvl int) bool { return l.g.V(lvl) }
func (l *defaultLogger) Lvl(lvl int) grpclog.LoggerV2 {
if l.g.V(lvl) {
return l
}
return &discardLogger{}
}
8 changes: 5 additions & 3 deletions clientv3/logger_test.go → pkg/logger/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package clientv3
package logger_test

import (
"bytes"
"io/ioutil"
"strings"
"testing"

"github.com/coreos/etcd/pkg/logger"

"google.golang.org/grpc/grpclog"
)

func TestLogger(t *testing.T) {
buf := new(bytes.Buffer)

l := NewLogger(grpclog.NewLoggerV2WithVerbosity(buf, buf, buf, 10))
l := logger.New(grpclog.NewLoggerV2WithVerbosity(buf, buf, buf, 10))
l.Infof("hello world!")
if !strings.Contains(buf.String(), "hello world!") {
t.Fatalf("expected 'hello world!', got %q", buf.String())
Expand All @@ -43,7 +45,7 @@ func TestLogger(t *testing.T) {
}
buf.Reset()

l = NewLogger(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, ioutil.Discard))
l = logger.New(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, ioutil.Discard))
l.Infof("ignore this")
if len(buf.Bytes()) > 0 {
t.Fatalf("unexpected logs %q", buf.String())
Expand Down
Loading

0 comments on commit 3903385

Please sign in to comment.