Files
silo/internal/api/broker.go
Forbes 3d7302f383 feat: add SSE endpoint and server mode system
Add server-sent events at GET /api/events for live mutation
notifications. Add server mode (normal/read-only/degraded) exposed
via /health, /ready, and SSE server.state events.

New files:
- broker.go: SSE event hub with client management, non-blocking
  fan-out, ring buffer history for Last-Event-ID replay, heartbeat
- servermode.go: mode state machine with periodic MinIO health
  check and SIGUSR1 read-only toggle
- sse_handler.go: HTTP handler using http.Flusher and
  ResponseController to disable WriteTimeout for long-lived SSE
- broker_test.go, servermode_test.go: 13 unit tests

Modified:
- handlers.go: Server struct gains broker/serverState fields,
  Health/Ready include mode and sse_clients, write handlers
  emit item.created/updated/deleted and revision.created events
- routes.go: register GET /api/events, add RequireWritable
  middleware to all 8 editor-gated route groups
- middleware.go: RequireWritable returns 503 in read-only mode
- csv.go, ods.go: emit bulk item.created events after import
- storage.go: add Ping() method for health checks
- config.go: add ReadOnly field to ServerConfig
- main.go: create broker/state, start background goroutines,
  SIGUSR1 handler, graceful shutdown sequence

Closes #38, closes #39
2026-02-08 15:59:23 -06:00

169 lines
3.6 KiB
Go

package api
import (
"encoding/json"
"sync"
"sync/atomic"
"time"
"github.com/rs/zerolog"
)
// Event represents a server-sent event.
type Event struct {
ID uint64
Type string
Data string
}
// sseClient represents a single connected SSE consumer.
type sseClient struct {
ch chan Event
closed chan struct{}
}
const (
clientChanSize = 64
historySize = 256
heartbeatInterval = 30 * time.Second
)
// Broker manages SSE client connections and event fan-out.
type Broker struct {
logger zerolog.Logger
mu sync.RWMutex
clients map[*sseClient]struct{}
eventID atomic.Uint64
historyMu sync.RWMutex
history []Event
done chan struct{}
}
// NewBroker creates a new SSE broker.
func NewBroker(logger zerolog.Logger) *Broker {
return &Broker{
logger: logger.With().Str("component", "sse-broker").Logger(),
clients: make(map[*sseClient]struct{}),
history: make([]Event, 0, historySize),
done: make(chan struct{}),
}
}
// Subscribe adds a new client and returns it. The caller must call Unsubscribe when done.
func (b *Broker) Subscribe() *sseClient {
c := &sseClient{
ch: make(chan Event, clientChanSize),
closed: make(chan struct{}),
}
b.mu.Lock()
b.clients[c] = struct{}{}
count := len(b.clients)
b.mu.Unlock()
b.logger.Info().Int("clients", count).Msg("client connected")
return c
}
// Unsubscribe removes a client and closes its channel.
func (b *Broker) Unsubscribe(c *sseClient) {
b.mu.Lock()
if _, ok := b.clients[c]; ok {
delete(b.clients, c)
close(c.closed)
}
count := len(b.clients)
b.mu.Unlock()
b.logger.Info().Int("clients", count).Msg("client disconnected")
}
// Publish sends an event to all connected clients. Non-blocking per client:
// if a client's channel is full, the event is dropped for that client.
func (b *Broker) Publish(eventType string, data string) {
ev := Event{
ID: b.eventID.Add(1),
Type: eventType,
Data: data,
}
// Append to history ring buffer.
b.historyMu.Lock()
if len(b.history) >= historySize {
b.history = b.history[1:]
}
b.history = append(b.history, ev)
b.historyMu.Unlock()
// Fan out to all clients.
b.mu.RLock()
for c := range b.clients {
select {
case c.ch <- ev:
default:
b.logger.Warn().Uint64("event_id", ev.ID).Str("type", eventType).Msg("dropped event for slow client")
}
}
b.mu.RUnlock()
}
// ClientCount returns the number of connected SSE clients.
func (b *Broker) ClientCount() int {
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.clients)
}
// EventsSince returns events with IDs greater than lastID, for Last-Event-ID replay.
func (b *Broker) EventsSince(lastID uint64) []Event {
b.historyMu.RLock()
defer b.historyMu.RUnlock()
var result []Event
for _, ev := range b.history {
if ev.ID > lastID {
result = append(result, ev)
}
}
return result
}
// StartHeartbeat launches a goroutine that publishes a heartbeat every 30s.
func (b *Broker) StartHeartbeat() {
go func() {
ticker := time.NewTicker(heartbeatInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
b.Publish("heartbeat", "{}")
case <-b.done:
return
}
}
}()
}
// Shutdown closes all client connections and stops the heartbeat.
func (b *Broker) Shutdown() {
close(b.done)
b.mu.Lock()
for c := range b.clients {
delete(b.clients, c)
close(c.closed)
}
b.mu.Unlock()
b.logger.Info().Msg("broker shut down")
}
// mustMarshal serializes v to JSON. Panics on error (should only be used with
// known-good types like structs and maps).
func mustMarshal(v any) string {
data, err := json.Marshal(v)
if err != nil {
panic("api: failed to marshal SSE event data: " + err.Error())
}
return string(data)
}