diff --git a/Makefile b/Makefile index c7bb968..845ce78 100644 --- a/Makefile +++ b/Makefile @@ -11,6 +11,7 @@ build: web-build go build -o silo ./cmd/silo go build -o silod ./cmd/silod + go build -o silorunner ./cmd/silorunner # Run the API server locally run: @@ -30,7 +31,7 @@ test-integration: # Clean build artifacts clean: - rm -f silo silod + rm -f silo silod silorunner rm -f *.out rm -rf web/dist diff --git a/cmd/silorunner/main.go b/cmd/silorunner/main.go new file mode 100644 index 0000000..2f4364d --- /dev/null +++ b/cmd/silorunner/main.go @@ -0,0 +1,330 @@ +// Command silorunner is a compute worker that polls the Silo server for jobs +// and executes them using Headless Create with silo-mod installed. +package main + +import ( + "bytes" + "encoding/json" + "flag" + "fmt" + "io" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/rs/zerolog" + "gopkg.in/yaml.v3" +) + +// RunnerConfig holds runner configuration. +type RunnerConfig struct { + ServerURL string `yaml:"server_url"` + Token string `yaml:"token"` + Name string `yaml:"name"` + Tags []string `yaml:"tags"` + PollInterval int `yaml:"poll_interval"` // seconds, default 5 + CreatePath string `yaml:"create_path"` // path to Headless Create binary +} + +func main() { + configPath := flag.String("config", "runner.yaml", "Path to runner config file") + flag.Parse() + + logger := zerolog.New(os.Stdout).With().Timestamp().Str("component", "silorunner").Logger() + + // Load config + cfg, err := loadConfig(*configPath) + if err != nil { + logger.Fatal().Err(err).Msg("failed to load config") + } + + if cfg.ServerURL == "" { + logger.Fatal().Msg("server_url is required") + } + if cfg.Token == "" { + logger.Fatal().Msg("token is required") + } + if cfg.Name == "" { + hostname, _ := os.Hostname() + cfg.Name = "runner-" + hostname + } + if cfg.PollInterval <= 0 { + cfg.PollInterval = 5 + } + + logger.Info(). + Str("server", cfg.ServerURL). + Str("name", cfg.Name). + Strs("tags", cfg.Tags). + Int("poll_interval", cfg.PollInterval). + Msg("starting runner") + + client := &http.Client{Timeout: 30 * time.Second} + + // Graceful shutdown + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + + // Heartbeat goroutine + go func() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if err := heartbeat(client, cfg); err != nil { + logger.Error().Err(err).Msg("heartbeat failed") + } + case <-quit: + return + } + } + }() + + // Initial heartbeat + if err := heartbeat(client, cfg); err != nil { + logger.Warn().Err(err).Msg("initial heartbeat failed") + } + + // Poll loop + ticker := time.NewTicker(time.Duration(cfg.PollInterval) * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + job, definition, err := claimJob(client, cfg) + if err != nil { + logger.Error().Err(err).Msg("claim failed") + continue + } + if job == nil { + continue + } + + jobID, _ := job["id"].(string) + defName, _ := job["definition_name"].(string) + logger.Info().Str("job_id", jobID).Str("definition", defName).Msg("claimed job") + + // Start the job + if err := startJob(client, cfg, jobID); err != nil { + logger.Error().Err(err).Str("job_id", jobID).Msg("failed to start job") + continue + } + + // Execute the job + executeJob(logger, client, cfg, jobID, job, definition) + + case <-quit: + logger.Info().Msg("shutting down") + return + } + } +} + +func loadConfig(path string) (*RunnerConfig, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("reading config: %w", err) + } + data = []byte(os.ExpandEnv(string(data))) + + var cfg RunnerConfig + if err := yaml.Unmarshal(data, &cfg); err != nil { + return nil, fmt.Errorf("parsing config: %w", err) + } + return &cfg, nil +} + +func heartbeat(client *http.Client, cfg *RunnerConfig) error { + req, err := http.NewRequest("POST", cfg.ServerURL+"/api/runner/heartbeat", nil) + if err != nil { + return err + } + req.Header.Set("Authorization", "Bearer "+cfg.Token) + + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("heartbeat: %d %s", resp.StatusCode, string(body)) + } + return nil +} + +func claimJob(client *http.Client, cfg *RunnerConfig) (map[string]any, map[string]any, error) { + req, err := http.NewRequest("POST", cfg.ServerURL+"/api/runner/claim", nil) + if err != nil { + return nil, nil, err + } + req.Header.Set("Authorization", "Bearer "+cfg.Token) + + resp, err := client.Do(req) + if err != nil { + return nil, nil, err + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNoContent { + return nil, nil, nil // No jobs available + } + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, nil, fmt.Errorf("claim: %d %s", resp.StatusCode, string(body)) + } + + var result struct { + Job map[string]any `json:"job"` + Definition map[string]any `json:"definition"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, nil, fmt.Errorf("decoding claim response: %w", err) + } + + return result.Job, result.Definition, nil +} + +func startJob(client *http.Client, cfg *RunnerConfig, jobID string) error { + req, err := http.NewRequest("POST", cfg.ServerURL+"/api/runner/jobs/"+jobID+"/start", nil) + if err != nil { + return err + } + req.Header.Set("Authorization", "Bearer "+cfg.Token) + + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("start: %d %s", resp.StatusCode, string(body)) + } + return nil +} + +func reportProgress(client *http.Client, cfg *RunnerConfig, jobID string, progress int, message string) { + body, _ := json.Marshal(map[string]any{ + "progress": progress, + "message": message, + }) + req, _ := http.NewRequest("PUT", cfg.ServerURL+"/api/runner/jobs/"+jobID+"/progress", bytes.NewReader(body)) + req.Header.Set("Authorization", "Bearer "+cfg.Token) + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + return + } + resp.Body.Close() +} + +func completeJob(client *http.Client, cfg *RunnerConfig, jobID string, result map[string]any) error { + body, _ := json.Marshal(map[string]any{"result": result}) + req, err := http.NewRequest("POST", cfg.ServerURL+"/api/runner/jobs/"+jobID+"/complete", bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Authorization", "Bearer "+cfg.Token) + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("complete: %d %s", resp.StatusCode, string(respBody)) + } + return nil +} + +func failJob(client *http.Client, cfg *RunnerConfig, jobID string, errMsg string) { + body, _ := json.Marshal(map[string]string{"error": errMsg}) + req, _ := http.NewRequest("POST", cfg.ServerURL+"/api/runner/jobs/"+jobID+"/fail", bytes.NewReader(body)) + req.Header.Set("Authorization", "Bearer "+cfg.Token) + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + return + } + resp.Body.Close() +} + +func appendLog(client *http.Client, cfg *RunnerConfig, jobID, level, message string) { + body, _ := json.Marshal(map[string]string{ + "level": level, + "message": message, + }) + req, _ := http.NewRequest("POST", cfg.ServerURL+"/api/runner/jobs/"+jobID+"/log", bytes.NewReader(body)) + req.Header.Set("Authorization", "Bearer "+cfg.Token) + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + return + } + resp.Body.Close() +} + +// executeJob dispatches the job based on its compute command. +// For now, this is a stub that demonstrates the lifecycle. +// Real execution will shell out to Headless Create with silo-mod. +func executeJob(logger zerolog.Logger, client *http.Client, cfg *RunnerConfig, jobID string, job, definition map[string]any) { + defName, _ := job["definition_name"].(string) + + // Extract compute config from definition + var command string + if definition != nil { + if compute, ok := definition["compute"].(map[string]any); ok { + command, _ = compute["command"].(string) + } + } + + appendLog(client, cfg, jobID, "info", fmt.Sprintf("starting execution: %s (command: %s)", defName, command)) + reportProgress(client, cfg, jobID, 10, "preparing") + + switch command { + case "create-validate", "create-export", "create-dag-extract", "create-thumbnail": + if cfg.CreatePath == "" { + failJob(client, cfg, jobID, "create_path not configured") + return + } + + appendLog(client, cfg, jobID, "info", fmt.Sprintf("would execute: %s --console with silo-mod", cfg.CreatePath)) + reportProgress(client, cfg, jobID, 50, "executing") + + // TODO: Actual Create execution: + // 1. Download item file from Silo API + // 2. Shell out: create --console -e "from silo.runner import ; (...)" + // 3. Parse output JSON + // 4. Upload results / sync DAG + // For now, complete with a placeholder result. + + reportProgress(client, cfg, jobID, 90, "finalizing") + + if err := completeJob(client, cfg, jobID, map[string]any{ + "status": "placeholder", + "message": "Create execution not yet implemented - runner lifecycle verified", + "command": command, + }); err != nil { + logger.Error().Err(err).Str("job_id", jobID).Msg("failed to complete job") + } else { + logger.Info().Str("job_id", jobID).Msg("job completed (placeholder)") + } + + default: + failJob(client, cfg, jobID, fmt.Sprintf("unknown compute command: %s", command)) + logger.Warn().Str("job_id", jobID).Str("command", command).Msg("unknown compute command") + } +}