Add server-sent events at GET /api/events for live mutation notifications. Add server mode (normal/read-only/degraded) exposed via /health, /ready, and SSE server.state events. New files: - broker.go: SSE event hub with client management, non-blocking fan-out, ring buffer history for Last-Event-ID replay, heartbeat - servermode.go: mode state machine with periodic MinIO health check and SIGUSR1 read-only toggle - sse_handler.go: HTTP handler using http.Flusher and ResponseController to disable WriteTimeout for long-lived SSE - broker_test.go, servermode_test.go: 13 unit tests Modified: - handlers.go: Server struct gains broker/serverState fields, Health/Ready include mode and sse_clients, write handlers emit item.created/updated/deleted and revision.created events - routes.go: register GET /api/events, add RequireWritable middleware to all 8 editor-gated route groups - middleware.go: RequireWritable returns 503 in read-only mode - csv.go, ods.go: emit bulk item.created events after import - storage.go: add Ping() method for health checks - config.go: add ReadOnly field to ServerConfig - main.go: create broker/state, start background goroutines, SIGUSR1 handler, graceful shutdown sequence Closes #38, closes #39
137 lines
3.2 KiB
Go
137 lines
3.2 KiB
Go
package api
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/kindredsystems/silo/internal/storage"
|
|
"github.com/rs/zerolog"
|
|
)
|
|
|
|
// ServerMode represents the operational mode of the server.
|
|
type ServerMode string
|
|
|
|
const (
|
|
ModeNormal ServerMode = "normal"
|
|
ModeReadOnly ServerMode = "read-only"
|
|
ModeDegraded ServerMode = "degraded"
|
|
)
|
|
|
|
const storageCheckInterval = 30 * time.Second
|
|
|
|
// ServerState tracks the server's current operational mode.
|
|
type ServerState struct {
|
|
logger zerolog.Logger
|
|
mu sync.RWMutex
|
|
readOnly bool
|
|
storageOK bool
|
|
storage *storage.Storage
|
|
broker *Broker
|
|
done chan struct{}
|
|
}
|
|
|
|
// NewServerState creates a new server state tracker.
|
|
func NewServerState(logger zerolog.Logger, store *storage.Storage, broker *Broker) *ServerState {
|
|
return &ServerState{
|
|
logger: logger.With().Str("component", "server-state").Logger(),
|
|
storageOK: store != nil, // assume healthy if configured
|
|
storage: store,
|
|
broker: broker,
|
|
done: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Mode returns the current effective server mode.
|
|
// Priority: explicit read-only > storage unhealthy (degraded) > normal.
|
|
func (ss *ServerState) Mode() ServerMode {
|
|
ss.mu.RLock()
|
|
defer ss.mu.RUnlock()
|
|
if ss.readOnly {
|
|
return ModeReadOnly
|
|
}
|
|
if ss.storage != nil && !ss.storageOK {
|
|
return ModeDegraded
|
|
}
|
|
return ModeNormal
|
|
}
|
|
|
|
// IsReadOnly returns true if the server should reject writes.
|
|
// Only explicit read-only mode blocks writes; degraded is informational.
|
|
func (ss *ServerState) IsReadOnly() bool {
|
|
ss.mu.RLock()
|
|
defer ss.mu.RUnlock()
|
|
return ss.readOnly
|
|
}
|
|
|
|
// SetReadOnly sets the explicit read-only flag and broadcasts a state change.
|
|
func (ss *ServerState) SetReadOnly(ro bool) {
|
|
ss.mu.Lock()
|
|
old := ss.mode()
|
|
ss.readOnly = ro
|
|
new := ss.mode()
|
|
ss.mu.Unlock()
|
|
|
|
if old != new {
|
|
ss.logger.Info().Str("mode", string(new)).Msg("server mode changed")
|
|
ss.broker.Publish("server.state", mustMarshal(map[string]string{"mode": string(new)}))
|
|
}
|
|
}
|
|
|
|
// ToggleReadOnly flips the read-only flag.
|
|
func (ss *ServerState) ToggleReadOnly() {
|
|
ss.mu.RLock()
|
|
current := ss.readOnly
|
|
ss.mu.RUnlock()
|
|
ss.SetReadOnly(!current)
|
|
}
|
|
|
|
// StartStorageHealthCheck launches a periodic check of MinIO reachability.
|
|
// Updates storageOK and broadcasts server.state on transitions.
|
|
func (ss *ServerState) StartStorageHealthCheck() {
|
|
if ss.storage == nil {
|
|
return
|
|
}
|
|
go func() {
|
|
ticker := time.NewTicker(storageCheckInterval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
err := ss.storage.Ping(ctx)
|
|
cancel()
|
|
|
|
ss.mu.Lock()
|
|
old := ss.mode()
|
|
ss.storageOK = err == nil
|
|
new := ss.mode()
|
|
ss.mu.Unlock()
|
|
|
|
if old != new {
|
|
ss.logger.Info().Str("mode", string(new)).Err(err).Msg("server mode changed")
|
|
ss.broker.Publish("server.state", mustMarshal(map[string]string{"mode": string(new)}))
|
|
}
|
|
case <-ss.done:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Shutdown stops the health check loop.
|
|
func (ss *ServerState) Shutdown() {
|
|
close(ss.done)
|
|
}
|
|
|
|
// mode returns the current mode. Must be called with mu held.
|
|
func (ss *ServerState) mode() ServerMode {
|
|
if ss.readOnly {
|
|
return ModeReadOnly
|
|
}
|
|
if ss.storage != nil && !ss.storageOK {
|
|
return ModeDegraded
|
|
}
|
|
return ModeNormal
|
|
}
|