Skip to content

Commit

Permalink
add package for easily embedding etcd into go based projects (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
charless-splunk authored Mar 12, 2019
1 parent 85180f4 commit 6ce1f71
Show file tree
Hide file tree
Showing 1,369 changed files with 547,578 additions and 2 deletions.
85 changes: 85 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
---
version: 2.1
executors:

goexecutor:
working_directory: /usr/local/go/src/github.com/signalfx/embedded-etcd
docker:
- image: golang:1.12

jobs:

cache_gobuild: # check out code and install dependencies for downstream jobs
executor: goexecutor
steps:
- run:
name: "go get golang.org/x/lint/golint"
command: go get -u golang.org/x/lint/golint
- run:
name: "go get github.com/alecthomas/gometalinter"
command: go get -u github.com/alecthomas/gometalinter
- run:
name: "install gometalinter linters"
command: gometalinter --install --update
- run:
name: "go get gobuild"
command: go get -u github.com/signalfx/gobuild
- run:
name: "install gobuild"
command: gobuild -verbose install
- save_cache:
key: goexecutor-cache-{{ .Branch }}-{{ .Revision }}
paths:
- /go
- /usr/local/go

build:
executor: goexecutor
steps:
- restore_cache:
key: goexecutor-cache-{{ .Branch }}-{{ .Revision }}
- checkout
- run:
name: "gobuild build"
command: gobuild build

lint:
executor: goexecutor
steps:
- restore_cache:
key: goexecutor-cache-{{ .Branch }}-{{ .Revision }}
- checkout
- run:
name: "gobuild lint"
command: |
export GO111MODULE=off
gobuild lint
- run:
name: "gobuild dupl"
command: |
export GO111MODULE=off
gobuild dupl
test:
executor: goexecutor
steps:
- restore_cache:
key: goexecutor-cache-{{ .Branch }}-{{ .Revision }}
- checkout
- run:
name: "gobuild test"
command: gobuild test

workflows:
build_test:
jobs:
- cache_gobuild
- build:
requires:
- cache_gobuild
- lint:
requires:
- cache_gobuild
- test:
requires:
- cache_gobuild
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# IDE Specific Files
.idea
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# embedded-etcd
Utilities for embedding etcd into go projects
# embetcd [![CircleCI](https://circleci.com/gh/signalfx/embetcd/tree/master.svg?style=svg)](https://circleci.com/gh/signalfx/embetcd/tree/master)
Package with helper functions for for embedding etcd into go projects
84 changes: 84 additions & 0 deletions embetcd/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package embetcd

import (
"context"
cli "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"path"
)

// Client wraps around an etcd v3 client and adds some helper functions
type Client struct {
*cli.Client
}

// PutWithKeepAlive puts a key and value with a keep alive returns
// a lease, the keep alive response channel, and an err if one occurrs
func (c *Client) PutWithKeepAlive(ctx context.Context, key string, value string, ttl int64) (lease *cli.LeaseGrantResponse, keepAlive <-chan *cli.LeaseKeepAliveResponse, err error) {
// create a lease for the member key
if err == nil {
// create a new lease with a 5 second ttl
lease, err = c.Grant(context.Background(), ttl)
}

// keep the lease alive if we successfully put the key in
if err == nil {
keepAlive, err = c.KeepAlive(context.Background(), lease.ID)
}

// put in a key for the server
if err == nil {
_, err = c.Put(ctx, key, value, cli.WithLease(lease.ID))
}

return lease, keepAlive, err
}

// Lock accepts an etcd client, context (with cancel), and name and creates a concurrent lock
func (c *Client) Lock(ctx context.Context, name string) (unlock func(context.Context) error, err error) {
var session *concurrency.Session
session, err = concurrency.NewSession(c.Client)

var mutex *concurrency.Mutex
if err == nil {
// create a mutex using the session under /mutex/name
mutex = concurrency.NewMutex(session, path.Join("", "mutex", name))

// lock the mutex and return a function to unlock the mutex
err = mutex.Lock(ctx)
}

// set unlock function
if err == nil {
unlock = func(ctx context.Context) (err error) {
// we need to return the first error we encounter
// but we need to do both operations
errs := make([]error, 2)
errs[0] = mutex.Unlock(ctx)
errs[1] = session.Close()

// return first error
for _, err := range errs {
if err != nil {
return err
}
}

// return nil
return nil
}
}

return unlock, err
}

// NewClient returns a new etcd v3client wrapped with some helper functions
func NewClient(cfg cli.Config) (client *Client, err error) {
var etcdClient *cli.Client

if etcdClient, err = cli.New(cfg); err == nil {
client = &Client{Client: etcdClient}
}

return client, err
}
49 changes: 49 additions & 0 deletions embetcd/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package embetcd

import (
"context"
"fmt"
"net/url"
"time"
)

var (
// ErrNameConflict is an error indicating that the server name is in conflict with an existing member of the cluster
ErrNameConflict = fmt.Errorf("server name is in conflict with an existing cluster member")
// ErrAlreadyRunning is an error indicating that the server is already running
ErrAlreadyRunning = fmt.Errorf("server is already running")
// ErrAlreadyStopped is an error indicating that the server is already stopped
ErrAlreadyStopped = fmt.Errorf("server is already stopped")
// ErrClusterNameConflict is an error indicating that the configured cluster name conflicts with the target cluster
ErrClusterNameConflict = fmt.Errorf("cluster name either does not exist in the cluster under '/_etcd-cluster/name' or is different from this server's cluster name")
)

// WaitForStructChOrErrCh waits for the struct channel, error channel or context to return a value
func WaitForStructChOrErrCh(ctx context.Context, structCh <-chan struct{}, errCh <-chan error) error {
// wait for the server to start or error out
select {
case <-structCh:
return nil
case err := <-errCh:
return err
case <-ctx.Done():
return ctx.Err()
}
}

// DurationOrDefault returns the pointed duration or the specified default
func DurationOrDefault(in *time.Duration, def time.Duration) time.Duration {
if in != nil {
return *in
}
return def
}

// URLSToStringSlice converts urls slices to string slices
func URLSToStringSlice(urls []url.URL) []string {
strs := make([]string, 0, len(urls))
for _, u := range urls {
strs = append(strs, u.String())
}
return strs
}
106 changes: 106 additions & 0 deletions embetcd/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package embetcd

import (
"context"
"fmt"
"github.com/signalfx/golib/pointer"
"net/url"
"testing"
"time"
)

func Test_WaitForStructChOrErrCh(t *testing.T) {
closedStructCh := make(chan struct{})
close(closedStructCh)
ErrChWithErr := make(chan error, 1)
ErrChWithErr <- fmt.Errorf("error on error channel")
canceledCtx, cancelFn := context.WithCancel(context.Background())
cancelFn()
type args struct {
readyCh <-chan struct{}
errCh <-chan error
ctx context.Context
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "test readyCh returns",
args: args{
readyCh: closedStructCh,
errCh: make(chan error),
ctx: context.Background(),
},
wantErr: false,
},
{
name: "test errCh returns",
args: args{
readyCh: make(chan struct{}),
errCh: ErrChWithErr,
ctx: context.Background(),
},
wantErr: true,
},
{
name: "test cancelled context returns",
args: args{
readyCh: make(chan struct{}),
errCh: make(chan error),
ctx: canceledCtx,
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := WaitForStructChOrErrCh(tt.args.ctx, tt.args.readyCh, tt.args.errCh); (err != nil) != tt.wantErr {
t.Errorf("waitForReady() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

func Test_duration(t *testing.T) {
type args struct {
in *time.Duration
def time.Duration
}
tests := []struct {
name string
args args
want time.Duration
}{
{
name: "expect default to be returned when incoming value is nil",
args: args{
def: time.Second * 5,
},
want: time.Second * 5,
},
{
name: "expect default to be returned when incoming value is nil",
args: args{
in: pointer.Duration(time.Second * 1),
def: time.Second * 5,
},
want: time.Second * 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := DurationOrDefault(tt.args.in, tt.args.def); got != tt.want {
t.Errorf("duration() = %v, want %v", got, tt.want)
}
})
}
}

func Test_URLToSTringSlice(t *testing.T) {
urls := URLSToStringSlice([]url.URL{{Scheme: "http", Host: "test1:8080"}})
if len(urls) < 1 || urls[0] != "http://test1:8080" {
t.Errorf("expected 'http://test1:8080', but got %v", urls)
}
}
43 changes: 43 additions & 0 deletions embetcd/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package embetcd

import (
"context"
"crypto/tls"
"time"

cli "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/embed"
)

// Config is a struct representing etcd config plus additional configurations we need for running etcd with this project
type Config struct {
*embed.Config
ClusterName string
InitialCluster []string
CleanUpInterval *time.Duration
DialTimeout *time.Duration
AutoSyncInterval *time.Duration
StartupGracePeriod *time.Duration
UnhealthyTTL *time.Duration
RemoveMemberTimeout *time.Duration
}

// GetClientFromConfig returns a client with the supplied context from the config
func (c *Config) GetClientFromConfig(ctx context.Context) (*Client, error) {
if ctx == nil {
ctx = context.Background()
}
// create a client to the existing cluster
return NewClient(cli.Config{
Endpoints: c.InitialCluster,
DialTimeout: DurationOrDefault(c.DialTimeout, DefaultDialTimeout),
TLS: &tls.Config{InsecureSkipVerify: true}, // insecure for now
AutoSyncInterval: DurationOrDefault(c.AutoSyncInterval, DefaultAutoSyncInterval),
Context: ctx, // pass in the context so the temp client closes with a cancelled context
})
}

// NewConfig returns a new config object with defaults provided by etcd embed
func NewConfig() *Config {
return &Config{Config: embed.NewConfig()}
}
Loading

0 comments on commit 6ce1f71

Please sign in to comment.