Files
silo/internal/api/servermode.go
Forbes 3d7302f383 feat: add SSE endpoint and server mode system
Add server-sent events at GET /api/events for live mutation
notifications. Add server mode (normal/read-only/degraded) exposed
via /health, /ready, and SSE server.state events.

New files:
- broker.go: SSE event hub with client management, non-blocking
  fan-out, ring buffer history for Last-Event-ID replay, heartbeat
- servermode.go: mode state machine with periodic MinIO health
  check and SIGUSR1 read-only toggle
- sse_handler.go: HTTP handler using http.Flusher and
  ResponseController to disable WriteTimeout for long-lived SSE
- broker_test.go, servermode_test.go: 13 unit tests

Modified:
- handlers.go: Server struct gains broker/serverState fields,
  Health/Ready include mode and sse_clients, write handlers
  emit item.created/updated/deleted and revision.created events
- routes.go: register GET /api/events, add RequireWritable
  middleware to all 8 editor-gated route groups
- middleware.go: RequireWritable returns 503 in read-only mode
- csv.go, ods.go: emit bulk item.created events after import
- storage.go: add Ping() method for health checks
- config.go: add ReadOnly field to ServerConfig
- main.go: create broker/state, start background goroutines,
  SIGUSR1 handler, graceful shutdown sequence

Closes #38, closes #39
2026-02-08 15:59:23 -06:00

137 lines
3.2 KiB
Go

package api
import (
"context"
"sync"
"time"
"github.com/kindredsystems/silo/internal/storage"
"github.com/rs/zerolog"
)
// ServerMode represents the operational mode of the server.
type ServerMode string
const (
ModeNormal ServerMode = "normal"
ModeReadOnly ServerMode = "read-only"
ModeDegraded ServerMode = "degraded"
)
const storageCheckInterval = 30 * time.Second
// ServerState tracks the server's current operational mode.
type ServerState struct {
logger zerolog.Logger
mu sync.RWMutex
readOnly bool
storageOK bool
storage *storage.Storage
broker *Broker
done chan struct{}
}
// NewServerState creates a new server state tracker.
func NewServerState(logger zerolog.Logger, store *storage.Storage, broker *Broker) *ServerState {
return &ServerState{
logger: logger.With().Str("component", "server-state").Logger(),
storageOK: store != nil, // assume healthy if configured
storage: store,
broker: broker,
done: make(chan struct{}),
}
}
// Mode returns the current effective server mode.
// Priority: explicit read-only > storage unhealthy (degraded) > normal.
func (ss *ServerState) Mode() ServerMode {
ss.mu.RLock()
defer ss.mu.RUnlock()
if ss.readOnly {
return ModeReadOnly
}
if ss.storage != nil && !ss.storageOK {
return ModeDegraded
}
return ModeNormal
}
// IsReadOnly returns true if the server should reject writes.
// Only explicit read-only mode blocks writes; degraded is informational.
func (ss *ServerState) IsReadOnly() bool {
ss.mu.RLock()
defer ss.mu.RUnlock()
return ss.readOnly
}
// SetReadOnly sets the explicit read-only flag and broadcasts a state change.
func (ss *ServerState) SetReadOnly(ro bool) {
ss.mu.Lock()
old := ss.mode()
ss.readOnly = ro
new := ss.mode()
ss.mu.Unlock()
if old != new {
ss.logger.Info().Str("mode", string(new)).Msg("server mode changed")
ss.broker.Publish("server.state", mustMarshal(map[string]string{"mode": string(new)}))
}
}
// ToggleReadOnly flips the read-only flag.
func (ss *ServerState) ToggleReadOnly() {
ss.mu.RLock()
current := ss.readOnly
ss.mu.RUnlock()
ss.SetReadOnly(!current)
}
// StartStorageHealthCheck launches a periodic check of MinIO reachability.
// Updates storageOK and broadcasts server.state on transitions.
func (ss *ServerState) StartStorageHealthCheck() {
if ss.storage == nil {
return
}
go func() {
ticker := time.NewTicker(storageCheckInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := ss.storage.Ping(ctx)
cancel()
ss.mu.Lock()
old := ss.mode()
ss.storageOK = err == nil
new := ss.mode()
ss.mu.Unlock()
if old != new {
ss.logger.Info().Str("mode", string(new)).Err(err).Msg("server mode changed")
ss.broker.Publish("server.state", mustMarshal(map[string]string{"mode": string(new)}))
}
case <-ss.done:
return
}
}
}()
}
// Shutdown stops the health check loop.
func (ss *ServerState) Shutdown() {
close(ss.done)
}
// mode returns the current mode. Must be called with mu held.
func (ss *ServerState) mode() ServerMode {
if ss.readOnly {
return ModeReadOnly
}
if ss.storage != nil && !ss.storageOK {
return ModeDegraded
}
return ModeNormal
}