Files
silo/internal/api/sse_handler.go
Forbes e7da3ee94d feat(sse): per-connection filtering with user and workstation context
- Extend sseClient with userID, workstationID, and item filter set
- Update Subscribe() to accept userID and workstationID params
- Add WatchItem/UnwatchItem/IsWatchingItem methods on sseClient
- Add PublishToItem, PublishToWorkstation, PublishToUser targeted delivery
- Targeted events get IDs but skip history ring buffer (real-time only)
- Update HandleEvents to pass auth user ID and workstation_id query param
- Touch workstation last_seen on SSE connect
- Existing Publish() broadcast unchanged; all current callers unaffected
- Add 5 new tests for targeted delivery and item watch lifecycle

Closes #162
2026-03-01 10:04:01 -06:00

77 lines
2.1 KiB
Go

package api
import (
"fmt"
"net/http"
"strconv"
"time"
"github.com/kindredsystems/silo/internal/auth"
)
// HandleEvents serves the SSE event stream.
// GET /api/events (requires auth, viewer+)
func (s *Server) HandleEvents(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
writeError(w, http.StatusInternalServerError, "streaming_unsupported", "Streaming not supported")
return
}
// Disable read and write deadlines for this long-lived connection.
// The server's ReadTimeout/WriteTimeout (15s) would otherwise kill it.
rc := http.NewResponseController(w)
if err := rc.SetReadDeadline(time.Time{}); err != nil {
s.logger.Warn().Err(err).Msg("failed to disable read deadline for SSE")
}
if err := rc.SetWriteDeadline(time.Time{}); err != nil {
s.logger.Warn().Err(err).Msg("failed to disable write deadline for SSE")
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no") // nginx: disable proxy buffering
userID := ""
if user := auth.UserFromContext(r.Context()); user != nil {
userID = user.ID
}
wsID := r.URL.Query().Get("workstation_id")
client := s.broker.Subscribe(userID, wsID)
defer s.broker.Unsubscribe(client)
if wsID != "" {
s.workstations.Touch(r.Context(), wsID)
}
// Replay missed events if Last-Event-ID is present.
if lastIDStr := r.Header.Get("Last-Event-ID"); lastIDStr != "" {
if lastID, err := strconv.ParseUint(lastIDStr, 10, 64); err == nil {
for _, ev := range s.broker.EventsSince(lastID) {
fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", ev.ID, ev.Type, ev.Data)
}
flusher.Flush()
}
}
// Send initial server.state event.
fmt.Fprintf(w, "event: server.state\ndata: %s\n\n",
mustMarshal(map[string]string{"mode": string(s.serverState.Mode())}))
flusher.Flush()
// Stream loop.
for {
select {
case ev := <-client.ch:
fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", ev.ID, ev.Type, ev.Data)
flusher.Flush()
case <-client.closed:
return
case <-r.Context().Done():
return
}
}
}