From 35d4ef736f27c826c9f17abc5f9513c932cfb1eb Mon Sep 17 00:00:00 2001 From: forbes-0023 Date: Mon, 2 Feb 2026 13:47:36 -0600 Subject: [PATCH] feat: port PebbleGame3D to solver/datagen/pebble_game.py Port the (6,6)-pebble game implementation from data/synthetic/pebble-game.py. Imports shared types from solver.datagen.types. No behavioral changes. - Full type annotations on all methods (mypy strict) - Ruff-compliant: ternary, combined if, unpacking - Re-exported from solver.datagen.__init__ Closes #2 --- solver/datagen/__init__.py | 2 + solver/datagen/pebble_game.py | 258 ++++++++++++++++++++++++++++++++++ 2 files changed, 260 insertions(+) create mode 100644 solver/datagen/pebble_game.py diff --git a/solver/datagen/__init__.py b/solver/datagen/__init__.py index 6e53b70..6504939 100644 --- a/solver/datagen/__init__.py +++ b/solver/datagen/__init__.py @@ -1,5 +1,6 @@ """Data generation utilities for assembly constraint training data.""" +from solver.datagen.pebble_game import PebbleGame3D from solver.datagen.types import ( ConstraintAnalysis, Joint, @@ -12,6 +13,7 @@ __all__ = [ "ConstraintAnalysis", "Joint", "JointType", + "PebbleGame3D", "PebbleState", "RigidBody", ] diff --git a/solver/datagen/pebble_game.py b/solver/datagen/pebble_game.py new file mode 100644 index 0000000..655cd13 --- /dev/null +++ b/solver/datagen/pebble_game.py @@ -0,0 +1,258 @@ +"""(6,6)-Pebble game for 3D body-bar-hinge rigidity analysis. + +Implements the pebble game algorithm adapted for CAD assembly constraint +graphs. Each rigid body has 6 DOF (3 translation + 3 rotation). Joints +between bodies remove DOF according to their type. + +The pebble game provides a fast combinatorial *necessary* condition for +rigidity via Tay's theorem. It does not detect geometric degeneracies — +use :class:`solver.datagen.jacobian.JacobianVerifier` for the *sufficient* +condition. + +References: + - Lee & Streinu, "Pebble Game Algorithms and Sparse Graphs", 2008 + - Jacobs & Hendrickson, "An Algorithm for Two-Dimensional Rigidity + Percolation: The Pebble Game", J. Comput. Phys., 1997 + - Tay, "Rigidity of Multigraphs I: Linking Rigid Bodies in n-space", 1984 +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from solver.datagen.types import Joint, PebbleState + +if TYPE_CHECKING: + from typing import Any + +__all__ = ["PebbleGame3D"] + + +class PebbleGame3D: + """Implements the (6,6)-pebble game for 3D body-bar-hinge frameworks. + + For body-bar-hinge structures in 3D, Tay's theorem states that a + multigraph G on n vertices is generically minimally rigid iff: + |E| = 6n - 6 and |E'| <= 6n' - 6 for all subgraphs (n' >= 2) + + The (6,6)-pebble game tests this sparsity condition incrementally. + Each vertex starts with 6 pebbles (representing 6 DOF). To insert + an edge, we need to collect 6+1=7 pebbles on its two endpoints. + If we can, the edge is independent (removes a DOF). If not, it's + redundant (overconstrained). + + In the CAD assembly context: + - Vertices = rigid bodies + - Edges = scalar constraints from joints + - A revolute joint (5 DOF removed) maps to 5 multigraph edges + - A fixed joint (6 DOF removed) maps to 6 multigraph edges + """ + + K = 6 # Pebbles per vertex (DOF per rigid body in 3D) + L = 6 # Sparsity parameter: need K+1=7 pebbles to accept edge + + def __init__(self) -> None: + self.state = PebbleState() + self._edge_counter = 0 + self._bodies: set[int] = set() + + def add_body(self, body_id: int) -> None: + """Register a rigid body (vertex) with K=6 free pebbles.""" + if body_id in self._bodies: + return + self._bodies.add(body_id) + self.state.free_pebbles[body_id] = self.K + self.state.incoming[body_id] = set() + self.state.outgoing[body_id] = set() + + def add_joint(self, joint: Joint) -> list[dict[str, Any]]: + """Expand a joint into multigraph edges and test each for independence. + + A joint that removes ``d`` DOF becomes ``d`` edges in the multigraph. + Each edge is tested individually via the pebble game. + + Returns a list of dicts, one per scalar constraint, with: + - edge_id: int + - independent: bool + - dof_remaining: int (total free pebbles after this edge) + """ + self.add_body(joint.body_a) + self.add_body(joint.body_b) + + num_constraints = joint.joint_type.value + results: list[dict[str, Any]] = [] + + for i in range(num_constraints): + edge_id = self._edge_counter + self._edge_counter += 1 + + independent = self._try_insert_edge(edge_id, joint.body_a, joint.body_b) + total_free = sum(self.state.free_pebbles.values()) + + results.append( + { + "edge_id": edge_id, + "joint_id": joint.joint_id, + "constraint_index": i, + "independent": independent, + "dof_remaining": total_free, + } + ) + + return results + + def _try_insert_edge(self, edge_id: int, u: int, v: int) -> bool: + """Try to insert a directed edge between u and v. + + The edge is accepted (independent) iff we can collect L+1 = 7 + pebbles on the two endpoints {u, v} combined. + + If accepted, one pebble is consumed and the edge is directed + away from the vertex that gives up the pebble. + """ + # Count current free pebbles on u and v + available = self.state.free_pebbles[u] + self.state.free_pebbles[v] + + # Try to gather enough pebbles via DFS reachability search + if available < self.L + 1: + needed = (self.L + 1) - available + # Try to free pebbles by searching from u first, then v + for target in (u, v): + while needed > 0: + found = self._search_and_collect(target, frozenset({u, v})) + if not found: + break + needed -= 1 + + # Recheck after collection attempts + available = self.state.free_pebbles[u] + self.state.free_pebbles[v] + + if available >= self.L + 1: + # Accept: consume a pebble from whichever endpoint has one + source = u if self.state.free_pebbles[u] > 0 else v + + self.state.free_pebbles[source] -= 1 + self.state.directed_edges[edge_id] = (source, v if source == u else u) + self.state.outgoing[source].add((edge_id, v if source == u else u)) + target = v if source == u else u + self.state.incoming[target].add((edge_id, source)) + self.state.independent_edges.add(edge_id) + return True + else: + # Reject: edge is redundant (overconstrained) + self.state.redundant_edges.add(edge_id) + return False + + def _search_and_collect(self, target: int, forbidden: frozenset[int]) -> bool: + """DFS to find a free pebble reachable from *target* and move it. + + Follows directed edges *backwards* (from destination to source) + to find a vertex with a free pebble that isn't in *forbidden*. + When found, reverses the path to move the pebble to *target*. + + Returns True if a pebble was successfully moved to target. + """ + # BFS/DFS through the directed graph following outgoing edges + # from target. An outgoing edge (target -> w) means target spent + # a pebble on that edge. If we can find a vertex with a free + # pebble, we reverse edges along the path to move it. + + visited: set[int] = set() + # Stack: (current_vertex, path_of_edge_ids_to_reverse) + stack: list[tuple[int, list[int]]] = [(target, [])] + + while stack: + current, path = stack.pop() + if current in visited: + continue + visited.add(current) + + # Check if current vertex (not in forbidden, not target) + # has a free pebble + if ( + current != target + and current not in forbidden + and self.state.free_pebbles[current] > 0 + ): + # Found a pebble — reverse the path + self._reverse_path(path, current) + return True + + # Follow outgoing edges from current vertex + for eid, neighbor in self.state.outgoing.get(current, set()): + if neighbor not in visited: + stack.append((neighbor, [*path, eid])) + + return False + + def _reverse_path(self, edge_ids: list[int], pebble_source: int) -> None: + """Reverse directed edges along a path, moving a pebble to the start. + + The pebble at *pebble_source* is consumed by the last edge in + the path, and a pebble is freed at the path's start vertex. + """ + if not edge_ids: + return + + # Reverse each edge in the path + for eid in edge_ids: + old_source, old_target = self.state.directed_edges[eid] + + # Remove from adjacency + self.state.outgoing[old_source].discard((eid, old_target)) + self.state.incoming[old_target].discard((eid, old_source)) + + # Reverse direction + self.state.directed_edges[eid] = (old_target, old_source) + self.state.outgoing[old_target].add((eid, old_source)) + self.state.incoming[old_source].add((eid, old_target)) + + # Move pebble counts: source loses one, first vertex in path gains one + self.state.free_pebbles[pebble_source] -= 1 + + # After all reversals, the vertex at the beginning of the + # search path gains a pebble + _first_src, first_tgt = self.state.directed_edges[edge_ids[0]] + self.state.free_pebbles[first_tgt] += 1 + + def get_dof(self) -> int: + """Total remaining DOF = sum of free pebbles. + + For a fully rigid assembly, this should be 6 (the trivial rigid + body motions of the whole assembly). Internal DOF = total - 6. + """ + return sum(self.state.free_pebbles.values()) + + def get_internal_dof(self) -> int: + """Internal (non-trivial) degrees of freedom.""" + return max(0, self.get_dof() - 6) + + def is_rigid(self) -> bool: + """Combinatorial rigidity check: rigid iff at most 6 pebbles remain.""" + return self.get_dof() <= self.L + + def get_redundant_count(self) -> int: + """Number of redundant (overconstrained) scalar constraints.""" + return len(self.state.redundant_edges) + + def classify_assembly(self, *, grounded: bool = False) -> str: + """Classify the assembly state. + + Args: + grounded: If True, the baseline trivial DOF is 0 (not 6), + because the ground body's 6 DOF were removed. + """ + total_dof = self.get_dof() + redundant = self.get_redundant_count() + baseline = 0 if grounded else self.L + + if redundant > 0 and total_dof > baseline: + return "mixed" # Both under and over-constrained regions + elif redundant > 0: + return "overconstrained" + elif total_dof > baseline: + return "underconstrained" + elif total_dof == baseline: + return "well-constrained" + else: + return "overconstrained"