Add modules.Registry and config.Config fields to Server struct. Create registry in main.go, load state from YAML+DB, log all module states at startup. Conditionally start job/runner sweeper goroutines only when the jobs module is enabled. Update all 5 test files to pass registry to NewServer. Ref #95, #96
319 lines
10 KiB
Go
319 lines
10 KiB
Go
// Command silod is the Silo HTTP API server.
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/alexedwards/scs/pgxstore"
|
|
"github.com/alexedwards/scs/v2"
|
|
|
|
"github.com/kindredsystems/silo/internal/api"
|
|
"github.com/kindredsystems/silo/internal/auth"
|
|
"github.com/kindredsystems/silo/internal/config"
|
|
"github.com/kindredsystems/silo/internal/db"
|
|
"github.com/kindredsystems/silo/internal/jobdef"
|
|
"github.com/kindredsystems/silo/internal/modules"
|
|
"github.com/kindredsystems/silo/internal/schema"
|
|
"github.com/kindredsystems/silo/internal/storage"
|
|
"github.com/rs/zerolog"
|
|
)
|
|
|
|
func main() {
|
|
// Parse flags
|
|
configPath := flag.String("config", "config.yaml", "Path to configuration file")
|
|
flag.Parse()
|
|
|
|
// Setup logger
|
|
logger := zerolog.New(os.Stdout).With().Timestamp().Logger()
|
|
|
|
// Load configuration
|
|
cfg, err := config.Load(*configPath)
|
|
if err != nil {
|
|
logger.Fatal().Err(err).Msg("failed to load configuration")
|
|
}
|
|
|
|
logger.Info().
|
|
Str("host", cfg.Server.Host).
|
|
Int("port", cfg.Server.Port).
|
|
Str("database", cfg.Database.Host).
|
|
Str("storage", cfg.Storage.Endpoint).
|
|
Msg("starting silo server")
|
|
|
|
// Connect to database
|
|
ctx := context.Background()
|
|
database, err := db.Connect(ctx, db.Config{
|
|
Host: cfg.Database.Host,
|
|
Port: cfg.Database.Port,
|
|
Name: cfg.Database.Name,
|
|
User: cfg.Database.User,
|
|
Password: cfg.Database.Password,
|
|
SSLMode: cfg.Database.SSLMode,
|
|
MaxConnections: cfg.Database.MaxConnections,
|
|
})
|
|
if err != nil {
|
|
logger.Fatal().Err(err).Msg("failed to connect to database")
|
|
}
|
|
defer database.Close()
|
|
logger.Info().Msg("connected to database")
|
|
|
|
// Connect to storage (optional - may be externally managed)
|
|
var store *storage.Storage
|
|
if cfg.Storage.Endpoint != "" {
|
|
store, err = storage.Connect(ctx, storage.Config{
|
|
Endpoint: cfg.Storage.Endpoint,
|
|
AccessKey: cfg.Storage.AccessKey,
|
|
SecretKey: cfg.Storage.SecretKey,
|
|
Bucket: cfg.Storage.Bucket,
|
|
UseSSL: cfg.Storage.UseSSL,
|
|
Region: cfg.Storage.Region,
|
|
})
|
|
if err != nil {
|
|
logger.Warn().Err(err).Msg("failed to connect to storage - file operations disabled")
|
|
store = nil
|
|
} else {
|
|
logger.Info().Msg("connected to storage")
|
|
}
|
|
} else {
|
|
logger.Info().Msg("storage not configured - file operations disabled")
|
|
}
|
|
|
|
// Load schemas
|
|
schemas, err := schema.LoadAll(cfg.Schemas.Directory)
|
|
if err != nil {
|
|
logger.Fatal().Err(err).Str("directory", cfg.Schemas.Directory).Msg("failed to load schemas")
|
|
}
|
|
logger.Info().Int("count", len(schemas)).Msg("loaded schemas")
|
|
|
|
// Initialize authentication
|
|
userRepo := db.NewUserRepository(database)
|
|
tokenRepo := db.NewTokenRepository(database)
|
|
|
|
// Session manager (PostgreSQL-backed via scs + pgxstore)
|
|
sessionManager := scs.New()
|
|
sessionManager.Store = pgxstore.New(database.Pool())
|
|
sessionManager.Lifetime = 24 * time.Hour
|
|
sessionManager.Cookie.Name = "silo_session"
|
|
sessionManager.Cookie.HttpOnly = true
|
|
sessionManager.Cookie.Secure = cfg.Auth.Enabled // Secure cookies when auth is active
|
|
sessionManager.Cookie.SameSite = http.SameSiteLaxMode
|
|
|
|
// Build auth backends from config
|
|
var backends []auth.Backend
|
|
if cfg.Auth.Local.Enabled {
|
|
backends = append(backends, auth.NewLocalBackend(userRepo))
|
|
logger.Info().Msg("auth backend: local")
|
|
}
|
|
if cfg.Auth.LDAP.Enabled {
|
|
backends = append(backends, auth.NewLDAPBackend(auth.LDAPConfig{
|
|
URL: cfg.Auth.LDAP.URL,
|
|
BaseDN: cfg.Auth.LDAP.BaseDN,
|
|
UserSearchDN: cfg.Auth.LDAP.UserSearchDN,
|
|
BindDN: cfg.Auth.LDAP.BindDN,
|
|
BindPassword: cfg.Auth.LDAP.BindPassword,
|
|
UserAttr: cfg.Auth.LDAP.UserAttr,
|
|
EmailAttr: cfg.Auth.LDAP.EmailAttr,
|
|
DisplayAttr: cfg.Auth.LDAP.DisplayAttr,
|
|
GroupAttr: cfg.Auth.LDAP.GroupAttr,
|
|
RoleMapping: cfg.Auth.LDAP.RoleMapping,
|
|
TLSSkipVerify: cfg.Auth.LDAP.TLSSkipVerify,
|
|
}))
|
|
logger.Info().Str("url", cfg.Auth.LDAP.URL).Msg("auth backend: ldap")
|
|
}
|
|
|
|
authService := auth.NewService(logger, userRepo, tokenRepo, backends...)
|
|
|
|
// OIDC backend (separate from the Backend interface since it uses redirect flow)
|
|
var oidcBackend *auth.OIDCBackend
|
|
if cfg.Auth.OIDC.Enabled {
|
|
oidcBackend, err = auth.NewOIDCBackend(ctx, auth.OIDCConfig{
|
|
IssuerURL: cfg.Auth.OIDC.IssuerURL,
|
|
ClientID: cfg.Auth.OIDC.ClientID,
|
|
ClientSecret: cfg.Auth.OIDC.ClientSecret,
|
|
RedirectURL: cfg.Auth.OIDC.RedirectURL,
|
|
Scopes: cfg.Auth.OIDC.Scopes,
|
|
AdminRole: cfg.Auth.OIDC.AdminRole,
|
|
EditorRole: cfg.Auth.OIDC.EditorRole,
|
|
DefaultRole: cfg.Auth.OIDC.DefaultRole,
|
|
})
|
|
if err != nil {
|
|
logger.Fatal().Err(err).Msg("failed to initialize OIDC backend")
|
|
}
|
|
logger.Info().Str("issuer", cfg.Auth.OIDC.IssuerURL).Msg("auth backend: oidc")
|
|
}
|
|
|
|
if cfg.Auth.Enabled {
|
|
logger.Info().Msg("authentication enabled")
|
|
} else {
|
|
logger.Warn().Msg("authentication disabled - all routes are open")
|
|
}
|
|
|
|
// Seed default admin account (idempotent — skips if user already exists)
|
|
if u := cfg.Auth.Local.DefaultAdminUsername; u != "" {
|
|
if p := cfg.Auth.Local.DefaultAdminPassword; p != "" {
|
|
existing, err := userRepo.GetByUsername(ctx, u)
|
|
if err != nil {
|
|
logger.Error().Err(err).Msg("failed to check for default admin user")
|
|
} else if existing != nil {
|
|
logger.Debug().Str("username", u).Msg("default admin user already exists, skipping")
|
|
} else {
|
|
hash, err := auth.HashPassword(p)
|
|
if err != nil {
|
|
logger.Fatal().Err(err).Msg("failed to hash default admin password")
|
|
}
|
|
adminUser := &db.User{
|
|
Username: u,
|
|
DisplayName: "Administrator",
|
|
Role: auth.RoleAdmin,
|
|
AuthSource: "local",
|
|
}
|
|
if err := userRepo.Create(ctx, adminUser, hash); err != nil {
|
|
logger.Fatal().Err(err).Msg("failed to create default admin user")
|
|
}
|
|
logger.Info().Str("username", u).Msg("default admin user created")
|
|
}
|
|
}
|
|
}
|
|
|
|
// Load job definitions (optional — directory may not exist yet)
|
|
var jobDefs map[string]*jobdef.Definition
|
|
if _, err := os.Stat(cfg.Jobs.Directory); err == nil {
|
|
jobDefs, err = jobdef.LoadAll(cfg.Jobs.Directory)
|
|
if err != nil {
|
|
logger.Fatal().Err(err).Str("directory", cfg.Jobs.Directory).Msg("failed to load job definitions")
|
|
}
|
|
logger.Info().Int("count", len(jobDefs)).Msg("loaded job definitions")
|
|
} else {
|
|
jobDefs = make(map[string]*jobdef.Definition)
|
|
logger.Info().Str("directory", cfg.Jobs.Directory).Msg("job definitions directory not found, skipping")
|
|
}
|
|
|
|
// Upsert job definitions into database
|
|
jobRepo := db.NewJobRepository(database)
|
|
for _, def := range jobDefs {
|
|
defJSON, _ := json.Marshal(def)
|
|
var defMap map[string]any
|
|
json.Unmarshal(defJSON, &defMap)
|
|
|
|
rec := &db.JobDefinitionRecord{
|
|
Name: def.Name,
|
|
Version: def.Version,
|
|
TriggerType: def.Trigger.Type,
|
|
ScopeType: def.Scope.Type,
|
|
ComputeType: def.Compute.Type,
|
|
RunnerTags: def.Runner.Tags,
|
|
TimeoutSeconds: def.Timeout,
|
|
MaxRetries: def.MaxRetries,
|
|
Priority: def.Priority,
|
|
Definition: defMap,
|
|
Enabled: true,
|
|
}
|
|
if err := jobRepo.UpsertDefinition(ctx, rec); err != nil {
|
|
logger.Fatal().Err(err).Str("name", def.Name).Msg("failed to upsert job definition")
|
|
}
|
|
}
|
|
|
|
// Initialize module registry
|
|
registry := modules.NewRegistry()
|
|
if err := modules.LoadState(registry, cfg, database.Pool()); err != nil {
|
|
logger.Fatal().Err(err).Msg("failed to load module state")
|
|
}
|
|
for _, m := range registry.All() {
|
|
logger.Info().Str("module", m.ID).Bool("enabled", registry.IsEnabled(m.ID)).
|
|
Bool("required", m.Required).Msg("module")
|
|
}
|
|
|
|
// Create SSE broker and server state
|
|
broker := api.NewBroker(logger)
|
|
serverState := api.NewServerState(logger, store, broker)
|
|
if cfg.Server.ReadOnly {
|
|
serverState.SetReadOnly(true)
|
|
logger.Warn().Msg("server started in read-only mode")
|
|
}
|
|
broker.StartHeartbeat()
|
|
serverState.StartStorageHealthCheck()
|
|
|
|
// Create API server
|
|
server := api.NewServer(logger, database, schemas, cfg.Schemas.Directory, store,
|
|
authService, sessionManager, oidcBackend, &cfg.Auth, broker, serverState,
|
|
jobDefs, cfg.Jobs.Directory, registry, cfg)
|
|
router := api.NewRouter(server, logger)
|
|
|
|
// Start background sweepers for job/runner timeouts (only when jobs module enabled)
|
|
if registry.IsEnabled(modules.Jobs) {
|
|
go func() {
|
|
ticker := time.NewTicker(time.Duration(cfg.Jobs.JobTimeoutCheck) * time.Second)
|
|
defer ticker.Stop()
|
|
for range ticker.C {
|
|
if n, err := jobRepo.TimeoutExpiredJobs(ctx); err != nil {
|
|
logger.Error().Err(err).Msg("job timeout sweep failed")
|
|
} else if n > 0 {
|
|
logger.Info().Int64("count", n).Msg("timed out expired jobs")
|
|
}
|
|
|
|
if n, err := jobRepo.ExpireStaleRunners(ctx, time.Duration(cfg.Jobs.RunnerTimeout)*time.Second); err != nil {
|
|
logger.Error().Err(err).Msg("runner expiry sweep failed")
|
|
} else if n > 0 {
|
|
logger.Info().Int64("count", n).Msg("expired stale runners")
|
|
}
|
|
}
|
|
}()
|
|
logger.Info().Msg("job/runner sweepers started")
|
|
}
|
|
|
|
// Create HTTP server
|
|
addr := fmt.Sprintf("%s:%d", cfg.Server.Host, cfg.Server.Port)
|
|
httpServer := &http.Server{
|
|
Addr: addr,
|
|
Handler: router,
|
|
ReadTimeout: 15 * time.Second,
|
|
WriteTimeout: 15 * time.Second,
|
|
IdleTimeout: 60 * time.Second,
|
|
}
|
|
|
|
// Start server in goroutine
|
|
go func() {
|
|
logger.Info().Str("addr", addr).Msg("listening")
|
|
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
logger.Fatal().Err(err).Msg("server error")
|
|
}
|
|
}()
|
|
|
|
// SIGUSR1: toggle read-only mode
|
|
usr1 := make(chan os.Signal, 1)
|
|
signal.Notify(usr1, syscall.SIGUSR1)
|
|
go func() {
|
|
for range usr1 {
|
|
serverState.ToggleReadOnly()
|
|
logger.Info().Str("mode", string(serverState.Mode())).Msg("read-only mode toggled via SIGUSR1")
|
|
}
|
|
}()
|
|
|
|
// Wait for interrupt signal
|
|
quit := make(chan os.Signal, 1)
|
|
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
|
<-quit
|
|
|
|
logger.Info().Msg("shutting down server")
|
|
|
|
// Graceful shutdown: close SSE connections first, then HTTP server
|
|
broker.Shutdown()
|
|
serverState.Shutdown()
|
|
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
if err := httpServer.Shutdown(shutdownCtx); err != nil {
|
|
logger.Fatal().Err(err).Msg("server forced to shutdown")
|
|
}
|
|
|
|
logger.Info().Msg("server stopped")
|
|
}
|