feat(modules): wire registry into server startup

Add modules.Registry and config.Config fields to Server struct.
Create registry in main.go, load state from YAML+DB, log all
module states at startup.

Conditionally start job/runner sweeper goroutines only when the
jobs module is enabled.

Update all 5 test files to pass registry to NewServer.

Ref #95, #96
This commit is contained in:
Forbes
2026-02-14 14:00:24 -06:00
parent 3adc155b14
commit 4fd4013360
7 changed files with 57 additions and 23 deletions

View File

@@ -20,6 +20,7 @@ import (
"github.com/kindredsystems/silo/internal/config"
"github.com/kindredsystems/silo/internal/db"
"github.com/kindredsystems/silo/internal/jobdef"
"github.com/kindredsystems/silo/internal/modules"
"github.com/kindredsystems/silo/internal/schema"
"github.com/kindredsystems/silo/internal/storage"
"github.com/rs/zerolog"
@@ -219,6 +220,16 @@ func main() {
}
}
// Initialize module registry
registry := modules.NewRegistry()
if err := modules.LoadState(registry, cfg, database.Pool()); err != nil {
logger.Fatal().Err(err).Msg("failed to load module state")
}
for _, m := range registry.All() {
logger.Info().Str("module", m.ID).Bool("enabled", registry.IsEnabled(m.ID)).
Bool("required", m.Required).Msg("module")
}
// Create SSE broker and server state
broker := api.NewBroker(logger)
serverState := api.NewServerState(logger, store, broker)
@@ -232,27 +243,30 @@ func main() {
// Create API server
server := api.NewServer(logger, database, schemas, cfg.Schemas.Directory, store,
authService, sessionManager, oidcBackend, &cfg.Auth, broker, serverState,
jobDefs, cfg.Jobs.Directory)
jobDefs, cfg.Jobs.Directory, registry, cfg)
router := api.NewRouter(server, logger)
// Start background sweepers for job/runner timeouts
go func() {
ticker := time.NewTicker(time.Duration(cfg.Jobs.JobTimeoutCheck) * time.Second)
defer ticker.Stop()
for range ticker.C {
if n, err := jobRepo.TimeoutExpiredJobs(ctx); err != nil {
logger.Error().Err(err).Msg("job timeout sweep failed")
} else if n > 0 {
logger.Info().Int64("count", n).Msg("timed out expired jobs")
}
// Start background sweepers for job/runner timeouts (only when jobs module enabled)
if registry.IsEnabled(modules.Jobs) {
go func() {
ticker := time.NewTicker(time.Duration(cfg.Jobs.JobTimeoutCheck) * time.Second)
defer ticker.Stop()
for range ticker.C {
if n, err := jobRepo.TimeoutExpiredJobs(ctx); err != nil {
logger.Error().Err(err).Msg("job timeout sweep failed")
} else if n > 0 {
logger.Info().Int64("count", n).Msg("timed out expired jobs")
}
if n, err := jobRepo.ExpireStaleRunners(ctx, time.Duration(cfg.Jobs.RunnerTimeout)*time.Second); err != nil {
logger.Error().Err(err).Msg("runner expiry sweep failed")
} else if n > 0 {
logger.Info().Int64("count", n).Msg("expired stale runners")
if n, err := jobRepo.ExpireStaleRunners(ctx, time.Duration(cfg.Jobs.RunnerTimeout)*time.Second); err != nil {
logger.Error().Err(err).Msg("runner expiry sweep failed")
} else if n > 0 {
logger.Info().Int64("count", n).Msg("expired stale runners")
}
}
}
}()
}()
logger.Info().Msg("job/runner sweepers started")
}
// Create HTTP server
addr := fmt.Sprintf("%s:%d", cfg.Server.Host, cfg.Server.Port)