Skip to content

Commit

Permalink
Fix bug in handling early exit
Browse files Browse the repository at this point in the history
Move left-over functions
Add comments
  • Loading branch information
mostafa committed Dec 26, 2024
1 parent b78b541 commit e1ffb20
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 49 deletions.
68 changes: 61 additions & 7 deletions cmd/gatewayd_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"crypto/tls"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"os/signal"
"runtime"
"strconv"
"sync/atomic"
"time"

"github.com/NYTimes/gziphandler"
Expand Down Expand Up @@ -42,6 +44,19 @@ import (
"google.golang.org/grpc/credentials"
)

var _ io.Writer = &cobraCmdWriter{}

type cobraCmdWriter struct {
*cobra.Command
}

func (c *cobraCmdWriter) Write(p []byte) (int, error) {
c.Print(string(p))
return len(p), nil
}

var UsageReportURL = "localhost:59091"

type GatewayDApp struct {
EnableTracing bool
EnableSentry bool
Expand All @@ -67,6 +82,30 @@ type GatewayDApp struct {
servers map[string]*network.Server
healthCheckScheduler *gocron.Scheduler
stopChan chan struct{}
ranStopGracefully *atomic.Bool
}

// NewGatewayDApp creates a new GatewayDApp instance.
func NewGatewayDApp(cmd *cobra.Command) *GatewayDApp {
app := GatewayDApp{
loggers: make(map[string]zerolog.Logger),
pools: make(map[string]map[string]*pool.Pool),
clients: make(map[string]map[string]*config.Client),
proxies: make(map[string]map[string]*network.Proxy),
servers: make(map[string]*network.Server),
healthCheckScheduler: gocron.NewScheduler(time.UTC),
stopChan: make(chan struct{}),
ranStopGracefully: &atomic.Bool{},
}
app.EnableTracing, _ = cmd.Flags().GetBool("enable-tracing")
app.EnableSentry, _ = cmd.Flags().GetBool("enable-sentry")
app.EnableUsageReport, _ = cmd.Flags().GetBool("enable-usage-report")
app.EnableLinting, _ = cmd.Flags().GetBool("enable-linting")
app.DevMode, _ = cmd.Flags().GetBool("dev")
app.CollectorURL, _ = cmd.Flags().GetString("collector-url")
app.GlobalConfigFile, _ = cmd.Flags().GetString("config")
app.PluginConfigFile, _ = cmd.Flags().GetString("plugin-config")
return &app
}

// loadConfig loads global and plugin configuration.
Expand Down Expand Up @@ -973,6 +1012,11 @@ func (app *GatewayDApp) startServers(

// stopGracefully stops the server gracefully.
func (app *GatewayDApp) stopGracefully(runCtx context.Context, sig os.Signal) {
// Only allow one call to this function.
if !app.ranStopGracefully.CompareAndSwap(false, true) {
return
}

_, span := otel.Tracer(config.TracerName).Start(runCtx, "Shutdown server")
currentSignal := "unknown"
if sig != nil {
Expand Down Expand Up @@ -1027,12 +1071,15 @@ func (app *GatewayDApp) stopGracefully(runCtx context.Context, sig os.Signal) {
span.AddEvent("Stopped metrics server")
}
}
earlyExit := false
for name, server := range app.servers {
logger.Info().Str("name", name).Msg("Stopping server")
server.Shutdown()
span.AddEvent("Stopped server")
if server.IsRunning() {
logger.Info().Str("name", name).Msg("Stopping server")
server.Shutdown()
span.AddEvent("Stopped server")
earlyExit = true
}
}
logger.Info().Msg("Stopped all servers")
if app.pluginRegistry != nil {
app.pluginRegistry.Shutdown()
logger.Info().Msg("Stopped plugin registry")
Expand All @@ -1052,9 +1099,16 @@ func (app *GatewayDApp) stopGracefully(runCtx context.Context, sig os.Signal) {
span.AddEvent("Stopped gRPC Server")
}

// Close the stop channel to notify the other goroutines to stop.
app.stopChan <- struct{}{}
close(app.stopChan)
logger.Info().Msg("GatewayD is shutdown")

// If the code never reaches the point where the app.stopChan is used,
// it means that the server was never started. This is a manual shutdown
// by the app, so we don't need to send a signal to the other goroutines.
if earlyExit {
// Close the stop channel to notify the other goroutines to stop.
app.stopChan <- struct{}{}
close(app.stopChan)
}
}

// handleSignals handles the signals and stops the server gracefully.
Expand Down
47 changes: 5 additions & 42 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,23 @@ package cmd

import (
"context"
"io"
"log"
"os"
"syscall"
"time"

"github.com/gatewayd-io/gatewayd/config"
gerr "github.com/gatewayd-io/gatewayd/errors"
"github.com/gatewayd-io/gatewayd/network"
"github.com/gatewayd-io/gatewayd/pool"
"github.com/gatewayd-io/gatewayd/raft"
"github.com/gatewayd-io/gatewayd/tracing"
"github.com/getsentry/sentry-go"
"github.com/go-co-op/gocron"
"github.com/rs/zerolog"
"github.com/spf13/cobra"
"go.opentelemetry.io/otel"
"golang.org/x/exp/maps"
)

var _ io.Writer = &cobraCmdWriter{}

type cobraCmdWriter struct {
*cobra.Command
}

func (c *cobraCmdWriter) Write(p []byte) (int, error) {
c.Print(string(p))
return len(p), nil
}

var (
UsageReportURL = "localhost:59091"
testMode bool
testApp *GatewayDApp
testMode bool
testApp *GatewayDApp
)

// EnableTestMode enables test mode and returns the previous value.
Expand All @@ -52,7 +34,7 @@ var runCmd = &cobra.Command{
Use: "run",
Short: "Run a GatewayD instance",
Run: func(cmd *cobra.Command, _ []string) {
app := NewGatewayDInstance(cmd)
app := NewGatewayDApp(cmd)

// If test mode is enabled, we need to access the app instance from the test,
// so we can stop the server gracefully.
Expand Down Expand Up @@ -206,6 +188,7 @@ var runCmd = &cobra.Command{

_, span = otel.Tracer(config.TracerName).Start(runCtx, "Create pools and clients")

// Create pools and clients.
if err := app.createPoolAndClients(runCtx, span); err != nil {
logger.Error().Err(err).Msg("Failed to create pools and clients")
span.RecordError(err)
Expand All @@ -225,6 +208,7 @@ var runCmd = &cobra.Command{
_, span = otel.Tracer(config.TracerName).Start(runCtx, "Create Raft Node")
defer span.End()

// Create the Raft node.
raftNode, originalErr := raft.NewRaftNode(logger, app.conf.Global.Raft)
if originalErr != nil {
logger.Error().Err(originalErr).Msg("Failed to start raft node")
Expand Down Expand Up @@ -276,24 +260,3 @@ func init() {
runCmd.Flags().Bool("lint", true, "Enable linting of configuration files")
runCmd.Flags().Bool("metrics-merger", true, "Enable metrics merger")
}

func NewGatewayDInstance(cmd *cobra.Command) *GatewayDApp {
app := GatewayDApp{
loggers: make(map[string]zerolog.Logger),
pools: make(map[string]map[string]*pool.Pool),
clients: make(map[string]map[string]*config.Client),
proxies: make(map[string]map[string]*network.Proxy),
servers: make(map[string]*network.Server),
healthCheckScheduler: gocron.NewScheduler(time.UTC),
stopChan: make(chan struct{}),
}
app.EnableTracing, _ = cmd.Flags().GetBool("enable-tracing")
app.EnableSentry, _ = cmd.Flags().GetBool("enable-sentry")
app.EnableUsageReport, _ = cmd.Flags().GetBool("enable-usage-report")
app.EnableLinting, _ = cmd.Flags().GetBool("enable-linting")
app.DevMode, _ = cmd.Flags().GetBool("dev")
app.CollectorURL, _ = cmd.Flags().GetString("collector-url")
app.GlobalConfigFile, _ = cmd.Flags().GetString("config")
app.PluginConfigFile, _ = cmd.Flags().GetString("plugin-config")
return &app
}

0 comments on commit e1ffb20

Please sign in to comment.