From 1952dea00c14addefa9ace08a2a2fc1775ab409f Mon Sep 17 00:00:00 2001 From: Forbes Date: Sat, 14 Feb 2026 13:13:54 -0600 Subject: [PATCH] feat: wire job definitions, DAG/job repos, and background sweepers --- cmd/silod/main.go | 63 +++++++++++++++++++++++++++++- internal/api/auth_handlers_test.go | 2 + internal/api/bom_handlers_test.go | 2 + internal/api/csv_handlers_test.go | 2 + internal/api/handlers.go | 13 ++++++ internal/config/config.go | 21 ++++++++++ 6 files changed, 102 insertions(+), 1 deletion(-) diff --git a/cmd/silod/main.go b/cmd/silod/main.go index 5aee49a..7530160 100644 --- a/cmd/silod/main.go +++ b/cmd/silod/main.go @@ -3,6 +3,7 @@ package main import ( "context" + "encoding/json" "flag" "fmt" "net/http" @@ -13,10 +14,12 @@ import ( "github.com/alexedwards/scs/pgxstore" "github.com/alexedwards/scs/v2" + "github.com/kindredsystems/silo/internal/api" "github.com/kindredsystems/silo/internal/auth" "github.com/kindredsystems/silo/internal/config" "github.com/kindredsystems/silo/internal/db" + "github.com/kindredsystems/silo/internal/jobdef" "github.com/kindredsystems/silo/internal/schema" "github.com/kindredsystems/silo/internal/storage" "github.com/rs/zerolog" @@ -178,6 +181,44 @@ func main() { } } + // Load job definitions (optional — directory may not exist yet) + var jobDefs map[string]*jobdef.Definition + if _, err := os.Stat(cfg.Jobs.Directory); err == nil { + jobDefs, err = jobdef.LoadAll(cfg.Jobs.Directory) + if err != nil { + logger.Fatal().Err(err).Str("directory", cfg.Jobs.Directory).Msg("failed to load job definitions") + } + logger.Info().Int("count", len(jobDefs)).Msg("loaded job definitions") + } else { + jobDefs = make(map[string]*jobdef.Definition) + logger.Info().Str("directory", cfg.Jobs.Directory).Msg("job definitions directory not found, skipping") + } + + // Upsert job definitions into database + jobRepo := db.NewJobRepository(database) + for _, def := range jobDefs { + defJSON, _ := json.Marshal(def) + var defMap map[string]any + json.Unmarshal(defJSON, &defMap) + + rec := &db.JobDefinitionRecord{ + Name: def.Name, + Version: def.Version, + TriggerType: def.Trigger.Type, + ScopeType: def.Scope.Type, + ComputeType: def.Compute.Type, + RunnerTags: def.Runner.Tags, + TimeoutSeconds: def.Timeout, + MaxRetries: def.MaxRetries, + Priority: def.Priority, + Definition: defMap, + Enabled: true, + } + if err := jobRepo.UpsertDefinition(ctx, rec); err != nil { + logger.Fatal().Err(err).Str("name", def.Name).Msg("failed to upsert job definition") + } + } + // Create SSE broker and server state broker := api.NewBroker(logger) serverState := api.NewServerState(logger, store, broker) @@ -190,9 +231,29 @@ func main() { // Create API server server := api.NewServer(logger, database, schemas, cfg.Schemas.Directory, store, - authService, sessionManager, oidcBackend, &cfg.Auth, broker, serverState) + authService, sessionManager, oidcBackend, &cfg.Auth, broker, serverState, + jobDefs, cfg.Jobs.Directory) router := api.NewRouter(server, logger) + // Start background sweepers for job/runner timeouts + go func() { + ticker := time.NewTicker(time.Duration(cfg.Jobs.JobTimeoutCheck) * time.Second) + defer ticker.Stop() + for range ticker.C { + if n, err := jobRepo.TimeoutExpiredJobs(ctx); err != nil { + logger.Error().Err(err).Msg("job timeout sweep failed") + } else if n > 0 { + logger.Info().Int64("count", n).Msg("timed out expired jobs") + } + + if n, err := jobRepo.ExpireStaleRunners(ctx, time.Duration(cfg.Jobs.RunnerTimeout)*time.Second); err != nil { + logger.Error().Err(err).Msg("runner expiry sweep failed") + } else if n > 0 { + logger.Info().Int64("count", n).Msg("expired stale runners") + } + } + }() + // Create HTTP server addr := fmt.Sprintf("%s:%d", cfg.Server.Host, cfg.Server.Port) httpServer := &http.Server{ diff --git a/internal/api/auth_handlers_test.go b/internal/api/auth_handlers_test.go index 323a437..85e3a17 100644 --- a/internal/api/auth_handlers_test.go +++ b/internal/api/auth_handlers_test.go @@ -38,6 +38,8 @@ func newAuthTestServer(t *testing.T) *Server { nil, // authConfig broker, state, + nil, // jobDefs + "", // jobDefsDir ) } diff --git a/internal/api/bom_handlers_test.go b/internal/api/bom_handlers_test.go index d928351..d63ca31 100644 --- a/internal/api/bom_handlers_test.go +++ b/internal/api/bom_handlers_test.go @@ -35,6 +35,8 @@ func newTestServer(t *testing.T) *Server { nil, // authConfig (nil = dev mode) broker, state, + nil, // jobDefs + "", // jobDefsDir ) } diff --git a/internal/api/csv_handlers_test.go b/internal/api/csv_handlers_test.go index 9dd703b..85edb10 100644 --- a/internal/api/csv_handlers_test.go +++ b/internal/api/csv_handlers_test.go @@ -64,6 +64,8 @@ func newTestServerWithSchemas(t *testing.T) *Server { nil, // authConfig broker, state, + nil, // jobDefs + "", // jobDefsDir ) } diff --git a/internal/api/handlers.go b/internal/api/handlers.go index 0f2ac90..c1fd419 100644 --- a/internal/api/handlers.go +++ b/internal/api/handlers.go @@ -18,6 +18,7 @@ import ( "github.com/kindredsystems/silo/internal/auth" "github.com/kindredsystems/silo/internal/config" "github.com/kindredsystems/silo/internal/db" + "github.com/kindredsystems/silo/internal/jobdef" "github.com/kindredsystems/silo/internal/partnum" "github.com/kindredsystems/silo/internal/schema" "github.com/kindredsystems/silo/internal/storage" @@ -43,6 +44,10 @@ type Server struct { itemFiles *db.ItemFileRepository broker *Broker serverState *ServerState + dag *db.DAGRepository + jobs *db.JobRepository + jobDefs map[string]*jobdef.Definition + jobDefsDir string } // NewServer creates a new API server. @@ -58,11 +63,15 @@ func NewServer( authCfg *config.AuthConfig, broker *Broker, state *ServerState, + jobDefs map[string]*jobdef.Definition, + jobDefsDir string, ) *Server { items := db.NewItemRepository(database) projects := db.NewProjectRepository(database) relationships := db.NewRelationshipRepository(database) itemFiles := db.NewItemFileRepository(database) + dag := db.NewDAGRepository(database) + jobs := db.NewJobRepository(database) seqStore := &dbSequenceStore{db: database, schemas: schemas} partgen := partnum.NewGenerator(schemas, seqStore) @@ -83,6 +92,10 @@ func NewServer( itemFiles: itemFiles, broker: broker, serverState: state, + dag: dag, + jobs: jobs, + jobDefs: jobDefs, + jobDefsDir: jobDefsDir, } } diff --git a/internal/config/config.go b/internal/config/config.go index d8399f6..4681c80 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -17,6 +17,7 @@ type Config struct { FreeCAD FreeCADConfig `yaml:"freecad"` Odoo OdooConfig `yaml:"odoo"` Auth AuthConfig `yaml:"auth"` + Jobs JobsConfig `yaml:"jobs"` } // AuthConfig holds authentication and authorization settings. @@ -111,6 +112,14 @@ type FreeCADConfig struct { Executable string `yaml:"executable"` } +// JobsConfig holds worker/runner system settings. +type JobsConfig struct { + Directory string `yaml:"directory"` // default /etc/silo/jobdefs + RunnerTimeout int `yaml:"runner_timeout"` // seconds, default 90 + JobTimeoutCheck int `yaml:"job_timeout_check"` // seconds, default 30 + DefaultPriority int `yaml:"default_priority"` // default 100 +} + // OdooConfig holds Odoo ERP integration settings. type OdooConfig struct { Enabled bool `yaml:"enabled"` @@ -157,6 +166,18 @@ func Load(path string) (*Config, error) { if cfg.FreeCAD.URIScheme == "" { cfg.FreeCAD.URIScheme = "silo" } + if cfg.Jobs.Directory == "" { + cfg.Jobs.Directory = "/etc/silo/jobdefs" + } + if cfg.Jobs.RunnerTimeout == 0 { + cfg.Jobs.RunnerTimeout = 90 + } + if cfg.Jobs.JobTimeoutCheck == 0 { + cfg.Jobs.JobTimeoutCheck = 30 + } + if cfg.Jobs.DefaultPriority == 0 { + cfg.Jobs.DefaultPriority = 100 + } // Override with environment variables if v := os.Getenv("SILO_DB_HOST"); v != "" {