Skip to content

Commit

Permalink
📝 Add [SERVER] tag to command server logs
Browse files Browse the repository at this point in the history
  • Loading branch information
wesen committed Jan 21, 2025
1 parent 18cfeac commit 3ce7ee9
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 11 deletions.
11 changes: 10 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,13 @@ Refactored the server to use context.Context for better control over server life
- Updated SSE server to use context for connection handling and shutdown
- Updated stdio server to handle context cancellation
- Added context with timeout for graceful shutdown in main program
- Improved error handling with context cancellation
- Improved error handling with context cancellation

# Enhanced Server Logging

Improved server logging when running as a command:

- Added [SERVER] tag prefix to all server log messages
- Configured stderr output to be forwarded to client
- Preserved log level and timestamp formatting
- Improved log message readability for debugging
1 change: 1 addition & 0 deletions cmd/mcp-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ Available transports:
if err := srv.Stop(shutdownCtx); err != nil {
logger.Error().Err(err).Msg("Error during shutdown")
return err

}
logger.Info().Msg("Server stopped gracefully")
return nil
Expand Down
45 changes: 40 additions & 5 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"context"
"fmt"
"sync"

"github.com/go-go-golems/go-go-mcp/pkg"
Expand Down Expand Up @@ -61,19 +62,43 @@ func (s *Server) GetRegistry() *pkg.ProviderRegistry {
// StartStdio starts the server with stdio transport
func (s *Server) StartStdio(ctx context.Context) error {
s.mu.Lock()
stdioServer := stdio.NewServer(s.logger, s.promptService, s.resourceService, s.toolService, s.initializeService)
s.logger.Debug().Msg("Creating stdio transport")
// Create a new logger for the stdio server that preserves the log level and other settings
stdioLogger := s.logger.With().Logger()
stdioServer := stdio.NewServer(stdioLogger, s.promptService, s.resourceService, s.toolService, s.initializeService)
s.transport = stdioServer
s.mu.Unlock()
return stdioServer.Start(ctx)

s.logger.Debug().Msg("Starting stdio transport")
err := stdioServer.Start(ctx)
if err != nil {
s.logger.Debug().
Err(err).
Msg("Stdio transport stopped with error")
return err
}
s.logger.Debug().Msg("Stdio transport stopped cleanly")
return nil
}

// StartSSE starts the server with SSE transport on the specified port
func (s *Server) StartSSE(ctx context.Context, port int) error {
s.mu.Lock()
s.logger.Debug().Int("port", port).Msg("Creating SSE transport")
sseServer := NewSSEServer(s.logger, s.registry, port)
s.transport = sseServer
s.mu.Unlock()
return sseServer.Start(ctx)

s.logger.Debug().Int("port", port).Msg("Starting SSE transport")
err := sseServer.Start(ctx)
if err != nil {
s.logger.Debug().
Err(err).
Msg("SSE transport stopped with error")
return err
}
s.logger.Debug().Msg("SSE transport stopped cleanly")
return nil
}

// Stop gracefully stops the server
Expand All @@ -82,9 +107,19 @@ func (s *Server) Stop(ctx context.Context) error {
defer s.mu.Unlock()

if s.transport == nil {
s.logger.Debug().Msg("No transport to stop")
return nil
}

s.logger.Info().Msg("Stopping server")
return s.transport.Stop(ctx)
s.logger.Info().Msg("Stopping server transport")
err := s.transport.Stop(ctx)
if err != nil {
s.logger.Error().
Err(err).
Msg("Error stopping transport")
return fmt.Errorf("error stopping transport: %w", err)
}

s.logger.Debug().Msg("Transport stopped successfully")
return nil
}
69 changes: 64 additions & 5 deletions pkg/server/transports/stdio/stdio.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"os"
"time"

"github.com/go-go-golems/go-go-mcp/pkg/protocol"
"github.com/go-go-golems/go-go-mcp/pkg/services"
Expand All @@ -30,10 +31,22 @@ func NewServer(logger zerolog.Logger, ps services.PromptService, rs services.Res
scanner := bufio.NewScanner(os.Stdin)
scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB buffer

// Create a ConsoleWriter that writes to stderr with a SERVER tag
consoleWriter := zerolog.ConsoleWriter{
Out: os.Stderr,
TimeFormat: time.RFC3339,
FormatMessage: func(i interface{}) string {
return fmt.Sprintf("[SERVER] %s", i)
},
}

// Create a new logger that writes to the tagged stderr
taggedLogger := logger.Output(consoleWriter)

return &Server{
scanner: scanner,
writer: json.NewEncoder(os.Stdout),
logger: logger,
logger: taggedLogger,
promptService: ps,
resourceService: rs,
toolService: ts,
Expand All @@ -50,12 +63,18 @@ func (s *Server) Start(ctx context.Context) error {
for s.scanner.Scan() {
select {
case <-s.done:
s.logger.Debug().Msg("Received done signal, stopping stdio server")
return nil
case <-ctx.Done():
s.logger.Debug().
Err(ctx.Err()).
Msg("Context cancelled, stopping stdio server")
return ctx.Err()
default:
line := s.scanner.Text()
s.logger.Debug().Str("line", line).Msg("Received line")
s.logger.Debug().
Str("line", line).
Msg("Received line")
if err := s.handleMessage(line); err != nil {
s.logger.Error().Err(err).Msg("Error handling message")
// Continue processing messages even if one fails
Expand All @@ -64,38 +83,78 @@ func (s *Server) Start(ctx context.Context) error {
}

if err := s.scanner.Err(); err != nil {
s.logger.Error().
Err(err).
Msg("Scanner error")
return fmt.Errorf("scanner error: %w", err)
}

s.logger.Debug().Msg("Scanner reached EOF")
return io.EOF
}

// Stop gracefully stops the stdio server
func (s *Server) Stop(ctx context.Context) error {
s.logger.Info().Msg("Stopping stdio server")
close(s.done)
return nil

select {
case <-s.done:
s.logger.Debug().Msg("Server already stopped")
return nil
default:
s.logger.Debug().Msg("Closing done channel to signal shutdown")
close(s.done)
}

// Wait for context to be done or timeout
select {
case <-ctx.Done():
s.logger.Debug().
Err(ctx.Err()).
Msg("Stop context cancelled before clean shutdown")
return ctx.Err()
case <-time.After(100 * time.Millisecond): // Give a small grace period for cleanup
s.logger.Debug().Msg("Stop completed successfully")
return nil
}
}

// handleMessage processes a single message
func (s *Server) handleMessage(message string) error {
s.logger.Debug().Str("message", message).Msg("Received message")
s.logger.Debug().
Str("message", message).
Msg("Processing message")

// Parse the base message structure
var request protocol.Request
if err := json.Unmarshal([]byte(message), &request); err != nil {
s.logger.Error().
Err(err).
Str("message", message).
Msg("Failed to parse message as JSON-RPC request")
return s.sendError(nil, -32700, "Parse error", err)
}

// Validate JSON-RPC version
if request.JSONRPC != "2.0" {
s.logger.Error().
Str("version", request.JSONRPC).
Msg("Invalid JSON-RPC version")
return s.sendError(&request.ID, -32600, "Invalid Request", fmt.Errorf("invalid JSON-RPC version"))
}

// Handle requests vs notifications based on ID presence
if len(request.ID) > 0 {
s.logger.Debug().
RawJSON("id", request.ID).
Str("method", request.Method).
Msg("Handling request")
return s.handleRequest(request)
}

s.logger.Debug().
Str("method", request.Method).
Msg("Handling notification")
return s.handleNotification(request)
}

Expand Down

0 comments on commit 3ce7ee9

Please sign in to comment.