// Command silod is the Silo HTTP API server. package main import ( "context" "encoding/json" "flag" "fmt" "net/http" "os" "os/signal" "syscall" "time" "github.com/alexedwards/scs/pgxstore" "github.com/alexedwards/scs/v2" "github.com/kindredsystems/silo/internal/api" "github.com/kindredsystems/silo/internal/auth" "github.com/kindredsystems/silo/internal/config" "github.com/kindredsystems/silo/internal/db" "github.com/kindredsystems/silo/internal/jobdef" "github.com/kindredsystems/silo/internal/modules" "github.com/kindredsystems/silo/internal/schema" "github.com/kindredsystems/silo/internal/storage" "github.com/kindredsystems/silo/internal/workflow" "github.com/rs/zerolog" ) func main() { // Parse flags configPath := flag.String("config", "config.yaml", "Path to configuration file") flag.Parse() // Setup logger logger := zerolog.New(os.Stdout).With().Timestamp().Logger() // Load configuration cfg, err := config.Load(*configPath) if err != nil { logger.Fatal().Err(err).Msg("failed to load configuration") } logger.Info(). Str("host", cfg.Server.Host). Int("port", cfg.Server.Port). Str("database", cfg.Database.Host). Str("storage", cfg.Storage.Endpoint). Msg("starting silo server") // Connect to database ctx := context.Background() database, err := db.Connect(ctx, db.Config{ Host: cfg.Database.Host, Port: cfg.Database.Port, Name: cfg.Database.Name, User: cfg.Database.User, Password: cfg.Database.Password, SSLMode: cfg.Database.SSLMode, MaxConnections: cfg.Database.MaxConnections, }) if err != nil { logger.Fatal().Err(err).Msg("failed to connect to database") } defer database.Close() logger.Info().Msg("connected to database") // Connect to storage (optional - may be externally managed) var store storage.FileStore switch cfg.Storage.Backend { case "minio", "": if cfg.Storage.Endpoint != "" { s, connErr := storage.Connect(ctx, storage.Config{ Endpoint: cfg.Storage.Endpoint, AccessKey: cfg.Storage.AccessKey, SecretKey: cfg.Storage.SecretKey, Bucket: cfg.Storage.Bucket, UseSSL: cfg.Storage.UseSSL, Region: cfg.Storage.Region, }) if connErr != nil { logger.Warn().Err(connErr).Msg("failed to connect to storage - file operations disabled") } else { store = s logger.Info().Msg("connected to storage") } } else { logger.Info().Msg("storage not configured - file operations disabled") } case "filesystem": if cfg.Storage.Filesystem.RootDir == "" { logger.Fatal().Msg("storage.filesystem.root_dir is required when backend is \"filesystem\"") } s, fsErr := storage.NewFilesystemStore(cfg.Storage.Filesystem.RootDir) if fsErr != nil { logger.Fatal().Err(fsErr).Msg("failed to initialize filesystem storage") } store = s logger.Info().Str("root", cfg.Storage.Filesystem.RootDir).Msg("connected to filesystem storage") default: logger.Fatal().Str("backend", cfg.Storage.Backend).Msg("unknown storage backend") } // Load schemas schemas, err := schema.LoadAll(cfg.Schemas.Directory) if err != nil { logger.Fatal().Err(err).Str("directory", cfg.Schemas.Directory).Msg("failed to load schemas") } logger.Info().Int("count", len(schemas)).Msg("loaded schemas") // Initialize authentication userRepo := db.NewUserRepository(database) tokenRepo := db.NewTokenRepository(database) // Session manager (PostgreSQL-backed via scs + pgxstore) sessionManager := scs.New() sessionManager.Store = pgxstore.New(database.Pool()) sessionManager.Lifetime = 24 * time.Hour sessionManager.Cookie.Name = "silo_session" sessionManager.Cookie.HttpOnly = true sessionManager.Cookie.Secure = cfg.Auth.Enabled // Secure cookies when auth is active sessionManager.Cookie.SameSite = http.SameSiteLaxMode // Build auth backends from config var backends []auth.Backend if cfg.Auth.Local.Enabled { backends = append(backends, auth.NewLocalBackend(userRepo)) logger.Info().Msg("auth backend: local") } if cfg.Auth.LDAP.Enabled { backends = append(backends, auth.NewLDAPBackend(auth.LDAPConfig{ URL: cfg.Auth.LDAP.URL, BaseDN: cfg.Auth.LDAP.BaseDN, UserSearchDN: cfg.Auth.LDAP.UserSearchDN, BindDN: cfg.Auth.LDAP.BindDN, BindPassword: cfg.Auth.LDAP.BindPassword, UserAttr: cfg.Auth.LDAP.UserAttr, EmailAttr: cfg.Auth.LDAP.EmailAttr, DisplayAttr: cfg.Auth.LDAP.DisplayAttr, GroupAttr: cfg.Auth.LDAP.GroupAttr, RoleMapping: cfg.Auth.LDAP.RoleMapping, TLSSkipVerify: cfg.Auth.LDAP.TLSSkipVerify, })) logger.Info().Str("url", cfg.Auth.LDAP.URL).Msg("auth backend: ldap") } authService := auth.NewService(logger, userRepo, tokenRepo, backends...) // OIDC backend (separate from the Backend interface since it uses redirect flow) var oidcBackend *auth.OIDCBackend if cfg.Auth.OIDC.Enabled { oidcBackend, err = auth.NewOIDCBackend(ctx, auth.OIDCConfig{ IssuerURL: cfg.Auth.OIDC.IssuerURL, ClientID: cfg.Auth.OIDC.ClientID, ClientSecret: cfg.Auth.OIDC.ClientSecret, RedirectURL: cfg.Auth.OIDC.RedirectURL, Scopes: cfg.Auth.OIDC.Scopes, AdminRole: cfg.Auth.OIDC.AdminRole, EditorRole: cfg.Auth.OIDC.EditorRole, DefaultRole: cfg.Auth.OIDC.DefaultRole, }) if err != nil { logger.Fatal().Err(err).Msg("failed to initialize OIDC backend") } logger.Info().Str("issuer", cfg.Auth.OIDC.IssuerURL).Msg("auth backend: oidc") } if cfg.Auth.Enabled { logger.Info().Msg("authentication enabled") } else { logger.Warn().Msg("authentication disabled - all routes are open") } // Seed default admin account (idempotent — skips if user already exists) if u := cfg.Auth.Local.DefaultAdminUsername; u != "" { if p := cfg.Auth.Local.DefaultAdminPassword; p != "" { existing, err := userRepo.GetByUsername(ctx, u) if err != nil { logger.Error().Err(err).Msg("failed to check for default admin user") } else if existing != nil { logger.Debug().Str("username", u).Msg("default admin user already exists, skipping") } else { hash, err := auth.HashPassword(p) if err != nil { logger.Fatal().Err(err).Msg("failed to hash default admin password") } adminUser := &db.User{ Username: u, DisplayName: "Administrator", Role: auth.RoleAdmin, AuthSource: "local", } if err := userRepo.Create(ctx, adminUser, hash); err != nil { logger.Fatal().Err(err).Msg("failed to create default admin user") } logger.Info().Str("username", u).Msg("default admin user created") } } } // Load job definitions (optional — directory may not exist yet) var jobDefs map[string]*jobdef.Definition if _, err := os.Stat(cfg.Jobs.Directory); err == nil { jobDefs, err = jobdef.LoadAll(cfg.Jobs.Directory) if err != nil { logger.Fatal().Err(err).Str("directory", cfg.Jobs.Directory).Msg("failed to load job definitions") } logger.Info().Int("count", len(jobDefs)).Msg("loaded job definitions") } else { jobDefs = make(map[string]*jobdef.Definition) logger.Info().Str("directory", cfg.Jobs.Directory).Msg("job definitions directory not found, skipping") } // Upsert job definitions into database jobRepo := db.NewJobRepository(database) for _, def := range jobDefs { defJSON, _ := json.Marshal(def) var defMap map[string]any json.Unmarshal(defJSON, &defMap) rec := &db.JobDefinitionRecord{ Name: def.Name, Version: def.Version, TriggerType: def.Trigger.Type, ScopeType: def.Scope.Type, ComputeType: def.Compute.Type, RunnerTags: def.Runner.Tags, TimeoutSeconds: def.Timeout, MaxRetries: def.MaxRetries, Priority: def.Priority, Definition: defMap, Enabled: true, } if err := jobRepo.UpsertDefinition(ctx, rec); err != nil { logger.Fatal().Err(err).Str("name", def.Name).Msg("failed to upsert job definition") } } // Load approval workflow definitions (optional — directory may not exist yet) var workflows map[string]*workflow.Workflow if _, err := os.Stat(cfg.Workflows.Directory); err == nil { workflows, err = workflow.LoadAll(cfg.Workflows.Directory) if err != nil { logger.Fatal().Err(err).Str("directory", cfg.Workflows.Directory).Msg("failed to load workflow definitions") } logger.Info().Int("count", len(workflows)).Msg("loaded workflow definitions") } else { workflows = make(map[string]*workflow.Workflow) logger.Info().Str("directory", cfg.Workflows.Directory).Msg("workflows directory not found, skipping") } // Initialize module registry registry := modules.NewRegistry() if err := modules.LoadState(registry, cfg, database.Pool()); err != nil { logger.Fatal().Err(err).Msg("failed to load module state") } for _, m := range registry.All() { logger.Info().Str("module", m.ID).Bool("enabled", registry.IsEnabled(m.ID)). Bool("required", m.Required).Msg("module") } // Create SSE broker and server state broker := api.NewBroker(logger) serverState := api.NewServerState(logger, store, broker) if cfg.Server.ReadOnly { serverState.SetReadOnly(true) logger.Warn().Msg("server started in read-only mode") } broker.StartHeartbeat() serverState.StartStorageHealthCheck() // Create API server server := api.NewServer(logger, database, schemas, cfg.Schemas.Directory, store, authService, sessionManager, oidcBackend, &cfg.Auth, broker, serverState, jobDefs, cfg.Jobs.Directory, registry, cfg, workflows) router := api.NewRouter(server, logger) // Start background sweepers for job/runner timeouts (only when jobs module enabled) if registry.IsEnabled(modules.Jobs) { go func() { ticker := time.NewTicker(time.Duration(cfg.Jobs.JobTimeoutCheck) * time.Second) defer ticker.Stop() for range ticker.C { if n, err := jobRepo.TimeoutExpiredJobs(ctx); err != nil { logger.Error().Err(err).Msg("job timeout sweep failed") } else if n > 0 { logger.Info().Int64("count", n).Msg("timed out expired jobs") } if n, err := jobRepo.ExpireStaleRunners(ctx, time.Duration(cfg.Jobs.RunnerTimeout)*time.Second); err != nil { logger.Error().Err(err).Msg("runner expiry sweep failed") } else if n > 0 { logger.Info().Int64("count", n).Msg("expired stale runners") } } }() logger.Info().Msg("job/runner sweepers started") } // Create HTTP server addr := fmt.Sprintf("%s:%d", cfg.Server.Host, cfg.Server.Port) httpServer := &http.Server{ Addr: addr, Handler: router, ReadTimeout: 15 * time.Second, WriteTimeout: 15 * time.Second, IdleTimeout: 60 * time.Second, } // Start server in goroutine go func() { logger.Info().Str("addr", addr).Msg("listening") if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { logger.Fatal().Err(err).Msg("server error") } }() // SIGUSR1: toggle read-only mode usr1 := make(chan os.Signal, 1) signal.Notify(usr1, syscall.SIGUSR1) go func() { for range usr1 { serverState.ToggleReadOnly() logger.Info().Str("mode", string(serverState.Mode())).Msg("read-only mode toggled via SIGUSR1") } }() // Wait for interrupt signal quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit logger.Info().Msg("shutting down server") // Graceful shutdown: close SSE connections first, then HTTP server broker.Shutdown() serverState.Shutdown() shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := httpServer.Shutdown(shutdownCtx); err != nil { logger.Fatal().Err(err).Msg("server forced to shutdown") } logger.Info().Msg("server stopped") }