diff --git a/internal/api/dag_handlers.go b/internal/api/dag_handlers.go index 4e162d8..22b8b47 100644 --- a/internal/api/dag_handlers.go +++ b/internal/api/dag_handlers.go @@ -219,14 +219,13 @@ func (s *Server) HandleSyncDAG(w http.ResponseWriter, r *http.Request) { } // Publish SSE event - evData, _ := json.Marshal(map[string]any{ + s.broker.Publish("dag.updated", mustMarshal(map[string]any{ "item_id": item.ID, "part_number": item.PartNumber, "revision_number": req.RevisionNumber, "node_count": len(req.Nodes), "edge_count": len(req.Edges), - }) - s.broker.Publish("dag.updated", string(evData)) + })) writeJSON(w, http.StatusOK, map[string]any{ "synced": true, diff --git a/internal/api/job_handlers.go b/internal/api/job_handlers.go new file mode 100644 index 0000000..e48a41d --- /dev/null +++ b/internal/api/job_handlers.go @@ -0,0 +1,323 @@ +package api + +import ( + "encoding/json" + "net/http" + "strconv" + + "github.com/go-chi/chi/v5" + "github.com/kindredsystems/silo/internal/auth" + "github.com/kindredsystems/silo/internal/db" +) + +// HandleListJobs returns jobs filtered by status and/or item. +func (s *Server) HandleListJobs(w http.ResponseWriter, r *http.Request) { + status := r.URL.Query().Get("status") + itemID := r.URL.Query().Get("item_id") + + limit := 50 + if v := r.URL.Query().Get("limit"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 && n <= 200 { + limit = n + } + } + offset := 0 + if v := r.URL.Query().Get("offset"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n >= 0 { + offset = n + } + } + + jobs, err := s.jobs.ListJobs(r.Context(), status, itemID, limit, offset) + if err != nil { + s.logger.Error().Err(err).Msg("failed to list jobs") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to list jobs") + return + } + + writeJSON(w, http.StatusOK, jobs) +} + +// HandleGetJob returns a single job by ID. +func (s *Server) HandleGetJob(w http.ResponseWriter, r *http.Request) { + jobID := chi.URLParam(r, "jobID") + + job, err := s.jobs.GetJob(r.Context(), jobID) + if err != nil { + s.logger.Error().Err(err).Msg("failed to get job") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get job") + return + } + if job == nil { + writeError(w, http.StatusNotFound, "not_found", "Job not found") + return + } + + writeJSON(w, http.StatusOK, job) +} + +// HandleGetJobLogs returns log entries for a job. +func (s *Server) HandleGetJobLogs(w http.ResponseWriter, r *http.Request) { + jobID := chi.URLParam(r, "jobID") + + logs, err := s.jobs.GetJobLogs(r.Context(), jobID) + if err != nil { + s.logger.Error().Err(err).Msg("failed to get job logs") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get job logs") + return + } + + writeJSON(w, http.StatusOK, logs) +} + +// HandleCreateJob manually triggers a job. +func (s *Server) HandleCreateJob(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + user := auth.UserFromContext(ctx) + + var req struct { + DefinitionName string `json:"definition_name"` + ItemID *string `json:"item_id,omitempty"` + ProjectID *string `json:"project_id,omitempty"` + ScopeMetadata map[string]any `json:"scope_metadata,omitempty"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body") + return + } + + if req.DefinitionName == "" { + writeError(w, http.StatusBadRequest, "missing_field", "definition_name is required") + return + } + + // Look up definition + def, err := s.jobs.GetDefinition(ctx, req.DefinitionName) + if err != nil { + s.logger.Error().Err(err).Msg("failed to look up job definition") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to look up definition") + return + } + if def == nil { + writeError(w, http.StatusNotFound, "not_found", "Job definition not found: "+req.DefinitionName) + return + } + + var createdBy *string + if user != nil { + createdBy = &user.Username + } + + job := &db.Job{ + JobDefinitionID: &def.ID, + DefinitionName: def.Name, + Priority: def.Priority, + ItemID: req.ItemID, + ProjectID: req.ProjectID, + ScopeMetadata: req.ScopeMetadata, + RunnerTags: def.RunnerTags, + TimeoutSeconds: def.TimeoutSeconds, + MaxRetries: def.MaxRetries, + CreatedBy: createdBy, + } + + if err := s.jobs.CreateJob(ctx, job); err != nil { + s.logger.Error().Err(err).Msg("failed to create job") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to create job") + return + } + + s.broker.Publish("job.created", mustMarshal(map[string]any{ + "job_id": job.ID, + "definition_name": job.DefinitionName, + "item_id": job.ItemID, + })) + + writeJSON(w, http.StatusCreated, job) +} + +// HandleCancelJob cancels a pending or active job. +func (s *Server) HandleCancelJob(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + jobID := chi.URLParam(r, "jobID") + user := auth.UserFromContext(ctx) + + cancelledBy := "system" + if user != nil { + cancelledBy = user.Username + } + + if err := s.jobs.CancelJob(ctx, jobID, cancelledBy); err != nil { + writeError(w, http.StatusBadRequest, "cancel_failed", err.Error()) + return + } + + s.broker.Publish("job.cancelled", mustMarshal(map[string]any{ + "job_id": jobID, + "cancelled_by": cancelledBy, + })) + + writeJSON(w, http.StatusOK, map[string]string{"status": "cancelled"}) +} + +// HandleListJobDefinitions returns all loaded job definitions. +func (s *Server) HandleListJobDefinitions(w http.ResponseWriter, r *http.Request) { + defs, err := s.jobs.ListDefinitions(r.Context()) + if err != nil { + s.logger.Error().Err(err).Msg("failed to list job definitions") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to list definitions") + return + } + + writeJSON(w, http.StatusOK, defs) +} + +// HandleGetJobDefinition returns a single job definition by name. +func (s *Server) HandleGetJobDefinition(w http.ResponseWriter, r *http.Request) { + name := chi.URLParam(r, "name") + + def, err := s.jobs.GetDefinition(r.Context(), name) + if err != nil { + s.logger.Error().Err(err).Msg("failed to get job definition") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get definition") + return + } + if def == nil { + writeError(w, http.StatusNotFound, "not_found", "Job definition not found") + return + } + + writeJSON(w, http.StatusOK, def) +} + +// HandleReloadJobDefinitions re-reads YAML files from disk and upserts them. +func (s *Server) HandleReloadJobDefinitions(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + if s.jobDefsDir == "" { + writeError(w, http.StatusBadRequest, "no_directory", "Job definitions directory not configured") + return + } + + defs, err := loadAndUpsertJobDefs(ctx, s.jobDefsDir, s.jobs) + if err != nil { + s.logger.Error().Err(err).Msg("failed to reload job definitions") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to reload definitions") + return + } + + // Update in-memory map + s.jobDefs = defs + + writeJSON(w, http.StatusOK, map[string]any{ + "reloaded": len(defs), + }) +} + +// HandleListRunners returns all registered runners (admin). +func (s *Server) HandleListRunners(w http.ResponseWriter, r *http.Request) { + runners, err := s.jobs.ListRunners(r.Context()) + if err != nil { + s.logger.Error().Err(err).Msg("failed to list runners") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to list runners") + return + } + + // Redact token hashes from response + type runnerResponse struct { + ID string `json:"id"` + Name string `json:"name"` + TokenPrefix string `json:"token_prefix"` + Tags []string `json:"tags"` + Status string `json:"status"` + LastHeartbeat *string `json:"last_heartbeat,omitempty"` + LastJobID *string `json:"last_job_id,omitempty"` + Metadata map[string]any `json:"metadata,omitempty"` + CreatedAt string `json:"created_at"` + } + + resp := make([]runnerResponse, len(runners)) + for i, runner := range runners { + var hb *string + if runner.LastHeartbeat != nil { + s := runner.LastHeartbeat.Format("2006-01-02T15:04:05Z07:00") + hb = &s + } + resp[i] = runnerResponse{ + ID: runner.ID, + Name: runner.Name, + TokenPrefix: runner.TokenPrefix, + Tags: runner.Tags, + Status: runner.Status, + LastHeartbeat: hb, + LastJobID: runner.LastJobID, + Metadata: runner.Metadata, + CreatedAt: runner.CreatedAt.Format("2006-01-02T15:04:05Z07:00"), + } + } + + writeJSON(w, http.StatusOK, resp) +} + +// HandleRegisterRunner creates a new runner and returns the token (admin). +func (s *Server) HandleRegisterRunner(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + var req struct { + Name string `json:"name"` + Tags []string `json:"tags"` + Metadata map[string]any `json:"metadata,omitempty"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body") + return + } + if req.Name == "" { + writeError(w, http.StatusBadRequest, "missing_field", "name is required") + return + } + if len(req.Tags) == 0 { + writeError(w, http.StatusBadRequest, "missing_field", "tags is required (at least one)") + return + } + + rawToken, tokenHash, tokenPrefix := generateRunnerToken() + + runner := &db.Runner{ + Name: req.Name, + TokenHash: tokenHash, + TokenPrefix: tokenPrefix, + Tags: req.Tags, + Metadata: req.Metadata, + } + + if err := s.jobs.RegisterRunner(ctx, runner); err != nil { + s.logger.Error().Err(err).Msg("failed to register runner") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to register runner") + return + } + + s.broker.Publish("runner.online", mustMarshal(map[string]any{ + "runner_id": runner.ID, + "name": runner.Name, + })) + + writeJSON(w, http.StatusCreated, map[string]any{ + "id": runner.ID, + "name": runner.Name, + "token": rawToken, + "tags": runner.Tags, + }) +} + +// HandleDeleteRunner removes a runner (admin). +func (s *Server) HandleDeleteRunner(w http.ResponseWriter, r *http.Request) { + runnerID := chi.URLParam(r, "runnerID") + + if err := s.jobs.DeleteRunner(r.Context(), runnerID); err != nil { + writeError(w, http.StatusNotFound, "not_found", err.Error()) + return + } + + w.WriteHeader(http.StatusNoContent) +} diff --git a/internal/api/routes.go b/internal/api/routes.go index e0057ad..3aa4181 100644 --- a/internal/api/routes.go +++ b/internal/api/routes.go @@ -150,6 +150,11 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler { r.Get("/bom/export.csv", server.HandleExportBOMCSV) r.Get("/bom/export.ods", server.HandleExportBOMODS) + // DAG (read: viewer, write: editor) + r.Get("/dag", server.HandleGetDAG) + r.Get("/dag/forward-cone/{nodeKey}", server.HandleGetForwardCone) + r.Get("/dag/dirty", server.HandleGetDirtySubgraph) + r.Group(func(r chi.Router) { r.Use(server.RequireWritable) r.Use(server.RequireRole(auth.RoleEditor)) @@ -169,6 +174,8 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler { r.Post("/bom/merge", server.HandleMergeBOM) r.Put("/bom/{childPartNumber}", server.HandleUpdateBOMEntry) r.Delete("/bom/{childPartNumber}", server.HandleDeleteBOMEntry) + r.Put("/dag", server.HandleSyncDAG) + r.Post("/dag/mark-dirty/{nodeKey}", server.HandleMarkDirty) }) }) }) @@ -201,6 +208,39 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler { r.Post("/sheets/diff", server.HandleSheetDiff) }) + // Jobs (read: viewer, write: editor) + r.Route("/jobs", func(r chi.Router) { + r.Get("/", server.HandleListJobs) + r.Get("/{jobID}", server.HandleGetJob) + r.Get("/{jobID}/logs", server.HandleGetJobLogs) + + r.Group(func(r chi.Router) { + r.Use(server.RequireWritable) + r.Use(server.RequireRole(auth.RoleEditor)) + r.Post("/", server.HandleCreateJob) + r.Post("/{jobID}/cancel", server.HandleCancelJob) + }) + }) + + // Job definitions (read: viewer, reload: admin) + r.Route("/job-definitions", func(r chi.Router) { + r.Get("/", server.HandleListJobDefinitions) + r.Get("/{name}", server.HandleGetJobDefinition) + + r.Group(func(r chi.Router) { + r.Use(server.RequireRole(auth.RoleAdmin)) + r.Post("/reload", server.HandleReloadJobDefinitions) + }) + }) + + // Runners (admin) + r.Route("/runners", func(r chi.Router) { + r.Use(server.RequireRole(auth.RoleAdmin)) + r.Get("/", server.HandleListRunners) + r.Post("/", server.HandleRegisterRunner) + r.Delete("/{runnerID}", server.HandleDeleteRunner) + }) + // Part number generation (editor) r.Group(func(r chi.Router) { r.Use(server.RequireWritable) @@ -209,6 +249,19 @@ func NewRouter(server *Server, logger zerolog.Logger) http.Handler { }) }) + // Runner-facing API (runner token auth, not user auth) + r.Route("/api/runner", func(r chi.Router) { + r.Use(server.RequireRunnerAuth) + r.Post("/heartbeat", server.HandleRunnerHeartbeat) + r.Post("/claim", server.HandleRunnerClaim) + r.Post("/jobs/{jobID}/start", server.HandleRunnerStartJob) + r.Put("/jobs/{jobID}/progress", server.HandleRunnerUpdateProgress) + r.Post("/jobs/{jobID}/complete", server.HandleRunnerCompleteJob) + r.Post("/jobs/{jobID}/fail", server.HandleRunnerFailJob) + r.Post("/jobs/{jobID}/log", server.HandleRunnerAppendLog) + r.Put("/jobs/{jobID}/dag", server.HandleRunnerSyncDAG) + }) + // React SPA — serve from web/dist at root, fallback to index.html if info, err := os.Stat("web/dist"); err == nil && info.IsDir() { spa := http.FileServerFS(os.DirFS("web/dist")) diff --git a/internal/api/runner_handlers.go b/internal/api/runner_handlers.go new file mode 100644 index 0000000..2d659b7 --- /dev/null +++ b/internal/api/runner_handlers.go @@ -0,0 +1,385 @@ +package api + +import ( + "context" + "crypto/rand" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "net/http" + + "github.com/go-chi/chi/v5" + "github.com/kindredsystems/silo/internal/auth" + "github.com/kindredsystems/silo/internal/db" + "github.com/kindredsystems/silo/internal/jobdef" +) + +// HandleRunnerHeartbeat updates the runner's heartbeat timestamp. +func (s *Server) HandleRunnerHeartbeat(w http.ResponseWriter, r *http.Request) { + runner := auth.RunnerFromContext(r.Context()) + if runner == nil { + writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required") + return + } + + // Heartbeat already updated by RequireRunnerAuth middleware + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) +} + +// HandleRunnerClaim claims the next available job matching the runner's tags. +func (s *Server) HandleRunnerClaim(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + runner := auth.RunnerFromContext(ctx) + if runner == nil { + writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required") + return + } + + job, err := s.jobs.ClaimJob(ctx, runner.ID, runner.Tags) + if err != nil { + s.logger.Error().Err(err).Msg("failed to claim job") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to claim job") + return + } + if job == nil { + writeJSON(w, http.StatusNoContent, nil) + return + } + + // Look up the full definition to send to the runner + var defPayload map[string]any + if job.JobDefinitionID != nil { + rec, err := s.jobs.GetDefinitionByID(ctx, *job.JobDefinitionID) + if err == nil && rec != nil { + defPayload = rec.Definition + } + } + + s.broker.Publish("job.claimed", mustMarshal(map[string]any{ + "job_id": job.ID, + "runner_id": runner.ID, + "runner": runner.Name, + })) + + writeJSON(w, http.StatusOK, map[string]any{ + "job": job, + "definition": defPayload, + }) +} + +// HandleRunnerStartJob transitions a claimed job to running. +func (s *Server) HandleRunnerStartJob(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + runner := auth.RunnerFromContext(ctx) + if runner == nil { + writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required") + return + } + + jobID := chi.URLParam(r, "jobID") + if err := s.jobs.StartJob(ctx, jobID, runner.ID); err != nil { + writeError(w, http.StatusBadRequest, "start_failed", err.Error()) + return + } + + writeJSON(w, http.StatusOK, map[string]string{"status": "running"}) +} + +// HandleRunnerUpdateProgress updates a running job's progress. +func (s *Server) HandleRunnerUpdateProgress(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + runner := auth.RunnerFromContext(ctx) + if runner == nil { + writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required") + return + } + + jobID := chi.URLParam(r, "jobID") + var req struct { + Progress int `json:"progress"` + Message string `json:"message,omitempty"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body") + return + } + + if err := s.jobs.UpdateProgress(ctx, jobID, runner.ID, req.Progress, req.Message); err != nil { + writeError(w, http.StatusBadRequest, "update_failed", err.Error()) + return + } + + s.broker.Publish("job.progress", mustMarshal(map[string]any{ + "job_id": jobID, + "progress": req.Progress, + "message": req.Message, + })) + + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) +} + +// HandleRunnerCompleteJob marks a job as completed. +func (s *Server) HandleRunnerCompleteJob(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + runner := auth.RunnerFromContext(ctx) + if runner == nil { + writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required") + return + } + + jobID := chi.URLParam(r, "jobID") + var req struct { + Result map[string]any `json:"result,omitempty"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body") + return + } + + if err := s.jobs.CompleteJob(ctx, jobID, runner.ID, req.Result); err != nil { + writeError(w, http.StatusBadRequest, "complete_failed", err.Error()) + return + } + + s.broker.Publish("job.completed", mustMarshal(map[string]any{ + "job_id": jobID, + "runner_id": runner.ID, + })) + + writeJSON(w, http.StatusOK, map[string]string{"status": "completed"}) +} + +// HandleRunnerFailJob marks a job as failed. +func (s *Server) HandleRunnerFailJob(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + runner := auth.RunnerFromContext(ctx) + if runner == nil { + writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required") + return + } + + jobID := chi.URLParam(r, "jobID") + var req struct { + Error string `json:"error"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body") + return + } + + if err := s.jobs.FailJob(ctx, jobID, runner.ID, req.Error); err != nil { + writeError(w, http.StatusBadRequest, "fail_failed", err.Error()) + return + } + + s.broker.Publish("job.failed", mustMarshal(map[string]any{ + "job_id": jobID, + "runner_id": runner.ID, + "error": req.Error, + })) + + writeJSON(w, http.StatusOK, map[string]string{"status": "failed"}) +} + +// HandleRunnerAppendLog appends a log entry to a job. +func (s *Server) HandleRunnerAppendLog(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + runner := auth.RunnerFromContext(ctx) + if runner == nil { + writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required") + return + } + + jobID := chi.URLParam(r, "jobID") + var req struct { + Level string `json:"level"` + Message string `json:"message"` + Metadata map[string]any `json:"metadata,omitempty"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body") + return + } + + if req.Level == "" { + req.Level = "info" + } + + entry := &db.JobLogEntry{ + JobID: jobID, + Level: req.Level, + Message: req.Message, + Metadata: req.Metadata, + } + if err := s.jobs.AppendLog(ctx, entry); err != nil { + s.logger.Error().Err(err).Msg("failed to append job log") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to append log") + return + } + + writeJSON(w, http.StatusCreated, entry) +} + +// HandleRunnerSyncDAG allows a runner to push DAG results for a job's item. +func (s *Server) HandleRunnerSyncDAG(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + runner := auth.RunnerFromContext(ctx) + if runner == nil { + writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required") + return + } + + jobID := chi.URLParam(r, "jobID") + + // Get the job to find the item + job, err := s.jobs.GetJob(ctx, jobID) + if err != nil || job == nil { + writeError(w, http.StatusNotFound, "not_found", "Job not found") + return + } + if job.ItemID == nil { + writeError(w, http.StatusBadRequest, "no_item", "Job has no associated item") + return + } + + // Delegate to the DAG sync handler logic + var req dagSyncRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body") + return + } + + if req.RevisionNumber == 0 { + // Look up current revision + item, err := s.items.GetByID(ctx, *job.ItemID) + if err != nil || item == nil { + writeError(w, http.StatusNotFound, "not_found", "Item not found") + return + } + req.RevisionNumber = item.CurrentRevision + } + + // Convert and sync nodes + nodes := make([]db.DAGNode, len(req.Nodes)) + for i, n := range req.Nodes { + state := n.ValidationState + if state == "" { + state = "clean" + } + nodes[i] = db.DAGNode{ + NodeKey: n.NodeKey, + NodeType: n.NodeType, + PropertiesHash: n.PropertiesHash, + ValidationState: state, + Metadata: n.Metadata, + } + } + + if err := s.dag.SyncFeatureTree(ctx, *job.ItemID, req.RevisionNumber, nodes, nil); err != nil { + s.logger.Error().Err(err).Msg("failed to sync DAG from runner") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to sync DAG") + return + } + + // Build key→ID map and sync edges + keyToID := make(map[string]string, len(nodes)) + for _, n := range nodes { + keyToID[n.NodeKey] = n.ID + } + + if len(req.Edges) > 0 { + if err := s.dag.DeleteEdgesForItem(ctx, *job.ItemID, req.RevisionNumber); err != nil { + s.logger.Error().Err(err).Msg("failed to delete old edges") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to sync DAG edges") + return + } + for _, e := range req.Edges { + sourceID, ok := keyToID[e.SourceKey] + if !ok { + continue + } + targetID, ok := keyToID[e.TargetKey] + if !ok { + continue + } + edgeType := e.EdgeType + if edgeType == "" { + edgeType = "depends_on" + } + edge := &db.DAGEdge{ + SourceNodeID: sourceID, + TargetNodeID: targetID, + EdgeType: edgeType, + Metadata: e.Metadata, + } + if err := s.dag.CreateEdge(ctx, edge); err != nil { + s.logger.Error().Err(err).Msg("failed to create edge from runner") + } + } + } + + s.broker.Publish("dag.updated", mustMarshal(map[string]any{ + "item_id": *job.ItemID, + "job_id": jobID, + "runner": runner.Name, + "node_count": len(req.Nodes), + "edge_count": len(req.Edges), + })) + + writeJSON(w, http.StatusOK, map[string]any{ + "synced": true, + "node_count": len(req.Nodes), + "edge_count": len(req.Edges), + }) +} + +// generateRunnerToken creates a new runner token. Returns raw token, hash, and prefix. +func generateRunnerToken() (raw, hash, prefix string) { + rawBytes := make([]byte, 32) + if _, err := rand.Read(rawBytes); err != nil { + panic(fmt.Sprintf("generating random bytes: %v", err)) + } + + raw = "silo_runner_" + hex.EncodeToString(rawBytes) + + h := sha256.Sum256([]byte(raw)) + hash = hex.EncodeToString(h[:]) + + prefix = raw[:20] // "silo_runner_" + first 8 hex chars + + return +} + +// loadAndUpsertJobDefs loads YAML definitions from a directory and upserts them into the database. +func loadAndUpsertJobDefs(ctx context.Context, dir string, repo *db.JobRepository) (map[string]*jobdef.Definition, error) { + defs, err := jobdef.LoadAll(dir) + if err != nil { + return nil, fmt.Errorf("loading job definitions: %w", err) + } + + for _, def := range defs { + defJSON, _ := json.Marshal(def) + var defMap map[string]any + json.Unmarshal(defJSON, &defMap) + + rec := &db.JobDefinitionRecord{ + Name: def.Name, + Version: def.Version, + TriggerType: def.Trigger.Type, + ScopeType: def.Scope.Type, + ComputeType: def.Compute.Type, + RunnerTags: def.Runner.Tags, + TimeoutSeconds: def.Timeout, + MaxRetries: def.MaxRetries, + Priority: def.Priority, + Definition: defMap, + Enabled: true, + } + if err := repo.UpsertDefinition(ctx, rec); err != nil { + return nil, fmt.Errorf("upserting definition %s: %w", def.Name, err) + } + } + + return defs, nil +}