Compare commits

...

1 Commits

Author SHA1 Message Date
Zoe Forbes
3a9fe6aed8 feat: add DAG extraction engine
Implements extract_dag(), classify_type(), and compute_properties_hash()
for extracting feature trees from FreeCAD documents.

- classify_type: maps ~50 FreeCAD TypeIds to 8 DAG node types
- compute_properties_hash: SHA-256 of per-feature parametric inputs
- extract_dag: two-pass walk of doc.Objects producing nodes + edges

No GUI dependencies -- works in both desktop and headless mode.

Closes kindred/create#214
2026-02-14 14:41:35 -06:00

463
freecad/dag.py Normal file
View File

@@ -0,0 +1,463 @@
"""DAG extraction engine for FreeCAD documents.
Extracts the feature tree from a FreeCAD document as nodes and edges
for syncing to the Silo server. No GUI dependencies -- usable in both
desktop and headless (``--console``) mode.
Public API
----------
classify_type(type_id) -> Optional[str]
compute_properties_hash(obj) -> str
extract_dag(doc) -> (nodes, edges)
"""
import hashlib
import json
import math
from typing import Any, Dict, List, Optional, Set, Tuple
# ---------------------------------------------------------------------------
# TypeId -> DAG node type mapping
# ---------------------------------------------------------------------------
_TYPE_MAP: Dict[str, str] = {
# Sketch
"Sketcher::SketchObject": "sketch",
# Sketch-based additive
"PartDesign::Pad": "pad",
"PartDesign::Revolution": "pad",
"PartDesign::AdditivePipe": "pad",
"PartDesign::AdditiveLoft": "pad",
"PartDesign::AdditiveHelix": "pad",
# Sketch-based subtractive
"PartDesign::Pocket": "pocket",
"PartDesign::Groove": "pocket",
"PartDesign::Hole": "pocket",
"PartDesign::SubtractivePipe": "pocket",
"PartDesign::SubtractiveLoft": "pocket",
"PartDesign::SubtractiveHelix": "pocket",
# Dress-up
"PartDesign::Fillet": "fillet",
"PartDesign::Chamfer": "chamfer",
"PartDesign::Draft": "chamfer",
"PartDesign::Thickness": "chamfer",
# Transformations
"PartDesign::Mirrored": "pad",
"PartDesign::LinearPattern": "pad",
"PartDesign::PolarPattern": "pad",
"PartDesign::Scaled": "pad",
"PartDesign::MultiTransform": "pad",
# Boolean
"PartDesign::Boolean": "pad",
# Additive primitives
"PartDesign::AdditiveBox": "pad",
"PartDesign::AdditiveCylinder": "pad",
"PartDesign::AdditiveSphere": "pad",
"PartDesign::AdditiveCone": "pad",
"PartDesign::AdditiveEllipsoid": "pad",
"PartDesign::AdditiveTorus": "pad",
"PartDesign::AdditivePrism": "pad",
"PartDesign::AdditiveWedge": "pad",
# Subtractive primitives
"PartDesign::SubtractiveBox": "pocket",
"PartDesign::SubtractiveCylinder": "pocket",
"PartDesign::SubtractiveSphere": "pocket",
"PartDesign::SubtractiveCone": "pocket",
"PartDesign::SubtractiveEllipsoid": "pocket",
"PartDesign::SubtractiveTorus": "pocket",
"PartDesign::SubtractivePrism": "pocket",
"PartDesign::SubtractiveWedge": "pocket",
# Containers
"PartDesign::Body": "body",
"App::Part": "part",
"Part::Feature": "part",
# Datum / reference
"PartDesign::Point": "datum",
"PartDesign::Line": "datum",
"PartDesign::Plane": "datum",
"PartDesign::CoordinateSystem": "datum",
"PartDesign::ShapeBinder": "datum",
"PartDesign::SubShapeBinder": "datum",
}
def classify_type(type_id: str) -> Optional[str]:
"""Map a FreeCAD TypeId string to a DAG node type.
Returns one of ``sketch``, ``pad``, ``pocket``, ``fillet``,
``chamfer``, ``body``, ``part``, ``datum``, or ``None`` if the
TypeId is not a recognized feature.
"""
return _TYPE_MAP.get(type_id)
# ---------------------------------------------------------------------------
# Properties hash
# ---------------------------------------------------------------------------
def _safe_float(val: Any) -> Any:
"""Convert a float to a JSON-safe value, replacing NaN/Infinity with 0."""
if isinstance(val, float) and (math.isnan(val) or math.isinf(val)):
return 0.0
return val
def _prop_value(obj: Any, name: str) -> Any:
"""Safely read ``obj.<name>.Value``, returning *None* on failure."""
try:
return _safe_float(getattr(obj, name).Value)
except Exception:
return None
def _prop_raw(obj: Any, name: str) -> Any:
"""Safely read ``obj.<name>``, returning *None* on failure."""
try:
return getattr(obj, name)
except Exception:
return None
def _link_name(obj: Any, name: str) -> Optional[str]:
"""Return the ``.Name`` of a linked object property, or *None*."""
try:
link = getattr(obj, name)
if isinstance(link, (list, tuple)):
link = link[0]
return link.Name if link is not None else None
except Exception:
return None
def _collect_inputs(obj: Any) -> Dict[str, Any]:
"""Extract the parametric inputs that affect *obj*'s geometry.
The returned dict is JSON-serialized and hashed to produce the
``properties_hash`` for the DAG node.
"""
tid = obj.TypeId
inputs: Dict[str, Any] = {"_type": tid}
# --- Sketch ---
if tid == "Sketcher::SketchObject":
inputs["geometry_count"] = _prop_raw(obj, "GeometryCount")
inputs["constraint_count"] = _prop_raw(obj, "ConstraintCount")
try:
inputs["geometry"] = obj.Shape.exportBrepToString()
except Exception:
pass
return inputs
# --- Extrude (Pad / Pocket) ---
if tid in ("PartDesign::Pad", "PartDesign::Pocket"):
inputs["length"] = _prop_value(obj, "Length")
inputs["type"] = str(_prop_raw(obj, "Type") or "")
inputs["reversed"] = _prop_raw(obj, "Reversed")
inputs["sketch"] = _link_name(obj, "Profile")
return inputs
# --- Revolution / Groove ---
if tid in ("PartDesign::Revolution", "PartDesign::Groove"):
inputs["angle"] = _prop_value(obj, "Angle")
inputs["type"] = str(_prop_raw(obj, "Type") or "")
inputs["reversed"] = _prop_raw(obj, "Reversed")
inputs["sketch"] = _link_name(obj, "Profile")
return inputs
# --- Hole ---
if tid == "PartDesign::Hole":
inputs["diameter"] = _prop_value(obj, "Diameter")
inputs["depth"] = _prop_value(obj, "Depth")
inputs["threaded"] = _prop_raw(obj, "Threaded")
inputs["thread_type"] = str(_prop_raw(obj, "ThreadType") or "")
inputs["depth_type"] = str(_prop_raw(obj, "DepthType") or "")
inputs["sketch"] = _link_name(obj, "Profile")
return inputs
# --- Pipe / Loft / Helix (additive + subtractive) ---
if tid in (
"PartDesign::AdditivePipe",
"PartDesign::SubtractivePipe",
"PartDesign::AdditiveLoft",
"PartDesign::SubtractiveLoft",
"PartDesign::AdditiveHelix",
"PartDesign::SubtractiveHelix",
):
inputs["sketch"] = _link_name(obj, "Profile")
inputs["spine"] = _link_name(obj, "Spine")
return inputs
# --- Fillet ---
if tid == "PartDesign::Fillet":
inputs["radius"] = _prop_value(obj, "Radius")
return inputs
# --- Chamfer ---
if tid == "PartDesign::Chamfer":
inputs["chamfer_type"] = str(_prop_raw(obj, "ChamferType") or "")
inputs["size"] = _prop_value(obj, "Size")
inputs["size2"] = _prop_value(obj, "Size2")
inputs["angle"] = _prop_value(obj, "Angle")
return inputs
# --- Draft ---
if tid == "PartDesign::Draft":
inputs["angle"] = _prop_value(obj, "Angle")
inputs["reversed"] = _prop_raw(obj, "Reversed")
return inputs
# --- Thickness ---
if tid == "PartDesign::Thickness":
inputs["value"] = _prop_value(obj, "Value")
inputs["reversed"] = _prop_raw(obj, "Reversed")
inputs["mode"] = str(_prop_raw(obj, "Mode") or "")
inputs["join"] = str(_prop_raw(obj, "Join") or "")
return inputs
# --- Mirrored ---
if tid == "PartDesign::Mirrored":
inputs["mirror_plane"] = _link_name(obj, "MirrorPlane")
return inputs
# --- LinearPattern ---
if tid == "PartDesign::LinearPattern":
inputs["direction"] = _link_name(obj, "Direction")
inputs["reversed"] = _prop_raw(obj, "Reversed")
inputs["length"] = _prop_value(obj, "Length")
inputs["occurrences"] = _prop_value(obj, "Occurrences")
return inputs
# --- PolarPattern ---
if tid == "PartDesign::PolarPattern":
inputs["axis"] = _link_name(obj, "Axis")
inputs["reversed"] = _prop_raw(obj, "Reversed")
inputs["angle"] = _prop_value(obj, "Angle")
inputs["occurrences"] = _prop_value(obj, "Occurrences")
return inputs
# --- Scaled ---
if tid == "PartDesign::Scaled":
inputs["factor"] = _prop_value(obj, "Factor")
inputs["occurrences"] = _prop_value(obj, "Occurrences")
return inputs
# --- MultiTransform ---
if tid == "PartDesign::MultiTransform":
try:
inputs["transform_count"] = len(obj.Transformations)
except Exception:
pass
return inputs
# --- Boolean ---
if tid == "PartDesign::Boolean":
inputs["type"] = str(_prop_raw(obj, "Type") or "")
return inputs
# --- Primitives (additive) ---
if tid in (
"PartDesign::AdditiveBox",
"PartDesign::SubtractiveBox",
):
inputs["length"] = _prop_value(obj, "Length")
inputs["width"] = _prop_value(obj, "Width")
inputs["height"] = _prop_value(obj, "Height")
return inputs
if tid in (
"PartDesign::AdditiveCylinder",
"PartDesign::SubtractiveCylinder",
):
inputs["radius"] = _prop_value(obj, "Radius")
inputs["height"] = _prop_value(obj, "Height")
inputs["angle"] = _prop_value(obj, "Angle")
return inputs
if tid in (
"PartDesign::AdditiveSphere",
"PartDesign::SubtractiveSphere",
):
inputs["radius"] = _prop_value(obj, "Radius")
return inputs
if tid in (
"PartDesign::AdditiveCone",
"PartDesign::SubtractiveCone",
):
inputs["radius1"] = _prop_value(obj, "Radius1")
inputs["radius2"] = _prop_value(obj, "Radius2")
inputs["height"] = _prop_value(obj, "Height")
return inputs
if tid in (
"PartDesign::AdditiveEllipsoid",
"PartDesign::SubtractiveEllipsoid",
):
inputs["radius1"] = _prop_value(obj, "Radius1")
inputs["radius2"] = _prop_value(obj, "Radius2")
inputs["radius3"] = _prop_value(obj, "Radius3")
return inputs
if tid in (
"PartDesign::AdditiveTorus",
"PartDesign::SubtractiveTorus",
):
inputs["radius1"] = _prop_value(obj, "Radius1")
inputs["radius2"] = _prop_value(obj, "Radius2")
return inputs
if tid in (
"PartDesign::AdditivePrism",
"PartDesign::SubtractivePrism",
):
inputs["polygon"] = _prop_raw(obj, "Polygon")
inputs["circumradius"] = _prop_value(obj, "Circumradius")
inputs["height"] = _prop_value(obj, "Height")
return inputs
if tid in (
"PartDesign::AdditiveWedge",
"PartDesign::SubtractiveWedge",
):
for dim in (
"Xmin",
"Ymin",
"Zmin",
"X2min",
"Z2min",
"Xmax",
"Ymax",
"Zmax",
"X2max",
"Z2max",
):
inputs[dim.lower()] = _prop_value(obj, dim)
return inputs
# --- Datum / ShapeBinder ---
if tid in (
"PartDesign::Point",
"PartDesign::Line",
"PartDesign::Plane",
"PartDesign::CoordinateSystem",
"PartDesign::ShapeBinder",
"PartDesign::SubShapeBinder",
):
try:
p = obj.Placement
inputs["position"] = {
"x": _safe_float(p.Base.x),
"y": _safe_float(p.Base.y),
"z": _safe_float(p.Base.z),
}
inputs["rotation"] = {
"axis_x": _safe_float(p.Rotation.Axis.x),
"axis_y": _safe_float(p.Rotation.Axis.y),
"axis_z": _safe_float(p.Rotation.Axis.z),
"angle": _safe_float(p.Rotation.Angle),
}
except Exception:
pass
return inputs
# --- Body / Part (containers) ---
if tid in ("PartDesign::Body", "App::Part", "Part::Feature"):
try:
inputs["child_count"] = len(obj.Group)
except Exception:
inputs["child_count"] = 0
return inputs
# --- Fallback ---
inputs["label"] = obj.Label
return inputs
def compute_properties_hash(obj: Any) -> str:
"""Return a SHA-256 hex digest of *obj*'s parametric inputs.
The hash is used for memoization -- if a node's inputs haven't
changed since the last validation run, re-validation can be skipped.
"""
inputs = _collect_inputs(obj)
canonical = json.dumps(inputs, sort_keys=True, default=str)
return hashlib.sha256(canonical.encode()).hexdigest()
# ---------------------------------------------------------------------------
# DAG extraction
# ---------------------------------------------------------------------------
def extract_dag(
doc: Any,
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""Walk a FreeCAD document and return ``(nodes, edges)``.
*nodes* is a list of dicts matching the Silo ``PUT /dag`` payload
schema. *edges* connects dependencies (source) to dependents
(target).
Only objects whose TypeId is recognized by :func:`classify_type`
are included. Edges are limited to pairs where **both** endpoints
are included, preventing dangling references to internal objects
such as ``App::Origin``.
"""
# Pass 1 -- identify included objects
included: Set[str] = set()
classified: Dict[str, str] = {} # obj.Name -> node_type
for obj in doc.Objects:
if not hasattr(obj, "TypeId"):
continue
node_type = classify_type(obj.TypeId)
if node_type is not None:
included.add(obj.Name)
classified[obj.Name] = node_type
# Pass 2 -- build nodes and edges
nodes: List[Dict[str, Any]] = []
edges: List[Dict[str, Any]] = []
seen_edges: Set[Tuple[str, str]] = set()
for obj in doc.Objects:
if obj.Name not in included:
continue
nodes.append(
{
"node_key": obj.Name,
"node_type": classified[obj.Name],
"properties_hash": compute_properties_hash(obj),
"metadata": {
"label": obj.Label,
"type_id": obj.TypeId,
},
}
)
# Walk dependencies: OutList contains objects this one depends on
try:
out_list = obj.OutList
except Exception:
continue
for dep in out_list:
if not hasattr(dep, "Name"):
continue
if dep.Name not in included:
continue
edge_key = (dep.Name, obj.Name)
if edge_key in seen_edges:
continue
seen_edges.add(edge_key)
edges.append(
{
"source_key": dep.Name,
"target_key": obj.Name,
"edge_type": "depends_on",
}
)
return nodes, edges