Skip to content

Commit

Permalink
feat: experimental memory logger and logs api for WebUI
Browse files Browse the repository at this point in the history
  • Loading branch information
yusing committed Jan 19, 2025
1 parent 1adba05 commit 0fad7b3
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 3 deletions.
8 changes: 8 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package main

import (
"encoding/json"
"io"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/yusing/go-proxy/internal"
v1 "github.com/yusing/go-proxy/internal/api/v1"
"github.com/yusing/go-proxy/internal/api/v1/auth"
"github.com/yusing/go-proxy/internal/api/v1/query"
"github.com/yusing/go-proxy/internal/common"
Expand All @@ -24,6 +26,12 @@ import (
var rawLogger = log.New(os.Stdout, "", 0)

func main() {
var out io.Writer = os.Stdout
if common.EnableLogStreaming {
out = io.MultiWriter(out, v1.MemLogger())
}
logging.InitLogger(out)

args := common.GetArgs()

switch args.Command {
Expand Down
1 change: 1 addition & 0 deletions internal/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func NewHandler(cfg config.ConfigInstance) http.Handler {
mux.HandleFunc("GET", "/v1/stats", useCfg(cfg, v1.Stats))
mux.HandleFunc("GET", "/v1/stats/ws", useCfg(cfg, v1.StatsWS))
mux.HandleFunc("GET", "/v1/health/ws", useCfg(cfg, v1.HealthWS))
mux.HandleFunc("GET", "/v1/logs/ws", useCfg(cfg, v1.LogsWS()))
mux.HandleFunc("GET", "/v1/favicon/{alias}", auth.RequireAuth(favicon.GetFavIcon))
return mux
}
Expand Down
161 changes: 161 additions & 0 deletions internal/api/v1/mem_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package v1

import (
"bytes"
"context"
"io"
"net/http"
"sync"
"time"

"github.com/coder/websocket"
"github.com/yusing/go-proxy/internal/api/v1/utils"
"github.com/yusing/go-proxy/internal/common"
config "github.com/yusing/go-proxy/internal/config/types"
"github.com/yusing/go-proxy/internal/logging"
"github.com/yusing/go-proxy/internal/task"
F "github.com/yusing/go-proxy/internal/utils/functional"
)

type logEntryRange struct {
Start, End int
}

type memLogger struct {
bytes.Buffer
sync.Mutex
connChans F.Map[chan *logEntryRange, struct{}]
}

const (
maxMemLogSize = 16 * 1024
truncateSize = maxMemLogSize / 2
initialWriteChunkSize = 4 * 1024
)

var memLoggerInstance = &memLogger{
connChans: F.NewMapOf[chan *logEntryRange, struct{}](),
}

func init() {
if !common.EnableLogStreaming {
return
}
memLoggerInstance.Grow(maxMemLogSize)

if common.DebugMemLogger {
ticker := time.NewTicker(1 * time.Second)

go func() {
defer ticker.Stop()

for {
select {
case <-task.RootContextCanceled():
return
case <-ticker.C:
logging.Info().Msgf("mem logger size: %d, active conns: %d",
memLoggerInstance.Len(),
memLoggerInstance.connChans.Size())
}
}
}()
}
}

func LogsWS() func(config config.ConfigInstance, w http.ResponseWriter, r *http.Request) {
return memLoggerInstance.ServeHTTP
}

func MemLogger() io.Writer {
return memLoggerInstance
}

func (m *memLogger) Write(p []byte) (n int, err error) {
m.Lock()

if m.Len() > maxMemLogSize {
m.Truncate(truncateSize)
}

pos := m.Buffer.Len()
n = len(p)
_, err = m.Buffer.Write(p)
if err != nil {
m.Unlock()
return
}

if m.connChans.Size() > 0 {
m.Unlock()
timeout := time.NewTimer(1 * time.Second)
defer timeout.Stop()

m.connChans.Range(func(ch chan *logEntryRange, _ struct{}) bool {
select {
case ch <- &logEntryRange{pos, pos + n}:
return true
case <-timeout.C:
logging.Warn().Msg("mem logger: timeout logging to channel")
return false
}
})
return
}

m.Unlock()
return
}

func (m *memLogger) ServeHTTP(config config.ConfigInstance, w http.ResponseWriter, r *http.Request) {
conn, err := utils.InitiateWS(config, w, r)
if err != nil {
utils.HandleErr(w, r, err)
return
}

logCh := make(chan *logEntryRange)
m.connChans.Store(logCh, struct{}{})

/* trunk-ignore(golangci-lint/errcheck) */
defer func() {
_ = conn.CloseNow()
m.connChans.Delete(logCh)
close(logCh)
}()

if err := m.wsInitial(r.Context(), conn); err != nil {
utils.HandleErr(w, r, err)
return
}

m.wsStreamLog(r.Context(), conn, logCh)
}

func (m *memLogger) writeBytes(ctx context.Context, conn *websocket.Conn, b []byte) error {
return conn.Write(ctx, websocket.MessageText, b)
}

func (m *memLogger) wsInitial(ctx context.Context, conn *websocket.Conn) error {
m.Lock()
defer m.Unlock()

return m.writeBytes(ctx, conn, m.Buffer.Bytes())
}

func (m *memLogger) wsStreamLog(ctx context.Context, conn *websocket.Conn, ch <-chan *logEntryRange) {
for {
select {
case <-ctx.Done():
return
case logRange := <-ch:
m.Lock()
msg := m.Buffer.Bytes()[logRange.Start:logRange.End]
err := m.writeBytes(ctx, conn, msg)
m.Unlock()
if err != nil {
return
}
}
}
}
3 changes: 3 additions & 0 deletions internal/common/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ var (
IsTrace = GetEnvBool("TRACE", false) && IsDebug
IsProduction = !IsTest && !IsDebug

EnableLogStreaming = GetEnvBool("LOG_STREAMING", true)
DebugMemLogger = GetEnvBool("DEBUG_MEM_LOGGER", false) && EnableLogStreaming

ProxyHTTPAddr,
ProxyHTTPHost,
ProxyHTTPPort,
Expand Down
6 changes: 3 additions & 3 deletions internal/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
package logging

import (
"os"
"io"
"strings"

"github.com/rs/zerolog"
Expand All @@ -12,7 +12,7 @@ import (

var logger zerolog.Logger

func init() {
func InitLogger(out io.Writer) {
var timeFmt string
var level zerolog.Level
var exclude []string
Expand All @@ -35,7 +35,7 @@ func init() {

logger = zerolog.New(
zerolog.ConsoleWriter{
Out: os.Stderr,
Out: out,
TimeFormat: timeFmt,
FieldsExclude: exclude,
FormatMessage: func(msgI interface{}) string { // pad spaces for each line
Expand Down

0 comments on commit 0fad7b3

Please sign in to comment.