Merge pull request 'feat: SSE endpoint and server mode system (#38, #39)' (#40) from feat-38-39-sse-server-mode into main

Reviewed-on: #40
This commit was merged in pull request #40.
This commit is contained in:
2026-02-08 22:00:11 +00:00
15 changed files with 747 additions and 14 deletions

View File

@@ -178,9 +178,19 @@ func main() {
} }
} }
// Create SSE broker and server state
broker := api.NewBroker(logger)
serverState := api.NewServerState(logger, store, broker)
if cfg.Server.ReadOnly {
serverState.SetReadOnly(true)
logger.Warn().Msg("server started in read-only mode")
}
broker.StartHeartbeat()
serverState.StartStorageHealthCheck()
// Create API server // Create API server
server := api.NewServer(logger, database, schemas, cfg.Schemas.Directory, store, server := api.NewServer(logger, database, schemas, cfg.Schemas.Directory, store,
authService, sessionManager, oidcBackend, &cfg.Auth) authService, sessionManager, oidcBackend, &cfg.Auth, broker, serverState)
router := api.NewRouter(server, logger) router := api.NewRouter(server, logger)
// Create HTTP server // Create HTTP server
@@ -201,6 +211,16 @@ func main() {
} }
}() }()
// SIGUSR1: toggle read-only mode
usr1 := make(chan os.Signal, 1)
signal.Notify(usr1, syscall.SIGUSR1)
go func() {
for range usr1 {
serverState.ToggleReadOnly()
logger.Info().Str("mode", string(serverState.Mode())).Msg("read-only mode toggled via SIGUSR1")
}
}()
// Wait for interrupt signal // Wait for interrupt signal
quit := make(chan os.Signal, 1) quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
@@ -208,7 +228,10 @@ func main() {
logger.Info().Msg("shutting down server") logger.Info().Msg("shutting down server")
// Graceful shutdown with timeout // Graceful shutdown: close SSE connections first, then HTTP server
broker.Shutdown()
serverState.Shutdown()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel() defer cancel()

View File

@@ -21,6 +21,8 @@ func newTestServer(t *testing.T) *Server {
t.Helper() t.Helper()
pool := testutil.MustConnectTestPool(t) pool := testutil.MustConnectTestPool(t)
database := db.NewFromPool(pool) database := db.NewFromPool(pool)
broker := NewBroker(zerolog.Nop())
state := NewServerState(zerolog.Nop(), nil, broker)
return NewServer( return NewServer(
zerolog.Nop(), zerolog.Nop(),
database, database,
@@ -31,6 +33,8 @@ func newTestServer(t *testing.T) *Server {
nil, // sessionManager nil, // sessionManager
nil, // oidcBackend nil, // oidcBackend
nil, // authConfig (nil = dev mode) nil, // authConfig (nil = dev mode)
broker,
state,
) )
} }

168
internal/api/broker.go Normal file
View File

@@ -0,0 +1,168 @@
package api
import (
"encoding/json"
"sync"
"sync/atomic"
"time"
"github.com/rs/zerolog"
)
// Event represents a server-sent event.
type Event struct {
ID uint64
Type string
Data string
}
// sseClient represents a single connected SSE consumer.
type sseClient struct {
ch chan Event
closed chan struct{}
}
const (
clientChanSize = 64
historySize = 256
heartbeatInterval = 30 * time.Second
)
// Broker manages SSE client connections and event fan-out.
type Broker struct {
logger zerolog.Logger
mu sync.RWMutex
clients map[*sseClient]struct{}
eventID atomic.Uint64
historyMu sync.RWMutex
history []Event
done chan struct{}
}
// NewBroker creates a new SSE broker.
func NewBroker(logger zerolog.Logger) *Broker {
return &Broker{
logger: logger.With().Str("component", "sse-broker").Logger(),
clients: make(map[*sseClient]struct{}),
history: make([]Event, 0, historySize),
done: make(chan struct{}),
}
}
// Subscribe adds a new client and returns it. The caller must call Unsubscribe when done.
func (b *Broker) Subscribe() *sseClient {
c := &sseClient{
ch: make(chan Event, clientChanSize),
closed: make(chan struct{}),
}
b.mu.Lock()
b.clients[c] = struct{}{}
count := len(b.clients)
b.mu.Unlock()
b.logger.Info().Int("clients", count).Msg("client connected")
return c
}
// Unsubscribe removes a client and closes its channel.
func (b *Broker) Unsubscribe(c *sseClient) {
b.mu.Lock()
if _, ok := b.clients[c]; ok {
delete(b.clients, c)
close(c.closed)
}
count := len(b.clients)
b.mu.Unlock()
b.logger.Info().Int("clients", count).Msg("client disconnected")
}
// Publish sends an event to all connected clients. Non-blocking per client:
// if a client's channel is full, the event is dropped for that client.
func (b *Broker) Publish(eventType string, data string) {
ev := Event{
ID: b.eventID.Add(1),
Type: eventType,
Data: data,
}
// Append to history ring buffer.
b.historyMu.Lock()
if len(b.history) >= historySize {
b.history = b.history[1:]
}
b.history = append(b.history, ev)
b.historyMu.Unlock()
// Fan out to all clients.
b.mu.RLock()
for c := range b.clients {
select {
case c.ch <- ev:
default:
b.logger.Warn().Uint64("event_id", ev.ID).Str("type", eventType).Msg("dropped event for slow client")
}
}
b.mu.RUnlock()
}
// ClientCount returns the number of connected SSE clients.
func (b *Broker) ClientCount() int {
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.clients)
}
// EventsSince returns events with IDs greater than lastID, for Last-Event-ID replay.
func (b *Broker) EventsSince(lastID uint64) []Event {
b.historyMu.RLock()
defer b.historyMu.RUnlock()
var result []Event
for _, ev := range b.history {
if ev.ID > lastID {
result = append(result, ev)
}
}
return result
}
// StartHeartbeat launches a goroutine that publishes a heartbeat every 30s.
func (b *Broker) StartHeartbeat() {
go func() {
ticker := time.NewTicker(heartbeatInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
b.Publish("heartbeat", "{}")
case <-b.done:
return
}
}
}()
}
// Shutdown closes all client connections and stops the heartbeat.
func (b *Broker) Shutdown() {
close(b.done)
b.mu.Lock()
for c := range b.clients {
delete(b.clients, c)
close(c.closed)
}
b.mu.Unlock()
b.logger.Info().Msg("broker shut down")
}
// mustMarshal serializes v to JSON. Panics on error (should only be used with
// known-good types like structs and maps).
func mustMarshal(v any) string {
data, err := json.Marshal(v)
if err != nil {
panic("api: failed to marshal SSE event data: " + err.Error())
}
return string(data)
}

147
internal/api/broker_test.go Normal file
View File

@@ -0,0 +1,147 @@
package api
import (
"testing"
"time"
"github.com/rs/zerolog"
)
func TestBrokerSubscribeUnsubscribe(t *testing.T) {
b := NewBroker(zerolog.Nop())
c := b.Subscribe()
if b.ClientCount() != 1 {
t.Fatalf("expected 1 client, got %d", b.ClientCount())
}
b.Unsubscribe(c)
if b.ClientCount() != 0 {
t.Fatalf("expected 0 clients, got %d", b.ClientCount())
}
}
func TestBrokerPublish(t *testing.T) {
b := NewBroker(zerolog.Nop())
c := b.Subscribe()
defer b.Unsubscribe(c)
b.Publish("item.created", `{"part_number":"F01-0001"}`)
select {
case ev := <-c.ch:
if ev.Type != "item.created" {
t.Fatalf("expected type item.created, got %s", ev.Type)
}
if ev.ID != 1 {
t.Fatalf("expected ID 1, got %d", ev.ID)
}
if ev.Data != `{"part_number":"F01-0001"}` {
t.Fatalf("unexpected data: %s", ev.Data)
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for event")
}
}
func TestBrokerPublishDropsSlow(t *testing.T) {
b := NewBroker(zerolog.Nop())
c := b.Subscribe()
defer b.Unsubscribe(c)
// Fill the client's channel
for i := 0; i < clientChanSize+10; i++ {
b.Publish("heartbeat", "{}")
}
// Should have clientChanSize events buffered, rest dropped
count := len(c.ch)
if count != clientChanSize {
t.Fatalf("expected %d buffered events, got %d", clientChanSize, count)
}
}
func TestBrokerEventsSince(t *testing.T) {
b := NewBroker(zerolog.Nop())
b.Publish("item.created", `{"pn":"A"}`)
b.Publish("item.updated", `{"pn":"B"}`)
b.Publish("item.deleted", `{"pn":"C"}`)
events := b.EventsSince(1) // after ID 1
if len(events) != 2 {
t.Fatalf("expected 2 events, got %d", len(events))
}
if events[0].Type != "item.updated" {
t.Fatalf("expected item.updated, got %s", events[0].Type)
}
if events[1].Type != "item.deleted" {
t.Fatalf("expected item.deleted, got %s", events[1].Type)
}
// No events after the latest
events = b.EventsSince(3)
if len(events) != 0 {
t.Fatalf("expected 0 events, got %d", len(events))
}
}
func TestBrokerClientCount(t *testing.T) {
b := NewBroker(zerolog.Nop())
c1 := b.Subscribe()
c2 := b.Subscribe()
c3 := b.Subscribe()
if b.ClientCount() != 3 {
t.Fatalf("expected 3 clients, got %d", b.ClientCount())
}
b.Unsubscribe(c2)
if b.ClientCount() != 2 {
t.Fatalf("expected 2 clients, got %d", b.ClientCount())
}
b.Unsubscribe(c1)
b.Unsubscribe(c3)
if b.ClientCount() != 0 {
t.Fatalf("expected 0 clients, got %d", b.ClientCount())
}
}
func TestBrokerShutdown(t *testing.T) {
b := NewBroker(zerolog.Nop())
c := b.Subscribe()
b.Shutdown()
// Client's closed channel should be closed
select {
case <-c.closed:
// expected
case <-time.After(time.Second):
t.Fatal("client closed channel not closed after shutdown")
}
if b.ClientCount() != 0 {
t.Fatalf("expected 0 clients after shutdown, got %d", b.ClientCount())
}
}
func TestBrokerMonotonicIDs(t *testing.T) {
b := NewBroker(zerolog.Nop())
b.Publish("a", "{}")
b.Publish("b", "{}")
b.Publish("c", "{}")
events := b.EventsSince(0)
if len(events) != 3 {
t.Fatalf("expected 3 events, got %d", len(events))
}
for i := 1; i < len(events); i++ {
if events[i].ID <= events[i-1].ID {
t.Fatalf("event IDs not monotonic: %d <= %d", events[i].ID, events[i-1].ID)
}
}
}

View File

@@ -427,6 +427,13 @@ func (s *Server) HandleImportCSV(w http.ResponseWriter, r *http.Request) {
Msg("CSV import completed") Msg("CSV import completed")
writeJSON(w, http.StatusOK, result) writeJSON(w, http.StatusOK, result)
if !dryRun && result.SuccessCount > 0 {
s.broker.Publish("item.created", mustMarshal(map[string]any{
"bulk": true,
"count": result.SuccessCount,
"items": result.CreatedItems,
}))
}
} }
// HandleCSVTemplate returns an empty CSV template with headers. // HandleCSVTemplate returns an empty CSV template with headers.

View File

@@ -40,6 +40,8 @@ type Server struct {
oidc *auth.OIDCBackend oidc *auth.OIDCBackend
authConfig *config.AuthConfig authConfig *config.AuthConfig
itemFiles *db.ItemFileRepository itemFiles *db.ItemFileRepository
broker *Broker
serverState *ServerState
} }
// NewServer creates a new API server. // NewServer creates a new API server.
@@ -53,6 +55,8 @@ func NewServer(
sessionManager *scs.SessionManager, sessionManager *scs.SessionManager,
oidcBackend *auth.OIDCBackend, oidcBackend *auth.OIDCBackend,
authCfg *config.AuthConfig, authCfg *config.AuthConfig,
broker *Broker,
state *ServerState,
) *Server { ) *Server {
items := db.NewItemRepository(database) items := db.NewItemRepository(database)
projects := db.NewProjectRepository(database) projects := db.NewProjectRepository(database)
@@ -76,6 +80,8 @@ func NewServer(
oidc: oidcBackend, oidc: oidcBackend,
authConfig: authCfg, authConfig: authCfg,
itemFiles: itemFiles, itemFiles: itemFiles,
broker: broker,
serverState: state,
} }
} }
@@ -112,7 +118,10 @@ func writeError(w http.ResponseWriter, status int, err string, message string) {
// HandleHealth returns basic health status. // HandleHealth returns basic health status.
func (s *Server) HandleHealth(w http.ResponseWriter, r *http.Request) { func (s *Server) HandleHealth(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) writeJSON(w, http.StatusOK, map[string]string{
"status": "ok",
"mode": string(s.serverState.Mode()),
})
} }
// HandleReady checks database and storage connectivity. // HandleReady checks database and storage connectivity.
@@ -125,11 +134,21 @@ func (s *Server) HandleReady(w http.ResponseWriter, r *http.Request) {
return return
} }
// Storage check would go here if we had a ping method storageStatus := "ok"
writeJSON(w, http.StatusOK, map[string]string{ if s.storage != nil {
"status": "ready", if err := s.storage.Ping(ctx); err != nil {
"database": "ok", storageStatus = "unavailable"
"storage": "ok", }
} else {
storageStatus = "not_configured"
}
writeJSON(w, http.StatusOK, map[string]any{
"status": "ready",
"database": "ok",
"storage": storageStatus,
"mode": string(s.serverState.Mode()),
"sse_clients": s.broker.ClientCount(),
}) })
} }
@@ -441,7 +460,9 @@ func (s *Server) HandleCreateItem(w http.ResponseWriter, r *http.Request) {
} }
} }
writeJSON(w, http.StatusCreated, itemToResponse(item)) resp := itemToResponse(item)
writeJSON(w, http.StatusCreated, resp)
s.broker.Publish("item.created", mustMarshal(resp))
} }
// HandleGetItem retrieves an item by part number. // HandleGetItem retrieves an item by part number.
@@ -565,7 +586,9 @@ func (s *Server) HandleUpdateItem(w http.ResponseWriter, r *http.Request) {
// Get updated item (use new part number if changed) // Get updated item (use new part number if changed)
item, _ = s.items.GetByPartNumber(ctx, fields.PartNumber) item, _ = s.items.GetByPartNumber(ctx, fields.PartNumber)
writeJSON(w, http.StatusOK, itemToResponse(item)) resp := itemToResponse(item)
writeJSON(w, http.StatusOK, resp)
s.broker.Publish("item.updated", mustMarshal(resp))
} }
// HandleDeleteItem permanently deletes an item. // HandleDeleteItem permanently deletes an item.
@@ -601,6 +624,7 @@ func (s *Server) HandleDeleteItem(w http.ResponseWriter, r *http.Request) {
} }
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
s.broker.Publish("item.deleted", mustMarshal(map[string]string{"part_number": partNumber}))
} }
// Revision handlers // Revision handlers
@@ -877,6 +901,11 @@ func (s *Server) HandleRollbackRevision(w http.ResponseWriter, r *http.Request)
Msg("rollback revision created") Msg("rollback revision created")
writeJSON(w, http.StatusCreated, revisionToResponse(newRev)) writeJSON(w, http.StatusCreated, revisionToResponse(newRev))
s.broker.Publish("revision.created", mustMarshal(map[string]any{
"part_number": partNumber,
"revision_number": newRev.RevisionNumber,
"rollback_from": revNum,
}))
} }
// Part number generation // Part number generation
@@ -1199,6 +1228,10 @@ func (s *Server) HandleCreateRevision(w http.ResponseWriter, r *http.Request) {
} }
writeJSON(w, http.StatusCreated, revisionToResponse(rev)) writeJSON(w, http.StatusCreated, revisionToResponse(rev))
s.broker.Publish("revision.created", mustMarshal(map[string]any{
"part_number": partNumber,
"revision_number": rev.RevisionNumber,
}))
} }
// HandleUploadFile uploads a file and creates a new revision. // HandleUploadFile uploads a file and creates a new revision.

View File

@@ -136,6 +136,18 @@ func (s *Server) RequireRole(minimum string) func(http.Handler) http.Handler {
} }
} }
// RequireWritable rejects requests with 503 when the server is in read-only mode.
func (s *Server) RequireWritable(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if s.serverState.IsReadOnly() {
writeError(w, http.StatusServiceUnavailable, "server_read_only",
"Server is in read-only mode")
return
}
next.ServeHTTP(w, r)
})
}
func extractBearerToken(r *http.Request) string { func extractBearerToken(r *http.Request) string {
h := r.Header.Get("Authorization") h := r.Header.Get("Authorization")
if strings.HasPrefix(h, "Bearer ") { if strings.HasPrefix(h, "Bearer ") {

View File

@@ -468,6 +468,13 @@ func (s *Server) HandleImportODS(w http.ResponseWriter, r *http.Request) {
Msg("ODS import completed") Msg("ODS import completed")
writeJSON(w, http.StatusOK, result) writeJSON(w, http.StatusOK, result)
if !dryRun && result.SuccessCount > 0 {
s.broker.Publish("item.created", mustMarshal(map[string]any{
"bulk": true,
"count": result.SuccessCount,
"items": result.CreatedItems,
}))
}
} }
// HandleExportBOMODS exports the expanded BOM as a formatted ODS file. // HandleExportBOMODS exports the expanded BOM as a formatted ODS file.

View File

@@ -64,6 +64,9 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler {
r.Route("/api", func(r chi.Router) { r.Route("/api", func(r chi.Router) {
r.Use(server.RequireAuth) r.Use(server.RequireAuth)
// SSE event stream (viewer+)
r.Get("/events", server.HandleEvents)
// Auth endpoints // Auth endpoints
r.Get("/auth/me", server.HandleGetCurrentUser) r.Get("/auth/me", server.HandleGetCurrentUser)
r.Route("/auth/tokens", func(r chi.Router) { r.Route("/auth/tokens", func(r chi.Router) {
@@ -74,6 +77,7 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler {
// Presigned uploads (editor) // Presigned uploads (editor)
r.Group(func(r chi.Router) { r.Group(func(r chi.Router) {
r.Use(server.RequireWritable)
r.Use(server.RequireRole(auth.RoleEditor)) r.Use(server.RequireRole(auth.RoleEditor))
r.Post("/uploads/presign", server.HandlePresignUpload) r.Post("/uploads/presign", server.HandlePresignUpload)
}) })
@@ -85,6 +89,7 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler {
r.Get("/{name}/properties", server.HandleGetPropertySchema) r.Get("/{name}/properties", server.HandleGetPropertySchema)
r.Group(func(r chi.Router) { r.Group(func(r chi.Router) {
r.Use(server.RequireWritable)
r.Use(server.RequireRole(auth.RoleEditor)) r.Use(server.RequireRole(auth.RoleEditor))
r.Route("/{name}/segments/{segment}/values", func(r chi.Router) { r.Route("/{name}/segments/{segment}/values", func(r chi.Router) {
r.Post("/", server.HandleAddSchemaValue) r.Post("/", server.HandleAddSchemaValue)
@@ -102,6 +107,7 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler {
r.Get("/{code}/sheet.ods", server.HandleProjectSheetODS) r.Get("/{code}/sheet.ods", server.HandleProjectSheetODS)
r.Group(func(r chi.Router) { r.Group(func(r chi.Router) {
r.Use(server.RequireWritable)
r.Use(server.RequireRole(auth.RoleEditor)) r.Use(server.RequireRole(auth.RoleEditor))
r.Post("/", server.HandleCreateProject) r.Post("/", server.HandleCreateProject)
r.Put("/{code}", server.HandleUpdateProject) r.Put("/{code}", server.HandleUpdateProject)
@@ -119,6 +125,7 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler {
r.Get("/template.ods", server.HandleODSTemplate) r.Get("/template.ods", server.HandleODSTemplate)
r.Group(func(r chi.Router) { r.Group(func(r chi.Router) {
r.Use(server.RequireWritable)
r.Use(server.RequireRole(auth.RoleEditor)) r.Use(server.RequireRole(auth.RoleEditor))
r.Post("/", server.HandleCreateItem) r.Post("/", server.HandleCreateItem)
r.Post("/import", server.HandleImportCSV) r.Post("/import", server.HandleImportCSV)
@@ -143,6 +150,7 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler {
r.Get("/bom/export.ods", server.HandleExportBOMODS) r.Get("/bom/export.ods", server.HandleExportBOMODS)
r.Group(func(r chi.Router) { r.Group(func(r chi.Router) {
r.Use(server.RequireWritable)
r.Use(server.RequireRole(auth.RoleEditor)) r.Use(server.RequireRole(auth.RoleEditor))
r.Put("/", server.HandleUpdateItem) r.Put("/", server.HandleUpdateItem)
r.Delete("/", server.HandleDeleteItem) r.Delete("/", server.HandleDeleteItem)
@@ -175,6 +183,7 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler {
r.Get("/sync-log", server.HandleGetOdooSyncLog) r.Get("/sync-log", server.HandleGetOdooSyncLog)
r.Group(func(r chi.Router) { r.Group(func(r chi.Router) {
r.Use(server.RequireWritable)
r.Use(server.RequireRole(auth.RoleEditor)) r.Use(server.RequireRole(auth.RoleEditor))
r.Put("/config", server.HandleUpdateOdooConfig) r.Put("/config", server.HandleUpdateOdooConfig)
r.Post("/test-connection", server.HandleTestOdooConnection) r.Post("/test-connection", server.HandleTestOdooConnection)
@@ -185,12 +194,14 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler {
// Sheets (editor) // Sheets (editor)
r.Group(func(r chi.Router) { r.Group(func(r chi.Router) {
r.Use(server.RequireWritable)
r.Use(server.RequireRole(auth.RoleEditor)) r.Use(server.RequireRole(auth.RoleEditor))
r.Post("/sheets/diff", server.HandleSheetDiff) r.Post("/sheets/diff", server.HandleSheetDiff)
}) })
// Part number generation (editor) // Part number generation (editor)
r.Group(func(r chi.Router) { r.Group(func(r chi.Router) {
r.Use(server.RequireWritable)
r.Use(server.RequireRole(auth.RoleEditor)) r.Use(server.RequireRole(auth.RoleEditor))
r.Post("/generate-part-number", server.HandleGeneratePartNumber) r.Post("/generate-part-number", server.HandleGeneratePartNumber)
}) })

136
internal/api/servermode.go Normal file
View File

@@ -0,0 +1,136 @@
package api
import (
"context"
"sync"
"time"
"github.com/kindredsystems/silo/internal/storage"
"github.com/rs/zerolog"
)
// ServerMode represents the operational mode of the server.
type ServerMode string
const (
ModeNormal ServerMode = "normal"
ModeReadOnly ServerMode = "read-only"
ModeDegraded ServerMode = "degraded"
)
const storageCheckInterval = 30 * time.Second
// ServerState tracks the server's current operational mode.
type ServerState struct {
logger zerolog.Logger
mu sync.RWMutex
readOnly bool
storageOK bool
storage *storage.Storage
broker *Broker
done chan struct{}
}
// NewServerState creates a new server state tracker.
func NewServerState(logger zerolog.Logger, store *storage.Storage, broker *Broker) *ServerState {
return &ServerState{
logger: logger.With().Str("component", "server-state").Logger(),
storageOK: store != nil, // assume healthy if configured
storage: store,
broker: broker,
done: make(chan struct{}),
}
}
// Mode returns the current effective server mode.
// Priority: explicit read-only > storage unhealthy (degraded) > normal.
func (ss *ServerState) Mode() ServerMode {
ss.mu.RLock()
defer ss.mu.RUnlock()
if ss.readOnly {
return ModeReadOnly
}
if ss.storage != nil && !ss.storageOK {
return ModeDegraded
}
return ModeNormal
}
// IsReadOnly returns true if the server should reject writes.
// Only explicit read-only mode blocks writes; degraded is informational.
func (ss *ServerState) IsReadOnly() bool {
ss.mu.RLock()
defer ss.mu.RUnlock()
return ss.readOnly
}
// SetReadOnly sets the explicit read-only flag and broadcasts a state change.
func (ss *ServerState) SetReadOnly(ro bool) {
ss.mu.Lock()
old := ss.mode()
ss.readOnly = ro
new := ss.mode()
ss.mu.Unlock()
if old != new {
ss.logger.Info().Str("mode", string(new)).Msg("server mode changed")
ss.broker.Publish("server.state", mustMarshal(map[string]string{"mode": string(new)}))
}
}
// ToggleReadOnly flips the read-only flag.
func (ss *ServerState) ToggleReadOnly() {
ss.mu.RLock()
current := ss.readOnly
ss.mu.RUnlock()
ss.SetReadOnly(!current)
}
// StartStorageHealthCheck launches a periodic check of MinIO reachability.
// Updates storageOK and broadcasts server.state on transitions.
func (ss *ServerState) StartStorageHealthCheck() {
if ss.storage == nil {
return
}
go func() {
ticker := time.NewTicker(storageCheckInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := ss.storage.Ping(ctx)
cancel()
ss.mu.Lock()
old := ss.mode()
ss.storageOK = err == nil
new := ss.mode()
ss.mu.Unlock()
if old != new {
ss.logger.Info().Str("mode", string(new)).Err(err).Msg("server mode changed")
ss.broker.Publish("server.state", mustMarshal(map[string]string{"mode": string(new)}))
}
case <-ss.done:
return
}
}
}()
}
// Shutdown stops the health check loop.
func (ss *ServerState) Shutdown() {
close(ss.done)
}
// mode returns the current mode. Must be called with mu held.
func (ss *ServerState) mode() ServerMode {
if ss.readOnly {
return ModeReadOnly
}
if ss.storage != nil && !ss.storageOK {
return ModeDegraded
}
return ModeNormal
}

View File

@@ -0,0 +1,118 @@
package api
import (
"testing"
"time"
"github.com/rs/zerolog"
)
func TestServerStateModeNormal(t *testing.T) {
b := NewBroker(zerolog.Nop())
ss := NewServerState(zerolog.Nop(), nil, b)
if ss.Mode() != ModeNormal {
t.Fatalf("expected normal, got %s", ss.Mode())
}
if ss.IsReadOnly() {
t.Fatal("expected not read-only")
}
}
func TestServerStateModeReadOnly(t *testing.T) {
b := NewBroker(zerolog.Nop())
ss := NewServerState(zerolog.Nop(), nil, b)
ss.SetReadOnly(true)
if ss.Mode() != ModeReadOnly {
t.Fatalf("expected read-only, got %s", ss.Mode())
}
if !ss.IsReadOnly() {
t.Fatal("expected read-only")
}
ss.SetReadOnly(false)
if ss.Mode() != ModeNormal {
t.Fatalf("expected normal after clearing read-only, got %s", ss.Mode())
}
}
func TestServerStateModeDegraded(t *testing.T) {
b := NewBroker(zerolog.Nop())
// Pass a non-nil storage placeholder to simulate configured storage.
// We manipulate storageOK directly to test degraded mode.
ss := NewServerState(zerolog.Nop(), nil, b)
// Simulate storage configured but unhealthy by setting fields directly.
ss.mu.Lock()
// We need a non-nil storage pointer to trigger degraded mode check.
// Since we can't easily create a fake storage, we test the mode() logic
// by checking that without storage, mode stays normal.
ss.mu.Unlock()
// Without storage configured, mode should be normal even if storageOK is false
ss.mu.Lock()
ss.storageOK = false
ss.mu.Unlock()
if ss.Mode() != ModeNormal {
t.Fatalf("expected normal (no storage configured), got %s", ss.Mode())
}
}
func TestServerStateToggleReadOnly(t *testing.T) {
b := NewBroker(zerolog.Nop())
ss := NewServerState(zerolog.Nop(), nil, b)
ss.ToggleReadOnly()
if !ss.IsReadOnly() {
t.Fatal("expected read-only after toggle")
}
ss.ToggleReadOnly()
if ss.IsReadOnly() {
t.Fatal("expected not read-only after second toggle")
}
}
func TestServerStateBroadcastsOnTransition(t *testing.T) {
b := NewBroker(zerolog.Nop())
c := b.Subscribe()
defer b.Unsubscribe(c)
ss := NewServerState(zerolog.Nop(), nil, b)
ss.SetReadOnly(true)
select {
case ev := <-c.ch:
if ev.Type != "server.state" {
t.Fatalf("expected server.state event, got %s", ev.Type)
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for server.state event")
}
// Setting to same value should not broadcast
ss.SetReadOnly(true)
select {
case ev := <-c.ch:
t.Fatalf("unexpected event on no-op SetReadOnly: %+v", ev)
case <-time.After(50 * time.Millisecond):
// expected — no event
}
}
func TestServerStateReadOnlyOverridesDegraded(t *testing.T) {
b := NewBroker(zerolog.Nop())
ss := NewServerState(zerolog.Nop(), nil, b)
// Both read-only and storage down: should show read-only
ss.mu.Lock()
ss.readOnly = true
ss.storageOK = false
ss.mu.Unlock()
if ss.Mode() != ModeReadOnly {
t.Fatalf("expected read-only to override degraded, got %s", ss.Mode())
}
}

View File

@@ -0,0 +1,61 @@
package api
import (
"fmt"
"net/http"
"strconv"
"time"
)
// HandleEvents serves the SSE event stream.
// GET /api/events (requires auth, viewer+)
func (s *Server) HandleEvents(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
writeError(w, http.StatusInternalServerError, "streaming_unsupported", "Streaming not supported")
return
}
// Disable the write deadline for this long-lived connection.
// The server's WriteTimeout (15s) would otherwise kill it.
rc := http.NewResponseController(w)
if err := rc.SetWriteDeadline(time.Time{}); err != nil {
s.logger.Warn().Err(err).Msg("failed to disable write deadline for SSE")
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no") // nginx: disable proxy buffering
client := s.broker.Subscribe()
defer s.broker.Unsubscribe(client)
// Replay missed events if Last-Event-ID is present.
if lastIDStr := r.Header.Get("Last-Event-ID"); lastIDStr != "" {
if lastID, err := strconv.ParseUint(lastIDStr, 10, 64); err == nil {
for _, ev := range s.broker.EventsSince(lastID) {
fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", ev.ID, ev.Type, ev.Data)
}
flusher.Flush()
}
}
// Send initial server.state event.
fmt.Fprintf(w, "event: server.state\ndata: %s\n\n",
mustMarshal(map[string]string{"mode": string(s.serverState.Mode())}))
flusher.Flush()
// Stream loop.
for {
select {
case ev := <-client.ch:
fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", ev.ID, ev.Type, ev.Data)
flusher.Flush()
case <-client.closed:
return
case <-r.Context().Done():
return
}
}
}

View File

@@ -72,9 +72,10 @@ type CORSConfig struct {
// ServerConfig holds HTTP server settings. // ServerConfig holds HTTP server settings.
type ServerConfig struct { type ServerConfig struct {
Host string `yaml:"host"` Host string `yaml:"host"`
Port int `yaml:"port"` Port int `yaml:"port"`
BaseURL string `yaml:"base_url"` BaseURL string `yaml:"base_url"`
ReadOnly bool `yaml:"read_only"`
} }
// DatabaseConfig holds PostgreSQL connection settings. // DatabaseConfig holds PostgreSQL connection settings.

View File

@@ -112,6 +112,12 @@ func (s *Storage) Delete(ctx context.Context, key string) error {
return nil return nil
} }
// Ping checks if the storage backend is reachable by verifying the bucket exists.
func (s *Storage) Ping(ctx context.Context) error {
_, err := s.client.BucketExists(ctx, s.bucket)
return err
}
// Bucket returns the bucket name. // Bucket returns the bucket name.
func (s *Storage) Bucket() string { func (s *Storage) Bucket() string {
return s.bucket return s.bucket

View File

@@ -1 +0,0 @@
This document has moved to [docs/SPECIFICATION.md](docs/SPECIFICATION.md).