From df073709ce2901940c49b23eb57ef5cf830bd4f2 Mon Sep 17 00:00:00 2001 From: Forbes Date: Sat, 14 Feb 2026 13:16:19 -0600 Subject: [PATCH] feat: add DAG API handlers for graph queries and sync --- internal/api/dag_handlers.go | 272 +++++++++++++++++++++++++++++++++++ 1 file changed, 272 insertions(+) create mode 100644 internal/api/dag_handlers.go diff --git a/internal/api/dag_handlers.go b/internal/api/dag_handlers.go new file mode 100644 index 0000000..4e162d8 --- /dev/null +++ b/internal/api/dag_handlers.go @@ -0,0 +1,272 @@ +package api + +import ( + "encoding/json" + "net/http" + + "github.com/go-chi/chi/v5" + "github.com/kindredsystems/silo/internal/db" +) + +// dagSyncRequest is the payload for PUT /api/items/{partNumber}/dag. +type dagSyncRequest struct { + RevisionNumber int `json:"revision_number"` + Nodes []dagSyncNode `json:"nodes"` + Edges []dagSyncEdge `json:"edges"` +} + +type dagSyncNode struct { + NodeKey string `json:"node_key"` + NodeType string `json:"node_type"` + PropertiesHash *string `json:"properties_hash,omitempty"` + ValidationState string `json:"validation_state,omitempty"` + Metadata map[string]any `json:"metadata,omitempty"` +} + +type dagSyncEdge struct { + SourceKey string `json:"source_key"` + TargetKey string `json:"target_key"` + EdgeType string `json:"edge_type,omitempty"` + Metadata map[string]any `json:"metadata,omitempty"` +} + +// HandleGetDAG returns the feature DAG for an item's current revision. +func (s *Server) HandleGetDAG(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + partNumber := chi.URLParam(r, "partNumber") + + item, err := s.items.GetByPartNumber(ctx, partNumber) + if err != nil || item == nil { + writeError(w, http.StatusNotFound, "not_found", "Item not found") + return + } + + nodes, err := s.dag.GetNodes(ctx, item.ID, item.CurrentRevision) + if err != nil { + s.logger.Error().Err(err).Msg("failed to get DAG nodes") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get DAG") + return + } + + edges, err := s.dag.GetEdges(ctx, item.ID, item.CurrentRevision) + if err != nil { + s.logger.Error().Err(err).Msg("failed to get DAG edges") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get DAG edges") + return + } + + writeJSON(w, http.StatusOK, map[string]any{ + "item_id": item.ID, + "part_number": item.PartNumber, + "revision_number": item.CurrentRevision, + "nodes": nodes, + "edges": edges, + }) +} + +// HandleGetForwardCone returns all downstream dependents of a node. +func (s *Server) HandleGetForwardCone(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + partNumber := chi.URLParam(r, "partNumber") + nodeKey := chi.URLParam(r, "nodeKey") + + item, err := s.items.GetByPartNumber(ctx, partNumber) + if err != nil || item == nil { + writeError(w, http.StatusNotFound, "not_found", "Item not found") + return + } + + node, err := s.dag.GetNodeByKey(ctx, item.ID, item.CurrentRevision, nodeKey) + if err != nil { + s.logger.Error().Err(err).Msg("failed to get DAG node") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get node") + return + } + if node == nil { + writeError(w, http.StatusNotFound, "not_found", "Node not found") + return + } + + cone, err := s.dag.GetForwardCone(ctx, node.ID) + if err != nil { + s.logger.Error().Err(err).Msg("failed to get forward cone") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get forward cone") + return + } + + writeJSON(w, http.StatusOK, map[string]any{ + "root_node": node, + "cone": cone, + }) +} + +// HandleGetDirtySubgraph returns all non-clean nodes for an item. +func (s *Server) HandleGetDirtySubgraph(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + partNumber := chi.URLParam(r, "partNumber") + + item, err := s.items.GetByPartNumber(ctx, partNumber) + if err != nil || item == nil { + writeError(w, http.StatusNotFound, "not_found", "Item not found") + return + } + + nodes, err := s.dag.GetDirtySubgraph(ctx, item.ID) + if err != nil { + s.logger.Error().Err(err).Msg("failed to get dirty subgraph") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get dirty subgraph") + return + } + + writeJSON(w, http.StatusOK, map[string]any{ + "item_id": item.ID, + "nodes": nodes, + }) +} + +// HandleSyncDAG accepts a full feature tree from a client or runner. +func (s *Server) HandleSyncDAG(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + partNumber := chi.URLParam(r, "partNumber") + + item, err := s.items.GetByPartNumber(ctx, partNumber) + if err != nil || item == nil { + writeError(w, http.StatusNotFound, "not_found", "Item not found") + return + } + + var req dagSyncRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body") + return + } + + if req.RevisionNumber == 0 { + req.RevisionNumber = item.CurrentRevision + } + + // Convert request nodes to DB nodes + nodes := make([]db.DAGNode, len(req.Nodes)) + for i, n := range req.Nodes { + state := n.ValidationState + if state == "" { + state = "clean" + } + nodes[i] = db.DAGNode{ + NodeKey: n.NodeKey, + NodeType: n.NodeType, + PropertiesHash: n.PropertiesHash, + ValidationState: state, + Metadata: n.Metadata, + } + } + + // Sync nodes first to get IDs + if err := s.dag.SyncFeatureTree(ctx, item.ID, req.RevisionNumber, nodes, nil); err != nil { + s.logger.Error().Err(err).Msg("failed to sync DAG nodes") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to sync DAG") + return + } + + // Build key→ID map from synced nodes + keyToID := make(map[string]string, len(nodes)) + for _, n := range nodes { + keyToID[n.NodeKey] = n.ID + } + + // Convert request edges, resolving keys to IDs + edges := make([]db.DAGEdge, len(req.Edges)) + for i, e := range req.Edges { + sourceID, ok := keyToID[e.SourceKey] + if !ok { + writeError(w, http.StatusBadRequest, "invalid_edge", + "Unknown source_key: "+e.SourceKey) + return + } + targetID, ok := keyToID[e.TargetKey] + if !ok { + writeError(w, http.StatusBadRequest, "invalid_edge", + "Unknown target_key: "+e.TargetKey) + return + } + edgeType := e.EdgeType + if edgeType == "" { + edgeType = "depends_on" + } + edges[i] = db.DAGEdge{ + SourceNodeID: sourceID, + TargetNodeID: targetID, + EdgeType: edgeType, + Metadata: e.Metadata, + } + } + + // Sync edges (nodes already synced, so pass empty nodes to skip re-upsert) + if len(edges) > 0 { + // Delete old edges and insert new ones + if err := s.dag.DeleteEdgesForItem(ctx, item.ID, req.RevisionNumber); err != nil { + s.logger.Error().Err(err).Msg("failed to delete old edges") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to sync DAG edges") + return + } + for i := range edges { + if err := s.dag.CreateEdge(ctx, &edges[i]); err != nil { + s.logger.Error().Err(err).Msg("failed to create edge") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to create edge") + return + } + } + } + + // Publish SSE event + evData, _ := json.Marshal(map[string]any{ + "item_id": item.ID, + "part_number": item.PartNumber, + "revision_number": req.RevisionNumber, + "node_count": len(req.Nodes), + "edge_count": len(req.Edges), + }) + s.broker.Publish("dag.updated", string(evData)) + + writeJSON(w, http.StatusOK, map[string]any{ + "synced": true, + "node_count": len(req.Nodes), + "edge_count": len(req.Edges), + }) +} + +// HandleMarkDirty marks a node and all its downstream dependents as dirty. +func (s *Server) HandleMarkDirty(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + partNumber := chi.URLParam(r, "partNumber") + nodeKey := chi.URLParam(r, "nodeKey") + + item, err := s.items.GetByPartNumber(ctx, partNumber) + if err != nil || item == nil { + writeError(w, http.StatusNotFound, "not_found", "Item not found") + return + } + + node, err := s.dag.GetNodeByKey(ctx, item.ID, item.CurrentRevision, nodeKey) + if err != nil { + s.logger.Error().Err(err).Msg("failed to get DAG node") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get node") + return + } + if node == nil { + writeError(w, http.StatusNotFound, "not_found", "Node not found") + return + } + + affected, err := s.dag.MarkDirty(ctx, node.ID) + if err != nil { + s.logger.Error().Err(err).Msg("failed to mark dirty") + writeError(w, http.StatusInternalServerError, "internal_error", "Failed to mark dirty") + return + } + + writeJSON(w, http.StatusOK, map[string]any{ + "node_key": nodeKey, + "nodes_affected": affected, + }) +}