main #111
@@ -219,14 +219,13 @@ func (s *Server) HandleSyncDAG(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// Publish SSE event
|
||||
evData, _ := json.Marshal(map[string]any{
|
||||
s.broker.Publish("dag.updated", mustMarshal(map[string]any{
|
||||
"item_id": item.ID,
|
||||
"part_number": item.PartNumber,
|
||||
"revision_number": req.RevisionNumber,
|
||||
"node_count": len(req.Nodes),
|
||||
"edge_count": len(req.Edges),
|
||||
})
|
||||
s.broker.Publish("dag.updated", string(evData))
|
||||
}))
|
||||
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"synced": true,
|
||||
|
||||
323
internal/api/job_handlers.go
Normal file
323
internal/api/job_handlers.go
Normal file
@@ -0,0 +1,323 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/kindredsystems/silo/internal/auth"
|
||||
"github.com/kindredsystems/silo/internal/db"
|
||||
)
|
||||
|
||||
// HandleListJobs returns jobs filtered by status and/or item.
|
||||
func (s *Server) HandleListJobs(w http.ResponseWriter, r *http.Request) {
|
||||
status := r.URL.Query().Get("status")
|
||||
itemID := r.URL.Query().Get("item_id")
|
||||
|
||||
limit := 50
|
||||
if v := r.URL.Query().Get("limit"); v != "" {
|
||||
if n, err := strconv.Atoi(v); err == nil && n > 0 && n <= 200 {
|
||||
limit = n
|
||||
}
|
||||
}
|
||||
offset := 0
|
||||
if v := r.URL.Query().Get("offset"); v != "" {
|
||||
if n, err := strconv.Atoi(v); err == nil && n >= 0 {
|
||||
offset = n
|
||||
}
|
||||
}
|
||||
|
||||
jobs, err := s.jobs.ListJobs(r.Context(), status, itemID, limit, offset)
|
||||
if err != nil {
|
||||
s.logger.Error().Err(err).Msg("failed to list jobs")
|
||||
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to list jobs")
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, jobs)
|
||||
}
|
||||
|
||||
// HandleGetJob returns a single job by ID.
|
||||
func (s *Server) HandleGetJob(w http.ResponseWriter, r *http.Request) {
|
||||
jobID := chi.URLParam(r, "jobID")
|
||||
|
||||
job, err := s.jobs.GetJob(r.Context(), jobID)
|
||||
if err != nil {
|
||||
s.logger.Error().Err(err).Msg("failed to get job")
|
||||
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get job")
|
||||
return
|
||||
}
|
||||
if job == nil {
|
||||
writeError(w, http.StatusNotFound, "not_found", "Job not found")
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, job)
|
||||
}
|
||||
|
||||
// HandleGetJobLogs returns log entries for a job.
|
||||
func (s *Server) HandleGetJobLogs(w http.ResponseWriter, r *http.Request) {
|
||||
jobID := chi.URLParam(r, "jobID")
|
||||
|
||||
logs, err := s.jobs.GetJobLogs(r.Context(), jobID)
|
||||
if err != nil {
|
||||
s.logger.Error().Err(err).Msg("failed to get job logs")
|
||||
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get job logs")
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, logs)
|
||||
}
|
||||
|
||||
// HandleCreateJob manually triggers a job.
|
||||
func (s *Server) HandleCreateJob(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
user := auth.UserFromContext(ctx)
|
||||
|
||||
var req struct {
|
||||
DefinitionName string `json:"definition_name"`
|
||||
ItemID *string `json:"item_id,omitempty"`
|
||||
ProjectID *string `json:"project_id,omitempty"`
|
||||
ScopeMetadata map[string]any `json:"scope_metadata,omitempty"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body")
|
||||
return
|
||||
}
|
||||
|
||||
if req.DefinitionName == "" {
|
||||
writeError(w, http.StatusBadRequest, "missing_field", "definition_name is required")
|
||||
return
|
||||
}
|
||||
|
||||
// Look up definition
|
||||
def, err := s.jobs.GetDefinition(ctx, req.DefinitionName)
|
||||
if err != nil {
|
||||
s.logger.Error().Err(err).Msg("failed to look up job definition")
|
||||
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to look up definition")
|
||||
return
|
||||
}
|
||||
if def == nil {
|
||||
writeError(w, http.StatusNotFound, "not_found", "Job definition not found: "+req.DefinitionName)
|
||||
return
|
||||
}
|
||||
|
||||
var createdBy *string
|
||||
if user != nil {
|
||||
createdBy = &user.Username
|
||||
}
|
||||
|
||||
job := &db.Job{
|
||||
JobDefinitionID: &def.ID,
|
||||
DefinitionName: def.Name,
|
||||
Priority: def.Priority,
|
||||
ItemID: req.ItemID,
|
||||
ProjectID: req.ProjectID,
|
||||
ScopeMetadata: req.ScopeMetadata,
|
||||
RunnerTags: def.RunnerTags,
|
||||
TimeoutSeconds: def.TimeoutSeconds,
|
||||
MaxRetries: def.MaxRetries,
|
||||
CreatedBy: createdBy,
|
||||
}
|
||||
|
||||
if err := s.jobs.CreateJob(ctx, job); err != nil {
|
||||
s.logger.Error().Err(err).Msg("failed to create job")
|
||||
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to create job")
|
||||
return
|
||||
}
|
||||
|
||||
s.broker.Publish("job.created", mustMarshal(map[string]any{
|
||||
"job_id": job.ID,
|
||||
"definition_name": job.DefinitionName,
|
||||
"item_id": job.ItemID,
|
||||
}))
|
||||
|
||||
writeJSON(w, http.StatusCreated, job)
|
||||
}
|
||||
|
||||
// HandleCancelJob cancels a pending or active job.
|
||||
func (s *Server) HandleCancelJob(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
jobID := chi.URLParam(r, "jobID")
|
||||
user := auth.UserFromContext(ctx)
|
||||
|
||||
cancelledBy := "system"
|
||||
if user != nil {
|
||||
cancelledBy = user.Username
|
||||
}
|
||||
|
||||
if err := s.jobs.CancelJob(ctx, jobID, cancelledBy); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "cancel_failed", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
s.broker.Publish("job.cancelled", mustMarshal(map[string]any{
|
||||
"job_id": jobID,
|
||||
"cancelled_by": cancelledBy,
|
||||
}))
|
||||
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "cancelled"})
|
||||
}
|
||||
|
||||
// HandleListJobDefinitions returns all loaded job definitions.
|
||||
func (s *Server) HandleListJobDefinitions(w http.ResponseWriter, r *http.Request) {
|
||||
defs, err := s.jobs.ListDefinitions(r.Context())
|
||||
if err != nil {
|
||||
s.logger.Error().Err(err).Msg("failed to list job definitions")
|
||||
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to list definitions")
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, defs)
|
||||
}
|
||||
|
||||
// HandleGetJobDefinition returns a single job definition by name.
|
||||
func (s *Server) HandleGetJobDefinition(w http.ResponseWriter, r *http.Request) {
|
||||
name := chi.URLParam(r, "name")
|
||||
|
||||
def, err := s.jobs.GetDefinition(r.Context(), name)
|
||||
if err != nil {
|
||||
s.logger.Error().Err(err).Msg("failed to get job definition")
|
||||
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get definition")
|
||||
return
|
||||
}
|
||||
if def == nil {
|
||||
writeError(w, http.StatusNotFound, "not_found", "Job definition not found")
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, def)
|
||||
}
|
||||
|
||||
// HandleReloadJobDefinitions re-reads YAML files from disk and upserts them.
|
||||
func (s *Server) HandleReloadJobDefinitions(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
if s.jobDefsDir == "" {
|
||||
writeError(w, http.StatusBadRequest, "no_directory", "Job definitions directory not configured")
|
||||
return
|
||||
}
|
||||
|
||||
defs, err := loadAndUpsertJobDefs(ctx, s.jobDefsDir, s.jobs)
|
||||
if err != nil {
|
||||
s.logger.Error().Err(err).Msg("failed to reload job definitions")
|
||||
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to reload definitions")
|
||||
return
|
||||
}
|
||||
|
||||
// Update in-memory map
|
||||
s.jobDefs = defs
|
||||
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"reloaded": len(defs),
|
||||
})
|
||||
}
|
||||
|
||||
// HandleListRunners returns all registered runners (admin).
|
||||
func (s *Server) HandleListRunners(w http.ResponseWriter, r *http.Request) {
|
||||
runners, err := s.jobs.ListRunners(r.Context())
|
||||
if err != nil {
|
||||
s.logger.Error().Err(err).Msg("failed to list runners")
|
||||
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to list runners")
|
||||
return
|
||||
}
|
||||
|
||||
// Redact token hashes from response
|
||||
type runnerResponse struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
TokenPrefix string `json:"token_prefix"`
|
||||
Tags []string `json:"tags"`
|
||||
Status string `json:"status"`
|
||||
LastHeartbeat *string `json:"last_heartbeat,omitempty"`
|
||||
LastJobID *string `json:"last_job_id,omitempty"`
|
||||
Metadata map[string]any `json:"metadata,omitempty"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
}
|
||||
|
||||
resp := make([]runnerResponse, len(runners))
|
||||
for i, runner := range runners {
|
||||
var hb *string
|
||||
if runner.LastHeartbeat != nil {
|
||||
s := runner.LastHeartbeat.Format("2006-01-02T15:04:05Z07:00")
|
||||
hb = &s
|
||||
}
|
||||
resp[i] = runnerResponse{
|
||||
ID: runner.ID,
|
||||
Name: runner.Name,
|
||||
TokenPrefix: runner.TokenPrefix,
|
||||
Tags: runner.Tags,
|
||||
Status: runner.Status,
|
||||
LastHeartbeat: hb,
|
||||
LastJobID: runner.LastJobID,
|
||||
Metadata: runner.Metadata,
|
||||
CreatedAt: runner.CreatedAt.Format("2006-01-02T15:04:05Z07:00"),
|
||||
}
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
// HandleRegisterRunner creates a new runner and returns the token (admin).
|
||||
func (s *Server) HandleRegisterRunner(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
var req struct {
|
||||
Name string `json:"name"`
|
||||
Tags []string `json:"tags"`
|
||||
Metadata map[string]any `json:"metadata,omitempty"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body")
|
||||
return
|
||||
}
|
||||
if req.Name == "" {
|
||||
writeError(w, http.StatusBadRequest, "missing_field", "name is required")
|
||||
return
|
||||
}
|
||||
if len(req.Tags) == 0 {
|
||||
writeError(w, http.StatusBadRequest, "missing_field", "tags is required (at least one)")
|
||||
return
|
||||
}
|
||||
|
||||
rawToken, tokenHash, tokenPrefix := generateRunnerToken()
|
||||
|
||||
runner := &db.Runner{
|
||||
Name: req.Name,
|
||||
TokenHash: tokenHash,
|
||||
TokenPrefix: tokenPrefix,
|
||||
Tags: req.Tags,
|
||||
Metadata: req.Metadata,
|
||||
}
|
||||
|
||||
if err := s.jobs.RegisterRunner(ctx, runner); err != nil {
|
||||
s.logger.Error().Err(err).Msg("failed to register runner")
|
||||
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to register runner")
|
||||
return
|
||||
}
|
||||
|
||||
s.broker.Publish("runner.online", mustMarshal(map[string]any{
|
||||
"runner_id": runner.ID,
|
||||
"name": runner.Name,
|
||||
}))
|
||||
|
||||
writeJSON(w, http.StatusCreated, map[string]any{
|
||||
"id": runner.ID,
|
||||
"name": runner.Name,
|
||||
"token": rawToken,
|
||||
"tags": runner.Tags,
|
||||
})
|
||||
}
|
||||
|
||||
// HandleDeleteRunner removes a runner (admin).
|
||||
func (s *Server) HandleDeleteRunner(w http.ResponseWriter, r *http.Request) {
|
||||
runnerID := chi.URLParam(r, "runnerID")
|
||||
|
||||
if err := s.jobs.DeleteRunner(r.Context(), runnerID); err != nil {
|
||||
writeError(w, http.StatusNotFound, "not_found", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
@@ -150,6 +150,11 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler {
|
||||
r.Get("/bom/export.csv", server.HandleExportBOMCSV)
|
||||
r.Get("/bom/export.ods", server.HandleExportBOMODS)
|
||||
|
||||
// DAG (read: viewer, write: editor)
|
||||
r.Get("/dag", server.HandleGetDAG)
|
||||
r.Get("/dag/forward-cone/{nodeKey}", server.HandleGetForwardCone)
|
||||
r.Get("/dag/dirty", server.HandleGetDirtySubgraph)
|
||||
|
||||
r.Group(func(r chi.Router) {
|
||||
r.Use(server.RequireWritable)
|
||||
r.Use(server.RequireRole(auth.RoleEditor))
|
||||
@@ -169,6 +174,8 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler {
|
||||
r.Post("/bom/merge", server.HandleMergeBOM)
|
||||
r.Put("/bom/{childPartNumber}", server.HandleUpdateBOMEntry)
|
||||
r.Delete("/bom/{childPartNumber}", server.HandleDeleteBOMEntry)
|
||||
r.Put("/dag", server.HandleSyncDAG)
|
||||
r.Post("/dag/mark-dirty/{nodeKey}", server.HandleMarkDirty)
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -201,6 +208,39 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler {
|
||||
r.Post("/sheets/diff", server.HandleSheetDiff)
|
||||
})
|
||||
|
||||
// Jobs (read: viewer, write: editor)
|
||||
r.Route("/jobs", func(r chi.Router) {
|
||||
r.Get("/", server.HandleListJobs)
|
||||
r.Get("/{jobID}", server.HandleGetJob)
|
||||
r.Get("/{jobID}/logs", server.HandleGetJobLogs)
|
||||
|
||||
r.Group(func(r chi.Router) {
|
||||
r.Use(server.RequireWritable)
|
||||
r.Use(server.RequireRole(auth.RoleEditor))
|
||||
r.Post("/", server.HandleCreateJob)
|
||||
r.Post("/{jobID}/cancel", server.HandleCancelJob)
|
||||
})
|
||||
})
|
||||
|
||||
// Job definitions (read: viewer, reload: admin)
|
||||
r.Route("/job-definitions", func(r chi.Router) {
|
||||
r.Get("/", server.HandleListJobDefinitions)
|
||||
r.Get("/{name}", server.HandleGetJobDefinition)
|
||||
|
||||
r.Group(func(r chi.Router) {
|
||||
r.Use(server.RequireRole(auth.RoleAdmin))
|
||||
r.Post("/reload", server.HandleReloadJobDefinitions)
|
||||
})
|
||||
})
|
||||
|
||||
// Runners (admin)
|
||||
r.Route("/runners", func(r chi.Router) {
|
||||
r.Use(server.RequireRole(auth.RoleAdmin))
|
||||
r.Get("/", server.HandleListRunners)
|
||||
r.Post("/", server.HandleRegisterRunner)
|
||||
r.Delete("/{runnerID}", server.HandleDeleteRunner)
|
||||
})
|
||||
|
||||
// Part number generation (editor)
|
||||
r.Group(func(r chi.Router) {
|
||||
r.Use(server.RequireWritable)
|
||||
@@ -209,6 +249,19 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler {
|
||||
})
|
||||
})
|
||||
|
||||
// Runner-facing API (runner token auth, not user auth)
|
||||
r.Route("/api/runner", func(r chi.Router) {
|
||||
r.Use(server.RequireRunnerAuth)
|
||||
r.Post("/heartbeat", server.HandleRunnerHeartbeat)
|
||||
r.Post("/claim", server.HandleRunnerClaim)
|
||||
r.Post("/jobs/{jobID}/start", server.HandleRunnerStartJob)
|
||||
r.Put("/jobs/{jobID}/progress", server.HandleRunnerUpdateProgress)
|
||||
r.Post("/jobs/{jobID}/complete", server.HandleRunnerCompleteJob)
|
||||
r.Post("/jobs/{jobID}/fail", server.HandleRunnerFailJob)
|
||||
r.Post("/jobs/{jobID}/log", server.HandleRunnerAppendLog)
|
||||
r.Put("/jobs/{jobID}/dag", server.HandleRunnerSyncDAG)
|
||||
})
|
||||
|
||||
// React SPA — serve from web/dist at root, fallback to index.html
|
||||
if info, err := os.Stat("web/dist"); err == nil && info.IsDir() {
|
||||
spa := http.FileServerFS(os.DirFS("web/dist"))
|
||||
|
||||
385
internal/api/runner_handlers.go
Normal file
385
internal/api/runner_handlers.go
Normal file
@@ -0,0 +1,385 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/kindredsystems/silo/internal/auth"
|
||||
"github.com/kindredsystems/silo/internal/db"
|
||||
"github.com/kindredsystems/silo/internal/jobdef"
|
||||
)
|
||||
|
||||
// HandleRunnerHeartbeat updates the runner's heartbeat timestamp.
|
||||
func (s *Server) HandleRunnerHeartbeat(w http.ResponseWriter, r *http.Request) {
|
||||
runner := auth.RunnerFromContext(r.Context())
|
||||
if runner == nil {
|
||||
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
|
||||
return
|
||||
}
|
||||
|
||||
// Heartbeat already updated by RequireRunnerAuth middleware
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
||||
}
|
||||
|
||||
// HandleRunnerClaim claims the next available job matching the runner's tags.
|
||||
func (s *Server) HandleRunnerClaim(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
runner := auth.RunnerFromContext(ctx)
|
||||
if runner == nil {
|
||||
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
|
||||
return
|
||||
}
|
||||
|
||||
job, err := s.jobs.ClaimJob(ctx, runner.ID, runner.Tags)
|
||||
if err != nil {
|
||||
s.logger.Error().Err(err).Msg("failed to claim job")
|
||||
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to claim job")
|
||||
return
|
||||
}
|
||||
if job == nil {
|
||||
writeJSON(w, http.StatusNoContent, nil)
|
||||
return
|
||||
}
|
||||
|
||||
// Look up the full definition to send to the runner
|
||||
var defPayload map[string]any
|
||||
if job.JobDefinitionID != nil {
|
||||
rec, err := s.jobs.GetDefinitionByID(ctx, *job.JobDefinitionID)
|
||||
if err == nil && rec != nil {
|
||||
defPayload = rec.Definition
|
||||
}
|
||||
}
|
||||
|
||||
s.broker.Publish("job.claimed", mustMarshal(map[string]any{
|
||||
"job_id": job.ID,
|
||||
"runner_id": runner.ID,
|
||||
"runner": runner.Name,
|
||||
}))
|
||||
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"job": job,
|
||||
"definition": defPayload,
|
||||
})
|
||||
}
|
||||
|
||||
// HandleRunnerStartJob transitions a claimed job to running.
|
||||
func (s *Server) HandleRunnerStartJob(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
runner := auth.RunnerFromContext(ctx)
|
||||
if runner == nil {
|
||||
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
|
||||
return
|
||||
}
|
||||
|
||||
jobID := chi.URLParam(r, "jobID")
|
||||
if err := s.jobs.StartJob(ctx, jobID, runner.ID); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "start_failed", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "running"})
|
||||
}
|
||||
|
||||
// HandleRunnerUpdateProgress updates a running job's progress.
|
||||
func (s *Server) HandleRunnerUpdateProgress(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
runner := auth.RunnerFromContext(ctx)
|
||||
if runner == nil {
|
||||
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
|
||||
return
|
||||
}
|
||||
|
||||
jobID := chi.URLParam(r, "jobID")
|
||||
var req struct {
|
||||
Progress int `json:"progress"`
|
||||
Message string `json:"message,omitempty"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body")
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.jobs.UpdateProgress(ctx, jobID, runner.ID, req.Progress, req.Message); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "update_failed", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
s.broker.Publish("job.progress", mustMarshal(map[string]any{
|
||||
"job_id": jobID,
|
||||
"progress": req.Progress,
|
||||
"message": req.Message,
|
||||
}))
|
||||
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
||||
}
|
||||
|
||||
// HandleRunnerCompleteJob marks a job as completed.
|
||||
func (s *Server) HandleRunnerCompleteJob(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
runner := auth.RunnerFromContext(ctx)
|
||||
if runner == nil {
|
||||
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
|
||||
return
|
||||
}
|
||||
|
||||
jobID := chi.URLParam(r, "jobID")
|
||||
var req struct {
|
||||
Result map[string]any `json:"result,omitempty"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body")
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.jobs.CompleteJob(ctx, jobID, runner.ID, req.Result); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "complete_failed", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
s.broker.Publish("job.completed", mustMarshal(map[string]any{
|
||||
"job_id": jobID,
|
||||
"runner_id": runner.ID,
|
||||
}))
|
||||
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "completed"})
|
||||
}
|
||||
|
||||
// HandleRunnerFailJob marks a job as failed.
|
||||
func (s *Server) HandleRunnerFailJob(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
runner := auth.RunnerFromContext(ctx)
|
||||
if runner == nil {
|
||||
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
|
||||
return
|
||||
}
|
||||
|
||||
jobID := chi.URLParam(r, "jobID")
|
||||
var req struct {
|
||||
Error string `json:"error"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body")
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.jobs.FailJob(ctx, jobID, runner.ID, req.Error); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "fail_failed", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
s.broker.Publish("job.failed", mustMarshal(map[string]any{
|
||||
"job_id": jobID,
|
||||
"runner_id": runner.ID,
|
||||
"error": req.Error,
|
||||
}))
|
||||
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "failed"})
|
||||
}
|
||||
|
||||
// HandleRunnerAppendLog appends a log entry to a job.
|
||||
func (s *Server) HandleRunnerAppendLog(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
runner := auth.RunnerFromContext(ctx)
|
||||
if runner == nil {
|
||||
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
|
||||
return
|
||||
}
|
||||
|
||||
jobID := chi.URLParam(r, "jobID")
|
||||
var req struct {
|
||||
Level string `json:"level"`
|
||||
Message string `json:"message"`
|
||||
Metadata map[string]any `json:"metadata,omitempty"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body")
|
||||
return
|
||||
}
|
||||
|
||||
if req.Level == "" {
|
||||
req.Level = "info"
|
||||
}
|
||||
|
||||
entry := &db.JobLogEntry{
|
||||
JobID: jobID,
|
||||
Level: req.Level,
|
||||
Message: req.Message,
|
||||
Metadata: req.Metadata,
|
||||
}
|
||||
if err := s.jobs.AppendLog(ctx, entry); err != nil {
|
||||
s.logger.Error().Err(err).Msg("failed to append job log")
|
||||
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to append log")
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusCreated, entry)
|
||||
}
|
||||
|
||||
// HandleRunnerSyncDAG allows a runner to push DAG results for a job's item.
|
||||
func (s *Server) HandleRunnerSyncDAG(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
runner := auth.RunnerFromContext(ctx)
|
||||
if runner == nil {
|
||||
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
|
||||
return
|
||||
}
|
||||
|
||||
jobID := chi.URLParam(r, "jobID")
|
||||
|
||||
// Get the job to find the item
|
||||
job, err := s.jobs.GetJob(ctx, jobID)
|
||||
if err != nil || job == nil {
|
||||
writeError(w, http.StatusNotFound, "not_found", "Job not found")
|
||||
return
|
||||
}
|
||||
if job.ItemID == nil {
|
||||
writeError(w, http.StatusBadRequest, "no_item", "Job has no associated item")
|
||||
return
|
||||
}
|
||||
|
||||
// Delegate to the DAG sync handler logic
|
||||
var req dagSyncRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body")
|
||||
return
|
||||
}
|
||||
|
||||
if req.RevisionNumber == 0 {
|
||||
// Look up current revision
|
||||
item, err := s.items.GetByID(ctx, *job.ItemID)
|
||||
if err != nil || item == nil {
|
||||
writeError(w, http.StatusNotFound, "not_found", "Item not found")
|
||||
return
|
||||
}
|
||||
req.RevisionNumber = item.CurrentRevision
|
||||
}
|
||||
|
||||
// Convert and sync nodes
|
||||
nodes := make([]db.DAGNode, len(req.Nodes))
|
||||
for i, n := range req.Nodes {
|
||||
state := n.ValidationState
|
||||
if state == "" {
|
||||
state = "clean"
|
||||
}
|
||||
nodes[i] = db.DAGNode{
|
||||
NodeKey: n.NodeKey,
|
||||
NodeType: n.NodeType,
|
||||
PropertiesHash: n.PropertiesHash,
|
||||
ValidationState: state,
|
||||
Metadata: n.Metadata,
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.dag.SyncFeatureTree(ctx, *job.ItemID, req.RevisionNumber, nodes, nil); err != nil {
|
||||
s.logger.Error().Err(err).Msg("failed to sync DAG from runner")
|
||||
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to sync DAG")
|
||||
return
|
||||
}
|
||||
|
||||
// Build key→ID map and sync edges
|
||||
keyToID := make(map[string]string, len(nodes))
|
||||
for _, n := range nodes {
|
||||
keyToID[n.NodeKey] = n.ID
|
||||
}
|
||||
|
||||
if len(req.Edges) > 0 {
|
||||
if err := s.dag.DeleteEdgesForItem(ctx, *job.ItemID, req.RevisionNumber); err != nil {
|
||||
s.logger.Error().Err(err).Msg("failed to delete old edges")
|
||||
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to sync DAG edges")
|
||||
return
|
||||
}
|
||||
for _, e := range req.Edges {
|
||||
sourceID, ok := keyToID[e.SourceKey]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
targetID, ok := keyToID[e.TargetKey]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
edgeType := e.EdgeType
|
||||
if edgeType == "" {
|
||||
edgeType = "depends_on"
|
||||
}
|
||||
edge := &db.DAGEdge{
|
||||
SourceNodeID: sourceID,
|
||||
TargetNodeID: targetID,
|
||||
EdgeType: edgeType,
|
||||
Metadata: e.Metadata,
|
||||
}
|
||||
if err := s.dag.CreateEdge(ctx, edge); err != nil {
|
||||
s.logger.Error().Err(err).Msg("failed to create edge from runner")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
s.broker.Publish("dag.updated", mustMarshal(map[string]any{
|
||||
"item_id": *job.ItemID,
|
||||
"job_id": jobID,
|
||||
"runner": runner.Name,
|
||||
"node_count": len(req.Nodes),
|
||||
"edge_count": len(req.Edges),
|
||||
}))
|
||||
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"synced": true,
|
||||
"node_count": len(req.Nodes),
|
||||
"edge_count": len(req.Edges),
|
||||
})
|
||||
}
|
||||
|
||||
// generateRunnerToken creates a new runner token. Returns raw token, hash, and prefix.
|
||||
func generateRunnerToken() (raw, hash, prefix string) {
|
||||
rawBytes := make([]byte, 32)
|
||||
if _, err := rand.Read(rawBytes); err != nil {
|
||||
panic(fmt.Sprintf("generating random bytes: %v", err))
|
||||
}
|
||||
|
||||
raw = "silo_runner_" + hex.EncodeToString(rawBytes)
|
||||
|
||||
h := sha256.Sum256([]byte(raw))
|
||||
hash = hex.EncodeToString(h[:])
|
||||
|
||||
prefix = raw[:20] // "silo_runner_" + first 8 hex chars
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// loadAndUpsertJobDefs loads YAML definitions from a directory and upserts them into the database.
|
||||
func loadAndUpsertJobDefs(ctx context.Context, dir string, repo *db.JobRepository) (map[string]*jobdef.Definition, error) {
|
||||
defs, err := jobdef.LoadAll(dir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("loading job definitions: %w", err)
|
||||
}
|
||||
|
||||
for _, def := range defs {
|
||||
defJSON, _ := json.Marshal(def)
|
||||
var defMap map[string]any
|
||||
json.Unmarshal(defJSON, &defMap)
|
||||
|
||||
rec := &db.JobDefinitionRecord{
|
||||
Name: def.Name,
|
||||
Version: def.Version,
|
||||
TriggerType: def.Trigger.Type,
|
||||
ScopeType: def.Scope.Type,
|
||||
ComputeType: def.Compute.Type,
|
||||
RunnerTags: def.Runner.Tags,
|
||||
TimeoutSeconds: def.Timeout,
|
||||
MaxRetries: def.MaxRetries,
|
||||
Priority: def.Priority,
|
||||
Definition: defMap,
|
||||
Enabled: true,
|
||||
}
|
||||
if err := repo.UpsertDefinition(ctx, rec); err != nil {
|
||||
return nil, fmt.Errorf("upserting definition %s: %w", def.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
return defs, nil
|
||||
}
|
||||
Reference in New Issue
Block a user