feat: add silorunner binary with job poll/claim/execute lifecycle

This commit is contained in:
Forbes
2026-02-14 13:21:21 -06:00
parent b6ac5133c3
commit ad4224aa8f
2 changed files with 332 additions and 1 deletions

330
cmd/silorunner/main.go Normal file
View File

@@ -0,0 +1,330 @@
// Command silorunner is a compute worker that polls the Silo server for jobs
// and executes them using Headless Create with silo-mod installed.
package main
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/rs/zerolog"
"gopkg.in/yaml.v3"
)
// RunnerConfig holds runner configuration.
type RunnerConfig struct {
ServerURL string `yaml:"server_url"`
Token string `yaml:"token"`
Name string `yaml:"name"`
Tags []string `yaml:"tags"`
PollInterval int `yaml:"poll_interval"` // seconds, default 5
CreatePath string `yaml:"create_path"` // path to Headless Create binary
}
func main() {
configPath := flag.String("config", "runner.yaml", "Path to runner config file")
flag.Parse()
logger := zerolog.New(os.Stdout).With().Timestamp().Str("component", "silorunner").Logger()
// Load config
cfg, err := loadConfig(*configPath)
if err != nil {
logger.Fatal().Err(err).Msg("failed to load config")
}
if cfg.ServerURL == "" {
logger.Fatal().Msg("server_url is required")
}
if cfg.Token == "" {
logger.Fatal().Msg("token is required")
}
if cfg.Name == "" {
hostname, _ := os.Hostname()
cfg.Name = "runner-" + hostname
}
if cfg.PollInterval <= 0 {
cfg.PollInterval = 5
}
logger.Info().
Str("server", cfg.ServerURL).
Str("name", cfg.Name).
Strs("tags", cfg.Tags).
Int("poll_interval", cfg.PollInterval).
Msg("starting runner")
client := &http.Client{Timeout: 30 * time.Second}
// Graceful shutdown
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
// Heartbeat goroutine
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := heartbeat(client, cfg); err != nil {
logger.Error().Err(err).Msg("heartbeat failed")
}
case <-quit:
return
}
}
}()
// Initial heartbeat
if err := heartbeat(client, cfg); err != nil {
logger.Warn().Err(err).Msg("initial heartbeat failed")
}
// Poll loop
ticker := time.NewTicker(time.Duration(cfg.PollInterval) * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
job, definition, err := claimJob(client, cfg)
if err != nil {
logger.Error().Err(err).Msg("claim failed")
continue
}
if job == nil {
continue
}
jobID, _ := job["id"].(string)
defName, _ := job["definition_name"].(string)
logger.Info().Str("job_id", jobID).Str("definition", defName).Msg("claimed job")
// Start the job
if err := startJob(client, cfg, jobID); err != nil {
logger.Error().Err(err).Str("job_id", jobID).Msg("failed to start job")
continue
}
// Execute the job
executeJob(logger, client, cfg, jobID, job, definition)
case <-quit:
logger.Info().Msg("shutting down")
return
}
}
}
func loadConfig(path string) (*RunnerConfig, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("reading config: %w", err)
}
data = []byte(os.ExpandEnv(string(data)))
var cfg RunnerConfig
if err := yaml.Unmarshal(data, &cfg); err != nil {
return nil, fmt.Errorf("parsing config: %w", err)
}
return &cfg, nil
}
func heartbeat(client *http.Client, cfg *RunnerConfig) error {
req, err := http.NewRequest("POST", cfg.ServerURL+"/api/runner/heartbeat", nil)
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+cfg.Token)
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("heartbeat: %d %s", resp.StatusCode, string(body))
}
return nil
}
func claimJob(client *http.Client, cfg *RunnerConfig) (map[string]any, map[string]any, error) {
req, err := http.NewRequest("POST", cfg.ServerURL+"/api/runner/claim", nil)
if err != nil {
return nil, nil, err
}
req.Header.Set("Authorization", "Bearer "+cfg.Token)
resp, err := client.Do(req)
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNoContent {
return nil, nil, nil // No jobs available
}
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, nil, fmt.Errorf("claim: %d %s", resp.StatusCode, string(body))
}
var result struct {
Job map[string]any `json:"job"`
Definition map[string]any `json:"definition"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, nil, fmt.Errorf("decoding claim response: %w", err)
}
return result.Job, result.Definition, nil
}
func startJob(client *http.Client, cfg *RunnerConfig, jobID string) error {
req, err := http.NewRequest("POST", cfg.ServerURL+"/api/runner/jobs/"+jobID+"/start", nil)
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+cfg.Token)
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("start: %d %s", resp.StatusCode, string(body))
}
return nil
}
func reportProgress(client *http.Client, cfg *RunnerConfig, jobID string, progress int, message string) {
body, _ := json.Marshal(map[string]any{
"progress": progress,
"message": message,
})
req, _ := http.NewRequest("PUT", cfg.ServerURL+"/api/runner/jobs/"+jobID+"/progress", bytes.NewReader(body))
req.Header.Set("Authorization", "Bearer "+cfg.Token)
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return
}
resp.Body.Close()
}
func completeJob(client *http.Client, cfg *RunnerConfig, jobID string, result map[string]any) error {
body, _ := json.Marshal(map[string]any{"result": result})
req, err := http.NewRequest("POST", cfg.ServerURL+"/api/runner/jobs/"+jobID+"/complete", bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+cfg.Token)
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
respBody, _ := io.ReadAll(resp.Body)
return fmt.Errorf("complete: %d %s", resp.StatusCode, string(respBody))
}
return nil
}
func failJob(client *http.Client, cfg *RunnerConfig, jobID string, errMsg string) {
body, _ := json.Marshal(map[string]string{"error": errMsg})
req, _ := http.NewRequest("POST", cfg.ServerURL+"/api/runner/jobs/"+jobID+"/fail", bytes.NewReader(body))
req.Header.Set("Authorization", "Bearer "+cfg.Token)
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return
}
resp.Body.Close()
}
func appendLog(client *http.Client, cfg *RunnerConfig, jobID, level, message string) {
body, _ := json.Marshal(map[string]string{
"level": level,
"message": message,
})
req, _ := http.NewRequest("POST", cfg.ServerURL+"/api/runner/jobs/"+jobID+"/log", bytes.NewReader(body))
req.Header.Set("Authorization", "Bearer "+cfg.Token)
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return
}
resp.Body.Close()
}
// executeJob dispatches the job based on its compute command.
// For now, this is a stub that demonstrates the lifecycle.
// Real execution will shell out to Headless Create with silo-mod.
func executeJob(logger zerolog.Logger, client *http.Client, cfg *RunnerConfig, jobID string, job, definition map[string]any) {
defName, _ := job["definition_name"].(string)
// Extract compute config from definition
var command string
if definition != nil {
if compute, ok := definition["compute"].(map[string]any); ok {
command, _ = compute["command"].(string)
}
}
appendLog(client, cfg, jobID, "info", fmt.Sprintf("starting execution: %s (command: %s)", defName, command))
reportProgress(client, cfg, jobID, 10, "preparing")
switch command {
case "create-validate", "create-export", "create-dag-extract", "create-thumbnail":
if cfg.CreatePath == "" {
failJob(client, cfg, jobID, "create_path not configured")
return
}
appendLog(client, cfg, jobID, "info", fmt.Sprintf("would execute: %s --console with silo-mod", cfg.CreatePath))
reportProgress(client, cfg, jobID, 50, "executing")
// TODO: Actual Create execution:
// 1. Download item file from Silo API
// 2. Shell out: create --console -e "from silo.runner import <entry>; <entry>(...)"
// 3. Parse output JSON
// 4. Upload results / sync DAG
// For now, complete with a placeholder result.
reportProgress(client, cfg, jobID, 90, "finalizing")
if err := completeJob(client, cfg, jobID, map[string]any{
"status": "placeholder",
"message": "Create execution not yet implemented - runner lifecycle verified",
"command": command,
}); err != nil {
logger.Error().Err(err).Str("job_id", jobID).Msg("failed to complete job")
} else {
logger.Info().Str("job_id", jobID).Msg("job completed (placeholder)")
}
default:
failJob(client, cfg, jobID, fmt.Sprintf("unknown compute command: %s", command))
logger.Warn().Str("job_id", jobID).Str("command", command).Msg("unknown compute command")
}
}