Files
silo/internal/api/runner_handlers.go

386 lines
11 KiB
Go

package api
import (
"context"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/kindredsystems/silo/internal/auth"
"github.com/kindredsystems/silo/internal/db"
"github.com/kindredsystems/silo/internal/jobdef"
)
// HandleRunnerHeartbeat updates the runner's heartbeat timestamp.
func (s *Server) HandleRunnerHeartbeat(w http.ResponseWriter, r *http.Request) {
runner := auth.RunnerFromContext(r.Context())
if runner == nil {
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
return
}
// Heartbeat already updated by RequireRunnerAuth middleware
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}
// HandleRunnerClaim claims the next available job matching the runner's tags.
func (s *Server) HandleRunnerClaim(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
runner := auth.RunnerFromContext(ctx)
if runner == nil {
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
return
}
job, err := s.jobs.ClaimJob(ctx, runner.ID, runner.Tags)
if err != nil {
s.logger.Error().Err(err).Msg("failed to claim job")
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to claim job")
return
}
if job == nil {
writeJSON(w, http.StatusNoContent, nil)
return
}
// Look up the full definition to send to the runner
var defPayload map[string]any
if job.JobDefinitionID != nil {
rec, err := s.jobs.GetDefinitionByID(ctx, *job.JobDefinitionID)
if err == nil && rec != nil {
defPayload = rec.Definition
}
}
s.broker.Publish("job.claimed", mustMarshal(map[string]any{
"job_id": job.ID,
"runner_id": runner.ID,
"runner": runner.Name,
}))
writeJSON(w, http.StatusOK, map[string]any{
"job": job,
"definition": defPayload,
})
}
// HandleRunnerStartJob transitions a claimed job to running.
func (s *Server) HandleRunnerStartJob(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
runner := auth.RunnerFromContext(ctx)
if runner == nil {
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
return
}
jobID := chi.URLParam(r, "jobID")
if err := s.jobs.StartJob(ctx, jobID, runner.ID); err != nil {
writeError(w, http.StatusBadRequest, "start_failed", err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "running"})
}
// HandleRunnerUpdateProgress updates a running job's progress.
func (s *Server) HandleRunnerUpdateProgress(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
runner := auth.RunnerFromContext(ctx)
if runner == nil {
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
return
}
jobID := chi.URLParam(r, "jobID")
var req struct {
Progress int `json:"progress"`
Message string `json:"message,omitempty"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body")
return
}
if err := s.jobs.UpdateProgress(ctx, jobID, runner.ID, req.Progress, req.Message); err != nil {
writeError(w, http.StatusBadRequest, "update_failed", err.Error())
return
}
s.broker.Publish("job.progress", mustMarshal(map[string]any{
"job_id": jobID,
"progress": req.Progress,
"message": req.Message,
}))
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}
// HandleRunnerCompleteJob marks a job as completed.
func (s *Server) HandleRunnerCompleteJob(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
runner := auth.RunnerFromContext(ctx)
if runner == nil {
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
return
}
jobID := chi.URLParam(r, "jobID")
var req struct {
Result map[string]any `json:"result,omitempty"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body")
return
}
if err := s.jobs.CompleteJob(ctx, jobID, runner.ID, req.Result); err != nil {
writeError(w, http.StatusBadRequest, "complete_failed", err.Error())
return
}
s.broker.Publish("job.completed", mustMarshal(map[string]any{
"job_id": jobID,
"runner_id": runner.ID,
}))
writeJSON(w, http.StatusOK, map[string]string{"status": "completed"})
}
// HandleRunnerFailJob marks a job as failed.
func (s *Server) HandleRunnerFailJob(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
runner := auth.RunnerFromContext(ctx)
if runner == nil {
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
return
}
jobID := chi.URLParam(r, "jobID")
var req struct {
Error string `json:"error"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body")
return
}
if err := s.jobs.FailJob(ctx, jobID, runner.ID, req.Error); err != nil {
writeError(w, http.StatusBadRequest, "fail_failed", err.Error())
return
}
s.broker.Publish("job.failed", mustMarshal(map[string]any{
"job_id": jobID,
"runner_id": runner.ID,
"error": req.Error,
}))
writeJSON(w, http.StatusOK, map[string]string{"status": "failed"})
}
// HandleRunnerAppendLog appends a log entry to a job.
func (s *Server) HandleRunnerAppendLog(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
runner := auth.RunnerFromContext(ctx)
if runner == nil {
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
return
}
jobID := chi.URLParam(r, "jobID")
var req struct {
Level string `json:"level"`
Message string `json:"message"`
Metadata map[string]any `json:"metadata,omitempty"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body")
return
}
if req.Level == "" {
req.Level = "info"
}
entry := &db.JobLogEntry{
JobID: jobID,
Level: req.Level,
Message: req.Message,
Metadata: req.Metadata,
}
if err := s.jobs.AppendLog(ctx, entry); err != nil {
s.logger.Error().Err(err).Msg("failed to append job log")
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to append log")
return
}
writeJSON(w, http.StatusCreated, entry)
}
// HandleRunnerSyncDAG allows a runner to push DAG results for a job's item.
func (s *Server) HandleRunnerSyncDAG(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
runner := auth.RunnerFromContext(ctx)
if runner == nil {
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
return
}
jobID := chi.URLParam(r, "jobID")
// Get the job to find the item
job, err := s.jobs.GetJob(ctx, jobID)
if err != nil || job == nil {
writeError(w, http.StatusNotFound, "not_found", "Job not found")
return
}
if job.ItemID == nil {
writeError(w, http.StatusBadRequest, "no_item", "Job has no associated item")
return
}
// Delegate to the DAG sync handler logic
var req dagSyncRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body")
return
}
if req.RevisionNumber == 0 {
// Look up current revision
item, err := s.items.GetByID(ctx, *job.ItemID)
if err != nil || item == nil {
writeError(w, http.StatusNotFound, "not_found", "Item not found")
return
}
req.RevisionNumber = item.CurrentRevision
}
// Convert and sync nodes
nodes := make([]db.DAGNode, len(req.Nodes))
for i, n := range req.Nodes {
state := n.ValidationState
if state == "" {
state = "clean"
}
nodes[i] = db.DAGNode{
NodeKey: n.NodeKey,
NodeType: n.NodeType,
PropertiesHash: n.PropertiesHash,
ValidationState: state,
Metadata: n.Metadata,
}
}
if err := s.dag.SyncFeatureTree(ctx, *job.ItemID, req.RevisionNumber, nodes, nil); err != nil {
s.logger.Error().Err(err).Msg("failed to sync DAG from runner")
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to sync DAG")
return
}
// Build key→ID map and sync edges
keyToID := make(map[string]string, len(nodes))
for _, n := range nodes {
keyToID[n.NodeKey] = n.ID
}
if len(req.Edges) > 0 {
if err := s.dag.DeleteEdgesForItem(ctx, *job.ItemID, req.RevisionNumber); err != nil {
s.logger.Error().Err(err).Msg("failed to delete old edges")
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to sync DAG edges")
return
}
for _, e := range req.Edges {
sourceID, ok := keyToID[e.SourceKey]
if !ok {
continue
}
targetID, ok := keyToID[e.TargetKey]
if !ok {
continue
}
edgeType := e.EdgeType
if edgeType == "" {
edgeType = "depends_on"
}
edge := &db.DAGEdge{
SourceNodeID: sourceID,
TargetNodeID: targetID,
EdgeType: edgeType,
Metadata: e.Metadata,
}
if err := s.dag.CreateEdge(ctx, edge); err != nil {
s.logger.Error().Err(err).Msg("failed to create edge from runner")
}
}
}
s.broker.Publish("dag.updated", mustMarshal(map[string]any{
"item_id": *job.ItemID,
"job_id": jobID,
"runner": runner.Name,
"node_count": len(req.Nodes),
"edge_count": len(req.Edges),
}))
writeJSON(w, http.StatusOK, map[string]any{
"synced": true,
"node_count": len(req.Nodes),
"edge_count": len(req.Edges),
})
}
// generateRunnerToken creates a new runner token. Returns raw token, hash, and prefix.
func generateRunnerToken() (raw, hash, prefix string) {
rawBytes := make([]byte, 32)
if _, err := rand.Read(rawBytes); err != nil {
panic(fmt.Sprintf("generating random bytes: %v", err))
}
raw = "silo_runner_" + hex.EncodeToString(rawBytes)
h := sha256.Sum256([]byte(raw))
hash = hex.EncodeToString(h[:])
prefix = raw[:20] // "silo_runner_" + first 8 hex chars
return
}
// loadAndUpsertJobDefs loads YAML definitions from a directory and upserts them into the database.
func loadAndUpsertJobDefs(ctx context.Context, dir string, repo *db.JobRepository) (map[string]*jobdef.Definition, error) {
defs, err := jobdef.LoadAll(dir)
if err != nil {
return nil, fmt.Errorf("loading job definitions: %w", err)
}
for _, def := range defs {
defJSON, _ := json.Marshal(def)
var defMap map[string]any
json.Unmarshal(defJSON, &defMap)
rec := &db.JobDefinitionRecord{
Name: def.Name,
Version: def.Version,
TriggerType: def.Trigger.Type,
ScopeType: def.Scope.Type,
ComputeType: def.Compute.Type,
RunnerTags: def.Runner.Tags,
TimeoutSeconds: def.Timeout,
MaxRetries: def.MaxRetries,
Priority: def.Priority,
Definition: defMap,
Enabled: true,
}
if err := repo.UpsertDefinition(ctx, rec); err != nil {
return nil, fmt.Errorf("upserting definition %s: %w", def.Name, err)
}
}
return defs, nil
}