feat: dependency DAG and YAML-defined compute jobs #92

Merged
forbes merged 13 commits from feat-dag-workers into main 2026-02-14 19:27:19 +00:00
3 changed files with 62 additions and 0 deletions
Showing only changes of commit b6ac5133c3 - Show all commits

View File

@@ -1,6 +1,7 @@
package api
import (
"context"
"encoding/csv"
"encoding/json"
"fmt"
@@ -1219,6 +1220,9 @@ func (s *Server) HandleMergeBOM(w http.ResponseWriter, r *http.Request) {
"unreferenced": len(diff.Removed),
}))
// Trigger auto-jobs (e.g. assembly validation)
go s.triggerJobs(context.Background(), "bom_changed", parent.ID, parent)
writeJSON(w, http.StatusOK, resp)
}

View File

@@ -1489,6 +1489,9 @@ func (s *Server) HandleCreateRevision(w http.ResponseWriter, r *http.Request) {
"part_number": partNumber,
"revision_number": rev.RevisionNumber,
}))
// Trigger auto-jobs (e.g. validation, export)
go s.triggerJobs(context.Background(), "revision_created", item.ID, item)
}
// HandleUploadFile uploads a file and creates a new revision.

View File

@@ -1,6 +1,7 @@
package api
import (
"context"
"encoding/json"
"net/http"
"strconv"
@@ -321,3 +322,57 @@ func (s *Server) HandleDeleteRunner(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
// triggerJobs creates jobs for all enabled definitions matching the trigger type.
// It applies trigger filters (e.g. item_type) before creating each job.
func (s *Server) triggerJobs(ctx context.Context, triggerType string, itemID string, item *db.Item) {
defs, err := s.jobs.GetDefinitionsByTrigger(ctx, triggerType)
if err != nil {
s.logger.Error().Err(err).Str("trigger", triggerType).Msg("failed to get job definitions for trigger")
return
}
for _, def := range defs {
// Apply trigger filter (e.g. item_type == "assembly")
if def.Definition != nil {
if triggerCfg, ok := def.Definition["trigger"].(map[string]any); ok {
if filterCfg, ok := triggerCfg["filter"].(map[string]any); ok {
if reqType, ok := filterCfg["item_type"].(string); ok && item != nil {
if item.ItemType != reqType {
continue
}
}
}
}
}
job := &db.Job{
JobDefinitionID: &def.ID,
DefinitionName: def.Name,
Priority: def.Priority,
ItemID: &itemID,
RunnerTags: def.RunnerTags,
TimeoutSeconds: def.TimeoutSeconds,
MaxRetries: def.MaxRetries,
}
if err := s.jobs.CreateJob(ctx, job); err != nil {
s.logger.Error().Err(err).Str("definition", def.Name).Msg("failed to create triggered job")
continue
}
s.broker.Publish("job.created", mustMarshal(map[string]any{
"job_id": job.ID,
"definition_name": def.Name,
"trigger": triggerType,
"item_id": itemID,
}))
s.logger.Info().
Str("job_id", job.ID).
Str("definition", def.Name).
Str("trigger", triggerType).
Str("item_id", itemID).
Msg("triggered job")
}
}