Files
silo/cmd/silod/main.go
Forbes 88d1ab1f97 refactor(storage): remove MinIO backend, filesystem-only storage
Remove the MinIO/S3 storage backend entirely. The filesystem backend is
fully implemented, already used in production, and a migrate-storage tool
exists for any remaining MinIO deployments to migrate beforehand.

Changes:
- Delete MinIO client implementation (internal/storage/storage.go)
- Delete migrate-storage tool (cmd/migrate-storage, scripts/migrate-storage.sh)
- Remove MinIO service, volumes, and env vars from all Docker Compose files
- Simplify StorageConfig: remove Endpoint, AccessKey, SecretKey, Bucket,
  UseSSL, Region fields; add SILO_STORAGE_ROOT_DIR env override
- Change all SQL COALESCE defaults from 'minio' to 'filesystem'
- Add migration 020 to update column defaults to 'filesystem'
- Remove minio-go/v7 dependency (go mod tidy)
- Update all config examples, setup scripts, docs, and tests
2026-02-19 14:36:22 -06:00

324 lines
11 KiB
Go

// Command silod is the Silo HTTP API server.
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/alexedwards/scs/pgxstore"
"github.com/alexedwards/scs/v2"
"github.com/kindredsystems/silo/internal/api"
"github.com/kindredsystems/silo/internal/auth"
"github.com/kindredsystems/silo/internal/config"
"github.com/kindredsystems/silo/internal/db"
"github.com/kindredsystems/silo/internal/jobdef"
"github.com/kindredsystems/silo/internal/modules"
"github.com/kindredsystems/silo/internal/schema"
"github.com/kindredsystems/silo/internal/storage"
"github.com/kindredsystems/silo/internal/workflow"
"github.com/rs/zerolog"
)
func main() {
// Parse flags
configPath := flag.String("config", "config.yaml", "Path to configuration file")
flag.Parse()
// Setup logger
logger := zerolog.New(os.Stdout).With().Timestamp().Logger()
// Load configuration
cfg, err := config.Load(*configPath)
if err != nil {
logger.Fatal().Err(err).Msg("failed to load configuration")
}
logger.Info().
Str("host", cfg.Server.Host).
Int("port", cfg.Server.Port).
Str("database", cfg.Database.Host).
Msg("starting silo server")
// Connect to database
ctx := context.Background()
database, err := db.Connect(ctx, db.Config{
Host: cfg.Database.Host,
Port: cfg.Database.Port,
Name: cfg.Database.Name,
User: cfg.Database.User,
Password: cfg.Database.Password,
SSLMode: cfg.Database.SSLMode,
MaxConnections: cfg.Database.MaxConnections,
})
if err != nil {
logger.Fatal().Err(err).Msg("failed to connect to database")
}
defer database.Close()
logger.Info().Msg("connected to database")
// Connect to storage (optional — requires root_dir to be set)
var store storage.FileStore
if cfg.Storage.Filesystem.RootDir != "" {
s, fsErr := storage.NewFilesystemStore(cfg.Storage.Filesystem.RootDir)
if fsErr != nil {
logger.Fatal().Err(fsErr).Msg("failed to initialize filesystem storage")
}
store = s
logger.Info().Str("root", cfg.Storage.Filesystem.RootDir).Msg("connected to filesystem storage")
} else {
logger.Info().Msg("storage not configured - file operations disabled")
}
// Load schemas
schemas, err := schema.LoadAll(cfg.Schemas.Directory)
if err != nil {
logger.Fatal().Err(err).Str("directory", cfg.Schemas.Directory).Msg("failed to load schemas")
}
logger.Info().Int("count", len(schemas)).Msg("loaded schemas")
// Initialize authentication
userRepo := db.NewUserRepository(database)
tokenRepo := db.NewTokenRepository(database)
// Session manager (PostgreSQL-backed via scs + pgxstore)
sessionManager := scs.New()
sessionManager.Store = pgxstore.New(database.Pool())
sessionManager.Lifetime = 24 * time.Hour
sessionManager.Cookie.Name = "silo_session"
sessionManager.Cookie.HttpOnly = true
sessionManager.Cookie.Secure = cfg.Auth.Enabled // Secure cookies when auth is active
sessionManager.Cookie.SameSite = http.SameSiteLaxMode
// Build auth backends from config
var backends []auth.Backend
if cfg.Auth.Local.Enabled {
backends = append(backends, auth.NewLocalBackend(userRepo))
logger.Info().Msg("auth backend: local")
}
if cfg.Auth.LDAP.Enabled {
backends = append(backends, auth.NewLDAPBackend(auth.LDAPConfig{
URL: cfg.Auth.LDAP.URL,
BaseDN: cfg.Auth.LDAP.BaseDN,
UserSearchDN: cfg.Auth.LDAP.UserSearchDN,
BindDN: cfg.Auth.LDAP.BindDN,
BindPassword: cfg.Auth.LDAP.BindPassword,
UserAttr: cfg.Auth.LDAP.UserAttr,
EmailAttr: cfg.Auth.LDAP.EmailAttr,
DisplayAttr: cfg.Auth.LDAP.DisplayAttr,
GroupAttr: cfg.Auth.LDAP.GroupAttr,
RoleMapping: cfg.Auth.LDAP.RoleMapping,
TLSSkipVerify: cfg.Auth.LDAP.TLSSkipVerify,
}))
logger.Info().Str("url", cfg.Auth.LDAP.URL).Msg("auth backend: ldap")
}
authService := auth.NewService(logger, userRepo, tokenRepo, backends...)
// OIDC backend (separate from the Backend interface since it uses redirect flow)
var oidcBackend *auth.OIDCBackend
if cfg.Auth.OIDC.Enabled {
oidcBackend, err = auth.NewOIDCBackend(ctx, auth.OIDCConfig{
IssuerURL: cfg.Auth.OIDC.IssuerURL,
ClientID: cfg.Auth.OIDC.ClientID,
ClientSecret: cfg.Auth.OIDC.ClientSecret,
RedirectURL: cfg.Auth.OIDC.RedirectURL,
Scopes: cfg.Auth.OIDC.Scopes,
AdminRole: cfg.Auth.OIDC.AdminRole,
EditorRole: cfg.Auth.OIDC.EditorRole,
DefaultRole: cfg.Auth.OIDC.DefaultRole,
})
if err != nil {
logger.Fatal().Err(err).Msg("failed to initialize OIDC backend")
}
logger.Info().Str("issuer", cfg.Auth.OIDC.IssuerURL).Msg("auth backend: oidc")
}
if cfg.Auth.Enabled {
logger.Info().Msg("authentication enabled")
} else {
logger.Warn().Msg("authentication disabled - all routes are open")
}
// Seed default admin account (idempotent — skips if user already exists)
if u := cfg.Auth.Local.DefaultAdminUsername; u != "" {
if p := cfg.Auth.Local.DefaultAdminPassword; p != "" {
existing, err := userRepo.GetByUsername(ctx, u)
if err != nil {
logger.Error().Err(err).Msg("failed to check for default admin user")
} else if existing != nil {
logger.Debug().Str("username", u).Msg("default admin user already exists, skipping")
} else {
hash, err := auth.HashPassword(p)
if err != nil {
logger.Fatal().Err(err).Msg("failed to hash default admin password")
}
adminUser := &db.User{
Username: u,
DisplayName: "Administrator",
Role: auth.RoleAdmin,
AuthSource: "local",
}
if err := userRepo.Create(ctx, adminUser, hash); err != nil {
logger.Fatal().Err(err).Msg("failed to create default admin user")
}
logger.Info().Str("username", u).Msg("default admin user created")
}
}
}
// Load job definitions (optional — directory may not exist yet)
var jobDefs map[string]*jobdef.Definition
if _, err := os.Stat(cfg.Jobs.Directory); err == nil {
jobDefs, err = jobdef.LoadAll(cfg.Jobs.Directory)
if err != nil {
logger.Fatal().Err(err).Str("directory", cfg.Jobs.Directory).Msg("failed to load job definitions")
}
logger.Info().Int("count", len(jobDefs)).Msg("loaded job definitions")
} else {
jobDefs = make(map[string]*jobdef.Definition)
logger.Info().Str("directory", cfg.Jobs.Directory).Msg("job definitions directory not found, skipping")
}
// Upsert job definitions into database
jobRepo := db.NewJobRepository(database)
for _, def := range jobDefs {
defJSON, _ := json.Marshal(def)
var defMap map[string]any
json.Unmarshal(defJSON, &defMap)
rec := &db.JobDefinitionRecord{
Name: def.Name,
Version: def.Version,
TriggerType: def.Trigger.Type,
ScopeType: def.Scope.Type,
ComputeType: def.Compute.Type,
RunnerTags: def.Runner.Tags,
TimeoutSeconds: def.Timeout,
MaxRetries: def.MaxRetries,
Priority: def.Priority,
Definition: defMap,
Enabled: true,
}
if err := jobRepo.UpsertDefinition(ctx, rec); err != nil {
logger.Fatal().Err(err).Str("name", def.Name).Msg("failed to upsert job definition")
}
}
// Load approval workflow definitions (optional — directory may not exist yet)
var workflows map[string]*workflow.Workflow
if _, err := os.Stat(cfg.Workflows.Directory); err == nil {
workflows, err = workflow.LoadAll(cfg.Workflows.Directory)
if err != nil {
logger.Fatal().Err(err).Str("directory", cfg.Workflows.Directory).Msg("failed to load workflow definitions")
}
logger.Info().Int("count", len(workflows)).Msg("loaded workflow definitions")
} else {
workflows = make(map[string]*workflow.Workflow)
logger.Info().Str("directory", cfg.Workflows.Directory).Msg("workflows directory not found, skipping")
}
// Initialize module registry
registry := modules.NewRegistry()
if err := modules.LoadState(registry, cfg, database.Pool()); err != nil {
logger.Fatal().Err(err).Msg("failed to load module state")
}
for _, m := range registry.All() {
logger.Info().Str("module", m.ID).Bool("enabled", registry.IsEnabled(m.ID)).
Bool("required", m.Required).Msg("module")
}
// Create SSE broker and server state
broker := api.NewBroker(logger)
serverState := api.NewServerState(logger, store, broker)
if cfg.Server.ReadOnly {
serverState.SetReadOnly(true)
logger.Warn().Msg("server started in read-only mode")
}
broker.StartHeartbeat()
serverState.StartStorageHealthCheck()
// Create API server
server := api.NewServer(logger, database, schemas, cfg.Schemas.Directory, store,
authService, sessionManager, oidcBackend, &cfg.Auth, broker, serverState,
jobDefs, cfg.Jobs.Directory, registry, cfg, workflows)
router := api.NewRouter(server, logger)
// Start background sweepers for job/runner timeouts (only when jobs module enabled)
if registry.IsEnabled(modules.Jobs) {
go func() {
ticker := time.NewTicker(time.Duration(cfg.Jobs.JobTimeoutCheck) * time.Second)
defer ticker.Stop()
for range ticker.C {
if n, err := jobRepo.TimeoutExpiredJobs(ctx); err != nil {
logger.Error().Err(err).Msg("job timeout sweep failed")
} else if n > 0 {
logger.Info().Int64("count", n).Msg("timed out expired jobs")
}
if n, err := jobRepo.ExpireStaleRunners(ctx, time.Duration(cfg.Jobs.RunnerTimeout)*time.Second); err != nil {
logger.Error().Err(err).Msg("runner expiry sweep failed")
} else if n > 0 {
logger.Info().Int64("count", n).Msg("expired stale runners")
}
}
}()
logger.Info().Msg("job/runner sweepers started")
}
// Create HTTP server
addr := fmt.Sprintf("%s:%d", cfg.Server.Host, cfg.Server.Port)
httpServer := &http.Server{
Addr: addr,
Handler: router,
ReadTimeout: 15 * time.Second,
WriteTimeout: 15 * time.Second,
IdleTimeout: 60 * time.Second,
}
// Start server in goroutine
go func() {
logger.Info().Str("addr", addr).Msg("listening")
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Fatal().Err(err).Msg("server error")
}
}()
// SIGUSR1: toggle read-only mode
usr1 := make(chan os.Signal, 1)
signal.Notify(usr1, syscall.SIGUSR1)
go func() {
for range usr1 {
serverState.ToggleReadOnly()
logger.Info().Str("mode", string(serverState.Mode())).Msg("read-only mode toggled via SIGUSR1")
}
}()
// Wait for interrupt signal
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
logger.Info().Msg("shutting down server")
// Graceful shutdown: close SSE connections first, then HTTP server
broker.Shutdown()
serverState.Shutdown()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := httpServer.Shutdown(shutdownCtx); err != nil {
logger.Fatal().Err(err).Msg("server forced to shutdown")
}
logger.Info().Msg("server stopped")
}