Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Profiling DiceDB #952

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ var baseConfig = Config{
KeepAlive: int32(300),
Timeout: int32(300),
MaxConn: int32(0),
ShardCronFrequency: 1 * time.Second,
ShardCronFrequency: 30 * time.Second,
MultiplexerPollTimeout: 100 * time.Millisecond,
MaxClients: int32(20000),
MaxMemory: 0,
Expand Down
81 changes: 42 additions & 39 deletions internal/clientio/iohandler/netconn/netconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"log/slog"
"net"
"os"
"sync"
"syscall"
"time"

Expand All @@ -18,24 +19,31 @@

const (
maxRequestSize = 512 * 1024 // 512 KB
readBufferSize = 4 * 1024 // 4 KB
bufferSize = 4 * 1024 // 4 KB
idleTimeout = 10 * time.Minute
)

var bufferPool = sync.Pool{
New: func() interface{} {
return make([]byte, bufferSize)
},
}

var (
ErrRequestTooLarge = errors.New("request too large")
ErrIdleTimeout = errors.New("connection idle timeout")
ErrorClosed = errors.New("connection closed")
ErrorConnClosed = errors.New("connection closed")
)

// IOHandler handles I/O operations for a network connection
type IOHandler struct {
fd int
file *os.File
conn net.Conn
reader *bufio.Reader
writer *bufio.Writer
logger *slog.Logger
fd int
file *os.File
conn net.Conn
reader *bufio.Reader
writer *bufio.Writer
bufferPool sync.Pool
logger *slog.Logger
}

var _ iohandler.IOHandler = (*IOHandler)(nil)
Expand Down Expand Up @@ -69,17 +77,22 @@
fd: clientFD,
file: file,
conn: conn,
reader: bufio.NewReader(conn),
writer: bufio.NewWriter(conn),
reader: bufio.NewReaderSize(conn, bufferSize),
writer: bufio.NewWriterSize(conn, bufferSize),
bufferPool: sync.Pool{
New: func() interface{} {
return make([]byte, maxRequestSize)
},
},
logger: logger,
}, nil
}

func NewIOHandlerWithConn(conn net.Conn) *IOHandler {
return &IOHandler{
conn: conn,
reader: bufio.NewReader(conn),
writer: bufio.NewWriter(conn),
reader: bufio.NewReaderSize(conn, bufferSize),
writer: bufio.NewWriterSize(conn, bufferSize),
}
}

Expand All @@ -90,7 +103,8 @@
// ReadRequest reads data from the network connection
func (h *IOHandler) Read(ctx context.Context) ([]byte, error) {
var data []byte
buf := make([]byte, readBufferSize)
buf := bufferPool.Get().([]byte)
defer bufferPool.Put(buf)

Check failure on line 107 in internal/clientio/iohandler/netconn/netconn.go

View workflow job for this annotation

GitHub Actions / lint

SA6002: argument should be pointer-like to avoid allocations (staticcheck)

for {
select {
Expand All @@ -112,12 +126,12 @@
// No more data to read at this time
return data, nil
case errors.Is(err, net.ErrClosed), errors.Is(err, syscall.EPIPE), errors.Is(err, syscall.ECONNRESET):
h.logger.Error("Connection closed", slog.Any("error", err))
h.logger.Info("Connection closed", slog.Any("error", err))
cerr := h.Close()
if cerr != nil {
h.logger.Warn("Error closing connection", slog.Any("error", errors.Join(err, cerr)))
}
return nil, ErrorClosed
return nil, ErrorConnClosed
case errors.Is(err, syscall.ETIMEDOUT):
h.logger.Info("Connection idle timeout", slog.Any("error", err))
cerr := h.Close()
Expand Down Expand Up @@ -146,8 +160,6 @@

// WriteResponse writes the response back to the network connection
func (h *IOHandler) Write(ctx context.Context, response interface{}) error {
errChan := make(chan error, 1)

// Process the incoming response by calling the handleResponse function.
// This function checks the response against known RESP formatted values
// and returns the corresponding byte array representation. The result
Expand All @@ -166,32 +178,23 @@
resp = clientio.Encode(response, true)
}

go func(errChan chan error) {
_, err := h.writer.Write(resp)
if err == nil {
err = h.writer.Flush()
}

errChan <- err
}(errChan)

select {
case <-ctx.Done():
return ctx.Err()
case err := <-errChan:
if err != nil {
if errors.Is(err, net.ErrClosed) || errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) {
cerr := h.Close()
if cerr != nil {
err = errors.Join(err, cerr)
}
_, err := h.writer.Write(resp)
if err == nil {
err = h.writer.Flush()
}

h.logger.Error("Connection closed", slog.Any("error", err))
return err
if err != nil {
if errors.Is(err, net.ErrClosed) || errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) {
cerr := h.Close()
if cerr != nil {
err = errors.Join(err, cerr)
}

return fmt.Errorf("error writing response: %w", err)
h.logger.Info("Connection closed", slog.Any("error", err)) // Connection closed, logging as info
return nil
}

return fmt.Errorf("error writing response: %w", err)
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions internal/shard/shard_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ func (manager *ShardManager) GetShardInfo(key string) (id ShardID, c chan *ops.S
}

// GetShardCount returns the number of shards managed by this ShardManager.
func (manager *ShardManager) GetShardCount() int8 {
return int8(len(manager.shards))
func (manager *ShardManager) GetShardCount() uint8 {
return uint8(len(manager.shards))
}

// GetShard returns the ShardThread for the given ShardID.
Expand Down
7 changes: 7 additions & 0 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"context"
"errors"
"fmt"
"github.com/dicedb/dice/internal/clientio/iohandler/netconn"
"log/slog"
"net"
"syscall"
Expand Down Expand Up @@ -80,9 +81,15 @@
default:
data, err := w.ioHandler.Read(ctx)
if err != nil {
if errors.Is(netconn.ErrorConnClosed, err) {

Check failure on line 84 in internal/worker/worker.go

View workflow job for this annotation

GitHub Actions / lint

SA1032: arguments have the wrong order (staticcheck)
w.logger.Debug("Connection closed", slog.String("workerID", w.id))
return nil
}

w.logger.Debug("Read error, connection closed possibly", slog.String("workerID", w.id), slog.Any("error", err))
return err
}

cmds, err := w.parser.Parse(data)
if err != nil {
err = w.ioHandler.Write(ctx, err)
Expand Down
52 changes: 52 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
"os"
"os/signal"
"runtime"
"runtime/pprof"
"runtime/trace"
"sync"
"syscall"

Expand Down Expand Up @@ -149,6 +151,50 @@
}
}()
} else {
// Start CPU profiling
cpuFile, err := os.Create("cpu.prof")
if err != nil {
logr.Warn("could not create CPU profile: ", err)

Check failure on line 157 in main.go

View workflow job for this annotation

GitHub Actions / lint

slog: slog.Logger.Warn arg "err" should be a string or a slog.Attr (possible missing key or value) (govet)
}
defer cpuFile.Close()

if err := pprof.StartCPUProfile(cpuFile); err != nil {
logr.Warn("could not start CPU profile: ", err)

Check failure on line 162 in main.go

View workflow job for this annotation

GitHub Actions / lint

slog: slog.Logger.Warn arg "err" should be a string or a slog.Attr (possible missing key or value) (govet)
}
defer pprof.StopCPUProfile()

// Start memory profiling
memFile, err := os.Create("mem.prof")
if err != nil {
logr.Warn("could not create memory profile: ", err)

Check failure on line 169 in main.go

View workflow job for this annotation

GitHub Actions / lint

slog: slog.Logger.Warn arg "err" should be a string or a slog.Attr (possible missing key or value) (govet)
}
defer memFile.Close()

// Start block profiling
runtime.SetBlockProfileRate(1)
defer func() {
blockFile, err := os.Create("block.prof")
if err != nil {
logr.Warn("could not create block profile: ", err)

Check failure on line 178 in main.go

View workflow job for this annotation

GitHub Actions / lint

slog: slog.Logger.Warn arg "err" should be a string or a slog.Attr (possible missing key or value) (govet)
}
defer blockFile.Close()
if err := pprof.Lookup("block").WriteTo(blockFile, 0); err != nil {
logr.Warn("could not write block profile: ", err)

Check failure on line 182 in main.go

View workflow job for this annotation

GitHub Actions / lint

slog: slog.Logger.Warn arg "err" should be a string or a slog.Attr (possible missing key or value) (govet)
}
}()

// Start execution trace
traceFile, err := os.Create("trace.out")
if err != nil {
logr.Warn("could not create trace output file: ", err)

Check failure on line 189 in main.go

View workflow job for this annotation

GitHub Actions / lint

slog: slog.Logger.Warn arg "err" should be a string or a slog.Attr (possible missing key or value) (govet)
}
defer traceFile.Close()

if err := trace.Start(traceFile); err != nil {
logr.Warn("could not start trace: ", err)

Check failure on line 194 in main.go

View workflow job for this annotation

GitHub Actions / lint

slog: slog.Logger.Warn arg "err" should be a string or a slog.Attr (possible missing key or value) (govet)
}
defer trace.Stop()

workerManager := worker.NewWorkerManager(config.DiceConfig.Server.MaxClients, shardManager)
// Initialize the RESP Server
respServer := resp.NewServer(shardManager, workerManager, serverErrCh, logr)
Expand Down Expand Up @@ -181,6 +227,12 @@
respServer.Shutdown()
cancel()
}()

// Ensure all profiling data is written before exiting
runtime.GC()
if err := pprof.WriteHeapProfile(memFile); err != nil {
logr.Warn("could not write memory profile: ", err)

Check failure on line 234 in main.go

View workflow job for this annotation

GitHub Actions / lint

slog: slog.Logger.Warn arg "err" should be a string or a slog.Attr (possible missing key or value) (govet)
}
}

websocketServer := server.NewWebSocketServer(shardManager, watchChan, logr)
Expand Down
Loading