Skip to content

Commit

Permalink
Merge pull request #19139 from joshuazh-x/fix-embed-close-deadlock
Browse files Browse the repository at this point in the history
Avoid deadlock in etcd.Close when stopping during bootstrapping
  • Loading branch information
ahrtr authored Jan 9, 2025
2 parents 3d562c3 + f9ce13e commit 75f2ae1
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 3 deletions.
7 changes: 6 additions & 1 deletion server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type Etcd struct {
errc chan error

closeOnce sync.Once
wg sync.WaitGroup
}

type peerListener struct {
Expand All @@ -109,7 +110,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
if !serving {
// errored before starting gRPC server for serveCtx.serversC
for _, sctx := range e.sctxs {
close(sctx.serversC)
sctx.close()
}
}
e.Close()
Expand Down Expand Up @@ -456,6 +457,7 @@ func (e *Etcd) Close() {
}
}
if e.errc != nil {
e.wg.Wait()
close(e.errc)
}
}
Expand Down Expand Up @@ -870,6 +872,9 @@ func (e *Etcd) serveMetrics() (err error) {
}

func (e *Etcd) errHandler(err error) {
e.wg.Add(1)
defer e.wg.Done()

if err != nil {
e.GetLogger().Error("setting up serving from embedded etcd failed.", zap.Error(err))
}
Expand Down
13 changes: 11 additions & 2 deletions server/embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net"
"net/http"
"strings"
"sync"

gw "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/soheilhy/cmux"
Expand Down Expand Up @@ -66,6 +67,7 @@ type serveCtx struct {
userHandlers map[string]http.Handler
serviceRegister func(*grpc.Server)
serversC chan *servers
closeOnce sync.Once
}

type servers struct {
Expand Down Expand Up @@ -102,6 +104,9 @@ func (sctx *serveCtx) serve(
) (err error) {
logger := defaultLog.New(io.Discard, "etcdhttp", 0)

// Make sure serversC is closed even if we prematurely exit the function.
defer sctx.close()

select {
case <-s.StoppingNotify():
return errors.New("server is stopping")
Expand All @@ -121,8 +126,6 @@ func (sctx *serveCtx) serve(
servElection := v3election.NewElectionServer(v3c)
servLock := v3lock.NewLockServer(v3c)

// Make sure serversC is closed even if we prematurely exit the function.
defer close(sctx.serversC)
var gwmux *gw.ServeMux
if s.Cfg.EnableGRPCGateway {
// GRPC gateway connects to grpc server via connection provided by grpc dial.
Expand Down Expand Up @@ -521,3 +524,9 @@ func (sctx *serveCtx) registerTrace() {
evf := func(w http.ResponseWriter, r *http.Request) { trace.RenderEvents(w, r, true) }
sctx.registerUserHandler("/debug/events", http.HandlerFunc(evf))
}

func (sctx *serveCtx) close() {
sctx.closeOnce.Do(func() {
close(sctx.serversC)
})
}
1 change: 1 addition & 0 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1776,6 +1776,7 @@ func (s *EtcdServer) publishV3(timeout time.Duration) {
ClientUrls: s.attributes.ClientURLs,
},
}
// gofail: var beforePublishing struct{}
lg := s.Logger()
for {
select {
Expand Down
42 changes: 42 additions & 0 deletions tests/integration/embed/embed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,45 @@ func TestEmbedEtcdAutoCompactionRetentionRetained(t *testing.T) {
assert.Equal(t, durationToCompare, autoCompactionRetention)
e.Close()
}

func TestEmbedEtcdStopDuringBootstrapping(t *testing.T) {
integration2.BeforeTest(t, integration2.WithFailpoint("beforePublishing", `sleep("2s")`))

done := make(chan struct{})
go func() {
defer close(done)

cfg := embed.NewConfig()
urls := newEmbedURLs(false, 2)
setupEmbedCfg(cfg, []url.URL{urls[0]}, []url.URL{urls[1]})
cfg.Dir = filepath.Join(t.TempDir(), "embed-etcd")

e, err := embed.StartEtcd(cfg)
if err != nil {
t.Errorf("Failed to start etcd, got error %v", err)
}
defer e.Close()

go func() {
time.Sleep(time.Second)
e.Server.Stop()
t.Log("Stopped server during bootstrapping")
}()

select {
case <-e.Server.ReadyNotify():
t.Log("Server is ready!")
case <-e.Server.StopNotify():
t.Log("Server is stopped")
case <-time.After(20 * time.Second):
e.Server.Stop() // trigger a shutdown
t.Error("Server took too long to start!")
}
}()

select {
case <-done:
case <-time.After(10 * time.Second):
t.Error("timeout in bootstrapping etcd")
}
}

0 comments on commit 75f2ae1

Please sign in to comment.