Skip to content

Commit

Permalink
✨ Add SSE endpoint with Watermill integration
Browse files Browse the repository at this point in the history
  • Loading branch information
wesen committed Feb 22, 2025
1 parent 65daf68 commit 0f3d145
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 11 deletions.
8 changes: 7 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -1272,4 +1272,10 @@ Improved form data rendering in the console log:

Added Watermill dependencies for implementing SSE support:
- Added github.com/ThreeDotsLabs/watermill v1.4.4
- Added github.com/ThreeDotsLabs/watermill/pubsub/gochannel for local pub/sub
- Added github.com/ThreeDotsLabs/watermill/pubsub/gochannel for local pub/sub

Added SSE endpoint to UI server:
- Added /sse endpoint for real-time updates
- Implemented page-specific event channels using Watermill
- Added CORS support for cross-origin requests
- Added proper error handling and connection management
68 changes: 58 additions & 10 deletions cmd/ui-server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,43 @@ import (
"sync"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
"github.com/go-go-golems/clay/pkg/watcher"
"github.com/rs/zerolog/log"
"gopkg.in/yaml.v3"
)

type Server struct {
dir string
pages map[string]UIDefinition
routes map[string]http.HandlerFunc
watcher *watcher.Watcher
mux *http.ServeMux
mu sync.RWMutex
dir string
pages map[string]UIDefinition
routes map[string]http.HandlerFunc
watcher *watcher.Watcher
mux *http.ServeMux
mu sync.RWMutex
publisher message.Publisher
subscriber message.Subscriber
}

type UIDefinition struct {
Components []map[string]interface{} `yaml:"components"`
}

func NewServer(dir string) *Server {
// Initialize watermill publisher/subscriber
publisher := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)

s := &Server{
dir: dir,
pages: make(map[string]UIDefinition),
routes: make(map[string]http.HandlerFunc),
mux: http.NewServeMux(),
dir: dir,
pages: make(map[string]UIDefinition),
routes: make(map[string]http.HandlerFunc),
mux: http.NewServeMux(),
publisher: publisher,
subscriber: publisher,
}

// Create a watcher for the pages directory
Expand All @@ -48,6 +61,9 @@ func NewServer(dir string) *Server {

s.watcher = w

// Set up SSE endpoint
s.mux.HandleFunc("/sse", s.handleSSE())

// Set up static file handler
s.mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("static"))))

Expand Down Expand Up @@ -267,3 +283,35 @@ func (s *Server) handlePage(name string) http.HandlerFunc {
}
}
}

func (s *Server) handleSSE() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// Set SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")

// Get page ID from query
pageID := r.URL.Query().Get("page")
if pageID == "" {
http.Error(w, "page parameter is required", http.StatusBadRequest)
return
}

// Subscribe to page-specific topic
messages, err := s.subscriber.Subscribe(r.Context(), "ui-updates."+pageID)
if err != nil {
http.Error(w, "Failed to subscribe to events", http.StatusInternalServerError)
return
}

// Stream messages
for msg := range messages {
// Format SSE message
fmt.Fprintf(w, "event: %s\n", msg.Metadata["event-type"])
fmt.Fprintf(w, "data: %s\n\n", msg.Payload)
w.(http.Flusher).Flush()
}
}
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ toolchain go1.23.3

require (
github.com/JohannesKaufmann/html-to-markdown v1.6.0
github.com/ThreeDotsLabs/watermill v1.3.7
github.com/a-h/templ v0.3.833
github.com/go-go-golems/clay v0.1.31
github.com/go-go-golems/geppetto v0.4.37
Expand Down Expand Up @@ -70,6 +71,7 @@ require (
github.com/jedib0t/go-pretty v4.3.0+incompatible // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kopoli/go-terminal-size v0.0.0-20170219200355-5c97524c8b54 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ github.com/Masterminds/sprig v2.22.0+incompatible/go.mod h1:y6hNFY5UBTIWBxnzTeuN
github.com/PuerkitoBio/goquery v1.9.2/go.mod h1:GHPCaP0ODyyxqcNoFGYlAprUFH81NuRPd0GX3Zu2Mvk=
github.com/PuerkitoBio/goquery v1.10.1 h1:Y8JGYUkXWTGRB6Ars3+j3kN0xg1YqqlwvdTV8WTFQcU=
github.com/PuerkitoBio/goquery v1.10.1/go.mod h1:IYiHrOMps66ag56LEH7QYDDupKXyo5A8qrjIx3ZtujY=
github.com/ThreeDotsLabs/watermill v1.3.7 h1:NV0PSTmuACVEOV4dMxRnmGXrmbz8U83LENOvpHekN7o=
github.com/ThreeDotsLabs/watermill v1.3.7/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
github.com/a-h/templ v0.3.833 h1:L/KOk/0VvVTBegtE0fp2RJQiBm7/52Zxv5fqlEHiQUU=
github.com/a-h/templ v0.3.833/go.mod h1:cAu4AiZhtJfBjMY0HASlyzvkrtjnHWPeEsyGK2YYmfk=
github.com/adrg/frontmatter v0.2.0 h1:/DgnNe82o03riBd1S+ZDjd43wAmC6W35q67NHeLkPd4=
Expand Down Expand Up @@ -96,12 +98,17 @@ github.com/go-openapi/strfmt v0.23.0/go.mod h1:NrtIpfKtWIygRkKVsxh7XQMDQW5HKQl6S
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/css v1.0.1 h1:ntNaBIghp6JmvWnxbZKANoLyuXTPZ4cAMlo6RyhlbO8=
github.com/gorilla/css v1.0.1/go.mod h1:BvnYkspnSzMmwRK+b8/xgNPLiIuNZr6vbZBTPQ2A3b0=
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
Expand Down Expand Up @@ -136,6 +143,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8=
github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY=
github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
Expand Down

0 comments on commit 0f3d145

Please sign in to comment.