diff --git a/freecad/dag.py b/freecad/dag.py new file mode 100644 index 0000000..8d0af67 --- /dev/null +++ b/freecad/dag.py @@ -0,0 +1,463 @@ +"""DAG extraction engine for FreeCAD documents. + +Extracts the feature tree from a FreeCAD document as nodes and edges +for syncing to the Silo server. No GUI dependencies -- usable in both +desktop and headless (``--console``) mode. + +Public API +---------- +classify_type(type_id) -> Optional[str] +compute_properties_hash(obj) -> str +extract_dag(doc) -> (nodes, edges) +""" + +import hashlib +import json +import math +from typing import Any, Dict, List, Optional, Set, Tuple + +# --------------------------------------------------------------------------- +# TypeId -> DAG node type mapping +# --------------------------------------------------------------------------- + +_TYPE_MAP: Dict[str, str] = { + # Sketch + "Sketcher::SketchObject": "sketch", + # Sketch-based additive + "PartDesign::Pad": "pad", + "PartDesign::Revolution": "pad", + "PartDesign::AdditivePipe": "pad", + "PartDesign::AdditiveLoft": "pad", + "PartDesign::AdditiveHelix": "pad", + # Sketch-based subtractive + "PartDesign::Pocket": "pocket", + "PartDesign::Groove": "pocket", + "PartDesign::Hole": "pocket", + "PartDesign::SubtractivePipe": "pocket", + "PartDesign::SubtractiveLoft": "pocket", + "PartDesign::SubtractiveHelix": "pocket", + # Dress-up + "PartDesign::Fillet": "fillet", + "PartDesign::Chamfer": "chamfer", + "PartDesign::Draft": "chamfer", + "PartDesign::Thickness": "chamfer", + # Transformations + "PartDesign::Mirrored": "pad", + "PartDesign::LinearPattern": "pad", + "PartDesign::PolarPattern": "pad", + "PartDesign::Scaled": "pad", + "PartDesign::MultiTransform": "pad", + # Boolean + "PartDesign::Boolean": "pad", + # Additive primitives + "PartDesign::AdditiveBox": "pad", + "PartDesign::AdditiveCylinder": "pad", + "PartDesign::AdditiveSphere": "pad", + "PartDesign::AdditiveCone": "pad", + "PartDesign::AdditiveEllipsoid": "pad", + "PartDesign::AdditiveTorus": "pad", + "PartDesign::AdditivePrism": "pad", + "PartDesign::AdditiveWedge": "pad", + # Subtractive primitives + "PartDesign::SubtractiveBox": "pocket", + "PartDesign::SubtractiveCylinder": "pocket", + "PartDesign::SubtractiveSphere": "pocket", + "PartDesign::SubtractiveCone": "pocket", + "PartDesign::SubtractiveEllipsoid": "pocket", + "PartDesign::SubtractiveTorus": "pocket", + "PartDesign::SubtractivePrism": "pocket", + "PartDesign::SubtractiveWedge": "pocket", + # Containers + "PartDesign::Body": "body", + "App::Part": "part", + "Part::Feature": "part", + # Datum / reference + "PartDesign::Point": "datum", + "PartDesign::Line": "datum", + "PartDesign::Plane": "datum", + "PartDesign::CoordinateSystem": "datum", + "PartDesign::ShapeBinder": "datum", + "PartDesign::SubShapeBinder": "datum", +} + + +def classify_type(type_id: str) -> Optional[str]: + """Map a FreeCAD TypeId string to a DAG node type. + + Returns one of ``sketch``, ``pad``, ``pocket``, ``fillet``, + ``chamfer``, ``body``, ``part``, ``datum``, or ``None`` if the + TypeId is not a recognized feature. + """ + return _TYPE_MAP.get(type_id) + + +# --------------------------------------------------------------------------- +# Properties hash +# --------------------------------------------------------------------------- + + +def _safe_float(val: Any) -> Any: + """Convert a float to a JSON-safe value, replacing NaN/Infinity with 0.""" + if isinstance(val, float) and (math.isnan(val) or math.isinf(val)): + return 0.0 + return val + + +def _prop_value(obj: Any, name: str) -> Any: + """Safely read ``obj..Value``, returning *None* on failure.""" + try: + return _safe_float(getattr(obj, name).Value) + except Exception: + return None + + +def _prop_raw(obj: Any, name: str) -> Any: + """Safely read ``obj.``, returning *None* on failure.""" + try: + return getattr(obj, name) + except Exception: + return None + + +def _link_name(obj: Any, name: str) -> Optional[str]: + """Return the ``.Name`` of a linked object property, or *None*.""" + try: + link = getattr(obj, name) + if isinstance(link, (list, tuple)): + link = link[0] + return link.Name if link is not None else None + except Exception: + return None + + +def _collect_inputs(obj: Any) -> Dict[str, Any]: + """Extract the parametric inputs that affect *obj*'s geometry. + + The returned dict is JSON-serialized and hashed to produce the + ``properties_hash`` for the DAG node. + """ + tid = obj.TypeId + inputs: Dict[str, Any] = {"_type": tid} + + # --- Sketch --- + if tid == "Sketcher::SketchObject": + inputs["geometry_count"] = _prop_raw(obj, "GeometryCount") + inputs["constraint_count"] = _prop_raw(obj, "ConstraintCount") + try: + inputs["geometry"] = obj.Shape.exportBrepToString() + except Exception: + pass + return inputs + + # --- Extrude (Pad / Pocket) --- + if tid in ("PartDesign::Pad", "PartDesign::Pocket"): + inputs["length"] = _prop_value(obj, "Length") + inputs["type"] = str(_prop_raw(obj, "Type") or "") + inputs["reversed"] = _prop_raw(obj, "Reversed") + inputs["sketch"] = _link_name(obj, "Profile") + return inputs + + # --- Revolution / Groove --- + if tid in ("PartDesign::Revolution", "PartDesign::Groove"): + inputs["angle"] = _prop_value(obj, "Angle") + inputs["type"] = str(_prop_raw(obj, "Type") or "") + inputs["reversed"] = _prop_raw(obj, "Reversed") + inputs["sketch"] = _link_name(obj, "Profile") + return inputs + + # --- Hole --- + if tid == "PartDesign::Hole": + inputs["diameter"] = _prop_value(obj, "Diameter") + inputs["depth"] = _prop_value(obj, "Depth") + inputs["threaded"] = _prop_raw(obj, "Threaded") + inputs["thread_type"] = str(_prop_raw(obj, "ThreadType") or "") + inputs["depth_type"] = str(_prop_raw(obj, "DepthType") or "") + inputs["sketch"] = _link_name(obj, "Profile") + return inputs + + # --- Pipe / Loft / Helix (additive + subtractive) --- + if tid in ( + "PartDesign::AdditivePipe", + "PartDesign::SubtractivePipe", + "PartDesign::AdditiveLoft", + "PartDesign::SubtractiveLoft", + "PartDesign::AdditiveHelix", + "PartDesign::SubtractiveHelix", + ): + inputs["sketch"] = _link_name(obj, "Profile") + inputs["spine"] = _link_name(obj, "Spine") + return inputs + + # --- Fillet --- + if tid == "PartDesign::Fillet": + inputs["radius"] = _prop_value(obj, "Radius") + return inputs + + # --- Chamfer --- + if tid == "PartDesign::Chamfer": + inputs["chamfer_type"] = str(_prop_raw(obj, "ChamferType") or "") + inputs["size"] = _prop_value(obj, "Size") + inputs["size2"] = _prop_value(obj, "Size2") + inputs["angle"] = _prop_value(obj, "Angle") + return inputs + + # --- Draft --- + if tid == "PartDesign::Draft": + inputs["angle"] = _prop_value(obj, "Angle") + inputs["reversed"] = _prop_raw(obj, "Reversed") + return inputs + + # --- Thickness --- + if tid == "PartDesign::Thickness": + inputs["value"] = _prop_value(obj, "Value") + inputs["reversed"] = _prop_raw(obj, "Reversed") + inputs["mode"] = str(_prop_raw(obj, "Mode") or "") + inputs["join"] = str(_prop_raw(obj, "Join") or "") + return inputs + + # --- Mirrored --- + if tid == "PartDesign::Mirrored": + inputs["mirror_plane"] = _link_name(obj, "MirrorPlane") + return inputs + + # --- LinearPattern --- + if tid == "PartDesign::LinearPattern": + inputs["direction"] = _link_name(obj, "Direction") + inputs["reversed"] = _prop_raw(obj, "Reversed") + inputs["length"] = _prop_value(obj, "Length") + inputs["occurrences"] = _prop_value(obj, "Occurrences") + return inputs + + # --- PolarPattern --- + if tid == "PartDesign::PolarPattern": + inputs["axis"] = _link_name(obj, "Axis") + inputs["reversed"] = _prop_raw(obj, "Reversed") + inputs["angle"] = _prop_value(obj, "Angle") + inputs["occurrences"] = _prop_value(obj, "Occurrences") + return inputs + + # --- Scaled --- + if tid == "PartDesign::Scaled": + inputs["factor"] = _prop_value(obj, "Factor") + inputs["occurrences"] = _prop_value(obj, "Occurrences") + return inputs + + # --- MultiTransform --- + if tid == "PartDesign::MultiTransform": + try: + inputs["transform_count"] = len(obj.Transformations) + except Exception: + pass + return inputs + + # --- Boolean --- + if tid == "PartDesign::Boolean": + inputs["type"] = str(_prop_raw(obj, "Type") or "") + return inputs + + # --- Primitives (additive) --- + if tid in ( + "PartDesign::AdditiveBox", + "PartDesign::SubtractiveBox", + ): + inputs["length"] = _prop_value(obj, "Length") + inputs["width"] = _prop_value(obj, "Width") + inputs["height"] = _prop_value(obj, "Height") + return inputs + + if tid in ( + "PartDesign::AdditiveCylinder", + "PartDesign::SubtractiveCylinder", + ): + inputs["radius"] = _prop_value(obj, "Radius") + inputs["height"] = _prop_value(obj, "Height") + inputs["angle"] = _prop_value(obj, "Angle") + return inputs + + if tid in ( + "PartDesign::AdditiveSphere", + "PartDesign::SubtractiveSphere", + ): + inputs["radius"] = _prop_value(obj, "Radius") + return inputs + + if tid in ( + "PartDesign::AdditiveCone", + "PartDesign::SubtractiveCone", + ): + inputs["radius1"] = _prop_value(obj, "Radius1") + inputs["radius2"] = _prop_value(obj, "Radius2") + inputs["height"] = _prop_value(obj, "Height") + return inputs + + if tid in ( + "PartDesign::AdditiveEllipsoid", + "PartDesign::SubtractiveEllipsoid", + ): + inputs["radius1"] = _prop_value(obj, "Radius1") + inputs["radius2"] = _prop_value(obj, "Radius2") + inputs["radius3"] = _prop_value(obj, "Radius3") + return inputs + + if tid in ( + "PartDesign::AdditiveTorus", + "PartDesign::SubtractiveTorus", + ): + inputs["radius1"] = _prop_value(obj, "Radius1") + inputs["radius2"] = _prop_value(obj, "Radius2") + return inputs + + if tid in ( + "PartDesign::AdditivePrism", + "PartDesign::SubtractivePrism", + ): + inputs["polygon"] = _prop_raw(obj, "Polygon") + inputs["circumradius"] = _prop_value(obj, "Circumradius") + inputs["height"] = _prop_value(obj, "Height") + return inputs + + if tid in ( + "PartDesign::AdditiveWedge", + "PartDesign::SubtractiveWedge", + ): + for dim in ( + "Xmin", + "Ymin", + "Zmin", + "X2min", + "Z2min", + "Xmax", + "Ymax", + "Zmax", + "X2max", + "Z2max", + ): + inputs[dim.lower()] = _prop_value(obj, dim) + return inputs + + # --- Datum / ShapeBinder --- + if tid in ( + "PartDesign::Point", + "PartDesign::Line", + "PartDesign::Plane", + "PartDesign::CoordinateSystem", + "PartDesign::ShapeBinder", + "PartDesign::SubShapeBinder", + ): + try: + p = obj.Placement + inputs["position"] = { + "x": _safe_float(p.Base.x), + "y": _safe_float(p.Base.y), + "z": _safe_float(p.Base.z), + } + inputs["rotation"] = { + "axis_x": _safe_float(p.Rotation.Axis.x), + "axis_y": _safe_float(p.Rotation.Axis.y), + "axis_z": _safe_float(p.Rotation.Axis.z), + "angle": _safe_float(p.Rotation.Angle), + } + except Exception: + pass + return inputs + + # --- Body / Part (containers) --- + if tid in ("PartDesign::Body", "App::Part", "Part::Feature"): + try: + inputs["child_count"] = len(obj.Group) + except Exception: + inputs["child_count"] = 0 + return inputs + + # --- Fallback --- + inputs["label"] = obj.Label + return inputs + + +def compute_properties_hash(obj: Any) -> str: + """Return a SHA-256 hex digest of *obj*'s parametric inputs. + + The hash is used for memoization -- if a node's inputs haven't + changed since the last validation run, re-validation can be skipped. + """ + inputs = _collect_inputs(obj) + canonical = json.dumps(inputs, sort_keys=True, default=str) + return hashlib.sha256(canonical.encode()).hexdigest() + + +# --------------------------------------------------------------------------- +# DAG extraction +# --------------------------------------------------------------------------- + + +def extract_dag( + doc: Any, +) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: + """Walk a FreeCAD document and return ``(nodes, edges)``. + + *nodes* is a list of dicts matching the Silo ``PUT /dag`` payload + schema. *edges* connects dependencies (source) to dependents + (target). + + Only objects whose TypeId is recognized by :func:`classify_type` + are included. Edges are limited to pairs where **both** endpoints + are included, preventing dangling references to internal objects + such as ``App::Origin``. + """ + # Pass 1 -- identify included objects + included: Set[str] = set() + classified: Dict[str, str] = {} # obj.Name -> node_type + + for obj in doc.Objects: + if not hasattr(obj, "TypeId"): + continue + node_type = classify_type(obj.TypeId) + if node_type is not None: + included.add(obj.Name) + classified[obj.Name] = node_type + + # Pass 2 -- build nodes and edges + nodes: List[Dict[str, Any]] = [] + edges: List[Dict[str, Any]] = [] + seen_edges: Set[Tuple[str, str]] = set() + + for obj in doc.Objects: + if obj.Name not in included: + continue + + nodes.append( + { + "node_key": obj.Name, + "node_type": classified[obj.Name], + "properties_hash": compute_properties_hash(obj), + "metadata": { + "label": obj.Label, + "type_id": obj.TypeId, + }, + } + ) + + # Walk dependencies: OutList contains objects this one depends on + try: + out_list = obj.OutList + except Exception: + continue + + for dep in out_list: + if not hasattr(dep, "Name"): + continue + if dep.Name not in included: + continue + edge_key = (dep.Name, obj.Name) + if edge_key in seen_edges: + continue + seen_edges.add(edge_key) + edges.append( + { + "source_key": dep.Name, + "target_key": obj.Name, + "edge_type": "depends_on", + } + ) + + return nodes, edges