Add server-side solver service module with REST API endpoints, database
schema, job definitions, and runner result caching.
New files:
- migrations/021_solver_results.sql: solver_results table with upsert constraint
- internal/db/solver_results.go: SolverResultRepository (Upsert, GetByItem, GetByItemRevision)
- internal/api/solver_handlers.go: solver API handlers and maybeCacheSolverResult hook
- jobdefs/assembly-solve.yaml: manual solve job definition
- jobdefs/assembly-validate.yaml: auto-validate on revision creation
- jobdefs/assembly-kinematic.yaml: manual kinematic simulation job
Modified:
- internal/config/config.go: SolverConfig struct with max_context_size_mb, default_timeout
- internal/modules/modules.go, loader.go: register solver module (depends on jobs)
- internal/db/jobs.go: ListSolverJobs helper with definition_name prefix filter
- internal/api/handlers.go: wire SolverResultRepository into Server
- internal/api/routes.go: /api/solver/* routes + /api/items/{partNumber}/solver/results
- internal/api/runner_handlers.go: async result cache hook on job completion
API endpoints:
- POST /api/solver/jobs — submit solver job (editor)
- GET /api/solver/jobs — list solver jobs with filters
- GET /api/solver/jobs/{id} — get solver job status
- POST /api/solver/jobs/{id}/cancel — cancel solver job (editor)
- GET /api/solver/solvers — registry of available solvers
- GET /api/items/{pn}/solver/results — cached results for item
Also fixes pre-existing test compilation errors (missing workflows param
in NewServer calls across 6 test files).
389 lines
11 KiB
Go
389 lines
11 KiB
Go
package api
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
"github.com/kindredsystems/silo/internal/auth"
|
|
"github.com/kindredsystems/silo/internal/db"
|
|
"github.com/kindredsystems/silo/internal/jobdef"
|
|
)
|
|
|
|
// HandleRunnerHeartbeat updates the runner's heartbeat timestamp.
|
|
func (s *Server) HandleRunnerHeartbeat(w http.ResponseWriter, r *http.Request) {
|
|
runner := auth.RunnerFromContext(r.Context())
|
|
if runner == nil {
|
|
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
|
|
return
|
|
}
|
|
|
|
// Heartbeat already updated by RequireRunnerAuth middleware
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
|
}
|
|
|
|
// HandleRunnerClaim claims the next available job matching the runner's tags.
|
|
func (s *Server) HandleRunnerClaim(w http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
runner := auth.RunnerFromContext(ctx)
|
|
if runner == nil {
|
|
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
|
|
return
|
|
}
|
|
|
|
job, err := s.jobs.ClaimJob(ctx, runner.ID, runner.Tags)
|
|
if err != nil {
|
|
s.logger.Error().Err(err).Msg("failed to claim job")
|
|
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to claim job")
|
|
return
|
|
}
|
|
if job == nil {
|
|
writeJSON(w, http.StatusNoContent, nil)
|
|
return
|
|
}
|
|
|
|
// Look up the full definition to send to the runner
|
|
var defPayload map[string]any
|
|
if job.JobDefinitionID != nil {
|
|
rec, err := s.jobs.GetDefinitionByID(ctx, *job.JobDefinitionID)
|
|
if err == nil && rec != nil {
|
|
defPayload = rec.Definition
|
|
}
|
|
}
|
|
|
|
s.broker.Publish("job.claimed", mustMarshal(map[string]any{
|
|
"job_id": job.ID,
|
|
"runner_id": runner.ID,
|
|
"runner": runner.Name,
|
|
}))
|
|
|
|
writeJSON(w, http.StatusOK, map[string]any{
|
|
"job": job,
|
|
"definition": defPayload,
|
|
})
|
|
}
|
|
|
|
// HandleRunnerStartJob transitions a claimed job to running.
|
|
func (s *Server) HandleRunnerStartJob(w http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
runner := auth.RunnerFromContext(ctx)
|
|
if runner == nil {
|
|
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
|
|
return
|
|
}
|
|
|
|
jobID := chi.URLParam(r, "jobID")
|
|
if err := s.jobs.StartJob(ctx, jobID, runner.ID); err != nil {
|
|
writeError(w, http.StatusBadRequest, "start_failed", err.Error())
|
|
return
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": "running"})
|
|
}
|
|
|
|
// HandleRunnerUpdateProgress updates a running job's progress.
|
|
func (s *Server) HandleRunnerUpdateProgress(w http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
runner := auth.RunnerFromContext(ctx)
|
|
if runner == nil {
|
|
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
|
|
return
|
|
}
|
|
|
|
jobID := chi.URLParam(r, "jobID")
|
|
var req struct {
|
|
Progress int `json:"progress"`
|
|
Message string `json:"message,omitempty"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body")
|
|
return
|
|
}
|
|
|
|
if err := s.jobs.UpdateProgress(ctx, jobID, runner.ID, req.Progress, req.Message); err != nil {
|
|
writeError(w, http.StatusBadRequest, "update_failed", err.Error())
|
|
return
|
|
}
|
|
|
|
s.broker.Publish("job.progress", mustMarshal(map[string]any{
|
|
"job_id": jobID,
|
|
"progress": req.Progress,
|
|
"message": req.Message,
|
|
}))
|
|
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
|
}
|
|
|
|
// HandleRunnerCompleteJob marks a job as completed.
|
|
func (s *Server) HandleRunnerCompleteJob(w http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
runner := auth.RunnerFromContext(ctx)
|
|
if runner == nil {
|
|
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
|
|
return
|
|
}
|
|
|
|
jobID := chi.URLParam(r, "jobID")
|
|
var req struct {
|
|
Result map[string]any `json:"result,omitempty"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body")
|
|
return
|
|
}
|
|
|
|
if err := s.jobs.CompleteJob(ctx, jobID, runner.ID, req.Result); err != nil {
|
|
writeError(w, http.StatusBadRequest, "complete_failed", err.Error())
|
|
return
|
|
}
|
|
|
|
// Cache solver results asynchronously (no-op for non-solver jobs).
|
|
go s.maybeCacheSolverResult(context.Background(), jobID)
|
|
|
|
s.broker.Publish("job.completed", mustMarshal(map[string]any{
|
|
"job_id": jobID,
|
|
"runner_id": runner.ID,
|
|
}))
|
|
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": "completed"})
|
|
}
|
|
|
|
// HandleRunnerFailJob marks a job as failed.
|
|
func (s *Server) HandleRunnerFailJob(w http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
runner := auth.RunnerFromContext(ctx)
|
|
if runner == nil {
|
|
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
|
|
return
|
|
}
|
|
|
|
jobID := chi.URLParam(r, "jobID")
|
|
var req struct {
|
|
Error string `json:"error"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body")
|
|
return
|
|
}
|
|
|
|
if err := s.jobs.FailJob(ctx, jobID, runner.ID, req.Error); err != nil {
|
|
writeError(w, http.StatusBadRequest, "fail_failed", err.Error())
|
|
return
|
|
}
|
|
|
|
s.broker.Publish("job.failed", mustMarshal(map[string]any{
|
|
"job_id": jobID,
|
|
"runner_id": runner.ID,
|
|
"error": req.Error,
|
|
}))
|
|
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": "failed"})
|
|
}
|
|
|
|
// HandleRunnerAppendLog appends a log entry to a job.
|
|
func (s *Server) HandleRunnerAppendLog(w http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
runner := auth.RunnerFromContext(ctx)
|
|
if runner == nil {
|
|
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
|
|
return
|
|
}
|
|
|
|
jobID := chi.URLParam(r, "jobID")
|
|
var req struct {
|
|
Level string `json:"level"`
|
|
Message string `json:"message"`
|
|
Metadata map[string]any `json:"metadata,omitempty"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body")
|
|
return
|
|
}
|
|
|
|
if req.Level == "" {
|
|
req.Level = "info"
|
|
}
|
|
|
|
entry := &db.JobLogEntry{
|
|
JobID: jobID,
|
|
Level: req.Level,
|
|
Message: req.Message,
|
|
Metadata: req.Metadata,
|
|
}
|
|
if err := s.jobs.AppendLog(ctx, entry); err != nil {
|
|
s.logger.Error().Err(err).Msg("failed to append job log")
|
|
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to append log")
|
|
return
|
|
}
|
|
|
|
writeJSON(w, http.StatusCreated, entry)
|
|
}
|
|
|
|
// HandleRunnerSyncDAG allows a runner to push DAG results for a job's item.
|
|
func (s *Server) HandleRunnerSyncDAG(w http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
runner := auth.RunnerFromContext(ctx)
|
|
if runner == nil {
|
|
writeError(w, http.StatusUnauthorized, "unauthorized", "Runner identity required")
|
|
return
|
|
}
|
|
|
|
jobID := chi.URLParam(r, "jobID")
|
|
|
|
// Get the job to find the item
|
|
job, err := s.jobs.GetJob(ctx, jobID)
|
|
if err != nil || job == nil {
|
|
writeError(w, http.StatusNotFound, "not_found", "Job not found")
|
|
return
|
|
}
|
|
if job.ItemID == nil {
|
|
writeError(w, http.StatusBadRequest, "no_item", "Job has no associated item")
|
|
return
|
|
}
|
|
|
|
// Delegate to the DAG sync handler logic
|
|
var req dagSyncRequest
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body")
|
|
return
|
|
}
|
|
|
|
if req.RevisionNumber == 0 {
|
|
// Look up current revision
|
|
item, err := s.items.GetByID(ctx, *job.ItemID)
|
|
if err != nil || item == nil {
|
|
writeError(w, http.StatusNotFound, "not_found", "Item not found")
|
|
return
|
|
}
|
|
req.RevisionNumber = item.CurrentRevision
|
|
}
|
|
|
|
// Convert and sync nodes
|
|
nodes := make([]db.DAGNode, len(req.Nodes))
|
|
for i, n := range req.Nodes {
|
|
state := n.ValidationState
|
|
if state == "" {
|
|
state = "clean"
|
|
}
|
|
nodes[i] = db.DAGNode{
|
|
NodeKey: n.NodeKey,
|
|
NodeType: n.NodeType,
|
|
PropertiesHash: n.PropertiesHash,
|
|
ValidationState: state,
|
|
Metadata: n.Metadata,
|
|
}
|
|
}
|
|
|
|
if err := s.dag.SyncFeatureTree(ctx, *job.ItemID, req.RevisionNumber, nodes, nil); err != nil {
|
|
s.logger.Error().Err(err).Msg("failed to sync DAG from runner")
|
|
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to sync DAG")
|
|
return
|
|
}
|
|
|
|
// Build key→ID map and sync edges
|
|
keyToID := make(map[string]string, len(nodes))
|
|
for _, n := range nodes {
|
|
keyToID[n.NodeKey] = n.ID
|
|
}
|
|
|
|
if len(req.Edges) > 0 {
|
|
if err := s.dag.DeleteEdgesForItem(ctx, *job.ItemID, req.RevisionNumber); err != nil {
|
|
s.logger.Error().Err(err).Msg("failed to delete old edges")
|
|
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to sync DAG edges")
|
|
return
|
|
}
|
|
for _, e := range req.Edges {
|
|
sourceID, ok := keyToID[e.SourceKey]
|
|
if !ok {
|
|
continue
|
|
}
|
|
targetID, ok := keyToID[e.TargetKey]
|
|
if !ok {
|
|
continue
|
|
}
|
|
edgeType := e.EdgeType
|
|
if edgeType == "" {
|
|
edgeType = "depends_on"
|
|
}
|
|
edge := &db.DAGEdge{
|
|
SourceNodeID: sourceID,
|
|
TargetNodeID: targetID,
|
|
EdgeType: edgeType,
|
|
Metadata: e.Metadata,
|
|
}
|
|
if err := s.dag.CreateEdge(ctx, edge); err != nil {
|
|
s.logger.Error().Err(err).Msg("failed to create edge from runner")
|
|
}
|
|
}
|
|
}
|
|
|
|
s.broker.Publish("dag.updated", mustMarshal(map[string]any{
|
|
"item_id": *job.ItemID,
|
|
"job_id": jobID,
|
|
"runner": runner.Name,
|
|
"node_count": len(req.Nodes),
|
|
"edge_count": len(req.Edges),
|
|
}))
|
|
|
|
writeJSON(w, http.StatusOK, map[string]any{
|
|
"synced": true,
|
|
"node_count": len(req.Nodes),
|
|
"edge_count": len(req.Edges),
|
|
})
|
|
}
|
|
|
|
// generateRunnerToken creates a new runner token. Returns raw token, hash, and prefix.
|
|
func generateRunnerToken() (raw, hash, prefix string) {
|
|
rawBytes := make([]byte, 32)
|
|
if _, err := rand.Read(rawBytes); err != nil {
|
|
panic(fmt.Sprintf("generating random bytes: %v", err))
|
|
}
|
|
|
|
raw = "silo_runner_" + hex.EncodeToString(rawBytes)
|
|
|
|
h := sha256.Sum256([]byte(raw))
|
|
hash = hex.EncodeToString(h[:])
|
|
|
|
prefix = raw[:20] // "silo_runner_" + first 8 hex chars
|
|
|
|
return
|
|
}
|
|
|
|
// loadAndUpsertJobDefs loads YAML definitions from a directory and upserts them into the database.
|
|
func loadAndUpsertJobDefs(ctx context.Context, dir string, repo *db.JobRepository) (map[string]*jobdef.Definition, error) {
|
|
defs, err := jobdef.LoadAll(dir)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("loading job definitions: %w", err)
|
|
}
|
|
|
|
for _, def := range defs {
|
|
defJSON, _ := json.Marshal(def)
|
|
var defMap map[string]any
|
|
json.Unmarshal(defJSON, &defMap)
|
|
|
|
rec := &db.JobDefinitionRecord{
|
|
Name: def.Name,
|
|
Version: def.Version,
|
|
TriggerType: def.Trigger.Type,
|
|
ScopeType: def.Scope.Type,
|
|
ComputeType: def.Compute.Type,
|
|
RunnerTags: def.Runner.Tags,
|
|
TimeoutSeconds: def.Timeout,
|
|
MaxRetries: def.MaxRetries,
|
|
Priority: def.Priority,
|
|
Definition: defMap,
|
|
Enabled: true,
|
|
}
|
|
if err := repo.UpsertDefinition(ctx, rec); err != nil {
|
|
return nil, fmt.Errorf("upserting definition %s: %w", def.Name, err)
|
|
}
|
|
}
|
|
|
|
return defs, nil
|
|
}
|