package api import ( "encoding/json" "net/http" "github.com/go-chi/chi/v5" "github.com/kindredsystems/silo/internal/db" ) // dagSyncRequest is the payload for PUT /api/items/{partNumber}/dag. type dagSyncRequest struct { RevisionNumber int `json:"revision_number"` Nodes []dagSyncNode `json:"nodes"` Edges []dagSyncEdge `json:"edges"` } type dagSyncNode struct { NodeKey string `json:"node_key"` NodeType string `json:"node_type"` PropertiesHash *string `json:"properties_hash,omitempty"` ValidationState string `json:"validation_state,omitempty"` Metadata map[string]any `json:"metadata,omitempty"` } type dagSyncEdge struct { SourceKey string `json:"source_key"` TargetKey string `json:"target_key"` EdgeType string `json:"edge_type,omitempty"` Metadata map[string]any `json:"metadata,omitempty"` } // HandleGetDAG returns the feature DAG for an item's current revision. func (s *Server) HandleGetDAG(w http.ResponseWriter, r *http.Request) { ctx := r.Context() partNumber := chi.URLParam(r, "partNumber") item, err := s.items.GetByPartNumber(ctx, partNumber) if err != nil || item == nil { writeError(w, http.StatusNotFound, "not_found", "Item not found") return } nodes, err := s.dag.GetNodes(ctx, item.ID, item.CurrentRevision) if err != nil { s.logger.Error().Err(err).Msg("failed to get DAG nodes") writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get DAG") return } edges, err := s.dag.GetEdges(ctx, item.ID, item.CurrentRevision) if err != nil { s.logger.Error().Err(err).Msg("failed to get DAG edges") writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get DAG edges") return } writeJSON(w, http.StatusOK, map[string]any{ "item_id": item.ID, "part_number": item.PartNumber, "revision_number": item.CurrentRevision, "nodes": nodes, "edges": edges, }) } // HandleGetForwardCone returns all downstream dependents of a node. func (s *Server) HandleGetForwardCone(w http.ResponseWriter, r *http.Request) { ctx := r.Context() partNumber := chi.URLParam(r, "partNumber") nodeKey := chi.URLParam(r, "nodeKey") item, err := s.items.GetByPartNumber(ctx, partNumber) if err != nil || item == nil { writeError(w, http.StatusNotFound, "not_found", "Item not found") return } node, err := s.dag.GetNodeByKey(ctx, item.ID, item.CurrentRevision, nodeKey) if err != nil { s.logger.Error().Err(err).Msg("failed to get DAG node") writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get node") return } if node == nil { writeError(w, http.StatusNotFound, "not_found", "Node not found") return } cone, err := s.dag.GetForwardCone(ctx, node.ID) if err != nil { s.logger.Error().Err(err).Msg("failed to get forward cone") writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get forward cone") return } writeJSON(w, http.StatusOK, map[string]any{ "root_node": node, "cone": cone, }) } // HandleGetDirtySubgraph returns all non-clean nodes for an item. func (s *Server) HandleGetDirtySubgraph(w http.ResponseWriter, r *http.Request) { ctx := r.Context() partNumber := chi.URLParam(r, "partNumber") item, err := s.items.GetByPartNumber(ctx, partNumber) if err != nil || item == nil { writeError(w, http.StatusNotFound, "not_found", "Item not found") return } nodes, err := s.dag.GetDirtySubgraph(ctx, item.ID) if err != nil { s.logger.Error().Err(err).Msg("failed to get dirty subgraph") writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get dirty subgraph") return } writeJSON(w, http.StatusOK, map[string]any{ "item_id": item.ID, "nodes": nodes, }) } // HandleSyncDAG accepts a full feature tree from a client or runner. func (s *Server) HandleSyncDAG(w http.ResponseWriter, r *http.Request) { ctx := r.Context() partNumber := chi.URLParam(r, "partNumber") item, err := s.items.GetByPartNumber(ctx, partNumber) if err != nil || item == nil { writeError(w, http.StatusNotFound, "not_found", "Item not found") return } var req dagSyncRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body") return } if req.RevisionNumber == 0 { req.RevisionNumber = item.CurrentRevision } // Convert request nodes to DB nodes nodes := make([]db.DAGNode, len(req.Nodes)) for i, n := range req.Nodes { state := n.ValidationState if state == "" { state = "clean" } nodes[i] = db.DAGNode{ NodeKey: n.NodeKey, NodeType: n.NodeType, PropertiesHash: n.PropertiesHash, ValidationState: state, Metadata: n.Metadata, } } // Sync nodes first to get IDs if err := s.dag.SyncFeatureTree(ctx, item.ID, req.RevisionNumber, nodes, nil); err != nil { s.logger.Error().Err(err).Msg("failed to sync DAG nodes") writeError(w, http.StatusInternalServerError, "internal_error", "Failed to sync DAG") return } // Build key→ID map from synced nodes keyToID := make(map[string]string, len(nodes)) for _, n := range nodes { keyToID[n.NodeKey] = n.ID } // Convert request edges, resolving keys to IDs edges := make([]db.DAGEdge, len(req.Edges)) for i, e := range req.Edges { sourceID, ok := keyToID[e.SourceKey] if !ok { writeError(w, http.StatusBadRequest, "invalid_edge", "Unknown source_key: "+e.SourceKey) return } targetID, ok := keyToID[e.TargetKey] if !ok { writeError(w, http.StatusBadRequest, "invalid_edge", "Unknown target_key: "+e.TargetKey) return } edgeType := e.EdgeType if edgeType == "" { edgeType = "depends_on" } edges[i] = db.DAGEdge{ SourceNodeID: sourceID, TargetNodeID: targetID, EdgeType: edgeType, Metadata: e.Metadata, } } // Sync edges (nodes already synced, so pass empty nodes to skip re-upsert) if len(edges) > 0 { // Delete old edges and insert new ones if err := s.dag.DeleteEdgesForItem(ctx, item.ID, req.RevisionNumber); err != nil { s.logger.Error().Err(err).Msg("failed to delete old edges") writeError(w, http.StatusInternalServerError, "internal_error", "Failed to sync DAG edges") return } for i := range edges { if err := s.dag.CreateEdge(ctx, &edges[i]); err != nil { s.logger.Error().Err(err).Msg("failed to create edge") writeError(w, http.StatusInternalServerError, "internal_error", "Failed to create edge") return } } } // Publish SSE event s.broker.Publish("dag.updated", mustMarshal(map[string]any{ "item_id": item.ID, "part_number": item.PartNumber, "revision_number": req.RevisionNumber, "node_count": len(req.Nodes), "edge_count": len(req.Edges), })) writeJSON(w, http.StatusOK, map[string]any{ "synced": true, "node_count": len(req.Nodes), "edge_count": len(req.Edges), }) } // HandleMarkDirty marks a node and all its downstream dependents as dirty. func (s *Server) HandleMarkDirty(w http.ResponseWriter, r *http.Request) { ctx := r.Context() partNumber := chi.URLParam(r, "partNumber") nodeKey := chi.URLParam(r, "nodeKey") item, err := s.items.GetByPartNumber(ctx, partNumber) if err != nil || item == nil { writeError(w, http.StatusNotFound, "not_found", "Item not found") return } node, err := s.dag.GetNodeByKey(ctx, item.ID, item.CurrentRevision, nodeKey) if err != nil { s.logger.Error().Err(err).Msg("failed to get DAG node") writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get node") return } if node == nil { writeError(w, http.StatusNotFound, "not_found", "Node not found") return } affected, err := s.dag.MarkDirty(ctx, node.ID) if err != nil { s.logger.Error().Err(err).Msg("failed to mark dirty") writeError(w, http.StatusInternalServerError, "internal_error", "Failed to mark dirty") return } writeJSON(w, http.StatusOK, map[string]any{ "node_key": nodeKey, "nodes_affected": affected, }) }