feat: port PebbleGame3D to solver/datagen/pebble_game.py
Some checks failed
CI / lint (push) Has been cancelled
CI / type-check (push) Has been cancelled
CI / test (push) Has been cancelled

Port the (6,6)-pebble game implementation from data/synthetic/pebble-game.py.
Imports shared types from solver.datagen.types. No behavioral changes.

- Full type annotations on all methods (mypy strict)
- Ruff-compliant: ternary, combined if, unpacking
- Re-exported from solver.datagen.__init__

Closes #2
This commit is contained in:
2026-02-02 13:47:36 -06:00
parent 1b6135129e
commit 35d4ef736f
2 changed files with 260 additions and 0 deletions

View File

@@ -1,5 +1,6 @@
"""Data generation utilities for assembly constraint training data."""
from solver.datagen.pebble_game import PebbleGame3D
from solver.datagen.types import (
ConstraintAnalysis,
Joint,
@@ -12,6 +13,7 @@ __all__ = [
"ConstraintAnalysis",
"Joint",
"JointType",
"PebbleGame3D",
"PebbleState",
"RigidBody",
]

View File

@@ -0,0 +1,258 @@
"""(6,6)-Pebble game for 3D body-bar-hinge rigidity analysis.
Implements the pebble game algorithm adapted for CAD assembly constraint
graphs. Each rigid body has 6 DOF (3 translation + 3 rotation). Joints
between bodies remove DOF according to their type.
The pebble game provides a fast combinatorial *necessary* condition for
rigidity via Tay's theorem. It does not detect geometric degeneracies —
use :class:`solver.datagen.jacobian.JacobianVerifier` for the *sufficient*
condition.
References:
- Lee & Streinu, "Pebble Game Algorithms and Sparse Graphs", 2008
- Jacobs & Hendrickson, "An Algorithm for Two-Dimensional Rigidity
Percolation: The Pebble Game", J. Comput. Phys., 1997
- Tay, "Rigidity of Multigraphs I: Linking Rigid Bodies in n-space", 1984
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from solver.datagen.types import Joint, PebbleState
if TYPE_CHECKING:
from typing import Any
__all__ = ["PebbleGame3D"]
class PebbleGame3D:
"""Implements the (6,6)-pebble game for 3D body-bar-hinge frameworks.
For body-bar-hinge structures in 3D, Tay's theorem states that a
multigraph G on n vertices is generically minimally rigid iff:
|E| = 6n - 6 and |E'| <= 6n' - 6 for all subgraphs (n' >= 2)
The (6,6)-pebble game tests this sparsity condition incrementally.
Each vertex starts with 6 pebbles (representing 6 DOF). To insert
an edge, we need to collect 6+1=7 pebbles on its two endpoints.
If we can, the edge is independent (removes a DOF). If not, it's
redundant (overconstrained).
In the CAD assembly context:
- Vertices = rigid bodies
- Edges = scalar constraints from joints
- A revolute joint (5 DOF removed) maps to 5 multigraph edges
- A fixed joint (6 DOF removed) maps to 6 multigraph edges
"""
K = 6 # Pebbles per vertex (DOF per rigid body in 3D)
L = 6 # Sparsity parameter: need K+1=7 pebbles to accept edge
def __init__(self) -> None:
self.state = PebbleState()
self._edge_counter = 0
self._bodies: set[int] = set()
def add_body(self, body_id: int) -> None:
"""Register a rigid body (vertex) with K=6 free pebbles."""
if body_id in self._bodies:
return
self._bodies.add(body_id)
self.state.free_pebbles[body_id] = self.K
self.state.incoming[body_id] = set()
self.state.outgoing[body_id] = set()
def add_joint(self, joint: Joint) -> list[dict[str, Any]]:
"""Expand a joint into multigraph edges and test each for independence.
A joint that removes ``d`` DOF becomes ``d`` edges in the multigraph.
Each edge is tested individually via the pebble game.
Returns a list of dicts, one per scalar constraint, with:
- edge_id: int
- independent: bool
- dof_remaining: int (total free pebbles after this edge)
"""
self.add_body(joint.body_a)
self.add_body(joint.body_b)
num_constraints = joint.joint_type.value
results: list[dict[str, Any]] = []
for i in range(num_constraints):
edge_id = self._edge_counter
self._edge_counter += 1
independent = self._try_insert_edge(edge_id, joint.body_a, joint.body_b)
total_free = sum(self.state.free_pebbles.values())
results.append(
{
"edge_id": edge_id,
"joint_id": joint.joint_id,
"constraint_index": i,
"independent": independent,
"dof_remaining": total_free,
}
)
return results
def _try_insert_edge(self, edge_id: int, u: int, v: int) -> bool:
"""Try to insert a directed edge between u and v.
The edge is accepted (independent) iff we can collect L+1 = 7
pebbles on the two endpoints {u, v} combined.
If accepted, one pebble is consumed and the edge is directed
away from the vertex that gives up the pebble.
"""
# Count current free pebbles on u and v
available = self.state.free_pebbles[u] + self.state.free_pebbles[v]
# Try to gather enough pebbles via DFS reachability search
if available < self.L + 1:
needed = (self.L + 1) - available
# Try to free pebbles by searching from u first, then v
for target in (u, v):
while needed > 0:
found = self._search_and_collect(target, frozenset({u, v}))
if not found:
break
needed -= 1
# Recheck after collection attempts
available = self.state.free_pebbles[u] + self.state.free_pebbles[v]
if available >= self.L + 1:
# Accept: consume a pebble from whichever endpoint has one
source = u if self.state.free_pebbles[u] > 0 else v
self.state.free_pebbles[source] -= 1
self.state.directed_edges[edge_id] = (source, v if source == u else u)
self.state.outgoing[source].add((edge_id, v if source == u else u))
target = v if source == u else u
self.state.incoming[target].add((edge_id, source))
self.state.independent_edges.add(edge_id)
return True
else:
# Reject: edge is redundant (overconstrained)
self.state.redundant_edges.add(edge_id)
return False
def _search_and_collect(self, target: int, forbidden: frozenset[int]) -> bool:
"""DFS to find a free pebble reachable from *target* and move it.
Follows directed edges *backwards* (from destination to source)
to find a vertex with a free pebble that isn't in *forbidden*.
When found, reverses the path to move the pebble to *target*.
Returns True if a pebble was successfully moved to target.
"""
# BFS/DFS through the directed graph following outgoing edges
# from target. An outgoing edge (target -> w) means target spent
# a pebble on that edge. If we can find a vertex with a free
# pebble, we reverse edges along the path to move it.
visited: set[int] = set()
# Stack: (current_vertex, path_of_edge_ids_to_reverse)
stack: list[tuple[int, list[int]]] = [(target, [])]
while stack:
current, path = stack.pop()
if current in visited:
continue
visited.add(current)
# Check if current vertex (not in forbidden, not target)
# has a free pebble
if (
current != target
and current not in forbidden
and self.state.free_pebbles[current] > 0
):
# Found a pebble — reverse the path
self._reverse_path(path, current)
return True
# Follow outgoing edges from current vertex
for eid, neighbor in self.state.outgoing.get(current, set()):
if neighbor not in visited:
stack.append((neighbor, [*path, eid]))
return False
def _reverse_path(self, edge_ids: list[int], pebble_source: int) -> None:
"""Reverse directed edges along a path, moving a pebble to the start.
The pebble at *pebble_source* is consumed by the last edge in
the path, and a pebble is freed at the path's start vertex.
"""
if not edge_ids:
return
# Reverse each edge in the path
for eid in edge_ids:
old_source, old_target = self.state.directed_edges[eid]
# Remove from adjacency
self.state.outgoing[old_source].discard((eid, old_target))
self.state.incoming[old_target].discard((eid, old_source))
# Reverse direction
self.state.directed_edges[eid] = (old_target, old_source)
self.state.outgoing[old_target].add((eid, old_source))
self.state.incoming[old_source].add((eid, old_target))
# Move pebble counts: source loses one, first vertex in path gains one
self.state.free_pebbles[pebble_source] -= 1
# After all reversals, the vertex at the beginning of the
# search path gains a pebble
_first_src, first_tgt = self.state.directed_edges[edge_ids[0]]
self.state.free_pebbles[first_tgt] += 1
def get_dof(self) -> int:
"""Total remaining DOF = sum of free pebbles.
For a fully rigid assembly, this should be 6 (the trivial rigid
body motions of the whole assembly). Internal DOF = total - 6.
"""
return sum(self.state.free_pebbles.values())
def get_internal_dof(self) -> int:
"""Internal (non-trivial) degrees of freedom."""
return max(0, self.get_dof() - 6)
def is_rigid(self) -> bool:
"""Combinatorial rigidity check: rigid iff at most 6 pebbles remain."""
return self.get_dof() <= self.L
def get_redundant_count(self) -> int:
"""Number of redundant (overconstrained) scalar constraints."""
return len(self.state.redundant_edges)
def classify_assembly(self, *, grounded: bool = False) -> str:
"""Classify the assembly state.
Args:
grounded: If True, the baseline trivial DOF is 0 (not 6),
because the ground body's 6 DOF were removed.
"""
total_dof = self.get_dof()
redundant = self.get_redundant_count()
baseline = 0 if grounded else self.L
if redundant > 0 and total_dof > baseline:
return "mixed" # Both under and over-constrained regions
elif redundant > 0:
return "overconstrained"
elif total_dof > baseline:
return "underconstrained"
elif total_dof == baseline:
return "well-constrained"
else:
return "overconstrained"