diff --git a/cmd/silod/main.go b/cmd/silod/main.go index 2eb20e9..5aee49a 100644 --- a/cmd/silod/main.go +++ b/cmd/silod/main.go @@ -178,9 +178,19 @@ func main() { } } + // Create SSE broker and server state + broker := api.NewBroker(logger) + serverState := api.NewServerState(logger, store, broker) + if cfg.Server.ReadOnly { + serverState.SetReadOnly(true) + logger.Warn().Msg("server started in read-only mode") + } + broker.StartHeartbeat() + serverState.StartStorageHealthCheck() + // Create API server server := api.NewServer(logger, database, schemas, cfg.Schemas.Directory, store, - authService, sessionManager, oidcBackend, &cfg.Auth) + authService, sessionManager, oidcBackend, &cfg.Auth, broker, serverState) router := api.NewRouter(server, logger) // Create HTTP server @@ -201,6 +211,16 @@ func main() { } }() + // SIGUSR1: toggle read-only mode + usr1 := make(chan os.Signal, 1) + signal.Notify(usr1, syscall.SIGUSR1) + go func() { + for range usr1 { + serverState.ToggleReadOnly() + logger.Info().Str("mode", string(serverState.Mode())).Msg("read-only mode toggled via SIGUSR1") + } + }() + // Wait for interrupt signal quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) @@ -208,7 +228,10 @@ func main() { logger.Info().Msg("shutting down server") - // Graceful shutdown with timeout + // Graceful shutdown: close SSE connections first, then HTTP server + broker.Shutdown() + serverState.Shutdown() + shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() diff --git a/internal/api/bom_handlers_test.go b/internal/api/bom_handlers_test.go index c31ee10..0e94b3e 100644 --- a/internal/api/bom_handlers_test.go +++ b/internal/api/bom_handlers_test.go @@ -21,6 +21,8 @@ func newTestServer(t *testing.T) *Server { t.Helper() pool := testutil.MustConnectTestPool(t) database := db.NewFromPool(pool) + broker := NewBroker(zerolog.Nop()) + state := NewServerState(zerolog.Nop(), nil, broker) return NewServer( zerolog.Nop(), database, @@ -31,6 +33,8 @@ func newTestServer(t *testing.T) *Server { nil, // sessionManager nil, // oidcBackend nil, // authConfig (nil = dev mode) + broker, + state, ) } diff --git a/internal/api/broker.go b/internal/api/broker.go new file mode 100644 index 0000000..bf084aa --- /dev/null +++ b/internal/api/broker.go @@ -0,0 +1,168 @@ +package api + +import ( + "encoding/json" + "sync" + "sync/atomic" + "time" + + "github.com/rs/zerolog" +) + +// Event represents a server-sent event. +type Event struct { + ID uint64 + Type string + Data string +} + +// sseClient represents a single connected SSE consumer. +type sseClient struct { + ch chan Event + closed chan struct{} +} + +const ( + clientChanSize = 64 + historySize = 256 + heartbeatInterval = 30 * time.Second +) + +// Broker manages SSE client connections and event fan-out. +type Broker struct { + logger zerolog.Logger + mu sync.RWMutex + clients map[*sseClient]struct{} + eventID atomic.Uint64 + + historyMu sync.RWMutex + history []Event + + done chan struct{} +} + +// NewBroker creates a new SSE broker. +func NewBroker(logger zerolog.Logger) *Broker { + return &Broker{ + logger: logger.With().Str("component", "sse-broker").Logger(), + clients: make(map[*sseClient]struct{}), + history: make([]Event, 0, historySize), + done: make(chan struct{}), + } +} + +// Subscribe adds a new client and returns it. The caller must call Unsubscribe when done. +func (b *Broker) Subscribe() *sseClient { + c := &sseClient{ + ch: make(chan Event, clientChanSize), + closed: make(chan struct{}), + } + b.mu.Lock() + b.clients[c] = struct{}{} + count := len(b.clients) + b.mu.Unlock() + b.logger.Info().Int("clients", count).Msg("client connected") + return c +} + +// Unsubscribe removes a client and closes its channel. +func (b *Broker) Unsubscribe(c *sseClient) { + b.mu.Lock() + if _, ok := b.clients[c]; ok { + delete(b.clients, c) + close(c.closed) + } + count := len(b.clients) + b.mu.Unlock() + b.logger.Info().Int("clients", count).Msg("client disconnected") +} + +// Publish sends an event to all connected clients. Non-blocking per client: +// if a client's channel is full, the event is dropped for that client. +func (b *Broker) Publish(eventType string, data string) { + ev := Event{ + ID: b.eventID.Add(1), + Type: eventType, + Data: data, + } + + // Append to history ring buffer. + b.historyMu.Lock() + if len(b.history) >= historySize { + b.history = b.history[1:] + } + b.history = append(b.history, ev) + b.historyMu.Unlock() + + // Fan out to all clients. + b.mu.RLock() + for c := range b.clients { + select { + case c.ch <- ev: + default: + b.logger.Warn().Uint64("event_id", ev.ID).Str("type", eventType).Msg("dropped event for slow client") + } + } + b.mu.RUnlock() +} + +// ClientCount returns the number of connected SSE clients. +func (b *Broker) ClientCount() int { + b.mu.RLock() + defer b.mu.RUnlock() + return len(b.clients) +} + +// EventsSince returns events with IDs greater than lastID, for Last-Event-ID replay. +func (b *Broker) EventsSince(lastID uint64) []Event { + b.historyMu.RLock() + defer b.historyMu.RUnlock() + + var result []Event + for _, ev := range b.history { + if ev.ID > lastID { + result = append(result, ev) + } + } + return result +} + +// StartHeartbeat launches a goroutine that publishes a heartbeat every 30s. +func (b *Broker) StartHeartbeat() { + go func() { + ticker := time.NewTicker(heartbeatInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + b.Publish("heartbeat", "{}") + case <-b.done: + return + } + } + }() +} + +// Shutdown closes all client connections and stops the heartbeat. +func (b *Broker) Shutdown() { + close(b.done) + + b.mu.Lock() + for c := range b.clients { + delete(b.clients, c) + close(c.closed) + } + b.mu.Unlock() + + b.logger.Info().Msg("broker shut down") +} + +// mustMarshal serializes v to JSON. Panics on error (should only be used with +// known-good types like structs and maps). +func mustMarshal(v any) string { + data, err := json.Marshal(v) + if err != nil { + panic("api: failed to marshal SSE event data: " + err.Error()) + } + return string(data) +} diff --git a/internal/api/broker_test.go b/internal/api/broker_test.go new file mode 100644 index 0000000..36874ce --- /dev/null +++ b/internal/api/broker_test.go @@ -0,0 +1,147 @@ +package api + +import ( + "testing" + "time" + + "github.com/rs/zerolog" +) + +func TestBrokerSubscribeUnsubscribe(t *testing.T) { + b := NewBroker(zerolog.Nop()) + + c := b.Subscribe() + if b.ClientCount() != 1 { + t.Fatalf("expected 1 client, got %d", b.ClientCount()) + } + + b.Unsubscribe(c) + if b.ClientCount() != 0 { + t.Fatalf("expected 0 clients, got %d", b.ClientCount()) + } +} + +func TestBrokerPublish(t *testing.T) { + b := NewBroker(zerolog.Nop()) + c := b.Subscribe() + defer b.Unsubscribe(c) + + b.Publish("item.created", `{"part_number":"F01-0001"}`) + + select { + case ev := <-c.ch: + if ev.Type != "item.created" { + t.Fatalf("expected type item.created, got %s", ev.Type) + } + if ev.ID != 1 { + t.Fatalf("expected ID 1, got %d", ev.ID) + } + if ev.Data != `{"part_number":"F01-0001"}` { + t.Fatalf("unexpected data: %s", ev.Data) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for event") + } +} + +func TestBrokerPublishDropsSlow(t *testing.T) { + b := NewBroker(zerolog.Nop()) + c := b.Subscribe() + defer b.Unsubscribe(c) + + // Fill the client's channel + for i := 0; i < clientChanSize+10; i++ { + b.Publish("heartbeat", "{}") + } + + // Should have clientChanSize events buffered, rest dropped + count := len(c.ch) + if count != clientChanSize { + t.Fatalf("expected %d buffered events, got %d", clientChanSize, count) + } +} + +func TestBrokerEventsSince(t *testing.T) { + b := NewBroker(zerolog.Nop()) + + b.Publish("item.created", `{"pn":"A"}`) + b.Publish("item.updated", `{"pn":"B"}`) + b.Publish("item.deleted", `{"pn":"C"}`) + + events := b.EventsSince(1) // after ID 1 + if len(events) != 2 { + t.Fatalf("expected 2 events, got %d", len(events)) + } + if events[0].Type != "item.updated" { + t.Fatalf("expected item.updated, got %s", events[0].Type) + } + if events[1].Type != "item.deleted" { + t.Fatalf("expected item.deleted, got %s", events[1].Type) + } + + // No events after the latest + events = b.EventsSince(3) + if len(events) != 0 { + t.Fatalf("expected 0 events, got %d", len(events)) + } +} + +func TestBrokerClientCount(t *testing.T) { + b := NewBroker(zerolog.Nop()) + + c1 := b.Subscribe() + c2 := b.Subscribe() + c3 := b.Subscribe() + + if b.ClientCount() != 3 { + t.Fatalf("expected 3 clients, got %d", b.ClientCount()) + } + + b.Unsubscribe(c2) + if b.ClientCount() != 2 { + t.Fatalf("expected 2 clients, got %d", b.ClientCount()) + } + + b.Unsubscribe(c1) + b.Unsubscribe(c3) + if b.ClientCount() != 0 { + t.Fatalf("expected 0 clients, got %d", b.ClientCount()) + } +} + +func TestBrokerShutdown(t *testing.T) { + b := NewBroker(zerolog.Nop()) + c := b.Subscribe() + + b.Shutdown() + + // Client's closed channel should be closed + select { + case <-c.closed: + // expected + case <-time.After(time.Second): + t.Fatal("client closed channel not closed after shutdown") + } + + if b.ClientCount() != 0 { + t.Fatalf("expected 0 clients after shutdown, got %d", b.ClientCount()) + } +} + +func TestBrokerMonotonicIDs(t *testing.T) { + b := NewBroker(zerolog.Nop()) + + b.Publish("a", "{}") + b.Publish("b", "{}") + b.Publish("c", "{}") + + events := b.EventsSince(0) + if len(events) != 3 { + t.Fatalf("expected 3 events, got %d", len(events)) + } + for i := 1; i < len(events); i++ { + if events[i].ID <= events[i-1].ID { + t.Fatalf("event IDs not monotonic: %d <= %d", events[i].ID, events[i-1].ID) + } + } +} diff --git a/internal/api/csv.go b/internal/api/csv.go index 30b48c9..5766ef6 100644 --- a/internal/api/csv.go +++ b/internal/api/csv.go @@ -427,6 +427,13 @@ func (s *Server) HandleImportCSV(w http.ResponseWriter, r *http.Request) { Msg("CSV import completed") writeJSON(w, http.StatusOK, result) + if !dryRun && result.SuccessCount > 0 { + s.broker.Publish("item.created", mustMarshal(map[string]any{ + "bulk": true, + "count": result.SuccessCount, + "items": result.CreatedItems, + })) + } } // HandleCSVTemplate returns an empty CSV template with headers. diff --git a/internal/api/handlers.go b/internal/api/handlers.go index 83bec81..aaef469 100644 --- a/internal/api/handlers.go +++ b/internal/api/handlers.go @@ -40,6 +40,8 @@ type Server struct { oidc *auth.OIDCBackend authConfig *config.AuthConfig itemFiles *db.ItemFileRepository + broker *Broker + serverState *ServerState } // NewServer creates a new API server. @@ -53,6 +55,8 @@ func NewServer( sessionManager *scs.SessionManager, oidcBackend *auth.OIDCBackend, authCfg *config.AuthConfig, + broker *Broker, + state *ServerState, ) *Server { items := db.NewItemRepository(database) projects := db.NewProjectRepository(database) @@ -76,6 +80,8 @@ func NewServer( oidc: oidcBackend, authConfig: authCfg, itemFiles: itemFiles, + broker: broker, + serverState: state, } } @@ -112,7 +118,10 @@ func writeError(w http.ResponseWriter, status int, err string, message string) { // HandleHealth returns basic health status. func (s *Server) HandleHealth(w http.ResponseWriter, r *http.Request) { - writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) + writeJSON(w, http.StatusOK, map[string]string{ + "status": "ok", + "mode": string(s.serverState.Mode()), + }) } // HandleReady checks database and storage connectivity. @@ -125,11 +134,21 @@ func (s *Server) HandleReady(w http.ResponseWriter, r *http.Request) { return } - // Storage check would go here if we had a ping method - writeJSON(w, http.StatusOK, map[string]string{ - "status": "ready", - "database": "ok", - "storage": "ok", + storageStatus := "ok" + if s.storage != nil { + if err := s.storage.Ping(ctx); err != nil { + storageStatus = "unavailable" + } + } else { + storageStatus = "not_configured" + } + + writeJSON(w, http.StatusOK, map[string]any{ + "status": "ready", + "database": "ok", + "storage": storageStatus, + "mode": string(s.serverState.Mode()), + "sse_clients": s.broker.ClientCount(), }) } @@ -441,7 +460,9 @@ func (s *Server) HandleCreateItem(w http.ResponseWriter, r *http.Request) { } } - writeJSON(w, http.StatusCreated, itemToResponse(item)) + resp := itemToResponse(item) + writeJSON(w, http.StatusCreated, resp) + s.broker.Publish("item.created", mustMarshal(resp)) } // HandleGetItem retrieves an item by part number. @@ -565,7 +586,9 @@ func (s *Server) HandleUpdateItem(w http.ResponseWriter, r *http.Request) { // Get updated item (use new part number if changed) item, _ = s.items.GetByPartNumber(ctx, fields.PartNumber) - writeJSON(w, http.StatusOK, itemToResponse(item)) + resp := itemToResponse(item) + writeJSON(w, http.StatusOK, resp) + s.broker.Publish("item.updated", mustMarshal(resp)) } // HandleDeleteItem permanently deletes an item. @@ -601,6 +624,7 @@ func (s *Server) HandleDeleteItem(w http.ResponseWriter, r *http.Request) { } w.WriteHeader(http.StatusNoContent) + s.broker.Publish("item.deleted", mustMarshal(map[string]string{"part_number": partNumber})) } // Revision handlers @@ -877,6 +901,11 @@ func (s *Server) HandleRollbackRevision(w http.ResponseWriter, r *http.Request) Msg("rollback revision created") writeJSON(w, http.StatusCreated, revisionToResponse(newRev)) + s.broker.Publish("revision.created", mustMarshal(map[string]any{ + "part_number": partNumber, + "revision_number": newRev.RevisionNumber, + "rollback_from": revNum, + })) } // Part number generation @@ -1199,6 +1228,10 @@ func (s *Server) HandleCreateRevision(w http.ResponseWriter, r *http.Request) { } writeJSON(w, http.StatusCreated, revisionToResponse(rev)) + s.broker.Publish("revision.created", mustMarshal(map[string]any{ + "part_number": partNumber, + "revision_number": rev.RevisionNumber, + })) } // HandleUploadFile uploads a file and creates a new revision. diff --git a/internal/api/middleware.go b/internal/api/middleware.go index f8e0fc4..50dc2e4 100644 --- a/internal/api/middleware.go +++ b/internal/api/middleware.go @@ -136,6 +136,18 @@ func (s *Server) RequireRole(minimum string) func(http.Handler) http.Handler { } } +// RequireWritable rejects requests with 503 when the server is in read-only mode. +func (s *Server) RequireWritable(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if s.serverState.IsReadOnly() { + writeError(w, http.StatusServiceUnavailable, "server_read_only", + "Server is in read-only mode") + return + } + next.ServeHTTP(w, r) + }) +} + func extractBearerToken(r *http.Request) string { h := r.Header.Get("Authorization") if strings.HasPrefix(h, "Bearer ") { diff --git a/internal/api/ods.go b/internal/api/ods.go index b02c72b..c590bf1 100644 --- a/internal/api/ods.go +++ b/internal/api/ods.go @@ -468,6 +468,13 @@ func (s *Server) HandleImportODS(w http.ResponseWriter, r *http.Request) { Msg("ODS import completed") writeJSON(w, http.StatusOK, result) + if !dryRun && result.SuccessCount > 0 { + s.broker.Publish("item.created", mustMarshal(map[string]any{ + "bulk": true, + "count": result.SuccessCount, + "items": result.CreatedItems, + })) + } } // HandleExportBOMODS exports the expanded BOM as a formatted ODS file. diff --git a/internal/api/routes.go b/internal/api/routes.go index bac65c1..32d760a 100644 --- a/internal/api/routes.go +++ b/internal/api/routes.go @@ -64,6 +64,9 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler { r.Route("/api", func(r chi.Router) { r.Use(server.RequireAuth) + // SSE event stream (viewer+) + r.Get("/events", server.HandleEvents) + // Auth endpoints r.Get("/auth/me", server.HandleGetCurrentUser) r.Route("/auth/tokens", func(r chi.Router) { @@ -74,6 +77,7 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler { // Presigned uploads (editor) r.Group(func(r chi.Router) { + r.Use(server.RequireWritable) r.Use(server.RequireRole(auth.RoleEditor)) r.Post("/uploads/presign", server.HandlePresignUpload) }) @@ -85,6 +89,7 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler { r.Get("/{name}/properties", server.HandleGetPropertySchema) r.Group(func(r chi.Router) { + r.Use(server.RequireWritable) r.Use(server.RequireRole(auth.RoleEditor)) r.Route("/{name}/segments/{segment}/values", func(r chi.Router) { r.Post("/", server.HandleAddSchemaValue) @@ -102,6 +107,7 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler { r.Get("/{code}/sheet.ods", server.HandleProjectSheetODS) r.Group(func(r chi.Router) { + r.Use(server.RequireWritable) r.Use(server.RequireRole(auth.RoleEditor)) r.Post("/", server.HandleCreateProject) r.Put("/{code}", server.HandleUpdateProject) @@ -119,6 +125,7 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler { r.Get("/template.ods", server.HandleODSTemplate) r.Group(func(r chi.Router) { + r.Use(server.RequireWritable) r.Use(server.RequireRole(auth.RoleEditor)) r.Post("/", server.HandleCreateItem) r.Post("/import", server.HandleImportCSV) @@ -143,6 +150,7 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler { r.Get("/bom/export.ods", server.HandleExportBOMODS) r.Group(func(r chi.Router) { + r.Use(server.RequireWritable) r.Use(server.RequireRole(auth.RoleEditor)) r.Put("/", server.HandleUpdateItem) r.Delete("/", server.HandleDeleteItem) @@ -175,6 +183,7 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler { r.Get("/sync-log", server.HandleGetOdooSyncLog) r.Group(func(r chi.Router) { + r.Use(server.RequireWritable) r.Use(server.RequireRole(auth.RoleEditor)) r.Put("/config", server.HandleUpdateOdooConfig) r.Post("/test-connection", server.HandleTestOdooConnection) @@ -185,12 +194,14 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler { // Sheets (editor) r.Group(func(r chi.Router) { + r.Use(server.RequireWritable) r.Use(server.RequireRole(auth.RoleEditor)) r.Post("/sheets/diff", server.HandleSheetDiff) }) // Part number generation (editor) r.Group(func(r chi.Router) { + r.Use(server.RequireWritable) r.Use(server.RequireRole(auth.RoleEditor)) r.Post("/generate-part-number", server.HandleGeneratePartNumber) }) diff --git a/internal/api/servermode.go b/internal/api/servermode.go new file mode 100644 index 0000000..0baa725 --- /dev/null +++ b/internal/api/servermode.go @@ -0,0 +1,136 @@ +package api + +import ( + "context" + "sync" + "time" + + "github.com/kindredsystems/silo/internal/storage" + "github.com/rs/zerolog" +) + +// ServerMode represents the operational mode of the server. +type ServerMode string + +const ( + ModeNormal ServerMode = "normal" + ModeReadOnly ServerMode = "read-only" + ModeDegraded ServerMode = "degraded" +) + +const storageCheckInterval = 30 * time.Second + +// ServerState tracks the server's current operational mode. +type ServerState struct { + logger zerolog.Logger + mu sync.RWMutex + readOnly bool + storageOK bool + storage *storage.Storage + broker *Broker + done chan struct{} +} + +// NewServerState creates a new server state tracker. +func NewServerState(logger zerolog.Logger, store *storage.Storage, broker *Broker) *ServerState { + return &ServerState{ + logger: logger.With().Str("component", "server-state").Logger(), + storageOK: store != nil, // assume healthy if configured + storage: store, + broker: broker, + done: make(chan struct{}), + } +} + +// Mode returns the current effective server mode. +// Priority: explicit read-only > storage unhealthy (degraded) > normal. +func (ss *ServerState) Mode() ServerMode { + ss.mu.RLock() + defer ss.mu.RUnlock() + if ss.readOnly { + return ModeReadOnly + } + if ss.storage != nil && !ss.storageOK { + return ModeDegraded + } + return ModeNormal +} + +// IsReadOnly returns true if the server should reject writes. +// Only explicit read-only mode blocks writes; degraded is informational. +func (ss *ServerState) IsReadOnly() bool { + ss.mu.RLock() + defer ss.mu.RUnlock() + return ss.readOnly +} + +// SetReadOnly sets the explicit read-only flag and broadcasts a state change. +func (ss *ServerState) SetReadOnly(ro bool) { + ss.mu.Lock() + old := ss.mode() + ss.readOnly = ro + new := ss.mode() + ss.mu.Unlock() + + if old != new { + ss.logger.Info().Str("mode", string(new)).Msg("server mode changed") + ss.broker.Publish("server.state", mustMarshal(map[string]string{"mode": string(new)})) + } +} + +// ToggleReadOnly flips the read-only flag. +func (ss *ServerState) ToggleReadOnly() { + ss.mu.RLock() + current := ss.readOnly + ss.mu.RUnlock() + ss.SetReadOnly(!current) +} + +// StartStorageHealthCheck launches a periodic check of MinIO reachability. +// Updates storageOK and broadcasts server.state on transitions. +func (ss *ServerState) StartStorageHealthCheck() { + if ss.storage == nil { + return + } + go func() { + ticker := time.NewTicker(storageCheckInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + err := ss.storage.Ping(ctx) + cancel() + + ss.mu.Lock() + old := ss.mode() + ss.storageOK = err == nil + new := ss.mode() + ss.mu.Unlock() + + if old != new { + ss.logger.Info().Str("mode", string(new)).Err(err).Msg("server mode changed") + ss.broker.Publish("server.state", mustMarshal(map[string]string{"mode": string(new)})) + } + case <-ss.done: + return + } + } + }() +} + +// Shutdown stops the health check loop. +func (ss *ServerState) Shutdown() { + close(ss.done) +} + +// mode returns the current mode. Must be called with mu held. +func (ss *ServerState) mode() ServerMode { + if ss.readOnly { + return ModeReadOnly + } + if ss.storage != nil && !ss.storageOK { + return ModeDegraded + } + return ModeNormal +} diff --git a/internal/api/servermode_test.go b/internal/api/servermode_test.go new file mode 100644 index 0000000..489bb39 --- /dev/null +++ b/internal/api/servermode_test.go @@ -0,0 +1,118 @@ +package api + +import ( + "testing" + "time" + + "github.com/rs/zerolog" +) + +func TestServerStateModeNormal(t *testing.T) { + b := NewBroker(zerolog.Nop()) + ss := NewServerState(zerolog.Nop(), nil, b) + + if ss.Mode() != ModeNormal { + t.Fatalf("expected normal, got %s", ss.Mode()) + } + if ss.IsReadOnly() { + t.Fatal("expected not read-only") + } +} + +func TestServerStateModeReadOnly(t *testing.T) { + b := NewBroker(zerolog.Nop()) + ss := NewServerState(zerolog.Nop(), nil, b) + + ss.SetReadOnly(true) + if ss.Mode() != ModeReadOnly { + t.Fatalf("expected read-only, got %s", ss.Mode()) + } + if !ss.IsReadOnly() { + t.Fatal("expected read-only") + } + + ss.SetReadOnly(false) + if ss.Mode() != ModeNormal { + t.Fatalf("expected normal after clearing read-only, got %s", ss.Mode()) + } +} + +func TestServerStateModeDegraded(t *testing.T) { + b := NewBroker(zerolog.Nop()) + // Pass a non-nil storage placeholder to simulate configured storage. + // We manipulate storageOK directly to test degraded mode. + ss := NewServerState(zerolog.Nop(), nil, b) + + // Simulate storage configured but unhealthy by setting fields directly. + ss.mu.Lock() + // We need a non-nil storage pointer to trigger degraded mode check. + // Since we can't easily create a fake storage, we test the mode() logic + // by checking that without storage, mode stays normal. + ss.mu.Unlock() + + // Without storage configured, mode should be normal even if storageOK is false + ss.mu.Lock() + ss.storageOK = false + ss.mu.Unlock() + if ss.Mode() != ModeNormal { + t.Fatalf("expected normal (no storage configured), got %s", ss.Mode()) + } +} + +func TestServerStateToggleReadOnly(t *testing.T) { + b := NewBroker(zerolog.Nop()) + ss := NewServerState(zerolog.Nop(), nil, b) + + ss.ToggleReadOnly() + if !ss.IsReadOnly() { + t.Fatal("expected read-only after toggle") + } + + ss.ToggleReadOnly() + if ss.IsReadOnly() { + t.Fatal("expected not read-only after second toggle") + } +} + +func TestServerStateBroadcastsOnTransition(t *testing.T) { + b := NewBroker(zerolog.Nop()) + c := b.Subscribe() + defer b.Unsubscribe(c) + + ss := NewServerState(zerolog.Nop(), nil, b) + + ss.SetReadOnly(true) + + select { + case ev := <-c.ch: + if ev.Type != "server.state" { + t.Fatalf("expected server.state event, got %s", ev.Type) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for server.state event") + } + + // Setting to same value should not broadcast + ss.SetReadOnly(true) + select { + case ev := <-c.ch: + t.Fatalf("unexpected event on no-op SetReadOnly: %+v", ev) + case <-time.After(50 * time.Millisecond): + // expected — no event + } +} + +func TestServerStateReadOnlyOverridesDegraded(t *testing.T) { + b := NewBroker(zerolog.Nop()) + ss := NewServerState(zerolog.Nop(), nil, b) + + // Both read-only and storage down: should show read-only + ss.mu.Lock() + ss.readOnly = true + ss.storageOK = false + ss.mu.Unlock() + + if ss.Mode() != ModeReadOnly { + t.Fatalf("expected read-only to override degraded, got %s", ss.Mode()) + } +} diff --git a/internal/api/sse_handler.go b/internal/api/sse_handler.go new file mode 100644 index 0000000..64da978 --- /dev/null +++ b/internal/api/sse_handler.go @@ -0,0 +1,61 @@ +package api + +import ( + "fmt" + "net/http" + "strconv" + "time" +) + +// HandleEvents serves the SSE event stream. +// GET /api/events (requires auth, viewer+) +func (s *Server) HandleEvents(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + if !ok { + writeError(w, http.StatusInternalServerError, "streaming_unsupported", "Streaming not supported") + return + } + + // Disable the write deadline for this long-lived connection. + // The server's WriteTimeout (15s) would otherwise kill it. + rc := http.NewResponseController(w) + if err := rc.SetWriteDeadline(time.Time{}); err != nil { + s.logger.Warn().Err(err).Msg("failed to disable write deadline for SSE") + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") // nginx: disable proxy buffering + + client := s.broker.Subscribe() + defer s.broker.Unsubscribe(client) + + // Replay missed events if Last-Event-ID is present. + if lastIDStr := r.Header.Get("Last-Event-ID"); lastIDStr != "" { + if lastID, err := strconv.ParseUint(lastIDStr, 10, 64); err == nil { + for _, ev := range s.broker.EventsSince(lastID) { + fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", ev.ID, ev.Type, ev.Data) + } + flusher.Flush() + } + } + + // Send initial server.state event. + fmt.Fprintf(w, "event: server.state\ndata: %s\n\n", + mustMarshal(map[string]string{"mode": string(s.serverState.Mode())})) + flusher.Flush() + + // Stream loop. + for { + select { + case ev := <-client.ch: + fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", ev.ID, ev.Type, ev.Data) + flusher.Flush() + case <-client.closed: + return + case <-r.Context().Done(): + return + } + } +} diff --git a/internal/config/config.go b/internal/config/config.go index 4b38ec3..d8399f6 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -72,9 +72,10 @@ type CORSConfig struct { // ServerConfig holds HTTP server settings. type ServerConfig struct { - Host string `yaml:"host"` - Port int `yaml:"port"` - BaseURL string `yaml:"base_url"` + Host string `yaml:"host"` + Port int `yaml:"port"` + BaseURL string `yaml:"base_url"` + ReadOnly bool `yaml:"read_only"` } // DatabaseConfig holds PostgreSQL connection settings. diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 81d9a4c..81e3caf 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -112,6 +112,12 @@ func (s *Storage) Delete(ctx context.Context, key string) error { return nil } +// Ping checks if the storage backend is reachable by verifying the bucket exists. +func (s *Storage) Ping(ctx context.Context) error { + _, err := s.client.BucketExists(ctx, s.bucket) + return err +} + // Bucket returns the bucket name. func (s *Storage) Bucket() string { return s.bucket diff --git a/silo-spec.md b/silo-spec.md deleted file mode 100644 index bae337f..0000000 --- a/silo-spec.md +++ /dev/null @@ -1 +0,0 @@ -This document has moved to [docs/SPECIFICATION.md](docs/SPECIFICATION.md).