The server's ReadTimeout (15s) was closing SSE connections shortly after they were established, causing a rapid connect/disconnect loop. The handler already disabled WriteTimeout but not ReadTimeout.
65 lines
1.8 KiB
Go
65 lines
1.8 KiB
Go
package api
|
|
|
|
import (
|
|
"fmt"
|
|
"net/http"
|
|
"strconv"
|
|
"time"
|
|
)
|
|
|
|
// HandleEvents serves the SSE event stream.
|
|
// GET /api/events (requires auth, viewer+)
|
|
func (s *Server) HandleEvents(w http.ResponseWriter, r *http.Request) {
|
|
flusher, ok := w.(http.Flusher)
|
|
if !ok {
|
|
writeError(w, http.StatusInternalServerError, "streaming_unsupported", "Streaming not supported")
|
|
return
|
|
}
|
|
|
|
// Disable read and write deadlines for this long-lived connection.
|
|
// The server's ReadTimeout/WriteTimeout (15s) would otherwise kill it.
|
|
rc := http.NewResponseController(w)
|
|
if err := rc.SetReadDeadline(time.Time{}); err != nil {
|
|
s.logger.Warn().Err(err).Msg("failed to disable read deadline for SSE")
|
|
}
|
|
if err := rc.SetWriteDeadline(time.Time{}); err != nil {
|
|
s.logger.Warn().Err(err).Msg("failed to disable write deadline for SSE")
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
w.Header().Set("X-Accel-Buffering", "no") // nginx: disable proxy buffering
|
|
|
|
client := s.broker.Subscribe()
|
|
defer s.broker.Unsubscribe(client)
|
|
|
|
// Replay missed events if Last-Event-ID is present.
|
|
if lastIDStr := r.Header.Get("Last-Event-ID"); lastIDStr != "" {
|
|
if lastID, err := strconv.ParseUint(lastIDStr, 10, 64); err == nil {
|
|
for _, ev := range s.broker.EventsSince(lastID) {
|
|
fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", ev.ID, ev.Type, ev.Data)
|
|
}
|
|
flusher.Flush()
|
|
}
|
|
}
|
|
|
|
// Send initial server.state event.
|
|
fmt.Fprintf(w, "event: server.state\ndata: %s\n\n",
|
|
mustMarshal(map[string]string{"mode": string(s.serverState.Mode())}))
|
|
flusher.Flush()
|
|
|
|
// Stream loop.
|
|
for {
|
|
select {
|
|
case ev := <-client.ch:
|
|
fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", ev.ID, ev.Type, ev.Data)
|
|
flusher.Flush()
|
|
case <-client.closed:
|
|
return
|
|
case <-r.Context().Done():
|
|
return
|
|
}
|
|
}
|
|
}
|