feat: add SSE endpoint and server mode system
Add server-sent events at GET /api/events for live mutation notifications. Add server mode (normal/read-only/degraded) exposed via /health, /ready, and SSE server.state events. New files: - broker.go: SSE event hub with client management, non-blocking fan-out, ring buffer history for Last-Event-ID replay, heartbeat - servermode.go: mode state machine with periodic MinIO health check and SIGUSR1 read-only toggle - sse_handler.go: HTTP handler using http.Flusher and ResponseController to disable WriteTimeout for long-lived SSE - broker_test.go, servermode_test.go: 13 unit tests Modified: - handlers.go: Server struct gains broker/serverState fields, Health/Ready include mode and sse_clients, write handlers emit item.created/updated/deleted and revision.created events - routes.go: register GET /api/events, add RequireWritable middleware to all 8 editor-gated route groups - middleware.go: RequireWritable returns 503 in read-only mode - csv.go, ods.go: emit bulk item.created events after import - storage.go: add Ping() method for health checks - config.go: add ReadOnly field to ServerConfig - main.go: create broker/state, start background goroutines, SIGUSR1 handler, graceful shutdown sequence Closes #38, closes #39
This commit is contained in:
168
internal/api/broker.go
Normal file
168
internal/api/broker.go
Normal file
@@ -0,0 +1,168 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
// Event represents a server-sent event.
|
||||
type Event struct {
|
||||
ID uint64
|
||||
Type string
|
||||
Data string
|
||||
}
|
||||
|
||||
// sseClient represents a single connected SSE consumer.
|
||||
type sseClient struct {
|
||||
ch chan Event
|
||||
closed chan struct{}
|
||||
}
|
||||
|
||||
const (
|
||||
clientChanSize = 64
|
||||
historySize = 256
|
||||
heartbeatInterval = 30 * time.Second
|
||||
)
|
||||
|
||||
// Broker manages SSE client connections and event fan-out.
|
||||
type Broker struct {
|
||||
logger zerolog.Logger
|
||||
mu sync.RWMutex
|
||||
clients map[*sseClient]struct{}
|
||||
eventID atomic.Uint64
|
||||
|
||||
historyMu sync.RWMutex
|
||||
history []Event
|
||||
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// NewBroker creates a new SSE broker.
|
||||
func NewBroker(logger zerolog.Logger) *Broker {
|
||||
return &Broker{
|
||||
logger: logger.With().Str("component", "sse-broker").Logger(),
|
||||
clients: make(map[*sseClient]struct{}),
|
||||
history: make([]Event, 0, historySize),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe adds a new client and returns it. The caller must call Unsubscribe when done.
|
||||
func (b *Broker) Subscribe() *sseClient {
|
||||
c := &sseClient{
|
||||
ch: make(chan Event, clientChanSize),
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
b.mu.Lock()
|
||||
b.clients[c] = struct{}{}
|
||||
count := len(b.clients)
|
||||
b.mu.Unlock()
|
||||
b.logger.Info().Int("clients", count).Msg("client connected")
|
||||
return c
|
||||
}
|
||||
|
||||
// Unsubscribe removes a client and closes its channel.
|
||||
func (b *Broker) Unsubscribe(c *sseClient) {
|
||||
b.mu.Lock()
|
||||
if _, ok := b.clients[c]; ok {
|
||||
delete(b.clients, c)
|
||||
close(c.closed)
|
||||
}
|
||||
count := len(b.clients)
|
||||
b.mu.Unlock()
|
||||
b.logger.Info().Int("clients", count).Msg("client disconnected")
|
||||
}
|
||||
|
||||
// Publish sends an event to all connected clients. Non-blocking per client:
|
||||
// if a client's channel is full, the event is dropped for that client.
|
||||
func (b *Broker) Publish(eventType string, data string) {
|
||||
ev := Event{
|
||||
ID: b.eventID.Add(1),
|
||||
Type: eventType,
|
||||
Data: data,
|
||||
}
|
||||
|
||||
// Append to history ring buffer.
|
||||
b.historyMu.Lock()
|
||||
if len(b.history) >= historySize {
|
||||
b.history = b.history[1:]
|
||||
}
|
||||
b.history = append(b.history, ev)
|
||||
b.historyMu.Unlock()
|
||||
|
||||
// Fan out to all clients.
|
||||
b.mu.RLock()
|
||||
for c := range b.clients {
|
||||
select {
|
||||
case c.ch <- ev:
|
||||
default:
|
||||
b.logger.Warn().Uint64("event_id", ev.ID).Str("type", eventType).Msg("dropped event for slow client")
|
||||
}
|
||||
}
|
||||
b.mu.RUnlock()
|
||||
}
|
||||
|
||||
// ClientCount returns the number of connected SSE clients.
|
||||
func (b *Broker) ClientCount() int {
|
||||
b.mu.RLock()
|
||||
defer b.mu.RUnlock()
|
||||
return len(b.clients)
|
||||
}
|
||||
|
||||
// EventsSince returns events with IDs greater than lastID, for Last-Event-ID replay.
|
||||
func (b *Broker) EventsSince(lastID uint64) []Event {
|
||||
b.historyMu.RLock()
|
||||
defer b.historyMu.RUnlock()
|
||||
|
||||
var result []Event
|
||||
for _, ev := range b.history {
|
||||
if ev.ID > lastID {
|
||||
result = append(result, ev)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// StartHeartbeat launches a goroutine that publishes a heartbeat every 30s.
|
||||
func (b *Broker) StartHeartbeat() {
|
||||
go func() {
|
||||
ticker := time.NewTicker(heartbeatInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
b.Publish("heartbeat", "{}")
|
||||
case <-b.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Shutdown closes all client connections and stops the heartbeat.
|
||||
func (b *Broker) Shutdown() {
|
||||
close(b.done)
|
||||
|
||||
b.mu.Lock()
|
||||
for c := range b.clients {
|
||||
delete(b.clients, c)
|
||||
close(c.closed)
|
||||
}
|
||||
b.mu.Unlock()
|
||||
|
||||
b.logger.Info().Msg("broker shut down")
|
||||
}
|
||||
|
||||
// mustMarshal serializes v to JSON. Panics on error (should only be used with
|
||||
// known-good types like structs and maps).
|
||||
func mustMarshal(v any) string {
|
||||
data, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
panic("api: failed to marshal SSE event data: " + err.Error())
|
||||
}
|
||||
return string(data)
|
||||
}
|
||||
Reference in New Issue
Block a user