Skip to content

Commit

Permalink
✨ Add SSE endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
wesen committed Feb 26, 2025
1 parent 53050d5 commit c6aeeb0
Show file tree
Hide file tree
Showing 25 changed files with 1,461 additions and 813 deletions.
44 changes: 43 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -1284,4 +1284,46 @@ Created refactoring plan for SSE implementation:
- Designed modular event system with dedicated packages
- Planned separation of Watermill integration
- Created detailed implementation steps for SSE handler
- Added comprehensive testing and documentation plan
- Added comprehensive testing and documentation plan

## SSE/Watermill Refactoring
Refactored the SSE and event system to be more modular and maintainable.

- Created new `pkg/events` package with event types and interfaces
- Added Watermill-based event manager implementation
- Created dedicated SSE handler package with proper connection management
- Updated UI server to use new event system and SSE handler

## Server-Sent Events (SSE) Support for UI Updates
Added SSE support to enable real-time UI updates through server-sent events. This allows components to be updated individually without full page reloads.

- Added SSE extension script to base template
- Added SSE connection and swap targets to page template
- Added individual component swap targets for granular updates
- Wrapped components in div containers with unique IDs for targeted updates

## HTMX SSE Extension Update
Fixed SSE extension integration to use the correct attributes and script:

- Updated SSE extension script to use htmx-ext-sse@2.2.2 package
- Changed SSE attributes to use sse-* prefix (from hx-sse-*)
- Fixed SSE connection syntax to use proper format
- Added SRI integrity hashes for security

# Changelog

## HTMX SSE Extension Update
Fixed SSE extension integration to use the correct attributes and script:

- Updated SSE extension script to use htmx-ext-sse@2.2.2 package
- Changed SSE attributes to use sse-* prefix (from hx-sse-*)
- Fixed SSE connection syntax to use proper format
- Added SRI integrity hashes for security

## Server-Sent Events (SSE) Support for UI Updates
Added SSE support to enable real-time UI updates through server-sent events. This allows components to be updated individually without full page reloads.

- Added SSE extension script to base template
- Added SSE connection and swap targets to page template
- Added individual component swap targets for granular updates
- Wrapped components in div containers with unique IDs for targeted updates
106 changes: 56 additions & 50 deletions cmd/ui-server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ import (
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
"github.com/go-go-golems/clay/pkg/watcher"
"github.com/go-go-golems/go-go-mcp/pkg/events"
"github.com/go-go-golems/go-go-mcp/pkg/server/sse"
"github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup"
"gopkg.in/yaml.v3"
)

Expand All @@ -28,26 +31,39 @@ type Server struct {
mu sync.RWMutex
publisher message.Publisher
subscriber message.Subscriber
events events.EventManager
sseHandler *sse.SSEHandler
}

type UIDefinition struct {
Components []map[string]interface{} `yaml:"components"`
}

func NewServer(dir string) *Server {
func NewServer(dir string) (*Server, error) {
// Initialize watermill publisher/subscriber
publisher := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)

// Initialize event manager with logger
eventManager, err := events.NewWatermillEventManager(&log.Logger)
if err != nil {
return nil, fmt.Errorf("failed to create event manager: %w", err)
}

// Create SSE handler
sseHandler := sse.NewSSEHandler(eventManager, &log.Logger)

s := &Server{
dir: dir,
pages: make(map[string]UIDefinition),
routes: make(map[string]http.HandlerFunc),
mux: http.NewServeMux(),
publisher: publisher,
subscriber: publisher,
events: eventManager,
sseHandler: sseHandler,
}

// Create a watcher for the pages directory
Expand All @@ -62,7 +78,7 @@ func NewServer(dir string) *Server {
s.watcher = w

// Set up SSE endpoint
s.mux.HandleFunc("/sse", s.handleSSE())
s.mux.Handle("/sse", sseHandler)

// Set up static file handler
s.mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("static"))))
Expand All @@ -79,7 +95,7 @@ func NewServer(dir string) *Server {
s.handleIndex()(w, r)
})

return s
return s, nil
}

func (s *Server) Start(ctx context.Context, port int) error {
Expand All @@ -88,45 +104,55 @@ func (s *Server) Start(ctx context.Context, port int) error {
return fmt.Errorf("failed to load pages: %w", err)
}

g, ctx := errgroup.WithContext(ctx)

// Start the file watcher
go func() {
g.Go(func() error {
log.Debug().Msg("Starting file watcher")
if err := s.watcher.Run(ctx); err != nil && err != context.Canceled {
log.Error().Err(err).Msg("Watcher error")
return err
}
}()
return nil
})

srv := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: s.mux,
}

// Start server in a goroutine
serverErr := make(chan error, 1)
go func() {
// Start server
g.Go(func() error {
log.Info().Str("addr", srv.Addr).Msg("Starting server")
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
serverErr <- fmt.Errorf("server error: %w", err)
return fmt.Errorf("server error: %w", err)
}
close(serverErr)
}()

// Wait for either context cancellation or server error
select {
case err := <-serverErr:
return err
case <-ctx.Done():
return nil
})

// Wait for shutdown
g.Go(func() error {
<-ctx.Done()
log.Info().Msg("Server shutdown initiated")

// Graceful shutdown with timeout
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()

// Close event manager
if err := s.events.Close(); err != nil {
log.Error().Err(err).Msg("Failed to close event manager")
}

if err := srv.Shutdown(shutdownCtx); err != nil {
return fmt.Errorf("server shutdown error: %w", err)
}

log.Info().Msg("Server shutdown completed")
return nil
}
})

return g.Wait()
}

func (s *Server) Handler() http.Handler {
Expand Down Expand Up @@ -193,6 +219,12 @@ func (s *Server) loadPage(path string) error {
s.routes[urlPath] = s.handlePage(relPath)
s.mu.Unlock()

// Publish page reload event
event := events.NewPageReloadEvent(relPath)
if err := s.events.Publish(relPath, event); err != nil {
log.Error().Err(err).Str("path", relPath).Msg("Failed to publish page reload event")
}

log.Info().Str("path", relPath).Str("url", urlPath).Msg("Loaded page")
return nil
}
Expand Down Expand Up @@ -226,6 +258,12 @@ func (s *Server) handleFileRemove(path string) error {
delete(s.routes, urlPath)
s.mu.Unlock()

// Publish page reload event
event := events.NewPageReloadEvent(relPath)
if err := s.events.Publish(relPath, event); err != nil {
log.Error().Err(err).Str("path", relPath).Msg("Failed to publish page reload event")
}

log.Info().Str("path", relPath).Str("url", urlPath).Msg("Removed page")
return nil
}
Expand Down Expand Up @@ -283,35 +321,3 @@ func (s *Server) handlePage(name string) http.HandlerFunc {
}
}
}

func (s *Server) handleSSE() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// Set SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")

// Get page ID from query
pageID := r.URL.Query().Get("page")
if pageID == "" {
http.Error(w, "page parameter is required", http.StatusBadRequest)
return
}

// Subscribe to page-specific topic
messages, err := s.subscriber.Subscribe(r.Context(), "ui-updates."+pageID)
if err != nil {
http.Error(w, "Failed to subscribe to events", http.StatusInternalServerError)
return
}

// Stream messages
for msg := range messages {
// Format SSE message
fmt.Fprintf(w, "event: %s\n", msg.Metadata["event-type"])
fmt.Fprintf(w, "data: %s\n\n", msg.Payload)
w.(http.Flusher).Flush()
}
}
}
20 changes: 7 additions & 13 deletions cmd/ui-server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"

Expand Down Expand Up @@ -56,20 +55,15 @@ func (c *StartCommand) Run(ctx context.Context, parsedLayers *layers.ParsedLayer
return err
}

// Create a context that can be cancelled
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Create a context that can be cancelled by signals
ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stop()

// Handle graceful shutdown
go func() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Info().Msg("Shutting down server...")
cancel()
}()
server, err := NewServer(s.Directory)
if err != nil {
return fmt.Errorf("failed to create server: %w", err)
}

server := NewServer(s.Directory)
if err := server.Start(ctx, s.Port); err != nil && err != http.ErrServerClosed {
return fmt.Errorf("server error: %w", err)
}
Expand Down
Loading

0 comments on commit c6aeeb0

Please sign in to comment.