Skip to content

Commit

Permalink
✨ Enhance SSE client with proper endpoint and session handling
Browse files Browse the repository at this point in the history
  • Loading branch information
wesen committed Jan 21, 2025
1 parent a26a10f commit c60363f
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 11 deletions.
10 changes: 9 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,4 +265,12 @@ Improved the SSE server's client management to better handle multiple clients an
- Improved session management to handle multiple clients per session
- Added client metadata tracking (creation time, remote address, user agent)
- Fixed race conditions in client management
- Better error handling for invalid sessions
- Better error handling for invalid sessions

# SSE Client Endpoint Handling

Enhanced SSE client to properly handle endpoint events and session management:
- Added proper endpoint event handling and waiting
- Added session ID extraction and storage
- Improved initialization flow to wait for endpoint event
- Added better error handling for endpoint event timeout
52 changes: 42 additions & 10 deletions pkg/client/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"strings"
"sync"

"github.com/go-go-golems/go-go-mcp/pkg/protocol"
Expand All @@ -24,6 +25,8 @@ type SSETransport struct {
closeOnce sync.Once
logger zerolog.Logger
initialized bool
sessionID string
endpoint string
}

// NewSSETransport creates a new SSE transport
Expand Down Expand Up @@ -64,12 +67,12 @@ func (t *SSETransport) Send(ctx context.Context, request *protocol.Request) (*pr
}

t.logger.Debug().
Str("url", t.baseURL+"/messages").
Str("url", t.endpoint).
RawJSON("request", reqBody).
Msg("Sending HTTP POST request")

// Create a new request with context
req, err := http.NewRequestWithContext(ctx, "POST", t.baseURL+"/messages", bytes.NewReader(reqBody))
req, err := http.NewRequestWithContext(ctx, "POST", t.endpoint, bytes.NewReader(reqBody))
if err != nil {
t.logger.Error().Err(err).Msg("Failed to create HTTP request")
return nil, fmt.Errorf("failed to create request: %w", err)
Expand Down Expand Up @@ -129,6 +132,9 @@ func (t *SSETransport) initializeSSE(ctx context.Context) error {
// Create a new context with cancellation for the subscription
subCtx, cancel := context.WithCancel(ctx)

// Channel to wait for endpoint event
endpointCh := make(chan string, 1)

go func() {
defer cancel()

Expand All @@ -139,7 +145,25 @@ func (t *SSETransport) initializeSSE(ctx context.Context) error {
RawJSON("data", msg.Data).
Msg("Received SSE event")

// Forward events to the events channel
// Handle endpoint event
if string(msg.Event) == "endpoint" {
endpoint := string(msg.Data)
t.logger.Debug().
Str("endpoint", endpoint).
Msg("Received endpoint event")

// Parse endpoint URL and extract session ID
if strings.Contains(endpoint, "sessionId=") {
t.mu.Lock()
t.endpoint = t.baseURL + endpoint
t.sessionID = strings.Split(strings.Split(endpoint, "sessionId=")[1], "&")[0]
t.mu.Unlock()
endpointCh <- endpoint
return
}
}

// Forward other events to the events channel
select {
case t.events <- msg:
t.logger.Debug().Msg("Forwarded event to channel")
Expand All @@ -157,13 +181,21 @@ func (t *SSETransport) initializeSSE(ctx context.Context) error {
}
}()

t.mu.Lock()
t.initialized = true
t.mu.Unlock()

t.logger.Debug().Msg("SSE initialization successful")

return nil
// Wait for endpoint event or context cancellation
select {
case <-endpointCh:
t.mu.Lock()
t.initialized = true
t.mu.Unlock()
t.logger.Debug().
Str("endpoint", t.endpoint).
Str("sessionId", t.sessionID).
Msg("SSE initialization successful")
return nil
case <-ctx.Done():
t.logger.Error().Msg("Context cancelled while waiting for endpoint event")
return ctx.Err()
}
}

// Close closes the transport
Expand Down

0 comments on commit c60363f

Please sign in to comment.