feat: wire job definitions, DAG/job repos, and background sweepers

This commit is contained in:
Forbes
2026-02-14 13:13:54 -06:00
parent 6becfd82d4
commit 1952dea00c
6 changed files with 102 additions and 1 deletions

View File

@@ -3,6 +3,7 @@ package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
@@ -13,10 +14,12 @@ import (
"github.com/alexedwards/scs/pgxstore"
"github.com/alexedwards/scs/v2"
"github.com/kindredsystems/silo/internal/api"
"github.com/kindredsystems/silo/internal/auth"
"github.com/kindredsystems/silo/internal/config"
"github.com/kindredsystems/silo/internal/db"
"github.com/kindredsystems/silo/internal/jobdef"
"github.com/kindredsystems/silo/internal/schema"
"github.com/kindredsystems/silo/internal/storage"
"github.com/rs/zerolog"
@@ -178,6 +181,44 @@ func main() {
}
}
// Load job definitions (optional — directory may not exist yet)
var jobDefs map[string]*jobdef.Definition
if _, err := os.Stat(cfg.Jobs.Directory); err == nil {
jobDefs, err = jobdef.LoadAll(cfg.Jobs.Directory)
if err != nil {
logger.Fatal().Err(err).Str("directory", cfg.Jobs.Directory).Msg("failed to load job definitions")
}
logger.Info().Int("count", len(jobDefs)).Msg("loaded job definitions")
} else {
jobDefs = make(map[string]*jobdef.Definition)
logger.Info().Str("directory", cfg.Jobs.Directory).Msg("job definitions directory not found, skipping")
}
// Upsert job definitions into database
jobRepo := db.NewJobRepository(database)
for _, def := range jobDefs {
defJSON, _ := json.Marshal(def)
var defMap map[string]any
json.Unmarshal(defJSON, &defMap)
rec := &db.JobDefinitionRecord{
Name: def.Name,
Version: def.Version,
TriggerType: def.Trigger.Type,
ScopeType: def.Scope.Type,
ComputeType: def.Compute.Type,
RunnerTags: def.Runner.Tags,
TimeoutSeconds: def.Timeout,
MaxRetries: def.MaxRetries,
Priority: def.Priority,
Definition: defMap,
Enabled: true,
}
if err := jobRepo.UpsertDefinition(ctx, rec); err != nil {
logger.Fatal().Err(err).Str("name", def.Name).Msg("failed to upsert job definition")
}
}
// Create SSE broker and server state
broker := api.NewBroker(logger)
serverState := api.NewServerState(logger, store, broker)
@@ -190,9 +231,29 @@ func main() {
// Create API server
server := api.NewServer(logger, database, schemas, cfg.Schemas.Directory, store,
authService, sessionManager, oidcBackend, &cfg.Auth, broker, serverState)
authService, sessionManager, oidcBackend, &cfg.Auth, broker, serverState,
jobDefs, cfg.Jobs.Directory)
router := api.NewRouter(server, logger)
// Start background sweepers for job/runner timeouts
go func() {
ticker := time.NewTicker(time.Duration(cfg.Jobs.JobTimeoutCheck) * time.Second)
defer ticker.Stop()
for range ticker.C {
if n, err := jobRepo.TimeoutExpiredJobs(ctx); err != nil {
logger.Error().Err(err).Msg("job timeout sweep failed")
} else if n > 0 {
logger.Info().Int64("count", n).Msg("timed out expired jobs")
}
if n, err := jobRepo.ExpireStaleRunners(ctx, time.Duration(cfg.Jobs.RunnerTimeout)*time.Second); err != nil {
logger.Error().Err(err).Msg("runner expiry sweep failed")
} else if n > 0 {
logger.Info().Int64("count", n).Msg("expired stale runners")
}
}
}()
// Create HTTP server
addr := fmt.Sprintf("%s:%d", cfg.Server.Host, cfg.Server.Port)
httpServer := &http.Server{