From 0fad7b3411db50ebbb7ace4a81662d13fb305042 Mon Sep 17 00:00:00 2001 From: yusing Date: Sun, 19 Jan 2025 13:45:16 +0800 Subject: [PATCH] feat: experimental memory logger and logs api for WebUI --- cmd/main.go | 8 ++ internal/api/handler.go | 1 + internal/api/v1/mem_logger.go | 161 ++++++++++++++++++++++++++++++++++ internal/common/env.go | 3 + internal/logging/logging.go | 6 +- 5 files changed, 176 insertions(+), 3 deletions(-) create mode 100644 internal/api/v1/mem_logger.go diff --git a/cmd/main.go b/cmd/main.go index c1a88eba..e2417472 100755 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,6 +2,7 @@ package main import ( "encoding/json" + "io" "log" "os" "os/signal" @@ -9,6 +10,7 @@ import ( "time" "github.com/yusing/go-proxy/internal" + v1 "github.com/yusing/go-proxy/internal/api/v1" "github.com/yusing/go-proxy/internal/api/v1/auth" "github.com/yusing/go-proxy/internal/api/v1/query" "github.com/yusing/go-proxy/internal/common" @@ -24,6 +26,12 @@ import ( var rawLogger = log.New(os.Stdout, "", 0) func main() { + var out io.Writer = os.Stdout + if common.EnableLogStreaming { + out = io.MultiWriter(out, v1.MemLogger()) + } + logging.InitLogger(out) + args := common.GetArgs() switch args.Command { diff --git a/internal/api/handler.go b/internal/api/handler.go index 71ed616d..5e121616 100644 --- a/internal/api/handler.go +++ b/internal/api/handler.go @@ -38,6 +38,7 @@ func NewHandler(cfg config.ConfigInstance) http.Handler { mux.HandleFunc("GET", "/v1/stats", useCfg(cfg, v1.Stats)) mux.HandleFunc("GET", "/v1/stats/ws", useCfg(cfg, v1.StatsWS)) mux.HandleFunc("GET", "/v1/health/ws", useCfg(cfg, v1.HealthWS)) + mux.HandleFunc("GET", "/v1/logs/ws", useCfg(cfg, v1.LogsWS())) mux.HandleFunc("GET", "/v1/favicon/{alias}", auth.RequireAuth(favicon.GetFavIcon)) return mux } diff --git a/internal/api/v1/mem_logger.go b/internal/api/v1/mem_logger.go new file mode 100644 index 00000000..434690af --- /dev/null +++ b/internal/api/v1/mem_logger.go @@ -0,0 +1,161 @@ +package v1 + +import ( + "bytes" + "context" + "io" + "net/http" + "sync" + "time" + + "github.com/coder/websocket" + "github.com/yusing/go-proxy/internal/api/v1/utils" + "github.com/yusing/go-proxy/internal/common" + config "github.com/yusing/go-proxy/internal/config/types" + "github.com/yusing/go-proxy/internal/logging" + "github.com/yusing/go-proxy/internal/task" + F "github.com/yusing/go-proxy/internal/utils/functional" +) + +type logEntryRange struct { + Start, End int +} + +type memLogger struct { + bytes.Buffer + sync.Mutex + connChans F.Map[chan *logEntryRange, struct{}] +} + +const ( + maxMemLogSize = 16 * 1024 + truncateSize = maxMemLogSize / 2 + initialWriteChunkSize = 4 * 1024 +) + +var memLoggerInstance = &memLogger{ + connChans: F.NewMapOf[chan *logEntryRange, struct{}](), +} + +func init() { + if !common.EnableLogStreaming { + return + } + memLoggerInstance.Grow(maxMemLogSize) + + if common.DebugMemLogger { + ticker := time.NewTicker(1 * time.Second) + + go func() { + defer ticker.Stop() + + for { + select { + case <-task.RootContextCanceled(): + return + case <-ticker.C: + logging.Info().Msgf("mem logger size: %d, active conns: %d", + memLoggerInstance.Len(), + memLoggerInstance.connChans.Size()) + } + } + }() + } +} + +func LogsWS() func(config config.ConfigInstance, w http.ResponseWriter, r *http.Request) { + return memLoggerInstance.ServeHTTP +} + +func MemLogger() io.Writer { + return memLoggerInstance +} + +func (m *memLogger) Write(p []byte) (n int, err error) { + m.Lock() + + if m.Len() > maxMemLogSize { + m.Truncate(truncateSize) + } + + pos := m.Buffer.Len() + n = len(p) + _, err = m.Buffer.Write(p) + if err != nil { + m.Unlock() + return + } + + if m.connChans.Size() > 0 { + m.Unlock() + timeout := time.NewTimer(1 * time.Second) + defer timeout.Stop() + + m.connChans.Range(func(ch chan *logEntryRange, _ struct{}) bool { + select { + case ch <- &logEntryRange{pos, pos + n}: + return true + case <-timeout.C: + logging.Warn().Msg("mem logger: timeout logging to channel") + return false + } + }) + return + } + + m.Unlock() + return +} + +func (m *memLogger) ServeHTTP(config config.ConfigInstance, w http.ResponseWriter, r *http.Request) { + conn, err := utils.InitiateWS(config, w, r) + if err != nil { + utils.HandleErr(w, r, err) + return + } + + logCh := make(chan *logEntryRange) + m.connChans.Store(logCh, struct{}{}) + + /* trunk-ignore(golangci-lint/errcheck) */ + defer func() { + _ = conn.CloseNow() + m.connChans.Delete(logCh) + close(logCh) + }() + + if err := m.wsInitial(r.Context(), conn); err != nil { + utils.HandleErr(w, r, err) + return + } + + m.wsStreamLog(r.Context(), conn, logCh) +} + +func (m *memLogger) writeBytes(ctx context.Context, conn *websocket.Conn, b []byte) error { + return conn.Write(ctx, websocket.MessageText, b) +} + +func (m *memLogger) wsInitial(ctx context.Context, conn *websocket.Conn) error { + m.Lock() + defer m.Unlock() + + return m.writeBytes(ctx, conn, m.Buffer.Bytes()) +} + +func (m *memLogger) wsStreamLog(ctx context.Context, conn *websocket.Conn, ch <-chan *logEntryRange) { + for { + select { + case <-ctx.Done(): + return + case logRange := <-ch: + m.Lock() + msg := m.Buffer.Bytes()[logRange.Start:logRange.End] + err := m.writeBytes(ctx, conn, msg) + m.Unlock() + if err != nil { + return + } + } + } +} diff --git a/internal/common/env.go b/internal/common/env.go index b8f9a09a..0cc17553 100644 --- a/internal/common/env.go +++ b/internal/common/env.go @@ -19,6 +19,9 @@ var ( IsTrace = GetEnvBool("TRACE", false) && IsDebug IsProduction = !IsTest && !IsDebug + EnableLogStreaming = GetEnvBool("LOG_STREAMING", true) + DebugMemLogger = GetEnvBool("DEBUG_MEM_LOGGER", false) && EnableLogStreaming + ProxyHTTPAddr, ProxyHTTPHost, ProxyHTTPPort, diff --git a/internal/logging/logging.go b/internal/logging/logging.go index dee54dde..6ee9f649 100644 --- a/internal/logging/logging.go +++ b/internal/logging/logging.go @@ -2,7 +2,7 @@ package logging import ( - "os" + "io" "strings" "github.com/rs/zerolog" @@ -12,7 +12,7 @@ import ( var logger zerolog.Logger -func init() { +func InitLogger(out io.Writer) { var timeFmt string var level zerolog.Level var exclude []string @@ -35,7 +35,7 @@ func init() { logger = zerolog.New( zerolog.ConsoleWriter{ - Out: os.Stderr, + Out: out, TimeFormat: timeFmt, FieldsExclude: exclude, FormatMessage: func(msgI interface{}) string { // pad spaces for each line