diff --git a/config/config.go b/config/config.go index 11409af2e..dcf6e6d6a 100644 --- a/config/config.go +++ b/config/config.go @@ -28,7 +28,6 @@ type DiceDBConfig struct { NumShards int `mapstructure:"num-shards" default:"-1" description:"number of shards to create. defaults to number of cores"` EnableWAL bool `mapstructure:"enable-wal" default:"false" description:"enable write-ahead logging"` - WALEngine string `mapstructure:"wal-engine" default:"aof" description:"wal engine to use, values: sqlite, aof"` WALDir string `mapstructure:"wal-dir" default:"/var/log/dicedb" description:"the directory to store WAL segments"` WALMode string `mapstructure:"wal-mode" default:"buffered" description:"wal mode to use, values: buffered, unbuffered"` WALWriteMode string `mapstructure:"wal-write-mode" default:"default" description:"wal file write mode to use, values: default, fsync"` diff --git a/dicedb.conf b/dicedb.conf deleted file mode 100644 index af4759337..000000000 --- a/dicedb.conf +++ /dev/null @@ -1,74 +0,0 @@ -# Configuration file for Dicedb - -# Version -version = "0.1.0" - -# Async Server Configuration -async_server.addr = "0.0.0.0" -async_server.port = 7379 -async_server.keepalive = 300 -async_server.timeout = 300 -async_server.max_conn = 0 - -# HTTP Configuration -http.enabled = false -http.port = 8082 - -# WebSocket Configuration -websocket.enabled = false -websocket.port = 8379 -websocket.max_write_response_retries = 3 -websocket.write_response_timeout = 10s - -# Performance Configuration -performance.watch_chan_buf_size = 20000 -performance.shard_cron_frequency = 1s -performance.multiplexer_poll_timeout = 100ms -performance.max_clients = 20000 -performance.store_map_init_size = 1024000 -performance.adhoc_req_chan_buf_size = 20 -performance.enable_profiling = false -performance.enable_watch = false -performance.num_shards = -1 - -# Memory Configuration -memory.max_memory = 0 -memory.eviction_policy = "allkeys-lfu" -memory.eviction_ratio = 0.9 -memory.keys_limit = 200000000 -memory.lfu_log_factor = 10 - -# Persistence Configuration -persistence.enabled = false -persistence.aof_file = "./dice-master.aof" -persistence.persistence_enabled = true -persistence.write_aof_on_cleanup = false -persistence.wal-dir = "./" -persistence.restore-wal = false -persistence.wal-engine = "aof" - -# Logging Configuration -logging.log_level = "info" - -# Authentication Configuration -auth.username = "dice" -auth.password = "" - -# Network Configuration -network.io_buffer_length = 512 -network.io_buffer_length_max = 51200 - -# WAL Configuration -LogDir = "tmp/dicedb-wal" -Enabled = "true" -WalMode = "buffered" -WriteMode = "default" -BufferSizeMB = 1 -RotationMode = "segemnt-size" -MaxSegmentSizeMB = 16 -MaxSegmentRotationTime = 60s -BufferSyncInterval = 200ms -RetentionMode = "num-segments" -MaxSegmentCount = 10 -MaxSegmentRetentionDuration = 600s -RecoveryMode = "strict" \ No newline at end of file diff --git a/internal/commandhandler/cmd_meta.go b/internal/commandhandler/cmd_meta.go index 1d50604f4..d0928fd61 100644 --- a/internal/commandhandler/cmd_meta.go +++ b/internal/commandhandler/cmd_meta.go @@ -226,342 +226,456 @@ type CmdMeta struct { // ensures that any required information is retrieved and processed in advance. Use this when set // preProcessingReq = true. preProcessResponse func(h *BaseCommandHandler, DiceDBCmd *cmd.DiceDBCmd) error + + // ReadOnly indicates whether the command modifies the database state. + // If true, the command only reads data and doesn't need to be logged in WAL. + ReadOnly bool } var CommandsMeta = map[string]CmdMeta{ // Single-shard commands. CmdSet: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdExpire: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdExpireAt: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdExpireTime: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdGet: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdGetSet: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdGetEx: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdGetDel: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdSadd: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdSrem: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdScard: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdSmembers: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdHExists: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdHKeys: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdHVals: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdJSONArrAppend: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdJSONArrInsert: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdJSONArrTrim: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdJSONArrLen: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdJSONArrPop: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdJSONClear: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdJSONSet: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdJSONObjKeys: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdJSONDel: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdDel: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdExists: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdPersist: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdTypeOf: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdJSONForget: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdJSONGet: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdJSONStrlen: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdJSONObjlen: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdJSONNumIncrBY: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdJSONNumMultBy: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdJSONType: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdJSONToggle: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdJSONDebug: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdJSONResp: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdGetRange: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdPFAdd: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdPFCount: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdTTL: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdPTTL: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdHLen: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdHStrLen: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdHScan: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdHIncrBy: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdHIncrByFloat: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdHRandField: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdSetBit: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdGetBit: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdBitCount: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdBitField: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdBitPos: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdBitFieldRO: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdLrange: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdLinsert: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdLPush: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdRPush: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdLPop: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdRPop: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdLLEN: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdCMSQuery: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdCMSInfo: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdCMSIncrBy: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdCMSInitByDim: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdCMSInitByProb: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdCMSMerge: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdHSet: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdHGet: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdHSetnx: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdHDel: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdHMSet: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdHMGet: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, // Sorted set commands CmdZAdd: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdZCount: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdZRank: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdZRange: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdZCard: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdZRem: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdAppend: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdIncr: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdIncrBy: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdDecr: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdDecrBy: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdIncrByFloat: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdZPopMin: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdZPopMax: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, // Bloom Filter CmdBFAdd: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdBFInfo: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdBFExists: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdBFReserve: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdDump: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdRestore: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, // geoCommands CmdGeoAdd: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: false, }, CmdGeoDist: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdGeoPos: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdGeoHash: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdClient: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdLatency: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdObject: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdCommand: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdCommandCount: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdCommandHelp: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdCommandInfo: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdCommandList: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdCommandDocs: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdCommandGetKeys: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdCommandGetKeysFlags: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, CmdJSONArrIndex: { - CmdType: SingleShard, + CmdType: SingleShard, + ReadOnly: true, }, // Multi-shard commands. @@ -571,6 +685,7 @@ var CommandsMeta = map[string]CmdMeta{ preProcessResponse: preProcessRename, decomposeCommand: (*BaseCommandHandler).decomposeRename, composeResponse: composeRename, + ReadOnly: false, }, CmdCopy: { @@ -579,6 +694,7 @@ var CommandsMeta = map[string]CmdMeta{ preProcessResponse: customProcessCopy, decomposeCommand: (*BaseCommandHandler).decomposeCopy, composeResponse: composeCopy, + ReadOnly: false, }, CmdPFMerge: { @@ -587,92 +703,112 @@ var CommandsMeta = map[string]CmdMeta{ preProcessResponse: preProcessPFMerge, decomposeCommand: (*BaseCommandHandler).decomposePFMerge, composeResponse: composePFMerge, + ReadOnly: false, }, CmdMset: { CmdType: MultiShard, decomposeCommand: (*BaseCommandHandler).decomposeMSet, composeResponse: composeMSet, + ReadOnly: false, }, CmdMget: { CmdType: MultiShard, decomposeCommand: (*BaseCommandHandler).decomposeMGet, composeResponse: composeMGet, + ReadOnly: true, }, CmdSInter: { CmdType: MultiShard, decomposeCommand: (*BaseCommandHandler).decomposeSInter, composeResponse: composeSInter, + ReadOnly: true, }, CmdSDiff: { CmdType: MultiShard, decomposeCommand: (*BaseCommandHandler).decomposeSDiff, composeResponse: composeSDiff, + ReadOnly: true, }, CmdJSONMget: { CmdType: MultiShard, decomposeCommand: (*BaseCommandHandler).decomposeJSONMget, composeResponse: composeJSONMget, + ReadOnly: true, }, CmdTouch: { CmdType: MultiShard, decomposeCommand: (*BaseCommandHandler).decomposeTouch, composeResponse: composeTouch, + ReadOnly: false, }, CmdDBSize: { CmdType: AllShard, decomposeCommand: (*BaseCommandHandler).decomposeDBSize, composeResponse: composeDBSize, + ReadOnly: true, }, CmdKeys: { CmdType: AllShard, decomposeCommand: (*BaseCommandHandler).decomposeKeys, composeResponse: composeKeys, + ReadOnly: true, }, CmdFlushDB: { CmdType: AllShard, decomposeCommand: (*BaseCommandHandler).decomposeFlushDB, composeResponse: composeFlushDB, + ReadOnly: false, }, // Custom commands. CmdAbort: { - CmdType: Custom, + CmdType: Custom, + ReadOnly: true, }, CmdAuth: { - CmdType: Custom, + CmdType: Custom, + ReadOnly: true, }, CmdEcho: { - CmdType: Custom, + CmdType: Custom, + ReadOnly: true, }, CmdPing: { - CmdType: Custom, + CmdType: Custom, + ReadOnly: true, }, // Watch commands CmdGetWatch: { - CmdType: Watch, + CmdType: Watch, + ReadOnly: true, }, CmdZRangeWatch: { - CmdType: Watch, + CmdType: Watch, + ReadOnly: true, }, CmdPFCountWatch: { - CmdType: Watch, + CmdType: Watch, + ReadOnly: true, }, // Unwatch commands CmdGetUnWatch: { - CmdType: Unwatch, + CmdType: Unwatch, + ReadOnly: true, }, CmdZRangeUnWatch: { - CmdType: Unwatch, + CmdType: Unwatch, + ReadOnly: true, }, CmdPFCountUnWatch: { - CmdType: Unwatch, + CmdType: Unwatch, + ReadOnly: true, }, } diff --git a/internal/commandhandler/commandhandler.go b/internal/commandhandler/commandhandler.go index 82e1c0551..b9fec8c31 100644 --- a/internal/commandhandler/commandhandler.go +++ b/internal/commandhandler/commandhandler.go @@ -10,6 +10,7 @@ import ( "log/slog" "net" "strconv" + "strings" "sync/atomic" "syscall" "time" @@ -148,7 +149,7 @@ func (h *BaseCommandHandler) executeCommandHandler(execCtx context.Context, gec } } - resp, err := h.executeCommand(execCtx, commands[0], isWatchNotification) + resp, err := h.ExecuteCommand(execCtx, commands[0], isWatchNotification, true) // log error and send to global error channel if it's a connection error if err != nil { @@ -162,7 +163,7 @@ func (h *BaseCommandHandler) executeCommandHandler(execCtx context.Context, gec return resp, err } -func (h *BaseCommandHandler) executeCommand(ctx context.Context, diceDBCmd *cmd.DiceDBCmd, isWatchNotification bool) (interface{}, error) { +func (h *BaseCommandHandler) ExecuteCommand(ctx context.Context, diceDBCmd *cmd.DiceDBCmd, isWatchNotification, shouldLog bool) (interface{}, error) { // Break down the single command into multiple commands if multisharding is supported. // The length of cmdList helps determine how many shards to wait for responses. cmdList := make([]*cmd.DiceDBCmd, 0) @@ -244,6 +245,15 @@ func (h *BaseCommandHandler) executeCommand(ctx context.Context, diceDBCmd *cmd. return h.handleCommandUnwatch(cmdList) } + // Log command to WAL before execution if it's not a read-only command + if !meta.ReadOnly && h.wl != nil && shouldLog { + // Convert command to bytes for WAL logging + cmdBytes := []byte(fmt.Sprintf("%s %s", diceDBCmd.Cmd, strings.Join(diceDBCmd.Args, " "))) + if err := h.wl.LogCommand(cmdBytes); err != nil { + return nil, fmt.Errorf("failed to log command to WAL: %w", err) + } + } + // Scatter the broken-down commands to the appropriate shards. if err := h.scatter(ctx, cmdList, meta.CmdType); err != nil { return nil, err @@ -555,3 +565,32 @@ func (h *BaseCommandHandler) RespAuth(args []string) interface{} { return clientio.OK } + +func NewWALReplayHandler(ctx context.Context, shardManager *shard.ShardManager) (*BaseCommandHandler, error) { + // Create channels for the replay handler + responseChan := make(chan *ops.StoreResponse) + preprocessingChan := make(chan *ops.StoreResponse) + watchSubscriptionChan := make(chan watchmanager.WatchSubscription) + globalErrorChan := make(chan error) + ioThreadReadChan := make(chan []byte) + ioThreadWriteChan := make(chan interface{}) + ioThreadErrChan := make(chan error) + + // Create a new command handler for WAL replay + replayHandler := NewCommandHandler( + "wal-replay", + responseChan, + preprocessingChan, + watchSubscriptionChan, + nil, // No parser needed for replay + shardManager, + globalErrorChan, + ioThreadReadChan, + ioThreadWriteChan, + ioThreadErrChan, + nil, // No WAL needed for replay handler + ) + + shardManager.RegisterCommandHandler(replayHandler.ID(), responseChan, preprocessingChan) + return replayHandler, nil +} diff --git a/internal/wal/wal.go b/internal/wal/wal.go index de75e11b9..a1d9df1c2 100644 --- a/internal/wal/wal.go +++ b/internal/wal/wal.go @@ -4,19 +4,17 @@ package wal import ( - "fmt" "log/slog" sync "sync" "time" - - "github.com/dicedb/dice/internal/cmd" ) type AbstractWAL interface { LogCommand([]byte) error Close() error Init(t time.Time) error - ForEachCommand(f func(c cmd.DiceDBCmd) error) error + Replay(c func(*WALEntry) error) error + ForEachCommand(e *WALEntry, c func(*WALEntry) error) error } var ( @@ -62,14 +60,3 @@ func ShutdownBG() { close(stopCh) ticker.Stop() } - -func ReplayWAL(wl AbstractWAL) { - err := wl.ForEachCommand(func(c cmd.DiceDBCmd) error { - fmt.Println("replaying", c.Cmd, c.Args) - return nil - }) - - if err != nil { - slog.Warn("error replaying WAL", slog.Any("error", err)) - } -} diff --git a/internal/wal/wal_aof.go b/internal/wal/wal_aof.go index 0b7abef9b..e581f073e 100644 --- a/internal/wal/wal_aof.go +++ b/internal/wal/wal_aof.go @@ -14,15 +14,18 @@ import ( "log/slog" "os" "path/filepath" + "sort" + "strconv" + "strings" sync "sync" "time" "github.com/dicedb/dice/config" - "github.com/dicedb/dice/internal/cmd" ) const ( segmentPrefix = "seg-" + segmentSuffix = ".wal" defaultVersion = "v0.0.1" RotationModeTime = "time" RetentionModeTime = "time" @@ -83,7 +86,7 @@ func (wal *AOF) Init(t time.Time) error { } // Get the list of log segment files in the directory - files, err := filepath.Glob(filepath.Join(wal.logDir, segmentPrefix+"*")) + files, err := filepath.Glob(filepath.Join(wal.logDir, segmentPrefix+"*"+segmentSuffix)) if err != nil { return nil } @@ -97,7 +100,7 @@ func (wal *AOF) Init(t time.Time) error { wal.currentSegmentIndex = 0 wal.oldestSegmentIndex = 0 wal.byteOffset = 0 - newFile, err := os.OpenFile(filepath.Join(wal.logDir, "seg-0"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + newFile, err := os.OpenFile(filepath.Join(wal.logDir, segmentPrefix+"0"+segmentSuffix), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return err } @@ -201,7 +204,7 @@ func (wal *AOF) rotateLog() error { wal.oldestSegmentIndex++ } - newFile, err := os.OpenFile(filepath.Join(wal.logDir, segmentPrefix+fmt.Sprintf("-%d", wal.currentSegmentIndex)), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + newFile, err := os.OpenFile(filepath.Join(wal.logDir, segmentPrefix+fmt.Sprintf("%d", wal.currentSegmentIndex)+segmentSuffix), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { log.Fatalf("failed opening file: %s", err) } @@ -215,7 +218,7 @@ func (wal *AOF) rotateLog() error { } func (wal *AOF) deleteOldestSegment() error { - oldestSegmentFilePath := filepath.Join(wal.logDir, segmentPrefix+fmt.Sprintf("%d", wal.oldestSegmentIndex)) + oldestSegmentFilePath := filepath.Join(wal.logDir, segmentPrefix+fmt.Sprintf("%d", wal.oldestSegmentIndex)+segmentSuffix) // TODO: checkpoint before deleting the file @@ -255,13 +258,12 @@ func (wal *AOF) keepSyncingBuffer() { for { select { case <-wal.bufferSyncTicker.C: - wal.mu.Lock() err := wal.Sync() wal.mu.Unlock() if err != nil { - log.Printf("Error while performing sync: %v", err) + slog.Error("failed to sync buffer", slog.String("error", err.Error())) } case <-wal.ctx.Done(): @@ -274,12 +276,11 @@ func (wal *AOF) rotateSegmentPeriodically() { for { select { case <-wal.segmentRotationTicker.C: - wal.mu.Lock() err := wal.rotateLog() wal.mu.Unlock() if err != nil { - log.Printf("Error while performing sync: %v", err) + slog.Error("failed to rotate segment", slog.String("error", err.Error())) } case <-wal.ctx.Done(): @@ -292,12 +293,11 @@ func (wal *AOF) deleteSegmentPeriodically() { for { select { case <-wal.segmentRetentionTicker.C: - wal.mu.Lock() err := wal.deleteOldestSegment() wal.mu.Unlock() if err != nil { - log.Printf("Error while deleting segment: %v", err) + slog.Error("failed to delete segment", slog.String("error", err.Error())) } case <-wal.ctx.Done(): return @@ -305,7 +305,82 @@ func (wal *AOF) deleteSegmentPeriodically() { } } -func (wal *AOF) ForEachCommand(f func(c cmd.DiceDBCmd) error) error { - // TODO: implement this method +func (wal *AOF) segmentFiles() ([]string, error) { + // Get all segment files matching the pattern + files, err := filepath.Glob(filepath.Join(wal.logDir, segmentPrefix+"*"+segmentSuffix)) + if err != nil { + return nil, err + } + + // Sort files by numeric suffix + sort.Slice(files, func(i, j int) bool { + parseSuffix := func(name string) int64 { + num, _ := strconv.ParseInt( + strings.TrimPrefix(strings.TrimSuffix(filepath.Base(name), segmentSuffix), segmentPrefix), 10, 64) + return num + } + return parseSuffix(files[i]) < parseSuffix(files[j]) + }) + + return files, nil +} + +func (wal *AOF) Replay(callback func(*WALEntry) error) error { + // Get list of segment files sorted by timestamp + segments, err := wal.segmentFiles() + if err != nil { + return fmt.Errorf("error getting wal-segment files: %w", err) + } + + // Process each segment file in order + for _, segment := range segments { + file, err := os.Open(segment) + if err != nil { + return fmt.Errorf("error opening wal-segment file %s: %w", segment, err) + } + + reader := bufio.NewReader(file) + for { + // Read entry size + var entrySize int32 + if err := binary.Read(reader, binary.LittleEndian, &entrySize); err != nil { + if err == io.EOF { + break + } + file.Close() + return fmt.Errorf("error reading wal entry size: %w", err) + } + + // Read entry data + entryData := make([]byte, entrySize) + if _, err := io.ReadFull(reader, entryData); err != nil { + file.Close() + return fmt.Errorf("error reading wal entry data: %w", err) + } + + // Unmarshal entry + var entry WALEntry + MustUnmarshal(entryData, &entry) + + // Call provided replay function with parsed command + if err := wal.ForEachCommand(&entry, callback); err != nil { + file.Close() + return fmt.Errorf("error replaying command: %w", err) + } + } + file.Close() + } + return nil } + +func (wal *AOF) ForEachCommand(entry *WALEntry, callback func(*WALEntry) error) error { + // Validate CRC + expectedCRC := crc32.ChecksumIEEE(append(entry.Data, byte(entry.LogSequenceNumber))) + if entry.Crc32 != expectedCRC { + return fmt.Errorf("checksum mismatch for log sequence %d: expected %d, got %d", + entry.LogSequenceNumber, expectedCRC, entry.Crc32) + } + + return callback(entry) +} diff --git a/internal/wal/wal_null.go b/internal/wal/wal_null.go index 31d917217..fa7415154 100644 --- a/internal/wal/wal_null.go +++ b/internal/wal/wal_null.go @@ -5,8 +5,6 @@ package wal import ( "time" - - "github.com/dicedb/dice/internal/cmd" ) type WALNull struct { @@ -29,6 +27,10 @@ func (w *WALNull) Close() error { return nil } -func (w *WALNull) ForEachCommand(f func(c cmd.DiceDBCmd) error) error { +func (w *WALNull) ForEachCommand(entry *WALEntry, callback func(*WALEntry) error) error { + return nil +} + +func (w *WALNull) Replay(callback func(*WALEntry) error) error { return nil } diff --git a/internal/wal/wal_sqlite.go b/internal/wal/wal_sqlite.go deleted file mode 100644 index cc7199b3a..000000000 --- a/internal/wal/wal_sqlite.go +++ /dev/null @@ -1,150 +0,0 @@ -// Copyright (c) 2022-present, DiceDB contributors -// All rights reserved. Licensed under the BSD 3-Clause License. See LICENSE file in the project root for full license information. - -package wal - -import ( - "database/sql" - "fmt" - "os" - "path/filepath" - "sort" - "strings" - sync "sync" - "time" - - "log/slog" - - "github.com/dicedb/dice/internal/cmd" - _ "github.com/mattn/go-sqlite3" -) - -type WALSQLite struct { - logDir string - curDB *sql.DB - mu sync.Mutex -} - -func NewSQLiteWAL(logDir string) (*WALSQLite, error) { - return &WALSQLite{ - logDir: logDir, - }, nil -} - -func (w *WALSQLite) Init(t time.Time) error { - slog.Debug("initializing WAL at", slog.Any("log-dir", w.logDir)) - if err := os.MkdirAll(w.logDir, os.ModePerm); err != nil { - return fmt.Errorf("failed to create log directory: %w", err) - } - - timestamp := t.Format("20060102_1504") - path := filepath.Join(w.logDir, fmt.Sprintf("wal_%s.sqlite3", timestamp)) - - db, err := sql.Open("sqlite3", path) - if err != nil { - return err - } - - _, err = db.Exec("PRAGMA journal_mode=WAL;") - if err != nil { - return err - } - - _, err = db.Exec(`CREATE TABLE IF NOT EXISTS wal ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - command TEXT NOT NULL - );`) - if err != nil { - return err - } - - w.curDB = db - return nil -} - -func (w *WALSQLite) LogCommand(c *cmd.DiceDBCmd) { - w.mu.Lock() - defer w.mu.Unlock() - - if _, err := w.curDB.Exec("INSERT INTO wal (command) VALUES (?)", c.Repr()); err != nil { - slog.Error("failed to log command in WAL", slog.Any("error", err)) - } else { - slog.Debug("logged command in WAL", slog.Any("command", c.Repr())) - } -} - -func (w *WALSQLite) Close() error { - return w.curDB.Close() -} - -func (w *WALSQLite) ForEachCommand(f func(c cmd.DiceDBCmd) error) error { - files, err := os.ReadDir(w.logDir) - if err != nil { - return fmt.Errorf("failed to read log directory: %v", err) - } - - var walFiles []os.DirEntry - - for _, file := range files { - if !file.IsDir() && filepath.Ext(file.Name()) == ".sqlite3" { - walFiles = append(walFiles, file) - } - } - - if len(walFiles) == 0 { - return fmt.Errorf("no valid WAL files found in log directory") - } - - // Sort files by timestamp in ascending order - sort.Slice(walFiles, func(i, j int) bool { - timestampStrI := walFiles[i].Name()[4:17] - timestampStrJ := walFiles[j].Name()[4:17] - timestampI, errI := time.Parse("20060102_1504", timestampStrI) - timestampJ, errJ := time.Parse("20060102_1504", timestampStrJ) - if errI != nil || errJ != nil { - return false - } - return timestampI.Before(timestampJ) - }) - - for _, file := range walFiles { - filePath := filepath.Join(w.logDir, file.Name()) - - slog.Debug("loading WAL", slog.Any("file", filePath)) - - db, err := sql.Open("sqlite3", filePath) - if err != nil { - return fmt.Errorf("failed to open WAL file %s: %v", file.Name(), err) - } - - rows, err := db.Query("SELECT command FROM wal") - if err != nil { - return fmt.Errorf("failed to query WAL file %s: %v", file.Name(), err) - } - - for rows.Next() { - var command string - if err := rows.Scan(&command); err != nil { - return fmt.Errorf("failed to scan WAL file %s: %v", file.Name(), err) - } - - tokens := strings.Split(command, " ") - if err := f(cmd.DiceDBCmd{ - Cmd: tokens[0], - Args: tokens[1:], - }); err != nil { - return err - } - } - - if err := rows.Err(); err != nil { - return fmt.Errorf("failed to iterate WAL file %s: %v", file.Name(), err) - } - - if err := db.Close(); err != nil { - return fmt.Errorf("failed to close WAL file %s: %v", file.Name(), err) - } - } - - return nil -} diff --git a/server/main.go b/server/main.go index 0c356d6e8..ef8092067 100644 --- a/server/main.go +++ b/server/main.go @@ -14,11 +14,13 @@ import ( "runtime" "runtime/pprof" "runtime/trace" + "strings" "sync" "syscall" "time" "github.com/dicedb/dice/internal/auth" + "github.com/dicedb/dice/internal/cmd" "github.com/dicedb/dice/internal/logger" "github.com/dicedb/dice/internal/server/httpws" @@ -91,21 +93,14 @@ func Start() { wl, _ = wal.NewNullWAL() if config.Config.EnableWAL { - if config.Config.WALEngine == "aof" { - _wl, err := wal.NewAOFWAL(config.Config.WALDir) - if err != nil { - slog.Warn("could not create WAL with", slog.String("wal-engine", config.Config.WALEngine), slog.Any("error", err)) - sigs <- syscall.SIGKILL - cancel() - return - } - wl = _wl - } else { - slog.Error("unsupported WAL engine", slog.String("engine", config.Config.WALEngine)) + _wl, err := wal.NewAOFWAL(config.Config.WALDir) + if err != nil { + slog.Warn("could not create WAL at", slog.String("wal-dir", config.Config.WALDir), slog.Any("error", err)) sigs <- syscall.SIGKILL cancel() return } + wl = _wl if err := wl.Init(time.Now()); err != nil { slog.Error("could not initialize WAL", slog.Any("error", err)) @@ -114,12 +109,6 @@ func Start() { } slog.Debug("WAL initialization complete") - - if config.Config.EnableWAL { - slog.Info("initializing wal restoration. this may take a while...") - wal.ReplayWAL(wl) - slog.Info("in-memory state restored. process complete") - } } if config.Config.EnableWatch { @@ -183,6 +172,35 @@ func Start() { go runServer(ctx, &serverWg, websocketServer, serverErrCh) } + // Recovery from WAL logs + if config.Config.EnableWAL { + slog.Info("restoring database from WAL") + replayHandler, err := commandhandler.NewWALReplayHandler(ctx, shardManager) + + if err != nil { + slog.Error("error getting WAL replay handler", slog.Any("error", err)) + sigs <- syscall.SIGKILL + cancel() + return + } + callback := func(entry *wal.WALEntry) error { + command := strings.Split(string(entry.Data), " ") + cmdTemp := cmd.DiceDBCmd{ + Cmd: command[0], + Args: command[1:], + } + _, err := replayHandler.ExecuteCommand(context.Background(), &cmdTemp, false, false) + if err != nil { + return fmt.Errorf("error handling WAL replay: %w", err) + } + return nil + } + if err := wl.Replay(callback); err != nil { + slog.Error("error restoring from WAL", slog.Any("error", err)) + } + slog.Info("database restored from WAL") + } + wg.Add(1) go func() { defer wg.Done()