feat: add DAG API handlers for graph queries and sync

This commit is contained in:
Forbes
2026-02-14 13:16:19 -06:00
parent 0eb891667b
commit df073709ce

View File

@@ -0,0 +1,272 @@
package api
import (
"encoding/json"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/kindredsystems/silo/internal/db"
)
// dagSyncRequest is the payload for PUT /api/items/{partNumber}/dag.
type dagSyncRequest struct {
RevisionNumber int `json:"revision_number"`
Nodes []dagSyncNode `json:"nodes"`
Edges []dagSyncEdge `json:"edges"`
}
type dagSyncNode struct {
NodeKey string `json:"node_key"`
NodeType string `json:"node_type"`
PropertiesHash *string `json:"properties_hash,omitempty"`
ValidationState string `json:"validation_state,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
}
type dagSyncEdge struct {
SourceKey string `json:"source_key"`
TargetKey string `json:"target_key"`
EdgeType string `json:"edge_type,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
}
// HandleGetDAG returns the feature DAG for an item's current revision.
func (s *Server) HandleGetDAG(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
partNumber := chi.URLParam(r, "partNumber")
item, err := s.items.GetByPartNumber(ctx, partNumber)
if err != nil || item == nil {
writeError(w, http.StatusNotFound, "not_found", "Item not found")
return
}
nodes, err := s.dag.GetNodes(ctx, item.ID, item.CurrentRevision)
if err != nil {
s.logger.Error().Err(err).Msg("failed to get DAG nodes")
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get DAG")
return
}
edges, err := s.dag.GetEdges(ctx, item.ID, item.CurrentRevision)
if err != nil {
s.logger.Error().Err(err).Msg("failed to get DAG edges")
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get DAG edges")
return
}
writeJSON(w, http.StatusOK, map[string]any{
"item_id": item.ID,
"part_number": item.PartNumber,
"revision_number": item.CurrentRevision,
"nodes": nodes,
"edges": edges,
})
}
// HandleGetForwardCone returns all downstream dependents of a node.
func (s *Server) HandleGetForwardCone(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
partNumber := chi.URLParam(r, "partNumber")
nodeKey := chi.URLParam(r, "nodeKey")
item, err := s.items.GetByPartNumber(ctx, partNumber)
if err != nil || item == nil {
writeError(w, http.StatusNotFound, "not_found", "Item not found")
return
}
node, err := s.dag.GetNodeByKey(ctx, item.ID, item.CurrentRevision, nodeKey)
if err != nil {
s.logger.Error().Err(err).Msg("failed to get DAG node")
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get node")
return
}
if node == nil {
writeError(w, http.StatusNotFound, "not_found", "Node not found")
return
}
cone, err := s.dag.GetForwardCone(ctx, node.ID)
if err != nil {
s.logger.Error().Err(err).Msg("failed to get forward cone")
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get forward cone")
return
}
writeJSON(w, http.StatusOK, map[string]any{
"root_node": node,
"cone": cone,
})
}
// HandleGetDirtySubgraph returns all non-clean nodes for an item.
func (s *Server) HandleGetDirtySubgraph(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
partNumber := chi.URLParam(r, "partNumber")
item, err := s.items.GetByPartNumber(ctx, partNumber)
if err != nil || item == nil {
writeError(w, http.StatusNotFound, "not_found", "Item not found")
return
}
nodes, err := s.dag.GetDirtySubgraph(ctx, item.ID)
if err != nil {
s.logger.Error().Err(err).Msg("failed to get dirty subgraph")
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get dirty subgraph")
return
}
writeJSON(w, http.StatusOK, map[string]any{
"item_id": item.ID,
"nodes": nodes,
})
}
// HandleSyncDAG accepts a full feature tree from a client or runner.
func (s *Server) HandleSyncDAG(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
partNumber := chi.URLParam(r, "partNumber")
item, err := s.items.GetByPartNumber(ctx, partNumber)
if err != nil || item == nil {
writeError(w, http.StatusNotFound, "not_found", "Item not found")
return
}
var req dagSyncRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid_body", "Invalid JSON body")
return
}
if req.RevisionNumber == 0 {
req.RevisionNumber = item.CurrentRevision
}
// Convert request nodes to DB nodes
nodes := make([]db.DAGNode, len(req.Nodes))
for i, n := range req.Nodes {
state := n.ValidationState
if state == "" {
state = "clean"
}
nodes[i] = db.DAGNode{
NodeKey: n.NodeKey,
NodeType: n.NodeType,
PropertiesHash: n.PropertiesHash,
ValidationState: state,
Metadata: n.Metadata,
}
}
// Sync nodes first to get IDs
if err := s.dag.SyncFeatureTree(ctx, item.ID, req.RevisionNumber, nodes, nil); err != nil {
s.logger.Error().Err(err).Msg("failed to sync DAG nodes")
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to sync DAG")
return
}
// Build key→ID map from synced nodes
keyToID := make(map[string]string, len(nodes))
for _, n := range nodes {
keyToID[n.NodeKey] = n.ID
}
// Convert request edges, resolving keys to IDs
edges := make([]db.DAGEdge, len(req.Edges))
for i, e := range req.Edges {
sourceID, ok := keyToID[e.SourceKey]
if !ok {
writeError(w, http.StatusBadRequest, "invalid_edge",
"Unknown source_key: "+e.SourceKey)
return
}
targetID, ok := keyToID[e.TargetKey]
if !ok {
writeError(w, http.StatusBadRequest, "invalid_edge",
"Unknown target_key: "+e.TargetKey)
return
}
edgeType := e.EdgeType
if edgeType == "" {
edgeType = "depends_on"
}
edges[i] = db.DAGEdge{
SourceNodeID: sourceID,
TargetNodeID: targetID,
EdgeType: edgeType,
Metadata: e.Metadata,
}
}
// Sync edges (nodes already synced, so pass empty nodes to skip re-upsert)
if len(edges) > 0 {
// Delete old edges and insert new ones
if err := s.dag.DeleteEdgesForItem(ctx, item.ID, req.RevisionNumber); err != nil {
s.logger.Error().Err(err).Msg("failed to delete old edges")
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to sync DAG edges")
return
}
for i := range edges {
if err := s.dag.CreateEdge(ctx, &edges[i]); err != nil {
s.logger.Error().Err(err).Msg("failed to create edge")
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to create edge")
return
}
}
}
// Publish SSE event
evData, _ := json.Marshal(map[string]any{
"item_id": item.ID,
"part_number": item.PartNumber,
"revision_number": req.RevisionNumber,
"node_count": len(req.Nodes),
"edge_count": len(req.Edges),
})
s.broker.Publish("dag.updated", string(evData))
writeJSON(w, http.StatusOK, map[string]any{
"synced": true,
"node_count": len(req.Nodes),
"edge_count": len(req.Edges),
})
}
// HandleMarkDirty marks a node and all its downstream dependents as dirty.
func (s *Server) HandleMarkDirty(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
partNumber := chi.URLParam(r, "partNumber")
nodeKey := chi.URLParam(r, "nodeKey")
item, err := s.items.GetByPartNumber(ctx, partNumber)
if err != nil || item == nil {
writeError(w, http.StatusNotFound, "not_found", "Item not found")
return
}
node, err := s.dag.GetNodeByKey(ctx, item.ID, item.CurrentRevision, nodeKey)
if err != nil {
s.logger.Error().Err(err).Msg("failed to get DAG node")
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to get node")
return
}
if node == nil {
writeError(w, http.StatusNotFound, "not_found", "Node not found")
return
}
affected, err := s.dag.MarkDirty(ctx, node.ID)
if err != nil {
s.logger.Error().Err(err).Msg("failed to mark dirty")
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to mark dirty")
return
}
writeJSON(w, http.StatusOK, map[string]any{
"node_key": nodeKey,
"nodes_affected": affected,
})
}