14 Commits

Author SHA1 Message Date
9d86bb203e Merge pull request 'fix(solver): prevent orientation flips during interactive drag' (#36) from fix/drag-orientation-stability into main
Reviewed-on: #36
2026-02-25 02:47:26 +00:00
forbes-0023
c2ebcc3169 fix(solver): prevent orientation flips during interactive drag
Add half-space tracking for all compound constraints with branch
ambiguity: Planar, Revolute, Concentric, Cylindrical, Slider, Screw,
Universal, PointInPlane, and LineInPlane.  Previously only
DistancePointPoint, Parallel, Angle, and Perpendicular were tracked,
so the Newton-Raphson solver could converge to the wrong branch for
compound constraints — causing parts to drift through plane
constraints while honoring revolute joints.

Add quaternion continuity enforcement in drag_step(): after solving,
each non-dragged body's quaternion is checked against its pre-step
value and negated if in the opposite hemisphere (standard SLERP
short-arc correction).  This prevents the C++ validateNewPlacements()
from rejecting valid solutions as 'flipped orientation' due to the
quaternion double-cover ambiguity (q and -q encode the same rotation
but measure as ~340° apart).
2026-02-24 20:46:42 -06:00
e7e4266f3d Merge pull request 'fix(solver): build weight vector after pre-passes to match free param count' (#35) from fix/weight-vector-after-prepass into main
Reviewed-on: #35
2026-02-23 03:19:26 +00:00
forbes-0023
0825578778 fix(solver): build weight vector after pre-passes to match free param count
The weight vector was built before substitution_pass and
single_equation_pass, which can fix variables and reduce the free
parameter count. This caused a shape mismatch in newton_solve when
the Jacobian had fewer columns than the weight vector had entries:

  ValueError: operands could not be broadcast together with shapes
  (55,27) (1,28)

Move build_weight_vector() after both pre-passes so its length
matches the actual free parameters used by the Jacobian.
2026-02-22 21:06:21 -06:00
forbes-0023
8e521b4519 fix(solver): use all 3 cross-product components to avoid XY-plane singularity
The parallel-normal constraints (ParallelConstraint, PlanarConstraint,
ConcentricConstraint, RevoluteConstraint, CylindricalConstraint,
SliderConstraint, ScrewConstraint) and point-on-line constraints
previously used only the x and y components of the cross product,
dropping the z component.

This created a singularity when both normal vectors lay in the XY
plane: a yaw rotation produced a cross product entirely along Z,
which was discarded, making the constraint blind to the rotation.

Fix: return all 3 cross-product components. The Jacobian has a
rank deficiency at the solution (3 residuals, rank 2), but the
Newton solver handles this correctly via its pseudoinverse.

Similarly, point_line_perp_components now returns all 3 components
of the displacement cross product to avoid singularity when the
line direction aligns with a coordinate axis.
2026-02-22 15:51:59 -06:00
forbes-0023
bfb787157c perf(solver): cache compiled system across drag steps
During interactive drag, the constraint topology is invariant — only the
dragged part's parameter values change between steps. Previously,
drag_step() called solve() which rebuilt everything from scratch each
frame: new ParamTable, new Expr trees, symbolic differentiation, CSE,
and compilation (~150 ms overhead per frame).

Now pre_drag() builds and caches the system, symbolic Jacobian, compiled
evaluator, half-spaces, and weight vector. drag_step() reuses all cached
artifacts, only updating the dragged part's 7 parameter values before
running Newton-Raphson.

Expected ~1.5-2x speedup on drag step latency (eliminating rebuild
overhead, leaving only the irreducible Newton iteration cost).
2026-02-21 12:23:32 -06:00
forbes-0023
e0468cd3c1 fix(solver): redirect distance=0 constraint to CoincidentConstraint
DistancePointPointConstraint uses a squared residual (||p_i-p_j||^2 - d^2)
which has a degenerate Jacobian when d=0 and the constraint is satisfied
(all partial derivatives vanish). This made the constraint invisible to
the Newton solver during drag, allowing constrained points to drift apart.

When distance=0, use CoincidentConstraint instead (3 linear residuals:
dx, dy, dz) which always has a well-conditioned Jacobian.
2026-02-21 11:46:47 -06:00
forbes-0023
64b1e24467 feat(solver): compile symbolic Jacobian to flat Python for fast evaluation
Add a code generation pipeline that compiles Expr DAGs into flat Python
functions, eliminating recursive tree-walk dispatch in the Newton-Raphson
inner loop.

Key changes:
- Add to_code() method to all 11 Expr node types (expr.py)
- New codegen.py module with CSE (common subexpression elimination),
  sparsity detection, and compile()/exec() compilation pipeline
- Add ParamTable.env_ref() to avoid dict copies per iteration (params.py)
- Newton and BFGS solvers accept pre-built jac_exprs and compiled_eval
  to avoid redundant diff/simplify and enable compiled evaluation
- count_dof() and diagnostics accept pre-built jac_exprs
- solver.py builds symbolic Jacobian once, compiles once, passes to all
  consumers (_monolithic_solve, count_dof, diagnostics)
- Automatic fallback: if codegen fails, tree-walk eval is used

Expected performance impact:
- ~10-20x faster Jacobian evaluation (no recursive dispatch)
- ~2-5x additional from CSE on quaternion-heavy systems
- ~3x fewer entries evaluated via sparsity detection
- Eliminates redundant diff().simplify() in DOF/diagnostics
2026-02-21 11:22:36 -06:00
forbes-0023
d20b38e760 feat(solver): add diagnostic logging throughout solver pipeline
- solver.py: log solve entry (parts/constraints counts), system build
  stats, convergence result with timing, decomposition decisions,
  Newton/BFGS fallback events, and per-constraint diagnostics on failure
- solver.py: log drag lifecycle (pre_drag parts, drag_step timing/status,
  post_drag step count summary)
- decompose.py: log cluster count, per-cluster body/constraint/residual
  stats, and per-cluster convergence failures
- Init.py: add _FreeCADLogHandler routing Python logging.* calls to
  FreeCAD.Console (PrintLog/PrintWarning/PrintError) with kindred_solver
  logger at DEBUG level
2026-02-21 10:07:54 -06:00
318a1c17da Merge pull request 'feat(solver): Phase 4+5 — diagnostics, preferences, assembly integration' (#34) from feat/phase5-assembly-integration into main
Reviewed-on: #34
2026-02-21 05:48:26 +00:00
forbes-0023
adaa0f9a69 test(solver): add in-client console tests for Phase 5 assembly integration
Paste-into-console test script exercising the full pipeline:
- Solver registry and loading
- Preference switching between kindred/ondsel
- Fixed joint placement matching
- Revolute joint DOF reporting
- No-ground error code
- Solve determinism/stability
- Standalone kcsolve API (no FreeCAD Assembly objects)
- Diagnose API for overconstrained detection
2026-02-20 23:34:39 -06:00
forbes-0023
9dad25e947 feat(solver): assembly integration — diagnose, drag protocol, system extraction (phase 5)
- Extract _build_system() from solve() to enable reuse by diagnose()
- Add diagnose(ctx) method: runs find_overconstrained() unconditionally
- Add interactive drag protocol: pre_drag(), drag_step(), post_drag()
- Add _run_diagnostics() and _extract_placements() helpers
- Log warning when joint limits are present (not yet enforced)
- KindredSolver now implements all IKCSolver methods needed for
  full Assembly workbench integration
2026-02-20 23:32:51 -06:00
forbes-0023
b4b8724ff1 feat(solver): diagnostics, half-space preference, and weight vectors (phase 4)
- Add per-entity DOF analysis via Jacobian SVD (diagnostics.py)
- Add overconstrained detection: redundant vs conflicting constraints
- Add half-space tracking to preserve configuration branch (preference.py)
- Add minimum-movement weighting for least-squares solve
- Extend BFGS fallback with weight vector and quaternion renormalization
- Add snapshot/restore and env accessor to ParamTable
- Fix DistancePointPointConstraint sign for half-space tracking
2026-02-20 23:32:45 -06:00
3f5f7905b5 Merge pull request 'feat(solver): graph decomposition for cluster-by-cluster solving (phase 3)' (#33) from feat/phase3-graph-decomposition into main 2026-02-21 04:21:10 +00:00
21 changed files with 3692 additions and 181 deletions

29
Init.py
View File

@@ -1,11 +1,40 @@
"""Register the Kindred solver with the KCSolve solver registry."""
import logging
import FreeCAD
class _FreeCADLogHandler(logging.Handler):
"""Route Python logging to FreeCAD's Console."""
def emit(self, record):
msg = self.format(record) + "\n"
if record.levelno >= logging.ERROR:
FreeCAD.Console.PrintError(msg)
elif record.levelno >= logging.WARNING:
FreeCAD.Console.PrintWarning(msg)
elif record.levelno >= logging.INFO:
FreeCAD.Console.PrintLog(msg)
else:
FreeCAD.Console.PrintLog(msg)
def _setup_logging():
"""Attach FreeCAD log handler to the kindred_solver logger."""
logger = logging.getLogger("kindred_solver")
if not logger.handlers:
handler = _FreeCADLogHandler()
handler.setFormatter(logging.Formatter("%(name)s: %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
try:
import kcsolve
from kindred_solver import KindredSolver
_setup_logging()
kcsolve.register_solver("kindred", KindredSolver)
FreeCAD.Console.PrintLog("kindred-solver registered\n")
except Exception as exc:

View File

@@ -7,7 +7,7 @@ analytic gradient from the Expr DAG's symbolic differentiation.
from __future__ import annotations
import math
from typing import List
from typing import Callable, List
import numpy as np
@@ -28,11 +28,25 @@ def bfgs_solve(
quat_groups: List[tuple[str, str, str, str]] | None = None,
max_iter: int = 200,
tol: float = 1e-10,
weight_vector: "np.ndarray | None" = None,
jac_exprs: "List[List[Expr]] | None" = None,
compiled_eval: "Callable | None" = None,
) -> bool:
"""Solve ``residuals == 0`` by minimizing sum of squared residuals.
Falls back gracefully to False if scipy is not available.
When *weight_vector* is provided, residuals are scaled by
``sqrt(w)`` so that the objective becomes
``0.5 * sum(w_i * r_i^2)`` — equivalent to weighted least-squares.
Parameters
----------
jac_exprs:
Pre-built symbolic Jacobian (list-of-lists of Expr).
compiled_eval:
Pre-compiled evaluation function from :mod:`codegen`.
Returns True if converged (||r|| < tol).
"""
if not _HAS_SCIPY:
@@ -46,54 +60,99 @@ def bfgs_solve(
return True
# Build symbolic gradient expressions once: d(r_i)/d(x_j)
jac_exprs: List[List[Expr]] = []
for r in residuals:
row = []
for name in free:
row.append(r.diff(name).simplify())
jac_exprs.append(row)
if jac_exprs is None:
jac_exprs = []
for r in residuals:
row = []
for name in free:
row.append(r.diff(name).simplify())
jac_exprs.append(row)
# Try compilation if not provided
if compiled_eval is None:
from .codegen import try_compile_system
compiled_eval = try_compile_system(residuals, jac_exprs, n_res, n_free)
# Pre-compute scaling for weighted minimum-norm
if weight_vector is not None:
w_sqrt = np.sqrt(weight_vector)
w_inv_sqrt = 1.0 / w_sqrt
else:
w_sqrt = None
w_inv_sqrt = None
# Pre-allocate arrays reused across objective calls
r_vals = np.empty(n_res)
J = np.zeros((n_res, n_free))
def objective_and_grad(y_vec):
# Transform back from scaled space if weighted
if w_inv_sqrt is not None:
x_vec = y_vec * w_inv_sqrt
else:
x_vec = y_vec
def objective_and_grad(x_vec):
# Update params
params.set_free_vector(x_vec)
if quat_groups:
_renormalize_quats(params, quat_groups)
env = params.get_env()
if compiled_eval is not None:
J[:] = 0.0
compiled_eval(params.env_ref(), r_vals, J)
else:
env = params.get_env()
for i, r in enumerate(residuals):
r_vals[i] = r.eval(env)
for i in range(n_res):
for j in range(n_free):
J[i, j] = jac_exprs[i][j].eval(env)
# Evaluate residuals
r_vals = np.array([r.eval(env) for r in residuals])
f = 0.5 * np.dot(r_vals, r_vals)
# Evaluate Jacobian
J = np.empty((n_res, n_free))
for i in range(n_res):
for j in range(n_free):
J[i, j] = jac_exprs[i][j].eval(env)
# Gradient of f w.r.t. x = J^T @ r
grad_x = J.T @ r_vals
# Gradient of f = sum(r_i * dr_i/dx_j) = J^T @ r
grad = J.T @ r_vals
# Chain rule: df/dy = df/dx * dx/dy = grad_x * w_inv_sqrt
if w_inv_sqrt is not None:
grad = grad_x * w_inv_sqrt
else:
grad = grad_x
return f, grad
x0 = params.get_free_vector().copy()
# Transform initial guess to scaled space
if w_sqrt is not None:
y0 = x0 * w_sqrt
else:
y0 = x0
result = _scipy_minimize(
objective_and_grad,
x0,
y0,
method="L-BFGS-B",
jac=True,
options={"maxiter": max_iter, "ftol": tol * tol, "gtol": tol},
)
# Apply final result
params.set_free_vector(result.x)
# Apply final result (transform back from scaled space)
if w_inv_sqrt is not None:
params.set_free_vector(result.x * w_inv_sqrt)
else:
params.set_free_vector(result.x)
if quat_groups:
_renormalize_quats(params, quat_groups)
# Check convergence on actual residual norm
env = params.get_env()
r_vals = np.array([r.eval(env) for r in residuals])
if compiled_eval is not None:
compiled_eval(params.env_ref(), r_vals, J)
else:
env = params.get_env()
for i, r in enumerate(residuals):
r_vals[i] = r.eval(env)
return bool(np.linalg.norm(r_vals) < tol)

308
kindred_solver/codegen.py Normal file
View File

@@ -0,0 +1,308 @@
"""Compile Expr DAGs into flat Python functions for fast evaluation.
The compilation pipeline:
1. Collect all Expr nodes to be evaluated (residuals + Jacobian entries).
2. Identify common subexpressions (CSE) by ``id()`` — the Expr DAG
already shares node objects via ParamTable's Var instances.
3. Generate a single Python function body that computes CSE temps,
then fills ``r_vec`` and ``J`` arrays in-place.
4. Compile with ``compile()`` + ``exec()`` and return the callable.
The generated function signature is::
fn(env: dict[str, float], r_vec: ndarray, J: ndarray) -> None
"""
from __future__ import annotations
import logging
import math
from collections import Counter
from typing import Callable, List
import numpy as np
from .expr import Const, Expr, Var
log = logging.getLogger(__name__)
# Namespace injected into compiled functions.
_CODEGEN_NS = {
"_sin": math.sin,
"_cos": math.cos,
"_sqrt": math.sqrt,
}
# ---------------------------------------------------------------------------
# CSE (Common Subexpression Elimination)
# ---------------------------------------------------------------------------
def _collect_nodes(expr: Expr, counts: Counter, visited: set[int]) -> None:
"""Walk *expr* and count how many times each node ``id()`` appears."""
eid = id(expr)
counts[eid] += 1
if eid in visited:
return
visited.add(eid)
# Recurse into children
if isinstance(expr, (Const, Var)):
return
if hasattr(expr, "child"):
_collect_nodes(expr.child, counts, visited)
elif hasattr(expr, "a"):
_collect_nodes(expr.a, counts, visited)
_collect_nodes(expr.b, counts, visited)
elif hasattr(expr, "base"):
_collect_nodes(expr.base, counts, visited)
_collect_nodes(expr.exp, counts, visited)
def _build_cse(
exprs: list[Expr],
) -> tuple[dict[int, str], list[tuple[str, Expr]]]:
"""Identify shared sub-trees and assign temporary variable names.
Returns:
id_to_temp: mapping from ``id(node)`` to temp variable name
temps_ordered: ``(temp_name, expr)`` pairs in dependency order
"""
counts: Counter = Counter()
visited: set[int] = set()
id_to_expr: dict[int, Expr] = {}
for expr in exprs:
_collect_nodes(expr, counts, visited)
# Map id -> Expr for nodes we visited
for expr in exprs:
_map_ids(expr, id_to_expr)
# Nodes referenced more than once and not trivial (Const/Var) become temps
shared_ids = set()
for eid, cnt in counts.items():
if cnt > 1:
node = id_to_expr.get(eid)
if node is not None and not isinstance(node, (Const, Var)):
shared_ids.add(eid)
if not shared_ids:
return {}, []
# Topological order: a temp must be computed before any temp that uses it.
# Walk each shared node, collect in post-order.
ordered_ids: list[int] = []
order_visited: set[int] = set()
def _topo(expr: Expr) -> None:
eid = id(expr)
if eid in order_visited:
return
order_visited.add(eid)
if isinstance(expr, (Const, Var)):
return
if hasattr(expr, "child"):
_topo(expr.child)
elif hasattr(expr, "a"):
_topo(expr.a)
_topo(expr.b)
elif hasattr(expr, "base"):
_topo(expr.base)
_topo(expr.exp)
if eid in shared_ids:
ordered_ids.append(eid)
for expr in exprs:
_topo(expr)
id_to_temp: dict[int, str] = {}
temps_ordered: list[tuple[str, Expr]] = []
for i, eid in enumerate(ordered_ids):
name = f"_c{i}"
id_to_temp[eid] = name
temps_ordered.append((name, id_to_expr[eid]))
return id_to_temp, temps_ordered
def _map_ids(expr: Expr, mapping: dict[int, Expr]) -> None:
"""Populate id -> Expr mapping for all nodes in *expr*."""
eid = id(expr)
if eid in mapping:
return
mapping[eid] = expr
if isinstance(expr, (Const, Var)):
return
if hasattr(expr, "child"):
_map_ids(expr.child, mapping)
elif hasattr(expr, "a"):
_map_ids(expr.a, mapping)
_map_ids(expr.b, mapping)
elif hasattr(expr, "base"):
_map_ids(expr.base, mapping)
_map_ids(expr.exp, mapping)
def _expr_to_code(expr: Expr, id_to_temp: dict[int, str]) -> str:
"""Emit code for *expr*, substituting temp names for shared nodes."""
eid = id(expr)
temp = id_to_temp.get(eid)
if temp is not None:
return temp
return expr.to_code()
def _expr_to_code_recursive(expr: Expr, id_to_temp: dict[int, str]) -> str:
"""Emit code for *expr*, recursing into children but respecting temps."""
eid = id(expr)
temp = id_to_temp.get(eid)
if temp is not None:
return temp
# For leaf nodes, just use to_code() directly
if isinstance(expr, (Const, Var)):
return expr.to_code()
# For non-leaf nodes, recurse into children with temp substitution
from .expr import Add, Cos, Div, Mul, Neg, Pow, Sin, Sqrt, Sub
if isinstance(expr, Neg):
return f"(-{_expr_to_code_recursive(expr.child, id_to_temp)})"
if isinstance(expr, Sin):
return f"_sin({_expr_to_code_recursive(expr.child, id_to_temp)})"
if isinstance(expr, Cos):
return f"_cos({_expr_to_code_recursive(expr.child, id_to_temp)})"
if isinstance(expr, Sqrt):
return f"_sqrt({_expr_to_code_recursive(expr.child, id_to_temp)})"
if isinstance(expr, Add):
a = _expr_to_code_recursive(expr.a, id_to_temp)
b = _expr_to_code_recursive(expr.b, id_to_temp)
return f"({a} + {b})"
if isinstance(expr, Sub):
a = _expr_to_code_recursive(expr.a, id_to_temp)
b = _expr_to_code_recursive(expr.b, id_to_temp)
return f"({a} - {b})"
if isinstance(expr, Mul):
a = _expr_to_code_recursive(expr.a, id_to_temp)
b = _expr_to_code_recursive(expr.b, id_to_temp)
return f"({a} * {b})"
if isinstance(expr, Div):
a = _expr_to_code_recursive(expr.a, id_to_temp)
b = _expr_to_code_recursive(expr.b, id_to_temp)
return f"({a} / {b})"
if isinstance(expr, Pow):
base = _expr_to_code_recursive(expr.base, id_to_temp)
exp = _expr_to_code_recursive(expr.exp, id_to_temp)
return f"({base} ** {exp})"
# Fallback — should not happen for known node types
return expr.to_code()
# ---------------------------------------------------------------------------
# Sparsity detection
# ---------------------------------------------------------------------------
def _find_nonzero_entries(
jac_exprs: list[list[Expr]],
) -> list[tuple[int, int]]:
"""Return ``(row, col)`` pairs for non-zero Jacobian entries."""
nz = []
for i, row in enumerate(jac_exprs):
for j, expr in enumerate(row):
if isinstance(expr, Const) and expr.value == 0.0:
continue
nz.append((i, j))
return nz
# ---------------------------------------------------------------------------
# Code generation
# ---------------------------------------------------------------------------
def compile_system(
residuals: list[Expr],
jac_exprs: list[list[Expr]],
n_res: int,
n_free: int,
) -> Callable[[dict, np.ndarray, np.ndarray], None]:
"""Compile residuals + Jacobian into a single evaluation function.
Returns a callable ``fn(env, r_vec, J)`` that fills *r_vec* and *J*
in-place. *J* must be pre-zeroed by the caller (only non-zero
entries are written).
"""
# Detect non-zero Jacobian entries
nz_entries = _find_nonzero_entries(jac_exprs)
# Collect all expressions for CSE analysis
all_exprs: list[Expr] = list(residuals)
nz_jac_exprs: list[Expr] = [jac_exprs[i][j] for i, j in nz_entries]
all_exprs.extend(nz_jac_exprs)
# CSE
id_to_temp, temps_ordered = _build_cse(all_exprs)
# Generate function body
lines: list[str] = ["def _eval(env, r_vec, J):"]
# Temporaries — temporarily remove each temp's own id so its RHS
# is expanded rather than self-referencing.
for temp_name, temp_expr in temps_ordered:
eid = id(temp_expr)
saved = id_to_temp.pop(eid)
code = _expr_to_code_recursive(temp_expr, id_to_temp)
id_to_temp[eid] = saved
lines.append(f" {temp_name} = {code}")
# Residuals
for i, r in enumerate(residuals):
code = _expr_to_code_recursive(r, id_to_temp)
lines.append(f" r_vec[{i}] = {code}")
# Jacobian (sparse)
for idx, (i, j) in enumerate(nz_entries):
code = _expr_to_code_recursive(nz_jac_exprs[idx], id_to_temp)
lines.append(f" J[{i}, {j}] = {code}")
source = "\n".join(lines)
# Compile
code_obj = compile(source, "<kindred_codegen>", "exec")
ns = dict(_CODEGEN_NS)
exec(code_obj, ns)
fn = ns["_eval"]
n_temps = len(temps_ordered)
n_nz = len(nz_entries)
n_total = n_res * n_free
log.debug(
"codegen: compiled %d residuals + %d/%d Jacobian entries, %d CSE temps",
n_res,
n_nz,
n_total,
n_temps,
)
return fn
def try_compile_system(
residuals: list[Expr],
jac_exprs: list[list[Expr]],
n_res: int,
n_free: int,
) -> Callable[[dict, np.ndarray, np.ndarray], None] | None:
"""Compile with automatic fallback. Returns ``None`` on failure."""
try:
return compile_system(residuals, jac_exprs, n_res, n_free)
except Exception:
log.debug(
"codegen: compilation failed, falling back to tree-walk eval", exc_info=True
)
return None

View File

@@ -77,9 +77,15 @@ class DistancePointPointConstraint(ConstraintBase):
self.marker_j_pos = marker_j_pos
self.distance = distance
def world_points(self) -> tuple[tuple[Expr, Expr, Expr], tuple[Expr, Expr, Expr]]:
"""Return (world_point_i, world_point_j) expression tuples."""
return (
self.body_i.world_point(*self.marker_i_pos),
self.body_j.world_point(*self.marker_j_pos),
)
def residuals(self) -> List[Expr]:
wx_i, wy_i, wz_i = self.body_i.world_point(*self.marker_i_pos)
wx_j, wy_j, wz_j = self.body_j.world_point(*self.marker_j_pos)
(wx_i, wy_i, wz_i), (wx_j, wy_j, wz_j) = self.world_points()
dx = wx_i - wx_j
dy = wy_i - wy_j
dz = wz_i - wz_j
@@ -190,8 +196,8 @@ class PointOnLineConstraint(ConstraintBase):
p_i = self.body_i.world_point(*self.marker_i_pos)
p_j = self.body_j.world_point(*self.marker_j_pos)
z_j = marker_z_axis(self.body_j, self.marker_j_quat)
cx, cy = point_line_perp_components(p_i, p_j, z_j)
return [cx, cy]
cx, cy, cz = point_line_perp_components(p_i, p_j, z_j)
return [cx, cy, cz]
class PointInPlaneConstraint(ConstraintBase):
@@ -238,8 +244,9 @@ class ParallelConstraint(ConstraintBase):
"""Parallel axes — 2 DOF removed.
marker Z-axes are parallel: z_i x z_j = 0.
2 residuals from the cross product (only 2 of 3 components are
independent for unit vectors).
3 cross-product residuals (rank 2 at the solution). Using all 3
avoids a singularity when the axes lie in the XY plane, where
dropping cz would leave the constraint blind to yaw rotations.
"""
def __init__(
@@ -258,7 +265,7 @@ class ParallelConstraint(ConstraintBase):
z_i = marker_z_axis(self.body_i, self.marker_i_quat)
z_j = marker_z_axis(self.body_j, self.marker_j_quat)
cx, cy, cz = cross3(z_i, z_j)
return [cx, cy]
return [cx, cy, cz]
class PerpendicularConstraint(ConstraintBase):
@@ -344,17 +351,17 @@ class ConcentricConstraint(ConstraintBase):
self.distance = distance
def residuals(self) -> List[Expr]:
# Parallel axes (2 residuals)
# Parallel axes (3 cross-product residuals, rank 2 at solution)
z_i = marker_z_axis(self.body_i, self.marker_i_quat)
z_j = marker_z_axis(self.body_j, self.marker_j_quat)
cx, cy, _cz = cross3(z_i, z_j)
cx, cy, cz = cross3(z_i, z_j)
# Point-on-line: marker_i origin on line through marker_j along z_j
p_i = self.body_i.world_point(*self.marker_i_pos)
p_j = self.body_j.world_point(*self.marker_j_pos)
lx, ly = point_line_perp_components(p_i, p_j, z_j)
lx, ly, lz = point_line_perp_components(p_i, p_j, z_j)
return [cx, cy, lx, ly]
return [cx, cy, cz, lx, ly, lz]
class TangentConstraint(ConstraintBase):
@@ -411,10 +418,10 @@ class PlanarConstraint(ConstraintBase):
self.offset = offset
def residuals(self) -> List[Expr]:
# Parallel normals
# Parallel normals (3 cross-product residuals, rank 2 at solution)
z_i = marker_z_axis(self.body_i, self.marker_i_quat)
z_j = marker_z_axis(self.body_j, self.marker_j_quat)
cx, cy, _cz = cross3(z_i, z_j)
cx, cy, cz = cross3(z_i, z_j)
# Point-in-plane
p_i = self.body_i.world_point(*self.marker_i_pos)
@@ -423,7 +430,7 @@ class PlanarConstraint(ConstraintBase):
if self.offset != 0.0:
d = d - Const(self.offset)
return [cx, cy, d]
return [cx, cy, cz, d]
class LineInPlaneConstraint(ConstraintBase):
@@ -521,12 +528,12 @@ class RevoluteConstraint(ConstraintBase):
p_j = self.body_j.world_point(*self.marker_j_pos)
pos = [p_i[0] - p_j[0], p_i[1] - p_j[1], p_i[2] - p_j[2]]
# Parallel Z-axes
# Parallel Z-axes (3 cross-product residuals, rank 2 at solution)
z_i = marker_z_axis(self.body_i, self.marker_i_quat)
z_j = marker_z_axis(self.body_j, self.marker_j_quat)
cx, cy, _cz = cross3(z_i, z_j)
cx, cy, cz = cross3(z_i, z_j)
return pos + [cx, cy]
return pos + [cx, cy, cz]
class CylindricalConstraint(ConstraintBase):
@@ -553,17 +560,17 @@ class CylindricalConstraint(ConstraintBase):
self.marker_j_quat = marker_j_quat
def residuals(self) -> List[Expr]:
# Parallel Z-axes
# Parallel Z-axes (3 cross-product residuals, rank 2 at solution)
z_i = marker_z_axis(self.body_i, self.marker_i_quat)
z_j = marker_z_axis(self.body_j, self.marker_j_quat)
cx, cy, _cz = cross3(z_i, z_j)
cx, cy, cz = cross3(z_i, z_j)
# Point-on-line
p_i = self.body_i.world_point(*self.marker_i_pos)
p_j = self.body_j.world_point(*self.marker_j_pos)
lx, ly = point_line_perp_components(p_i, p_j, z_j)
lx, ly, lz = point_line_perp_components(p_i, p_j, z_j)
return [cx, cy, lx, ly]
return [cx, cy, cz, lx, ly, lz]
class SliderConstraint(ConstraintBase):
@@ -592,22 +599,22 @@ class SliderConstraint(ConstraintBase):
self.marker_j_quat = marker_j_quat
def residuals(self) -> List[Expr]:
# Parallel Z-axes
# Parallel Z-axes (3 cross-product residuals, rank 2 at solution)
z_i = marker_z_axis(self.body_i, self.marker_i_quat)
z_j = marker_z_axis(self.body_j, self.marker_j_quat)
cx, cy, _cz = cross3(z_i, z_j)
cx, cy, cz = cross3(z_i, z_j)
# Point-on-line
p_i = self.body_i.world_point(*self.marker_i_pos)
p_j = self.body_j.world_point(*self.marker_j_pos)
lx, ly = point_line_perp_components(p_i, p_j, z_j)
lx, ly, lz = point_line_perp_components(p_i, p_j, z_j)
# Rotation lock: x_i . y_j = 0
x_i = marker_x_axis(self.body_i, self.marker_i_quat)
y_j = marker_y_axis(self.body_j, self.marker_j_quat)
twist = dot3(x_i, y_j)
return [cx, cy, lx, ly, twist]
return [cx, cy, cz, lx, ly, lz, twist]
class ScrewConstraint(ConstraintBase):
@@ -642,14 +649,14 @@ class ScrewConstraint(ConstraintBase):
self.pitch = pitch
def residuals(self) -> List[Expr]:
# Cylindrical residuals (4)
# Cylindrical residuals (5: 3 parallel + 2 point-on-line)
z_i = marker_z_axis(self.body_i, self.marker_i_quat)
z_j = marker_z_axis(self.body_j, self.marker_j_quat)
cx, cy, _cz = cross3(z_i, z_j)
cx, cy, cz = cross3(z_i, z_j)
p_i = self.body_i.world_point(*self.marker_i_pos)
p_j = self.body_j.world_point(*self.marker_j_pos)
lx, ly = point_line_perp_components(p_i, p_j, z_j)
lx, ly, lz = point_line_perp_components(p_i, p_j, z_j)
# Pitch coupling: axial_disp = pitch * angle / (2*pi)
# Axial displacement
@@ -678,7 +685,7 @@ class ScrewConstraint(ConstraintBase):
# = axial - pitch * qz / pi
coupling = axial - Const(self.pitch / math.pi) * rel[3]
return [cx, cy, lx, ly, coupling]
return [cx, cy, cz, lx, ly, lz, coupling]
class UniversalConstraint(ConstraintBase):

View File

@@ -382,6 +382,13 @@ def solve_decomposed(
Returns True if all clusters converged.
"""
log.info(
"solve_decomposed: %d clusters, %d bodies, %d constraints",
len(clusters),
len(bodies),
len(constraint_objs),
)
# Build reverse map: constraint_index → position in constraint_objs list
idx_to_obj: dict[int, "ConstraintBase"] = {}
for pos, ci in enumerate(constraint_indices_map):
@@ -390,7 +397,7 @@ def solve_decomposed(
solved_bodies: set[str] = set()
all_converged = True
for cluster in clusters:
for cluster_idx, cluster in enumerate(clusters):
# 1. Fix boundary bodies that were already solved
fixed_boundary_params: list[str] = []
for body_id in cluster.boundary_bodies:
@@ -424,6 +431,14 @@ def solve_decomposed(
# 5. Newton solve (+ BFGS fallback)
if cluster_residuals:
log.debug(
" cluster[%d]: %d bodies (%d boundary), %d constraints, %d residuals",
cluster_idx,
len(cluster.bodies),
len(cluster.boundary_bodies),
len(cluster.constraint_indices),
len(cluster_residuals),
)
converged = newton_solve(
cluster_residuals,
params,
@@ -432,6 +447,9 @@ def solve_decomposed(
tol=1e-10,
)
if not converged:
log.info(
" cluster[%d]: Newton-Raphson failed, trying BFGS", cluster_idx
)
converged = bfgs_solve(
cluster_residuals,
params,
@@ -440,6 +458,7 @@ def solve_decomposed(
tol=1e-10,
)
if not converged:
log.warning(" cluster[%d]: failed to converge", cluster_idx)
all_converged = False
# 6. Mark this cluster's bodies as solved

View File

@@ -0,0 +1,312 @@
"""Per-entity DOF diagnostics and overconstrained detection.
Provides per-body remaining degrees of freedom, human-readable free
motion labels, and redundant/conflicting constraint identification.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import List
import numpy as np
from .entities import RigidBody
from .expr import Expr
from .params import ParamTable
# -- Per-entity DOF -----------------------------------------------------------
@dataclass
class EntityDOF:
"""DOF report for a single entity (rigid body)."""
entity_id: str
remaining_dof: int # 0 = well-constrained
free_motions: list[str] = field(default_factory=list)
def per_entity_dof(
residuals: list[Expr],
params: ParamTable,
bodies: dict[str, RigidBody],
rank_tol: float = 1e-8,
jac_exprs: "list[list[Expr]] | None" = None,
) -> list[EntityDOF]:
"""Compute remaining DOF for each non-grounded body.
For each body, extracts the Jacobian columns corresponding to its
7 parameters, performs SVD to find constrained directions, and
classifies null-space vectors as translations or rotations.
"""
free = params.free_names()
n_res = len(residuals)
env = params.get_env()
if n_res == 0:
# No constraints — every free body has 6 DOF
result = []
for pid, body in bodies.items():
if body.grounded:
continue
result.append(
EntityDOF(
entity_id=pid,
remaining_dof=6,
free_motions=[
"translation along X",
"translation along Y",
"translation along Z",
"rotation about X",
"rotation about Y",
"rotation about Z",
],
)
)
return result
# Build column index mapping: param_name -> column index in free list
free_index = {name: i for i, name in enumerate(free)}
# Build full Jacobian (for efficiency, compute once)
n_free = len(free)
J_full = np.empty((n_res, n_free))
if jac_exprs is not None:
for i in range(n_res):
for j in range(n_free):
J_full[i, j] = jac_exprs[i][j].eval(env)
else:
for i, r in enumerate(residuals):
for j, name in enumerate(free):
J_full[i, j] = r.diff(name).simplify().eval(env)
result = []
for pid, body in bodies.items():
if body.grounded:
continue
# Find column indices for this body's params
pfx = pid + "/"
body_param_names = [
pfx + "tx",
pfx + "ty",
pfx + "tz",
pfx + "qw",
pfx + "qx",
pfx + "qy",
pfx + "qz",
]
col_indices = [free_index[n] for n in body_param_names if n in free_index]
if not col_indices:
# All params fixed (shouldn't happen for non-grounded, but be safe)
result.append(EntityDOF(entity_id=pid, remaining_dof=0))
continue
# Extract submatrix: all residual rows, only this body's columns
J_sub = J_full[:, col_indices]
# SVD
U, sv, Vt = np.linalg.svd(J_sub, full_matrices=True)
constrained = int(np.sum(sv > rank_tol))
# Subtract 1 for the quaternion unit-norm constraint (already in residuals)
# The quat norm residual constrains 1 direction in the 7-D param space,
# so effective body DOF = 7 - 1 - constrained_by_other_constraints.
# But the quat norm IS one of the residual rows, so it's already counted
# in `constrained`. So: remaining = len(col_indices) - constrained
# But the quat norm takes 1 from 7 → 6 geometric DOF, and constrained
# includes the quat norm row. So remaining = 7 - constrained, which gives
# geometric remaining DOF directly.
remaining = len(col_indices) - constrained
# Classify null-space vectors as free motions
free_motions = []
if remaining > 0 and Vt.shape[0] > constrained:
null_space = Vt[constrained:] # rows = null vectors in param space
# Map column indices back to param types
param_types = []
for n in body_param_names:
if n in free_index:
if n.endswith(("/tx", "/ty", "/tz")):
param_types.append("t")
else:
param_types.append("q")
for null_vec in null_space:
label = _classify_motion(
null_vec, param_types, body_param_names, free_index
)
if label:
free_motions.append(label)
result.append(
EntityDOF(
entity_id=pid,
remaining_dof=remaining,
free_motions=free_motions,
)
)
return result
def _classify_motion(
null_vec: np.ndarray,
param_types: list[str],
body_param_names: list[str],
free_index: dict[str, int],
) -> str:
"""Classify a null-space vector as translation, rotation, or helical."""
# Split components into translation and rotation parts
trans_indices = [i for i, t in enumerate(param_types) if t == "t"]
rot_indices = [i for i, t in enumerate(param_types) if t == "q"]
trans_norm = np.linalg.norm(null_vec[trans_indices]) if trans_indices else 0.0
rot_norm = np.linalg.norm(null_vec[rot_indices]) if rot_indices else 0.0
total = trans_norm + rot_norm
if total < 1e-14:
return ""
trans_frac = trans_norm / total
rot_frac = rot_norm / total
# Determine dominant axis
if trans_frac > 0.8:
# Pure translation
axis = _dominant_axis(null_vec, trans_indices)
return f"translation along {axis}"
elif rot_frac > 0.8:
# Pure rotation
axis = _dominant_axis(null_vec, rot_indices)
return f"rotation about {axis}"
else:
# Mixed — helical
axis = _dominant_axis(null_vec, trans_indices)
return f"helical motion along {axis}"
def _dominant_axis(vec: np.ndarray, indices: list[int]) -> str:
"""Find the dominant axis (X/Y/Z) among the given component indices."""
if not indices:
return "?"
components = np.abs(vec[indices])
# Map to axis names — first 3 in group are X/Y/Z
axis_names = ["X", "Y", "Z"]
if len(components) >= 3:
idx = int(np.argmax(components[:3]))
return axis_names[idx]
elif len(components) == 1:
return axis_names[0]
else:
idx = int(np.argmax(components))
return axis_names[min(idx, 2)]
# -- Overconstrained detection ------------------------------------------------
@dataclass
class ConstraintDiag:
"""Diagnostic for a single constraint."""
constraint_index: int
kind: str # "redundant" | "conflicting"
detail: str
def find_overconstrained(
residuals: list[Expr],
params: ParamTable,
residual_ranges: list[tuple[int, int, int]],
rank_tol: float = 1e-8,
jac_exprs: "list[list[Expr]] | None" = None,
) -> list[ConstraintDiag]:
"""Identify redundant and conflicting constraints.
Algorithm (following SolvSpace's FindWhichToRemoveToFixJacobian):
1. Build full Jacobian J, compute rank.
2. If rank == n_residuals, not overconstrained — return empty.
3. For each constraint: remove its rows, check if rank is preserved
→ if so, the constraint is **redundant**.
4. Compute left null space, project residual vector F → if a
constraint's residuals contribute to this projection, it is
**conflicting** (redundant + unsatisfied).
"""
free = params.free_names()
n_free = len(free)
n_res = len(residuals)
if n_free == 0 or n_res == 0:
return []
env = params.get_env()
# Build Jacobian and residual vector
J = np.empty((n_res, n_free))
r_vec = np.empty(n_res)
for i, r in enumerate(residuals):
r_vec[i] = r.eval(env)
if jac_exprs is not None:
for i in range(n_res):
for j in range(n_free):
J[i, j] = jac_exprs[i][j].eval(env)
else:
for i, r in enumerate(residuals):
for j, name in enumerate(free):
J[i, j] = r.diff(name).simplify().eval(env)
# Full rank
sv_full = np.linalg.svd(J, compute_uv=False)
full_rank = int(np.sum(sv_full > rank_tol))
if full_rank >= n_res:
return [] # not overconstrained
# Left null space: columns of U beyond rank
U, sv, Vt = np.linalg.svd(J, full_matrices=True)
left_null = U[:, full_rank:] # shape (n_res, n_res - rank)
# Project residual onto left null space
null_residual = left_null.T @ r_vec # shape (n_res - rank,)
residual_projection = left_null @ null_residual # back to residual space
diags = []
for start, end, c_idx in residual_ranges:
# Remove this constraint's rows and check rank
mask = np.ones(n_res, dtype=bool)
mask[start:end] = False
J_reduced = J[mask]
if J_reduced.shape[0] == 0:
continue
sv_reduced = np.linalg.svd(J_reduced, compute_uv=False)
reduced_rank = int(np.sum(sv_reduced > rank_tol))
if reduced_rank >= full_rank:
# Removing this constraint preserves rank → redundant
# Check if it's also conflicting (contributes to unsatisfied null projection)
constraint_proj = np.linalg.norm(residual_projection[start:end])
if constraint_proj > rank_tol:
kind = "conflicting"
detail = (
f"Constraint {c_idx} is conflicting (redundant and unsatisfied)"
)
else:
kind = "redundant"
detail = (
f"Constraint {c_idx} is redundant (can be removed without effect)"
)
diags.append(
ConstraintDiag(
constraint_index=c_idx,
kind=kind,
detail=detail,
)
)
return diags

View File

@@ -14,11 +14,15 @@ def count_dof(
residuals: List[Expr],
params: ParamTable,
rank_tol: float = 1e-8,
jac_exprs: "List[List[Expr]] | None" = None,
) -> int:
"""Compute DOF = n_free_params - rank(Jacobian).
Evaluates the Jacobian numerically at the current parameter values
and computes its rank via SVD.
When *jac_exprs* is provided, reuses the pre-built symbolic
Jacobian instead of re-differentiating every residual.
"""
free = params.free_names()
n_free = len(free)
@@ -32,9 +36,14 @@ def count_dof(
env = params.get_env()
J = np.empty((n_res, n_free))
for i, r in enumerate(residuals):
for j, name in enumerate(free):
J[i, j] = r.diff(name).simplify().eval(env)
if jac_exprs is not None:
for i in range(n_res):
for j in range(n_free):
J[i, j] = jac_exprs[i][j].eval(env)
else:
for i, r in enumerate(residuals):
for j, name in enumerate(free):
J[i, j] = r.diff(name).simplify().eval(env)
if J.size == 0:
return n_free

View File

@@ -24,6 +24,16 @@ class Expr:
"""Return the set of variable names in this expression."""
raise NotImplementedError
def to_code(self) -> str:
"""Emit a Python arithmetic expression string.
The returned string, when evaluated with a dict ``env`` mapping
parameter names to floats (and ``_sin``, ``_cos``, ``_sqrt``
bound to their ``math`` equivalents), produces the same result
as ``self.eval(env)``.
"""
raise NotImplementedError
# -- operator overloads --------------------------------------------------
def __add__(self, other):
@@ -90,6 +100,9 @@ class Const(Expr):
def vars(self):
return set()
def to_code(self):
return repr(self.value)
def __repr__(self):
return f"Const({self.value})"
@@ -118,6 +131,9 @@ class Var(Expr):
def vars(self):
return {self.name}
def to_code(self):
return f"env[{self.name!r}]"
def __repr__(self):
return f"Var({self.name!r})"
@@ -154,6 +170,9 @@ class Neg(Expr):
def vars(self):
return self.child.vars()
def to_code(self):
return f"(-{self.child.to_code()})"
def __repr__(self):
return f"Neg({self.child!r})"
@@ -180,6 +199,9 @@ class Sin(Expr):
def vars(self):
return self.child.vars()
def to_code(self):
return f"_sin({self.child.to_code()})"
def __repr__(self):
return f"Sin({self.child!r})"
@@ -206,6 +228,9 @@ class Cos(Expr):
def vars(self):
return self.child.vars()
def to_code(self):
return f"_cos({self.child.to_code()})"
def __repr__(self):
return f"Cos({self.child!r})"
@@ -232,6 +257,9 @@ class Sqrt(Expr):
def vars(self):
return self.child.vars()
def to_code(self):
return f"_sqrt({self.child.to_code()})"
def __repr__(self):
return f"Sqrt({self.child!r})"
@@ -266,6 +294,9 @@ class Add(Expr):
def vars(self):
return self.a.vars() | self.b.vars()
def to_code(self):
return f"({self.a.to_code()} + {self.b.to_code()})"
def __repr__(self):
return f"Add({self.a!r}, {self.b!r})"
@@ -297,6 +328,9 @@ class Sub(Expr):
def vars(self):
return self.a.vars() | self.b.vars()
def to_code(self):
return f"({self.a.to_code()} - {self.b.to_code()})"
def __repr__(self):
return f"Sub({self.a!r}, {self.b!r})"
@@ -337,6 +371,9 @@ class Mul(Expr):
def vars(self):
return self.a.vars() | self.b.vars()
def to_code(self):
return f"({self.a.to_code()} * {self.b.to_code()})"
def __repr__(self):
return f"Mul({self.a!r}, {self.b!r})"
@@ -372,6 +409,9 @@ class Div(Expr):
def vars(self):
return self.a.vars() | self.b.vars()
def to_code(self):
return f"({self.a.to_code()} / {self.b.to_code()})"
def __repr__(self):
return f"Div({self.a!r}, {self.b!r})"
@@ -414,6 +454,9 @@ class Pow(Expr):
def vars(self):
return self.base.vars() | self.exp.vars()
def to_code(self):
return f"({self.base.to_code()} ** {self.exp.to_code()})"
def __repr__(self):
return f"Pow({self.base!r}, {self.exp!r})"

View File

@@ -117,15 +117,15 @@ def point_line_perp_components(
point: Vec3,
line_origin: Vec3,
line_dir: Vec3,
) -> tuple[Expr, Expr]:
"""Two independent perpendicular-distance components from point to line.
) -> tuple[Expr, Expr, Expr]:
"""Three perpendicular-distance components from point to line.
The line passes through line_origin along line_dir.
Returns the x and y components of (point - line_origin) x line_dir,
which are zero when the point lies on the line.
Returns all 3 components of (point - line_origin) x line_dir,
which are zero when the point lies on the line. Only 2 of 3 are
independent, but using all 3 avoids a singularity when the line
direction aligns with a coordinate axis.
"""
d = sub3(point, line_origin)
cx, cy, cz = cross3(d, line_dir)
# All three components of d x line_dir are zero when d is parallel
# to line_dir, but only 2 are independent. We return x and y.
return cx, cy
return cx, cy, cz

View File

@@ -3,7 +3,7 @@
from __future__ import annotations
import math
from typing import List
from typing import Callable, List
import numpy as np
@@ -17,6 +17,10 @@ def newton_solve(
quat_groups: List[tuple[str, str, str, str]] | None = None,
max_iter: int = 50,
tol: float = 1e-10,
post_step: "Callable[[ParamTable], None] | None" = None,
weight_vector: "np.ndarray | None" = None,
jac_exprs: "List[List[Expr]] | None" = None,
compiled_eval: "Callable | None" = None,
) -> bool:
"""Solve ``residuals == 0`` by Newton-Raphson.
@@ -33,6 +37,20 @@ def newton_solve(
Maximum Newton iterations.
tol:
Convergence threshold on ``||r||``.
post_step:
Optional callback invoked after each parameter update, before
quaternion renormalization. Used for half-space correction.
weight_vector:
Optional 1-D array of length ``n_free``. When provided, the
lstsq step is column-scaled to produce the weighted
minimum-norm solution (prefer small movements in
high-weight parameters).
jac_exprs:
Pre-built symbolic Jacobian (list-of-lists of Expr). When
provided, skips the ``diff().simplify()`` step.
compiled_eval:
Pre-compiled evaluation function from :mod:`codegen`. When
provided, uses flat compiled code instead of tree-walk eval.
Returns True if converged within *max_iter* iterations.
"""
@@ -43,45 +61,72 @@ def newton_solve(
if n_free == 0 or n_res == 0:
return True
# Build symbolic Jacobian once (list-of-lists of simplified Expr)
jac_exprs: List[List[Expr]] = []
for r in residuals:
row = []
for name in free:
row.append(r.diff(name).simplify())
jac_exprs.append(row)
# Build symbolic Jacobian once (or reuse pre-built)
if jac_exprs is None:
jac_exprs = []
for r in residuals:
row = []
for name in free:
row.append(r.diff(name).simplify())
jac_exprs.append(row)
# Try compilation if not provided
if compiled_eval is None:
from .codegen import try_compile_system
compiled_eval = try_compile_system(residuals, jac_exprs, n_res, n_free)
# Pre-allocate arrays reused across iterations
r_vec = np.empty(n_res)
J = np.zeros((n_res, n_free))
for _it in range(max_iter):
env = params.get_env()
if compiled_eval is not None:
J[:] = 0.0
compiled_eval(params.env_ref(), r_vec, J)
else:
env = params.get_env()
for i, r in enumerate(residuals):
r_vec[i] = r.eval(env)
for i in range(n_res):
for j in range(n_free):
J[i, j] = jac_exprs[i][j].eval(env)
# Evaluate residual vector
r_vec = np.array([r.eval(env) for r in residuals])
r_norm = np.linalg.norm(r_vec)
if r_norm < tol:
return True
# Evaluate Jacobian matrix
J = np.empty((n_res, n_free))
for i in range(n_res):
for j in range(n_free):
J[i, j] = jac_exprs[i][j].eval(env)
# Solve J @ dx = -r (least-squares handles rank-deficient)
dx, _, _, _ = np.linalg.lstsq(J, -r_vec, rcond=None)
if weight_vector is not None:
# Column-scale J by W^{-1/2} for weighted minimum-norm
w_inv_sqrt = 1.0 / np.sqrt(weight_vector)
J_scaled = J * w_inv_sqrt[np.newaxis, :]
dx_scaled, _, _, _ = np.linalg.lstsq(J_scaled, -r_vec, rcond=None)
dx = dx_scaled * w_inv_sqrt
else:
dx, _, _, _ = np.linalg.lstsq(J, -r_vec, rcond=None)
# Update parameters
x = params.get_free_vector()
x += dx
params.set_free_vector(x)
# Half-space correction (before quat renormalization)
if post_step:
post_step(params)
# Re-normalize quaternions
if quat_groups:
_renormalize_quats(params, quat_groups)
# Check final residual
env = params.get_env()
r_vec = np.array([r.eval(env) for r in residuals])
return np.linalg.norm(r_vec) < tol
if compiled_eval is not None:
compiled_eval(params.env_ref(), r_vec, J)
else:
env = params.get_env()
for i, r in enumerate(residuals):
r_vec[i] = r.eval(env)
return bool(np.linalg.norm(r_vec) < tol)
def _renormalize_quats(

View File

@@ -60,6 +60,14 @@ class ParamTable:
"""Return a snapshot of all current values (for Expr.eval)."""
return dict(self._values)
def env_ref(self) -> Dict[str, float]:
"""Return a direct reference to the internal values dict.
Faster than :meth:`get_env` (no copy). Safe when the caller
only reads during evaluation and mutates via :meth:`set_free_vector`.
"""
return self._values
def free_names(self) -> List[str]:
"""Return ordered list of free (non-fixed) parameter names."""
return list(self._free_order)
@@ -81,3 +89,26 @@ class ParamTable:
"""Bulk-update free parameters from a 1-D array."""
for i, name in enumerate(self._free_order):
self._values[name] = float(vec[i])
def snapshot(self) -> Dict[str, float]:
"""Capture current values as a checkpoint."""
return dict(self._values)
def restore(self, snap: Dict[str, float]):
"""Restore parameter values from a checkpoint."""
for name, val in snap.items():
if name in self._values:
self._values[name] = val
def movement_cost(
self,
start: Dict[str, float],
weights: Dict[str, float] | None = None,
) -> float:
"""Weighted sum of squared displacements from start."""
cost = 0.0
for name in self._free_order:
w = weights.get(name, 1.0) if weights else 1.0
delta = self._values[name] - start.get(name, self._values[name])
cost += delta * delta * w
return cost

View File

@@ -0,0 +1,667 @@
"""Solution preference: half-space tracking and minimum-movement weighting.
Half-space tracking preserves the initial configuration branch across
Newton iterations. For constraints with multiple valid solutions
(e.g. distance can be satisfied on either side), we record which
"half-space" the initial state lives in and correct the solver step
if it crosses to the wrong branch.
Minimum-movement weighting scales the Newton/BFGS step so that
quaternion parameters (rotation) are penalised more than translation
parameters, yielding the physically-nearest solution.
"""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from typing import Callable, List
import numpy as np
from .constraints import (
AngleConstraint,
ConcentricConstraint,
ConstraintBase,
CylindricalConstraint,
DistancePointPointConstraint,
LineInPlaneConstraint,
ParallelConstraint,
PerpendicularConstraint,
PlanarConstraint,
PointInPlaneConstraint,
RevoluteConstraint,
ScrewConstraint,
SliderConstraint,
UniversalConstraint,
)
from .geometry import cross3, dot3, marker_z_axis, point_plane_distance, sub3
from .params import ParamTable
@dataclass
class HalfSpace:
"""Tracks which branch of a branching constraint the solution should stay in."""
constraint_index: int # index in ctx.constraints
reference_sign: float # +1.0 or -1.0, captured at setup
indicator_fn: Callable[[dict[str, float]], float] # returns signed value
param_names: list[str] = field(default_factory=list) # params to flip
correction_fn: Callable[[ParamTable, float], None] | None = None
def compute_half_spaces(
constraint_objs: list[ConstraintBase],
constraint_indices: list[int],
params: ParamTable,
) -> list[HalfSpace]:
"""Build half-space trackers for all branching constraints.
Evaluates each constraint's indicator function at the current
parameter values to capture the reference sign.
"""
env = params.get_env()
half_spaces: list[HalfSpace] = []
for i, obj in enumerate(constraint_objs):
hs = _build_half_space(obj, constraint_indices[i], env, params)
if hs is not None:
half_spaces.append(hs)
return half_spaces
def apply_half_space_correction(
params: ParamTable,
half_spaces: list[HalfSpace],
) -> None:
"""Check each half-space and correct if the solver crossed a branch.
Called as a post_step callback from newton_solve.
"""
if not half_spaces:
return
env = params.get_env()
for hs in half_spaces:
current_val = hs.indicator_fn(env)
current_sign = (
math.copysign(1.0, current_val)
if abs(current_val) > 1e-14
else hs.reference_sign
)
if current_sign != hs.reference_sign and hs.correction_fn is not None:
hs.correction_fn(params, current_val)
# Re-read env after correction for subsequent half-spaces
env = params.get_env()
def _build_half_space(
obj: ConstraintBase,
constraint_idx: int,
env: dict[str, float],
params: ParamTable,
) -> HalfSpace | None:
"""Build a HalfSpace for a branching constraint, or None if not branching."""
if isinstance(obj, DistancePointPointConstraint) and obj.distance > 0:
return _distance_half_space(obj, constraint_idx, env, params)
if isinstance(obj, ParallelConstraint):
return _parallel_half_space(obj, constraint_idx, env, params)
if isinstance(obj, AngleConstraint):
return _angle_half_space(obj, constraint_idx, env, params)
if isinstance(obj, PerpendicularConstraint):
return _perpendicular_half_space(obj, constraint_idx, env, params)
if isinstance(obj, PlanarConstraint):
return _planar_half_space(obj, constraint_idx, env, params)
if isinstance(obj, RevoluteConstraint):
return _revolute_half_space(obj, constraint_idx, env, params)
if isinstance(obj, ConcentricConstraint):
return _concentric_half_space(obj, constraint_idx, env, params)
if isinstance(obj, PointInPlaneConstraint):
return _point_in_plane_half_space(obj, constraint_idx, env, params)
if isinstance(obj, LineInPlaneConstraint):
return _line_in_plane_half_space(obj, constraint_idx, env, params)
if isinstance(obj, CylindricalConstraint):
return _axis_direction_half_space(obj, constraint_idx, env)
if isinstance(obj, SliderConstraint):
return _axis_direction_half_space(obj, constraint_idx, env)
if isinstance(obj, ScrewConstraint):
return _axis_direction_half_space(obj, constraint_idx, env)
if isinstance(obj, UniversalConstraint):
return _universal_half_space(obj, constraint_idx, env)
return None
def _distance_half_space(
obj: DistancePointPointConstraint,
constraint_idx: int,
env: dict[str, float],
params: ParamTable,
) -> HalfSpace | None:
"""Half-space for DistancePointPoint: track displacement direction.
The indicator is the dot product of the current displacement with
the reference displacement direction. If the solver flips to the
opposite side, we reflect the moving body's position.
"""
p_i, p_j = obj.world_points()
# Evaluate reference displacement direction
dx = p_i[0].eval(env) - p_j[0].eval(env)
dy = p_i[1].eval(env) - p_j[1].eval(env)
dz = p_i[2].eval(env) - p_j[2].eval(env)
dist = math.sqrt(dx * dx + dy * dy + dz * dz)
if dist < 1e-14:
return None # points coincident, no branch to track
# Reference unit direction
nx, ny, nz = dx / dist, dy / dist, dz / dist
# Build indicator: dot(displacement, reference_direction)
# Use Expr evaluation for speed
disp_x, disp_y, disp_z = p_i[0] - p_j[0], p_i[1] - p_j[1], p_i[2] - p_j[2]
def indicator(e: dict[str, float]) -> float:
return disp_x.eval(e) * nx + disp_y.eval(e) * ny + disp_z.eval(e) * nz
ref_sign = math.copysign(1.0, indicator(env))
# Correction: reflect body_j position along reference direction
# (or body_i if body_j is grounded)
moving_body = obj.body_j if not obj.body_j.grounded else obj.body_i
if moving_body.grounded:
return None # both grounded, nothing to correct
px_name = f"{moving_body.part_id}/tx"
py_name = f"{moving_body.part_id}/ty"
pz_name = f"{moving_body.part_id}/tz"
sign_flip = -1.0 if moving_body is obj.body_j else 1.0
def correction(p: ParamTable, _val: float) -> None:
# Reflect displacement: negate the component along reference direction
e = p.get_env()
cur_dx = disp_x.eval(e)
cur_dy = disp_y.eval(e)
cur_dz = disp_z.eval(e)
# Project displacement onto reference direction
proj = cur_dx * nx + cur_dy * ny + cur_dz * nz
# Reflect: subtract 2*proj*n from the moving body's position
if not p.is_fixed(px_name):
p.set_value(px_name, p.get_value(px_name) + sign_flip * 2.0 * proj * nx)
if not p.is_fixed(py_name):
p.set_value(py_name, p.get_value(py_name) + sign_flip * 2.0 * proj * ny)
if not p.is_fixed(pz_name):
p.set_value(pz_name, p.get_value(pz_name) + sign_flip * 2.0 * proj * nz)
return HalfSpace(
constraint_index=constraint_idx,
reference_sign=ref_sign,
indicator_fn=indicator,
param_names=[px_name, py_name, pz_name],
correction_fn=correction,
)
def _parallel_half_space(
obj: ParallelConstraint,
constraint_idx: int,
env: dict[str, float],
params: ParamTable,
) -> HalfSpace:
"""Half-space for Parallel: track same-direction vs opposite-direction.
Indicator: dot(z_i, z_j). Positive = same direction, negative = opposite.
"""
z_i = marker_z_axis(obj.body_i, obj.marker_i_quat)
z_j = marker_z_axis(obj.body_j, obj.marker_j_quat)
dot_expr = dot3(z_i, z_j)
def indicator(e: dict[str, float]) -> float:
return dot_expr.eval(e)
ref_val = indicator(env)
ref_sign = math.copysign(1.0, ref_val) if abs(ref_val) > 1e-14 else 1.0
# No geometric correction — just let the indicator track.
# The Newton solver naturally handles this via the cross-product residual.
# We only need to detect and report branch flips.
return HalfSpace(
constraint_index=constraint_idx,
reference_sign=ref_sign,
indicator_fn=indicator,
)
def _planar_half_space(
obj: PlanarConstraint,
constraint_idx: int,
env: dict[str, float],
params: ParamTable,
) -> HalfSpace | None:
"""Half-space for Planar: track which side of the plane the point is on
AND which direction the normals face.
A Planar constraint has parallel normals (cross product = 0) plus
point-in-plane (signed distance = 0). Both have branch ambiguity:
the normals can be same-direction or opposite, and the point can
approach the plane from either side. We track the signed distance
from marker_i to the plane defined by marker_j — this captures
the plane-side and is the primary drift mode during drag.
"""
# Point-in-plane signed distance as indicator
p_i = obj.body_i.world_point(*obj.marker_i_pos)
p_j = obj.body_j.world_point(*obj.marker_j_pos)
z_j = marker_z_axis(obj.body_j, obj.marker_j_quat)
d_expr = point_plane_distance(p_i, p_j, z_j)
d_val = d_expr.eval(env)
# Also track normal alignment (same as Parallel half-space)
z_i = marker_z_axis(obj.body_i, obj.marker_i_quat)
dot_expr = dot3(z_i, z_j)
dot_val = dot_expr.eval(env)
normal_ref_sign = math.copysign(1.0, dot_val) if abs(dot_val) > 1e-14 else 1.0
# If offset is zero and distance is near-zero, we still need the normal
# direction indicator to prevent flipping through the plane.
# Use the normal dot product as the primary indicator when the point
# is already on the plane (distance ≈ 0).
if abs(d_val) < 1e-10:
# Point is on the plane — track normal direction instead
def indicator(e: dict[str, float]) -> float:
return dot_expr.eval(e)
ref_sign = normal_ref_sign
else:
# Point is off-plane — track which side
def indicator(e: dict[str, float]) -> float:
return d_expr.eval(e)
ref_sign = math.copysign(1.0, d_val)
# Correction: reflect the moving body's position through the plane
moving_body = obj.body_j if not obj.body_j.grounded else obj.body_i
if moving_body.grounded:
return None
px_name = f"{moving_body.part_id}/tx"
py_name = f"{moving_body.part_id}/ty"
pz_name = f"{moving_body.part_id}/tz"
def correction(p: ParamTable, _val: float) -> None:
e = p.get_env()
# Recompute signed distance and normal direction
d_cur = d_expr.eval(e)
nx = z_j[0].eval(e)
ny = z_j[1].eval(e)
nz = z_j[2].eval(e)
n_len = math.sqrt(nx * nx + ny * ny + nz * nz)
if n_len < 1e-15:
return
nx, ny, nz = nx / n_len, ny / n_len, nz / n_len
# Reflect through plane: move body by -2*d along normal
sign = -1.0 if moving_body is obj.body_j else 1.0
if not p.is_fixed(px_name):
p.set_value(px_name, p.get_value(px_name) + sign * 2.0 * d_cur * nx)
if not p.is_fixed(py_name):
p.set_value(py_name, p.get_value(py_name) + sign * 2.0 * d_cur * ny)
if not p.is_fixed(pz_name):
p.set_value(pz_name, p.get_value(pz_name) + sign * 2.0 * d_cur * nz)
return HalfSpace(
constraint_index=constraint_idx,
reference_sign=ref_sign,
indicator_fn=indicator,
correction_fn=correction,
)
def _revolute_half_space(
obj: RevoluteConstraint,
constraint_idx: int,
env: dict[str, float],
params: ParamTable,
) -> HalfSpace | None:
"""Half-space for Revolute: track hinge axis direction.
A revolute has coincident origins + parallel Z-axes. The parallel
axes can flip direction (same ambiguity as Parallel). Track
dot(z_i, z_j) to prevent the axis from inverting.
"""
z_i = marker_z_axis(obj.body_i, obj.marker_i_quat)
z_j = marker_z_axis(obj.body_j, obj.marker_j_quat)
dot_expr = dot3(z_i, z_j)
ref_val = dot_expr.eval(env)
ref_sign = math.copysign(1.0, ref_val) if abs(ref_val) > 1e-14 else 1.0
return HalfSpace(
constraint_index=constraint_idx,
reference_sign=ref_sign,
indicator_fn=lambda e: dot_expr.eval(e),
)
def _concentric_half_space(
obj: ConcentricConstraint,
constraint_idx: int,
env: dict[str, float],
params: ParamTable,
) -> HalfSpace | None:
"""Half-space for Concentric: track axis direction.
Concentric has parallel axes + point-on-line. The parallel axes
can flip direction. Track dot(z_i, z_j).
"""
z_i = marker_z_axis(obj.body_i, obj.marker_i_quat)
z_j = marker_z_axis(obj.body_j, obj.marker_j_quat)
dot_expr = dot3(z_i, z_j)
ref_val = dot_expr.eval(env)
ref_sign = math.copysign(1.0, ref_val) if abs(ref_val) > 1e-14 else 1.0
return HalfSpace(
constraint_index=constraint_idx,
reference_sign=ref_sign,
indicator_fn=lambda e: dot_expr.eval(e),
)
def _point_in_plane_half_space(
obj: PointInPlaneConstraint,
constraint_idx: int,
env: dict[str, float],
params: ParamTable,
) -> HalfSpace | None:
"""Half-space for PointInPlane: track which side of the plane.
The signed distance to the plane can be satisfied from either side.
Track which side the initial configuration is on.
"""
p_i = obj.body_i.world_point(*obj.marker_i_pos)
p_j = obj.body_j.world_point(*obj.marker_j_pos)
n_j = marker_z_axis(obj.body_j, obj.marker_j_quat)
d_expr = point_plane_distance(p_i, p_j, n_j)
d_val = d_expr.eval(env)
if abs(d_val) < 1e-10:
return None # already on the plane, no branch to track
ref_sign = math.copysign(1.0, d_val)
moving_body = obj.body_j if not obj.body_j.grounded else obj.body_i
if moving_body.grounded:
return None
px_name = f"{moving_body.part_id}/tx"
py_name = f"{moving_body.part_id}/ty"
pz_name = f"{moving_body.part_id}/tz"
def correction(p: ParamTable, _val: float) -> None:
e = p.get_env()
d_cur = d_expr.eval(e)
nx = n_j[0].eval(e)
ny = n_j[1].eval(e)
nz = n_j[2].eval(e)
n_len = math.sqrt(nx * nx + ny * ny + nz * nz)
if n_len < 1e-15:
return
nx, ny, nz = nx / n_len, ny / n_len, nz / n_len
sign = -1.0 if moving_body is obj.body_j else 1.0
if not p.is_fixed(px_name):
p.set_value(px_name, p.get_value(px_name) + sign * 2.0 * d_cur * nx)
if not p.is_fixed(py_name):
p.set_value(py_name, p.get_value(py_name) + sign * 2.0 * d_cur * ny)
if not p.is_fixed(pz_name):
p.set_value(pz_name, p.get_value(pz_name) + sign * 2.0 * d_cur * nz)
return HalfSpace(
constraint_index=constraint_idx,
reference_sign=ref_sign,
indicator_fn=lambda e: d_expr.eval(e),
correction_fn=correction,
)
def _line_in_plane_half_space(
obj: LineInPlaneConstraint,
constraint_idx: int,
env: dict[str, float],
params: ParamTable,
) -> HalfSpace | None:
"""Half-space for LineInPlane: track which side of the plane.
Same plane-side ambiguity as PointInPlane.
"""
p_i = obj.body_i.world_point(*obj.marker_i_pos)
p_j = obj.body_j.world_point(*obj.marker_j_pos)
n_j = marker_z_axis(obj.body_j, obj.marker_j_quat)
d_expr = point_plane_distance(p_i, p_j, n_j)
d_val = d_expr.eval(env)
if abs(d_val) < 1e-10:
return None
ref_sign = math.copysign(1.0, d_val)
moving_body = obj.body_j if not obj.body_j.grounded else obj.body_i
if moving_body.grounded:
return None
px_name = f"{moving_body.part_id}/tx"
py_name = f"{moving_body.part_id}/ty"
pz_name = f"{moving_body.part_id}/tz"
def correction(p: ParamTable, _val: float) -> None:
e = p.get_env()
d_cur = d_expr.eval(e)
nx = n_j[0].eval(e)
ny = n_j[1].eval(e)
nz = n_j[2].eval(e)
n_len = math.sqrt(nx * nx + ny * ny + nz * nz)
if n_len < 1e-15:
return
nx, ny, nz = nx / n_len, ny / n_len, nz / n_len
sign = -1.0 if moving_body is obj.body_j else 1.0
if not p.is_fixed(px_name):
p.set_value(px_name, p.get_value(px_name) + sign * 2.0 * d_cur * nx)
if not p.is_fixed(py_name):
p.set_value(py_name, p.get_value(py_name) + sign * 2.0 * d_cur * ny)
if not p.is_fixed(pz_name):
p.set_value(pz_name, p.get_value(pz_name) + sign * 2.0 * d_cur * nz)
return HalfSpace(
constraint_index=constraint_idx,
reference_sign=ref_sign,
indicator_fn=lambda e: d_expr.eval(e),
correction_fn=correction,
)
def _axis_direction_half_space(
obj,
constraint_idx: int,
env: dict[str, float],
) -> HalfSpace | None:
"""Half-space for any constraint with parallel Z-axes (Cylindrical, Slider, Screw).
Tracks dot(z_i, z_j) to prevent axis inversion.
"""
z_i = marker_z_axis(obj.body_i, obj.marker_i_quat)
z_j = marker_z_axis(obj.body_j, obj.marker_j_quat)
dot_expr = dot3(z_i, z_j)
ref_val = dot_expr.eval(env)
ref_sign = math.copysign(1.0, ref_val) if abs(ref_val) > 1e-14 else 1.0
return HalfSpace(
constraint_index=constraint_idx,
reference_sign=ref_sign,
indicator_fn=lambda e: dot_expr.eval(e),
)
def _universal_half_space(
obj: UniversalConstraint,
constraint_idx: int,
env: dict[str, float],
) -> HalfSpace | None:
"""Half-space for Universal: track which quadrant of perpendicularity.
Universal has dot(z_i, z_j) = 0 (perpendicular). The cross product
sign distinguishes which "side" of perpendicular.
"""
z_i = marker_z_axis(obj.body_i, obj.marker_i_quat)
z_j = marker_z_axis(obj.body_j, obj.marker_j_quat)
cx, cy, cz = cross3(z_i, z_j)
cx_val = cx.eval(env)
cy_val = cy.eval(env)
cz_val = cz.eval(env)
components = [
(abs(cx_val), cx, cx_val),
(abs(cy_val), cy, cy_val),
(abs(cz_val), cz, cz_val),
]
_, best_expr, best_val = max(components, key=lambda t: t[0])
if abs(best_val) < 1e-14:
return None
ref_sign = math.copysign(1.0, best_val)
return HalfSpace(
constraint_index=constraint_idx,
reference_sign=ref_sign,
indicator_fn=lambda e: best_expr.eval(e),
)
# ============================================================================
# Minimum-movement weighting
# ============================================================================
# Scale factor so that a 1-radian rotation is penalised as much as a
# (180/pi)-unit translation. This makes the weighted minimum-norm
# step prefer translating over rotating for the same residual reduction.
QUAT_WEIGHT = (180.0 / math.pi) ** 2 # ~3283
def build_weight_vector(params: ParamTable) -> np.ndarray:
"""Build diagonal weight vector: 1.0 for translation, QUAT_WEIGHT for quaternion.
Returns a 1-D array of length ``params.n_free()``.
"""
free = params.free_names()
w = np.ones(len(free))
quat_suffixes = ("/qw", "/qx", "/qy", "/qz")
for i, name in enumerate(free):
if any(name.endswith(s) for s in quat_suffixes):
w[i] = QUAT_WEIGHT
return w
def _angle_half_space(
obj: AngleConstraint,
constraint_idx: int,
env: dict[str, float],
params: ParamTable,
) -> HalfSpace | None:
"""Half-space for Angle: track sign of sin(angle) via cross product.
For angle constraints, the dot product is fixed (= cos(angle)),
but sin can be +/-. We track the cross product magnitude sign.
"""
if abs(obj.angle) < 1e-14 or abs(obj.angle - math.pi) < 1e-14:
return None # 0 or 180 degrees — no branch ambiguity
z_i = marker_z_axis(obj.body_i, obj.marker_i_quat)
z_j = marker_z_axis(obj.body_j, obj.marker_j_quat)
cx, cy, cz = cross3(z_i, z_j)
# Use the magnitude of the cross product's z-component as indicator
# (or whichever component is largest at setup time)
cx_val = cx.eval(env)
cy_val = cy.eval(env)
cz_val = cz.eval(env)
# Pick the dominant cross product component
components = [
(abs(cx_val), cx, cx_val),
(abs(cy_val), cy, cy_val),
(abs(cz_val), cz, cz_val),
]
_, best_expr, best_val = max(components, key=lambda t: t[0])
if abs(best_val) < 1e-14:
return None
def indicator(e: dict[str, float]) -> float:
return best_expr.eval(e)
ref_sign = math.copysign(1.0, best_val)
return HalfSpace(
constraint_index=constraint_idx,
reference_sign=ref_sign,
indicator_fn=indicator,
)
def _perpendicular_half_space(
obj: PerpendicularConstraint,
constraint_idx: int,
env: dict[str, float],
params: ParamTable,
) -> HalfSpace | None:
"""Half-space for Perpendicular: track which quadrant.
The dot product is constrained to 0, but the cross product sign
distinguishes which "side" of perpendicular.
"""
z_i = marker_z_axis(obj.body_i, obj.marker_i_quat)
z_j = marker_z_axis(obj.body_j, obj.marker_j_quat)
cx, cy, cz = cross3(z_i, z_j)
# Pick the dominant cross product component
cx_val = cx.eval(env)
cy_val = cy.eval(env)
cz_val = cz.eval(env)
components = [
(abs(cx_val), cx, cx_val),
(abs(cy_val), cy, cy_val),
(abs(cz_val), cz, cz_val),
]
_, best_expr, best_val = max(components, key=lambda t: t[0])
if abs(best_val) < 1e-14:
return None
def indicator(e: dict[str, float]) -> float:
return best_expr.eval(e)
ref_sign = math.copysign(1.0, best_val)
return HalfSpace(
constraint_index=constraint_idx,
reference_sign=ref_sign,
indicator_fn=indicator,
)

View File

@@ -3,8 +3,14 @@ expression-based Newton-Raphson solver."""
from __future__ import annotations
import logging
import math
import time
import kcsolve
log = logging.getLogger(__name__)
from .bfgs import bfgs_solve
from .constraints import (
AngleConstraint,
@@ -33,10 +39,16 @@ from .constraints import (
UniversalConstraint,
)
from .decompose import decompose, solve_decomposed
from .diagnostics import find_overconstrained
from .dof import count_dof
from .entities import RigidBody
from .newton import newton_solve
from .params import ParamTable
from .preference import (
apply_half_space_correction,
build_weight_vector,
compute_half_spaces,
)
from .prepass import single_equation_pass, substitution_pass
# Assemblies with fewer free bodies than this use the monolithic path.
@@ -76,141 +88,658 @@ _SUPPORTED = {
class KindredSolver(kcsolve.IKCSolver):
"""Expression-based Newton-Raphson constraint solver."""
def __init__(self):
super().__init__()
self._drag_ctx = None
self._drag_parts = None
self._drag_cache = None
self._limits_warned = False
def name(self):
return "Kindred (Newton-Raphson)"
def supported_joints(self):
return list(_SUPPORTED)
# ── Static solve ────────────────────────────────────────────────
def solve(self, ctx):
params = ParamTable()
bodies = {} # part_id -> RigidBody
t0 = time.perf_counter()
n_parts = len(ctx.parts)
n_constraints = len(ctx.constraints)
n_grounded = sum(1 for p in ctx.parts if p.grounded)
log.info(
"solve: %d parts (%d grounded), %d constraints",
n_parts,
n_grounded,
n_constraints,
)
# 1. Build entities from parts
for part in ctx.parts:
pos = tuple(part.placement.position)
quat = tuple(part.placement.quaternion) # (w, x, y, z)
body = RigidBody(
part.id,
params,
position=pos,
quaternion=quat,
grounded=part.grounded,
)
bodies[part.id] = body
system = _build_system(ctx)
n_free_bodies = sum(1 for b in system.bodies.values() if not b.grounded)
n_residuals = len(system.all_residuals)
n_free_params = len(system.params.free_names())
log.info(
"solve: system built — %d free bodies, %d residuals, %d free params",
n_free_bodies,
n_residuals,
n_free_params,
)
# 2. Build constraint residuals (track index mapping for decomposition)
all_residuals = []
constraint_objs = []
constraint_indices = [] # parallel to constraint_objs: index in ctx.constraints
# Warn once per solver instance if any constraints have limits
if not self._limits_warned:
for c in ctx.constraints:
if c.limits:
log.warning(
"Joint limits on '%s' ignored "
"(not yet supported by Kindred solver)",
c.id,
)
self._limits_warned = True
break
for idx, c in enumerate(ctx.constraints):
if not c.activated:
continue
body_i = bodies.get(c.part_i)
body_j = bodies.get(c.part_j)
if body_i is None or body_j is None:
continue
# Solution preference: half-spaces, weight vector
half_spaces = compute_half_spaces(
system.constraint_objs,
system.constraint_indices,
system.params,
)
if half_spaces:
post_step_fn = lambda p: apply_half_space_correction(p, half_spaces)
else:
post_step_fn = None
marker_i_pos = tuple(c.marker_i.position)
marker_j_pos = tuple(c.marker_j.position)
# Pre-passes on full system
residuals = substitution_pass(system.all_residuals, system.params)
residuals = single_equation_pass(residuals, system.params)
obj = _build_constraint(
c.type,
body_i,
marker_i_pos,
body_j,
marker_j_pos,
c.marker_i,
c.marker_j,
c.params,
)
if obj is None:
continue
constraint_objs.append(obj)
constraint_indices.append(idx)
all_residuals.extend(obj.residuals())
# Build weight vector *after* pre-passes so its length matches the
# remaining free parameters (single_equation_pass may fix some).
weight_vec = build_weight_vector(system.params)
# 3. Add quaternion normalization residuals for non-grounded bodies
quat_groups = []
for body in bodies.values():
if not body.grounded:
all_residuals.append(body.quat_norm_residual())
quat_groups.append(body.quat_param_names())
# 4. Pre-passes on full system
all_residuals = substitution_pass(all_residuals, params)
all_residuals = single_equation_pass(all_residuals, params)
# 5. Solve (decomposed for large assemblies, monolithic for small)
n_free_bodies = sum(1 for b in bodies.values() if not b.grounded)
# Solve (decomposed for large assemblies, monolithic for small)
jac_exprs = None # may be populated by _monolithic_solve
if n_free_bodies >= _DECOMPOSE_THRESHOLD:
grounded_ids = {pid for pid, b in bodies.items() if b.grounded}
grounded_ids = {pid for pid, b in system.bodies.items() if b.grounded}
clusters = decompose(ctx.constraints, grounded_ids)
log.info(
"solve: decomposed into %d cluster(s) (%d free bodies >= threshold %d)",
len(clusters),
n_free_bodies,
_DECOMPOSE_THRESHOLD,
)
if len(clusters) > 1:
converged = solve_decomposed(
clusters,
bodies,
constraint_objs,
constraint_indices,
params,
system.bodies,
system.constraint_objs,
system.constraint_indices,
system.params,
)
else:
converged = _monolithic_solve(
all_residuals,
params,
quat_groups,
converged, jac_exprs = _monolithic_solve(
residuals,
system.params,
system.quat_groups,
post_step=post_step_fn,
weight_vector=weight_vec,
)
else:
converged = _monolithic_solve(all_residuals, params, quat_groups)
log.debug(
"solve: monolithic path (%d free bodies < threshold %d)",
n_free_bodies,
_DECOMPOSE_THRESHOLD,
)
converged, jac_exprs = _monolithic_solve(
residuals,
system.params,
system.quat_groups,
post_step=post_step_fn,
weight_vector=weight_vec,
)
# 6. DOF
dof = count_dof(all_residuals, params)
# DOF
dof = count_dof(residuals, system.params, jac_exprs=jac_exprs)
# 7. Build result
# Build result
result = kcsolve.SolveResult()
result.status = (
kcsolve.SolveStatus.Success if converged else kcsolve.SolveStatus.Failed
)
result.dof = dof
env = params.get_env()
placements = []
for body in bodies.values():
if body.grounded:
continue
pr = kcsolve.SolveResult.PartResult()
pr.id = body.part_id
pr.placement = kcsolve.Transform()
pr.placement.position = list(body.extract_position(env))
pr.placement.quaternion = list(body.extract_quaternion(env))
placements.append(pr)
# Diagnostics on failure
if not converged:
result.diagnostics = _run_diagnostics(
residuals,
system.params,
system.residual_ranges,
ctx,
jac_exprs=jac_exprs,
)
result.placements = _extract_placements(system.params, system.bodies)
elapsed = (time.perf_counter() - t0) * 1000
log.info(
"solve: %s in %.1f ms — dof=%d, %d placements",
"converged" if converged else "FAILED",
elapsed,
dof,
len(result.placements),
)
if not converged and result.diagnostics:
for d in result.diagnostics:
log.warning(
" diagnostic: [%s] %s%s", d.kind, d.constraint_id, d.detail
)
result.placements = placements
return result
# ── Incremental update ──────────────────────────────────────────
# The base class default (delegates to solve()) is correct here:
# solve() uses current placements as initial guess, so small
# parameter changes converge quickly without special handling.
# ── Interactive drag ────────────────────────────────────────────
def pre_drag(self, ctx, drag_parts):
log.info("pre_drag: drag_parts=%s", drag_parts)
self._drag_ctx = ctx
self._drag_parts = set(drag_parts)
self._drag_step_count = 0
# Build the system once and cache everything for drag_step() reuse.
t0 = time.perf_counter()
system = _build_system(ctx)
half_spaces = compute_half_spaces(
system.constraint_objs,
system.constraint_indices,
system.params,
)
if half_spaces:
post_step_fn = lambda p: apply_half_space_correction(p, half_spaces)
else:
post_step_fn = None
residuals = substitution_pass(system.all_residuals, system.params)
residuals = single_equation_pass(residuals, system.params)
# Build weight vector *after* pre-passes so its length matches the
# remaining free parameters (single_equation_pass may fix some).
weight_vec = build_weight_vector(system.params)
# Build symbolic Jacobian + compile once
from .codegen import try_compile_system
free = system.params.free_names()
n_res = len(residuals)
n_free = len(free)
jac_exprs = [[r.diff(name).simplify() for name in free] for r in residuals]
compiled_eval = try_compile_system(residuals, jac_exprs, n_res, n_free)
# Initial solve
converged = newton_solve(
residuals,
system.params,
quat_groups=system.quat_groups,
max_iter=100,
tol=1e-10,
post_step=post_step_fn,
weight_vector=weight_vec,
jac_exprs=jac_exprs,
compiled_eval=compiled_eval,
)
if not converged:
converged = bfgs_solve(
residuals,
system.params,
quat_groups=system.quat_groups,
max_iter=200,
tol=1e-10,
weight_vector=weight_vec,
jac_exprs=jac_exprs,
compiled_eval=compiled_eval,
)
# Cache for drag_step() reuse
cache = _DragCache()
cache.system = system
cache.residuals = residuals
cache.jac_exprs = jac_exprs
cache.compiled_eval = compiled_eval
cache.half_spaces = half_spaces
cache.weight_vec = weight_vec
cache.post_step_fn = post_step_fn
# Snapshot solved quaternions for continuity enforcement in drag_step()
env = system.params.get_env()
cache.pre_step_quats = {}
for body in system.bodies.values():
if not body.grounded:
cache.pre_step_quats[body.part_id] = body.extract_quaternion(env)
self._drag_cache = cache
# Build result
dof = count_dof(residuals, system.params, jac_exprs=jac_exprs)
result = kcsolve.SolveResult()
result.status = (
kcsolve.SolveStatus.Success if converged else kcsolve.SolveStatus.Failed
)
result.dof = dof
result.placements = _extract_placements(system.params, system.bodies)
elapsed = (time.perf_counter() - t0) * 1000
log.info(
"pre_drag: initial solve %s in %.1f ms — dof=%d",
"converged" if converged else "FAILED",
elapsed,
dof,
)
return result
def drag_step(self, drag_placements):
ctx = self._drag_ctx
if ctx is None:
log.warning("drag_step: no drag context (pre_drag not called?)")
return kcsolve.SolveResult()
self._drag_step_count = getattr(self, "_drag_step_count", 0) + 1
# Update dragged part placements in ctx (for caller consistency)
for pr in drag_placements:
for part in ctx.parts:
if part.id == pr.id:
part.placement = pr.placement
break
cache = getattr(self, "_drag_cache", None)
if cache is None:
# Fallback: no cache, do a full solve
log.debug(
"drag_step #%d: no cache, falling back to full solve",
self._drag_step_count,
)
return self.solve(ctx)
t0 = time.perf_counter()
params = cache.system.params
# Update only the dragged part's 7 parameter values
for pr in drag_placements:
pfx = pr.id + "/"
params.set_value(pfx + "tx", pr.placement.position[0])
params.set_value(pfx + "ty", pr.placement.position[1])
params.set_value(pfx + "tz", pr.placement.position[2])
params.set_value(pfx + "qw", pr.placement.quaternion[0])
params.set_value(pfx + "qx", pr.placement.quaternion[1])
params.set_value(pfx + "qy", pr.placement.quaternion[2])
params.set_value(pfx + "qz", pr.placement.quaternion[3])
# Solve with cached artifacts — no rebuild
converged = newton_solve(
cache.residuals,
params,
quat_groups=cache.system.quat_groups,
max_iter=100,
tol=1e-10,
post_step=cache.post_step_fn,
weight_vector=cache.weight_vec,
jac_exprs=cache.jac_exprs,
compiled_eval=cache.compiled_eval,
)
if not converged:
converged = bfgs_solve(
cache.residuals,
params,
quat_groups=cache.system.quat_groups,
max_iter=200,
tol=1e-10,
weight_vector=cache.weight_vec,
jac_exprs=cache.jac_exprs,
compiled_eval=cache.compiled_eval,
)
# Quaternion continuity: ensure solved quaternions stay in the
# same hemisphere as the previous step. q and -q encode the
# same rotation, but the C++ side measures angle between the
# old and new quaternion — if we're in the -q branch, that
# shows up as a ~340° flip and gets rejected.
dragged_ids = self._drag_parts or set()
_enforce_quat_continuity(
params, cache.system.bodies, cache.pre_step_quats, dragged_ids
)
# Update the stored quaternions for the next drag step
env = params.get_env()
for body in cache.system.bodies.values():
if not body.grounded:
cache.pre_step_quats[body.part_id] = body.extract_quaternion(env)
result = kcsolve.SolveResult()
result.status = (
kcsolve.SolveStatus.Success if converged else kcsolve.SolveStatus.Failed
)
result.dof = -1 # skip DOF counting during drag for speed
result.placements = _extract_placements(params, cache.system.bodies)
elapsed = (time.perf_counter() - t0) * 1000
if not converged:
log.warning(
"drag_step #%d: solve FAILED in %.1f ms",
self._drag_step_count,
elapsed,
)
else:
log.debug(
"drag_step #%d: ok in %.1f ms",
self._drag_step_count,
elapsed,
)
return result
def post_drag(self):
steps = getattr(self, "_drag_step_count", 0)
log.info("post_drag: completed after %d drag steps", steps)
self._drag_ctx = None
self._drag_parts = None
self._drag_step_count = 0
self._drag_cache = None
# ── Diagnostics ─────────────────────────────────────────────────
def diagnose(self, ctx):
system = _build_system(ctx)
residuals = substitution_pass(system.all_residuals, system.params)
return _run_diagnostics(
residuals,
system.params,
system.residual_ranges,
ctx,
)
def is_deterministic(self):
return True
def _monolithic_solve(all_residuals, params, quat_groups):
"""Newton-Raphson solve with BFGS fallback on the full system."""
class _DragCache:
"""Cached artifacts from pre_drag() reused across drag_step() calls.
During interactive drag the constraint topology is invariant — only
the dragged part's parameter values change. Caching the built
system, symbolic Jacobian, and compiled evaluator eliminates the
expensive rebuild overhead (~150 ms) on every frame.
"""
__slots__ = (
"system", # _System — owns ParamTable + Expr trees
"residuals", # list[Expr] — after substitution + single-equation pass
"jac_exprs", # list[list[Expr]] — symbolic Jacobian
"compiled_eval", # Callable or None
"half_spaces", # list[HalfSpace]
"weight_vec", # ndarray or None
"post_step_fn", # Callable or None
"pre_step_quats", # dict[str, tuple] — last-accepted quaternions per body
)
class _System:
"""Intermediate representation of a built constraint system."""
__slots__ = (
"params",
"bodies",
"constraint_objs",
"constraint_indices",
"all_residuals",
"residual_ranges",
"quat_groups",
)
def _enforce_quat_continuity(
params: ParamTable,
bodies: dict,
pre_step_quats: dict,
dragged_ids: set,
) -> None:
"""Ensure solved quaternions stay in the same hemisphere as pre-step.
For each non-grounded, non-dragged body, check whether the solved
quaternion is in the opposite hemisphere from the pre-step quaternion
(dot product < 0). If so, negate it — q and -q represent the same
rotation, but staying in the same hemisphere prevents the C++ side
from seeing a large-angle "flip".
This is the standard short-arc correction used in SLERP interpolation.
"""
for body in bodies.values():
if body.grounded or body.part_id in dragged_ids:
continue
prev = pre_step_quats.get(body.part_id)
if prev is None:
continue
pfx = body.part_id + "/"
qw = params.get_value(pfx + "qw")
qx = params.get_value(pfx + "qx")
qy = params.get_value(pfx + "qy")
qz = params.get_value(pfx + "qz")
# Quaternion dot product: positive means same hemisphere
dot = prev[0] * qw + prev[1] * qx + prev[2] * qy + prev[3] * qz
if dot < 0.0:
# Negate to stay in the same hemisphere (identical rotation)
params.set_value(pfx + "qw", -qw)
params.set_value(pfx + "qx", -qx)
params.set_value(pfx + "qy", -qy)
params.set_value(pfx + "qz", -qz)
def _build_system(ctx):
"""Build the solver's internal representation from a SolveContext.
Returns a _System with params, bodies, constraint objects,
residuals, residual-to-constraint mapping, and quaternion groups.
"""
system = _System()
params = ParamTable()
bodies = {} # part_id -> RigidBody
# 1. Build entities from parts
for part in ctx.parts:
pos = tuple(part.placement.position)
quat = tuple(part.placement.quaternion) # (w, x, y, z)
body = RigidBody(
part.id,
params,
position=pos,
quaternion=quat,
grounded=part.grounded,
)
bodies[part.id] = body
# 2. Build constraint residuals (track index mapping for decomposition)
all_residuals = []
constraint_objs = []
constraint_indices = [] # parallel to constraint_objs: index in ctx.constraints
skipped_inactive = 0
skipped_missing_body = 0
skipped_unsupported = 0
for idx, c in enumerate(ctx.constraints):
if not c.activated:
skipped_inactive += 1
continue
body_i = bodies.get(c.part_i)
body_j = bodies.get(c.part_j)
if body_i is None or body_j is None:
skipped_missing_body += 1
log.debug(
"_build_system: constraint[%d] %s skipped — missing body (%s or %s)",
idx,
c.id,
c.part_i,
c.part_j,
)
continue
marker_i_pos = tuple(c.marker_i.position)
marker_j_pos = tuple(c.marker_j.position)
obj = _build_constraint(
c.type,
body_i,
marker_i_pos,
body_j,
marker_j_pos,
c.marker_i,
c.marker_j,
c.params,
)
if obj is None:
skipped_unsupported += 1
log.debug(
"_build_system: constraint[%d] %s type=%s — unsupported, skipped",
idx,
c.id,
c.type,
)
continue
constraint_objs.append(obj)
constraint_indices.append(idx)
all_residuals.extend(obj.residuals())
if skipped_inactive or skipped_missing_body or skipped_unsupported:
log.debug(
"_build_system: skipped constraints — %d inactive, %d missing body, %d unsupported",
skipped_inactive,
skipped_missing_body,
skipped_unsupported,
)
# 3. Build residual-to-constraint mapping
residual_ranges = [] # (start_row, end_row, constraint_idx)
row = 0
for i, obj in enumerate(constraint_objs):
n = len(obj.residuals())
residual_ranges.append((row, row + n, constraint_indices[i]))
row += n
# 4. Add quaternion normalization residuals for non-grounded bodies
quat_groups = []
for body in bodies.values():
if not body.grounded:
all_residuals.append(body.quat_norm_residual())
quat_groups.append(body.quat_param_names())
system.params = params
system.bodies = bodies
system.constraint_objs = constraint_objs
system.constraint_indices = constraint_indices
system.all_residuals = all_residuals
system.residual_ranges = residual_ranges
system.quat_groups = quat_groups
return system
def _run_diagnostics(residuals, params, residual_ranges, ctx, jac_exprs=None):
"""Run overconstrained detection and return kcsolve diagnostics."""
diagnostics = []
if not hasattr(kcsolve, "ConstraintDiagnostic"):
return diagnostics
diags = find_overconstrained(
residuals, params, residual_ranges, jac_exprs=jac_exprs
)
for d in diags:
cd = kcsolve.ConstraintDiagnostic()
cd.constraint_id = ctx.constraints[d.constraint_index].id
cd.kind = (
kcsolve.DiagnosticKind.Redundant
if d.kind == "redundant"
else kcsolve.DiagnosticKind.Conflicting
)
cd.detail = d.detail
diagnostics.append(cd)
return diagnostics
def _extract_placements(params, bodies):
"""Extract solved placements from the parameter table."""
env = params.get_env()
placements = []
for body in bodies.values():
if body.grounded:
continue
pr = kcsolve.SolveResult.PartResult()
pr.id = body.part_id
pr.placement = kcsolve.Transform()
pr.placement.position = list(body.extract_position(env))
pr.placement.quaternion = list(body.extract_quaternion(env))
placements.append(pr)
return placements
def _monolithic_solve(
all_residuals, params, quat_groups, post_step=None, weight_vector=None
):
"""Newton-Raphson solve with BFGS fallback on the full system.
Returns ``(converged, jac_exprs)`` so the caller can reuse the
symbolic Jacobian for DOF counting / diagnostics.
"""
from .codegen import try_compile_system
free = params.free_names()
n_res = len(all_residuals)
n_free = len(free)
# Build symbolic Jacobian once
jac_exprs = [[r.diff(name).simplify() for name in free] for r in all_residuals]
# Compile once
compiled_eval = try_compile_system(all_residuals, jac_exprs, n_res, n_free)
t0 = time.perf_counter()
converged = newton_solve(
all_residuals,
params,
quat_groups=quat_groups,
max_iter=100,
tol=1e-10,
post_step=post_step,
weight_vector=weight_vector,
jac_exprs=jac_exprs,
compiled_eval=compiled_eval,
)
nr_ms = (time.perf_counter() - t0) * 1000
if not converged:
log.info(
"_monolithic_solve: Newton-Raphson failed (%.1f ms), trying BFGS", nr_ms
)
t1 = time.perf_counter()
converged = bfgs_solve(
all_residuals,
params,
quat_groups=quat_groups,
max_iter=200,
tol=1e-10,
weight_vector=weight_vector,
jac_exprs=jac_exprs,
compiled_eval=compiled_eval,
)
return converged
bfgs_ms = (time.perf_counter() - t1) * 1000
if converged:
log.info("_monolithic_solve: BFGS converged (%.1f ms)", bfgs_ms)
else:
log.warning("_monolithic_solve: BFGS also failed (%.1f ms)", bfgs_ms)
else:
log.debug("_monolithic_solve: Newton-Raphson converged (%.1f ms)", nr_ms)
return converged, jac_exprs
def _build_constraint(
@@ -234,6 +763,16 @@ def _build_constraint(
if kind == kcsolve.BaseJointKind.DistancePointPoint:
distance = c_params[0] if c_params else 0.0
if distance == 0.0:
# Distance=0 is point coincidence. Use CoincidentConstraint
# (3 linear residuals) instead of the squared form which has
# a degenerate Jacobian when the constraint is satisfied.
return CoincidentConstraint(
body_i,
marker_i_pos,
body_j,
marker_j_pos,
)
return DistancePointPointConstraint(
body_i,
marker_i_pos,

View File

@@ -0,0 +1,355 @@
"""
Phase 5 in-client console tests.
Paste into the FreeCAD Python console (or run via: exec(open(...).read())).
Tests the full Assembly -> KindredSolver pipeline without the unittest harness.
Expected output: all lines print PASS. Any FAIL indicates a regression.
"""
import FreeCAD as App
import JointObject
import kcsolve
_pref = App.ParamGet("User parameter:BaseApp/Preferences/Mod/Assembly")
_orig_solver = _pref.GetString("Solver", "")
_results = []
def _report(name, passed, detail=""):
status = "PASS" if passed else "FAIL"
msg = f" [{status}] {name}"
if detail:
msg += f"{detail}"
print(msg)
_results.append((name, passed))
def _new_doc(name="Phase5Test"):
if App.ActiveDocument and App.ActiveDocument.Name == name:
App.closeDocument(name)
App.newDocument(name)
App.setActiveDocument(name)
return App.ActiveDocument
def _cleanup(doc):
App.closeDocument(doc.Name)
def _make_assembly(doc):
asm = doc.addObject("Assembly::AssemblyObject", "Assembly")
asm.resetSolver()
jg = asm.newObject("Assembly::JointGroup", "Joints")
return asm, jg
def _make_box(asm, x=0, y=0, z=0, size=10):
box = asm.newObject("Part::Box", "Box")
box.Length = size
box.Width = size
box.Height = size
box.Placement = App.Placement(App.Vector(x, y, z), App.Rotation())
return box
def _ground(jg, obj):
gnd = jg.newObject("App::FeaturePython", "GroundedJoint")
JointObject.GroundedJoint(gnd, obj)
return gnd
def _make_joint(jg, joint_type, ref1, ref2):
joint = jg.newObject("App::FeaturePython", "Joint")
JointObject.Joint(joint, joint_type)
refs = [[ref1[0], ref1[1]], [ref2[0], ref2[1]]]
joint.Proxy.setJointConnectors(joint, refs)
return joint
# ── Test 1: Registry ────────────────────────────────────────────────
def test_solver_registry():
"""Verify kindred solver is registered and available."""
names = kcsolve.available()
_report(
"registry: kindred in available()", "kindred" in names, f"available={names}"
)
solver = kcsolve.load("kindred")
_report("registry: load('kindred') succeeds", solver is not None)
_report(
"registry: solver name",
solver.name() == "Kindred (Newton-Raphson)",
f"got '{solver.name()}'",
)
joints = solver.supported_joints()
_report(
"registry: supported_joints non-empty",
len(joints) > 0,
f"{len(joints)} joint types",
)
# ── Test 2: Preference switching ────────────────────────────────────
def test_preference_switching():
"""Verify solver preference controls which backend is used."""
doc = _new_doc("PrefTest")
try:
# Set to kindred
_pref.SetString("Solver", "kindred")
asm, jg = _make_assembly(doc)
box1 = _make_box(asm, 0, 0, 0)
box2 = _make_box(asm, 50, 0, 0)
_ground(box1)
_make_joint(jg, 0, [box1, ["Face6", "Vertex7"]], [box2, ["Face6", "Vertex7"]])
result = asm.solve()
_report(
"pref: kindred solve succeeds", result == 0, f"solve() returned {result}"
)
# Switch back to ondsel
_pref.SetString("Solver", "ondsel")
asm.resetSolver()
result2 = asm.solve()
_report(
"pref: ondsel solve succeeds after switch",
result2 == 0,
f"solve() returned {result2}",
)
finally:
_cleanup(doc)
# ── Test 3: Fixed joint ─────────────────────────────────────────────
def test_fixed_joint():
"""Two boxes + ground + fixed joint -> placements match."""
_pref.SetString("Solver", "kindred")
doc = _new_doc("FixedTest")
try:
asm, jg = _make_assembly(doc)
box1 = _make_box(asm, 10, 20, 30)
box2 = _make_box(asm, 40, 50, 60)
_ground(box2)
_make_joint(jg, 0, [box2, ["Face6", "Vertex7"]], [box1, ["Face6", "Vertex7"]])
same = box1.Placement.isSame(box2.Placement, 1e-6)
_report(
"fixed: box1 matches box2 placement",
same,
f"box1={box1.Placement.Base}, box2={box2.Placement.Base}",
)
finally:
_cleanup(doc)
# ── Test 4: Revolute joint + DOF ─────────────────────────────────────
def test_revolute_dof():
"""Revolute joint -> solve succeeds, DOF = 1."""
_pref.SetString("Solver", "kindred")
doc = _new_doc("RevoluteTest")
try:
asm, jg = _make_assembly(doc)
box1 = _make_box(asm, 0, 0, 0)
box2 = _make_box(asm, 100, 0, 0)
_ground(box1)
_make_joint(jg, 1, [box1, ["Face6", "Vertex7"]], [box2, ["Face6", "Vertex7"]])
result = asm.solve()
_report("revolute: solve succeeds", result == 0, f"solve() returned {result}")
dof = asm.getLastDoF()
_report("revolute: DOF = 1", dof == 1, f"DOF = {dof}")
finally:
_cleanup(doc)
# ── Test 5: No ground ───────────────────────────────────────────────
def test_no_ground():
"""No grounded parts -> returns -6."""
_pref.SetString("Solver", "kindred")
doc = _new_doc("NoGroundTest")
try:
asm, jg = _make_assembly(doc)
box1 = _make_box(asm, 0, 0, 0)
box2 = _make_box(asm, 50, 0, 0)
joint = jg.newObject("App::FeaturePython", "Joint")
JointObject.Joint(joint, 0)
refs = [[box1, ["Face6", "Vertex7"]], [box2, ["Face6", "Vertex7"]]]
joint.Proxy.setJointConnectors(joint, refs)
result = asm.solve()
_report("no-ground: returns -6", result == -6, f"solve() returned {result}")
finally:
_cleanup(doc)
# ── Test 6: Solve stability ─────────────────────────────────────────
def test_stability():
"""Solving twice gives identical placements."""
_pref.SetString("Solver", "kindred")
doc = _new_doc("StabilityTest")
try:
asm, jg = _make_assembly(doc)
box1 = _make_box(asm, 10, 20, 30)
box2 = _make_box(asm, 40, 50, 60)
_ground(box2)
_make_joint(jg, 0, [box2, ["Face6", "Vertex7"]], [box1, ["Face6", "Vertex7"]])
asm.solve()
plc1 = App.Placement(box1.Placement)
asm.solve()
plc2 = box1.Placement
same = plc1.isSame(plc2, 1e-6)
_report("stability: two solves identical", same)
finally:
_cleanup(doc)
# ── Test 7: Standalone solver API ────────────────────────────────────
def test_standalone_api():
"""Use kcsolve types directly without FreeCAD Assembly objects."""
solver = kcsolve.load("kindred")
# Two parts: one grounded, one free
p1 = kcsolve.Part()
p1.id = "base"
p1.placement = kcsolve.Transform.identity()
p1.grounded = True
p2 = kcsolve.Part()
p2.id = "arm"
p2.placement = kcsolve.Transform()
p2.placement.position = [100.0, 0.0, 0.0]
p2.placement.quaternion = [1.0, 0.0, 0.0, 0.0]
p2.grounded = False
# Fixed joint
c = kcsolve.Constraint()
c.id = "fix1"
c.part_i = "base"
c.marker_i = kcsolve.Transform.identity()
c.part_j = "arm"
c.marker_j = kcsolve.Transform.identity()
c.type = kcsolve.BaseJointKind.Fixed
ctx = kcsolve.SolveContext()
ctx.parts = [p1, p2]
ctx.constraints = [c]
result = solver.solve(ctx)
_report(
"standalone: solve status",
result.status == kcsolve.SolveStatus.Success,
f"status={result.status}",
)
_report("standalone: DOF = 0", result.dof == 0, f"dof={result.dof}")
# Check that arm moved to origin
for pr in result.placements:
if pr.id == "arm":
dist = sum(x**2 for x in pr.placement.position) ** 0.5
_report("standalone: arm at origin", dist < 1e-6, f"distance={dist:.2e}")
break
# ── Test 8: Diagnose API ────────────────────────────────────────────
def test_diagnose():
"""Diagnose overconstrained system via standalone API."""
solver = kcsolve.load("kindred")
p1 = kcsolve.Part()
p1.id = "base"
p1.placement = kcsolve.Transform.identity()
p1.grounded = True
p2 = kcsolve.Part()
p2.id = "arm"
p2.placement = kcsolve.Transform()
p2.placement.position = [50.0, 0.0, 0.0]
p2.placement.quaternion = [1.0, 0.0, 0.0, 0.0]
p2.grounded = False
# Two fixed joints = overconstrained
c1 = kcsolve.Constraint()
c1.id = "fix1"
c1.part_i = "base"
c1.marker_i = kcsolve.Transform.identity()
c1.part_j = "arm"
c1.marker_j = kcsolve.Transform.identity()
c1.type = kcsolve.BaseJointKind.Fixed
c2 = kcsolve.Constraint()
c2.id = "fix2"
c2.part_i = "base"
c2.marker_i = kcsolve.Transform.identity()
c2.part_j = "arm"
c2.marker_j = kcsolve.Transform.identity()
c2.type = kcsolve.BaseJointKind.Fixed
ctx = kcsolve.SolveContext()
ctx.parts = [p1, p2]
ctx.constraints = [c1, c2]
diags = solver.diagnose(ctx)
_report(
"diagnose: returns diagnostics", len(diags) > 0, f"{len(diags)} diagnostic(s)"
)
if diags:
kinds = [d.kind for d in diags]
_report(
"diagnose: found redundant",
kcsolve.DiagnosticKind.Redundant in kinds,
f"kinds={[str(k) for k in kinds]}",
)
# ── Run all ──────────────────────────────────────────────────────────
def run_all():
print("\n=== Phase 5 Console Tests ===\n")
test_solver_registry()
test_preference_switching()
test_fixed_joint()
test_revolute_dof()
test_no_ground()
test_stability()
test_standalone_api()
test_diagnose()
# Restore original preference
_pref.SetString("Solver", _orig_solver)
# Summary
passed = sum(1 for _, p in _results if p)
total = len(_results)
print(f"\n=== {passed}/{total} passed ===\n")
if passed < total:
failed = [name for name, p in _results if not p]
print(f"FAILED: {', '.join(failed)}")
run_all()

357
tests/test_codegen.py Normal file
View File

@@ -0,0 +1,357 @@
"""Tests for the codegen module — CSE, compilation, and compiled evaluation."""
import math
import numpy as np
import pytest
from kindred_solver.codegen import (
_build_cse,
_find_nonzero_entries,
compile_system,
try_compile_system,
)
from kindred_solver.expr import (
ZERO,
Add,
Const,
Cos,
Div,
Mul,
Neg,
Pow,
Sin,
Sqrt,
Sub,
Var,
)
from kindred_solver.newton import newton_solve
from kindred_solver.params import ParamTable
# ---------------------------------------------------------------------------
# to_code() — round-trip correctness for each Expr type
# ---------------------------------------------------------------------------
class TestToCode:
"""Verify that eval(expr.to_code()) == expr.eval(env) for each node."""
NS = {"_sin": math.sin, "_cos": math.cos, "_sqrt": math.sqrt}
def _check(self, expr, env):
code = expr.to_code()
ns = dict(self.NS)
ns["env"] = env
compiled = eval(code, ns)
expected = expr.eval(env)
assert abs(compiled - expected) < 1e-15, (
f"{code} = {compiled}, expected {expected}"
)
def test_const(self):
self._check(Const(3.14), {})
def test_const_negative(self):
self._check(Const(-2.5), {})
def test_const_zero(self):
self._check(Const(0.0), {})
def test_var(self):
self._check(Var("x"), {"x": 7.0})
def test_neg(self):
self._check(Neg(Var("x")), {"x": 3.0})
def test_add(self):
self._check(Add(Var("x"), Const(2.0)), {"x": 5.0})
def test_sub(self):
self._check(Sub(Var("x"), Var("y")), {"x": 5.0, "y": 3.0})
def test_mul(self):
self._check(Mul(Var("x"), Const(3.0)), {"x": 4.0})
def test_div(self):
self._check(Div(Var("x"), Const(2.0)), {"x": 6.0})
def test_pow(self):
self._check(Pow(Var("x"), Const(3.0)), {"x": 2.0})
def test_sin(self):
self._check(Sin(Var("x")), {"x": 1.0})
def test_cos(self):
self._check(Cos(Var("x")), {"x": 1.0})
def test_sqrt(self):
self._check(Sqrt(Var("x")), {"x": 9.0})
def test_nested(self):
"""Complex nested expression."""
x, y = Var("x"), Var("y")
expr = Add(Mul(Sin(x), Cos(y)), Sqrt(Sub(x, Neg(y))))
self._check(expr, {"x": 2.0, "y": 1.0})
# ---------------------------------------------------------------------------
# CSE
# ---------------------------------------------------------------------------
class TestCSE:
def test_no_sharing(self):
"""Distinct expressions produce no CSE temps."""
a = Var("x") + Const(1.0)
b = Var("y") + Const(2.0)
id_to_temp, temps = _build_cse([a, b])
assert len(temps) == 0
def test_shared_subtree(self):
"""Same node object used in two places is extracted."""
x = Var("x")
shared = x * Const(2.0) # single Mul node
a = shared + Const(1.0)
b = shared + Const(3.0)
id_to_temp, temps = _build_cse([a, b])
assert len(temps) >= 1
# The shared Mul node should be a temp
assert id(shared) in id_to_temp
def test_leaf_nodes_not_extracted(self):
"""Const and Var nodes are never extracted as temps."""
x = Var("x")
c = Const(5.0)
a = x + c
b = x + c
id_to_temp, temps = _build_cse([a, b])
for _, expr in temps:
assert not isinstance(expr, (Const, Var))
def test_dependency_order(self):
"""Temps are in dependency order (dependencies first)."""
x = Var("x")
inner = x * Const(2.0)
outer = inner + inner # uses inner twice
wrapper_a = outer * Const(3.0)
wrapper_b = outer * Const(4.0)
id_to_temp, temps = _build_cse([wrapper_a, wrapper_b])
# If both inner and outer are temps, inner must come first
temp_names = [name for name, _ in temps]
temp_ids = [id(expr) for _, expr in temps]
if id(inner) in set(id_to_temp) and id(outer) in set(id_to_temp):
inner_idx = temp_ids.index(id(inner))
outer_idx = temp_ids.index(id(outer))
assert inner_idx < outer_idx
# ---------------------------------------------------------------------------
# Sparsity detection
# ---------------------------------------------------------------------------
class TestSparsity:
def test_zero_entries_skipped(self):
nz = _find_nonzero_entries(
[
[Const(0.0), Var("x"), Const(0.0)],
[Const(1.0), Const(0.0), Var("y")],
]
)
assert nz == [(0, 1), (1, 0), (1, 2)]
def test_all_nonzero(self):
nz = _find_nonzero_entries(
[
[Var("x"), Const(1.0)],
]
)
assert nz == [(0, 0), (0, 1)]
def test_all_zero(self):
nz = _find_nonzero_entries(
[
[Const(0.0), Const(0.0)],
]
)
assert nz == []
# ---------------------------------------------------------------------------
# Full compilation pipeline
# ---------------------------------------------------------------------------
class TestCompileSystem:
def test_simple_linear(self):
"""Compile and evaluate a trivial system: r = x - 3, J = [[1]]."""
x = Var("x")
residuals = [x - Const(3.0)]
jac_exprs = [[Const(1.0)]] # d(x-3)/dx = 1
fn = compile_system(residuals, jac_exprs, 1, 1)
env = {"x": 5.0}
r_vec = np.empty(1)
J = np.zeros((1, 1))
fn(env, r_vec, J)
assert abs(r_vec[0] - 2.0) < 1e-15 # 5 - 3 = 2
assert abs(J[0, 0] - 1.0) < 1e-15
def test_two_variable_system(self):
"""Compile: r0 = x + y - 5, r1 = x - y - 1."""
x, y = Var("x"), Var("y")
residuals = [x + y - Const(5.0), x - y - Const(1.0)]
jac_exprs = [
[Const(1.0), Const(1.0)], # d(r0)/dx, d(r0)/dy
[Const(1.0), Const(-1.0)], # d(r1)/dx, d(r1)/dy
]
fn = compile_system(residuals, jac_exprs, 2, 2)
env = {"x": 3.0, "y": 2.0}
r_vec = np.empty(2)
J = np.zeros((2, 2))
fn(env, r_vec, J)
assert abs(r_vec[0] - 0.0) < 1e-15
assert abs(r_vec[1] - 0.0) < 1e-15
assert abs(J[0, 0] - 1.0) < 1e-15
assert abs(J[0, 1] - 1.0) < 1e-15
assert abs(J[1, 0] - 1.0) < 1e-15
assert abs(J[1, 1] - (-1.0)) < 1e-15
def test_sparse_jacobian(self):
"""Zero Jacobian entries remain zero after compiled evaluation."""
x = Var("x")
y = Var("y")
# r0 depends on x only, r1 depends on y only
residuals = [x - Const(1.0), y - Const(2.0)]
jac_exprs = [
[Const(1.0), Const(0.0)],
[Const(0.0), Const(1.0)],
]
fn = compile_system(residuals, jac_exprs, 2, 2)
env = {"x": 3.0, "y": 4.0}
r_vec = np.empty(2)
J = np.zeros((2, 2))
fn(env, r_vec, J)
assert abs(J[0, 1]) < 1e-15 # should remain zero
assert abs(J[1, 0]) < 1e-15 # should remain zero
assert abs(J[0, 0] - 1.0) < 1e-15
assert abs(J[1, 1] - 1.0) < 1e-15
def test_trig_functions(self):
"""Compiled evaluation handles Sin/Cos/Sqrt."""
x = Var("x")
residuals = [Sin(x), Cos(x), Sqrt(x)]
jac_exprs = [
[Cos(x)],
[Neg(Sin(x))],
[Div(Const(1.0), Mul(Const(2.0), Sqrt(x)))],
]
fn = compile_system(residuals, jac_exprs, 3, 1)
env = {"x": 1.0}
r_vec = np.empty(3)
J = np.zeros((3, 1))
fn(env, r_vec, J)
assert abs(r_vec[0] - math.sin(1.0)) < 1e-15
assert abs(r_vec[1] - math.cos(1.0)) < 1e-15
assert abs(r_vec[2] - math.sqrt(1.0)) < 1e-15
assert abs(J[0, 0] - math.cos(1.0)) < 1e-15
assert abs(J[1, 0] - (-math.sin(1.0))) < 1e-15
assert abs(J[2, 0] - (1.0 / (2.0 * math.sqrt(1.0)))) < 1e-15
def test_matches_tree_walk(self):
"""Compiled eval produces identical results to tree-walk eval."""
pt = ParamTable()
x = pt.add("x", 2.0)
y = pt.add("y", 3.0)
residuals = [x * y - Const(6.0), x * x + y - Const(7.0)]
free = pt.free_names()
jac_exprs = [[r.diff(name).simplify() for name in free] for r in residuals]
fn = compile_system(residuals, jac_exprs, 2, 2)
# Tree-walk eval
env = pt.get_env()
r_tree = np.array([r.eval(env) for r in residuals])
J_tree = np.empty((2, 2))
for i in range(2):
for j in range(2):
J_tree[i, j] = jac_exprs[i][j].eval(env)
# Compiled eval
r_comp = np.empty(2)
J_comp = np.zeros((2, 2))
fn(pt.env_ref(), r_comp, J_comp)
np.testing.assert_allclose(r_comp, r_tree, atol=1e-15)
np.testing.assert_allclose(J_comp, J_tree, atol=1e-15)
class TestTryCompile:
def test_returns_callable(self):
x = Var("x")
fn = try_compile_system([x], [[Const(1.0)]], 1, 1)
assert fn is not None
def test_empty_system(self):
"""Empty system returns None (nothing to compile)."""
fn = try_compile_system([], [], 0, 0)
# Empty system is handled by the solver before codegen is reached,
# so returning None is acceptable.
assert fn is None or callable(fn)
# ---------------------------------------------------------------------------
# Integration: Newton with compiled eval matches tree-walk
# ---------------------------------------------------------------------------
class TestCompiledNewton:
def test_single_linear(self):
"""Solve x - 3 = 0 with compiled eval."""
pt = ParamTable()
x = pt.add("x", 0.0)
residuals = [x - Const(3.0)]
assert newton_solve(residuals, pt) is True
assert abs(pt.get_value("x") - 3.0) < 1e-10
def test_two_variables(self):
"""Solve x + y = 5, x - y = 1 with compiled eval."""
pt = ParamTable()
x = pt.add("x", 0.0)
y = pt.add("y", 0.0)
residuals = [x + y - Const(5.0), x - y - Const(1.0)]
assert newton_solve(residuals, pt) is True
assert abs(pt.get_value("x") - 3.0) < 1e-10
assert abs(pt.get_value("y") - 2.0) < 1e-10
def test_quadratic(self):
"""Solve x^2 - 4 = 0 starting from x=1."""
pt = ParamTable()
x = pt.add("x", 1.0)
residuals = [x * x - Const(4.0)]
assert newton_solve(residuals, pt) is True
assert abs(pt.get_value("x") - 2.0) < 1e-10
def test_nonlinear_system(self):
"""Compiled eval converges for a nonlinear system: xy=6, x+y=5."""
pt = ParamTable()
x = pt.add("x", 2.0)
y = pt.add("y", 3.5)
residuals = [x * y - Const(6.0), x + y - Const(5.0)]
assert newton_solve(residuals, pt, max_iter=100) is True
# Solutions are (2, 3) or (3, 2) — check they satisfy both equations
xv, yv = pt.get_value("x"), pt.get_value("y")
assert abs(xv * yv - 6.0) < 1e-10
assert abs(xv + yv - 5.0) < 1e-10

View File

@@ -65,7 +65,7 @@ class TestPointOnLine:
b1 = RigidBody("a", pt, (0, 0, 0), (1, 0, 0, 0))
b2 = RigidBody("b", pt, (0, 0, 0), (1, 0, 0, 0))
c = PointOnLineConstraint(b1, (0, 0, 0), ID_QUAT, b2, (0, 0, 0), ID_QUAT)
assert len(c.residuals()) == 2
assert len(c.residuals()) == 3
class TestPointInPlane:
@@ -135,7 +135,7 @@ class TestParallel:
b1 = RigidBody("a", pt, (0, 0, 0), (1, 0, 0, 0))
b2 = RigidBody("b", pt, (0, 0, 0), (1, 0, 0, 0))
c = ParallelConstraint(b1, ID_QUAT, b2, ID_QUAT)
assert len(c.residuals()) == 2
assert len(c.residuals()) == 3
class TestPerpendicular:
@@ -222,7 +222,7 @@ class TestConcentric:
b1 = RigidBody("a", pt, (0, 0, 0), (1, 0, 0, 0))
b2 = RigidBody("b", pt, (0, 0, 0), (1, 0, 0, 0))
c = ConcentricConstraint(b1, (0, 0, 0), ID_QUAT, b2, (0, 0, 0), ID_QUAT)
assert len(c.residuals()) == 4
assert len(c.residuals()) == 6
class TestTangent:
@@ -279,7 +279,7 @@ class TestPlanar:
b1 = RigidBody("a", pt, (0, 0, 0), (1, 0, 0, 0))
b2 = RigidBody("b", pt, (0, 0, 0), (1, 0, 0, 0))
c = PlanarConstraint(b1, (0, 0, 0), ID_QUAT, b2, (0, 0, 0), ID_QUAT)
assert len(c.residuals()) == 3
assert len(c.residuals()) == 4
class TestLineInPlane:
@@ -334,7 +334,7 @@ class TestRevolute:
b1 = RigidBody("a", pt, (0, 0, 0), (1, 0, 0, 0))
b2 = RigidBody("b", pt, (0, 0, 0), (1, 0, 0, 0))
c = RevoluteConstraint(b1, (0, 0, 0), ID_QUAT, b2, (0, 0, 0), ID_QUAT)
assert len(c.residuals()) == 5
assert len(c.residuals()) == 6
class TestCylindrical:
@@ -353,7 +353,7 @@ class TestCylindrical:
b1 = RigidBody("a", pt, (0, 0, 0), (1, 0, 0, 0))
b2 = RigidBody("b", pt, (0, 0, 0), (1, 0, 0, 0))
c = CylindricalConstraint(b1, (0, 0, 0), ID_QUAT, b2, (0, 0, 0), ID_QUAT)
assert len(c.residuals()) == 4
assert len(c.residuals()) == 6
class TestSlider:
@@ -375,15 +375,15 @@ class TestSlider:
c = SliderConstraint(b1, (0, 0, 0), ID_QUAT, b2, (0, 0, 0), ID_QUAT)
env = pt.get_env()
vals = [r.eval(env) for r in c.residuals()]
# First 4 should be ~0 (parallel + on-line), but twist residual should be ~1
assert abs(vals[4]) > 0.5
# First 6 should be ~0 (parallel + on-line), but twist residual should be ~1
assert abs(vals[6]) > 0.5
def test_residual_count(self):
pt = ParamTable()
b1 = RigidBody("a", pt, (0, 0, 0), (1, 0, 0, 0))
b2 = RigidBody("b", pt, (0, 0, 0), (1, 0, 0, 0))
c = SliderConstraint(b1, (0, 0, 0), ID_QUAT, b2, (0, 0, 0), ID_QUAT)
assert len(c.residuals()) == 5
assert len(c.residuals()) == 7
class TestUniversal:
@@ -411,7 +411,7 @@ class TestScrew:
b1 = RigidBody("a", pt, (0, 0, 0), (1, 0, 0, 0))
b2 = RigidBody("b", pt, (0, 0, 0), (1, 0, 0, 0))
c = ScrewConstraint(b1, (0, 0, 0), ID_QUAT, b2, (0, 0, 0), ID_QUAT, pitch=10.0)
assert len(c.residuals()) == 5
assert len(c.residuals()) == 7
def test_zero_displacement_zero_rotation(self):
"""Both at origin with identity rotation → all residuals 0."""

296
tests/test_diagnostics.py Normal file
View File

@@ -0,0 +1,296 @@
"""Tests for per-entity DOF diagnostics and overconstrained detection."""
import math
import numpy as np
import pytest
from kindred_solver.constraints import (
CoincidentConstraint,
CylindricalConstraint,
DistancePointPointConstraint,
FixedConstraint,
ParallelConstraint,
RevoluteConstraint,
)
from kindred_solver.diagnostics import (
ConstraintDiag,
EntityDOF,
find_overconstrained,
per_entity_dof,
)
from kindred_solver.entities import RigidBody
from kindred_solver.params import ParamTable
def _make_two_bodies(
params,
pos_a=(0, 0, 0),
pos_b=(5, 0, 0),
quat_a=(1, 0, 0, 0),
quat_b=(1, 0, 0, 0),
ground_a=True,
ground_b=False,
):
body_a = RigidBody(
"a", params, position=pos_a, quaternion=quat_a, grounded=ground_a
)
body_b = RigidBody(
"b", params, position=pos_b, quaternion=quat_b, grounded=ground_b
)
return body_a, body_b
def _build_residuals_and_ranges(constraint_objs, bodies, params):
"""Build residuals list, quat norms, and residual_ranges."""
all_residuals = []
residual_ranges = []
row = 0
for i, obj in enumerate(constraint_objs):
r = obj.residuals()
n = len(r)
residual_ranges.append((row, row + n, i))
all_residuals.extend(r)
row += n
for body in bodies.values():
if not body.grounded:
all_residuals.append(body.quat_norm_residual())
return all_residuals, residual_ranges
# ============================================================================
# Per-entity DOF tests
# ============================================================================
class TestPerEntityDOF:
"""Per-entity DOF computation."""
def test_unconstrained_body_6dof(self):
"""Unconstrained non-grounded body has 6 DOF."""
params = ParamTable()
body = RigidBody(
"b", params, position=(0, 0, 0), quaternion=(1, 0, 0, 0), grounded=False
)
bodies = {"b": body}
# Only quat norm constraint
residuals = [body.quat_norm_residual()]
result = per_entity_dof(residuals, params, bodies)
assert len(result) == 1
assert result[0].entity_id == "b"
assert result[0].remaining_dof == 6
assert len(result[0].free_motions) == 6
def test_fixed_body_0dof(self):
"""Body welded to ground has 0 DOF."""
params = ParamTable()
body_a, body_b = _make_two_bodies(params)
bodies = {"a": body_a, "b": body_b}
c = FixedConstraint(
body_a,
(0, 0, 0),
(1, 0, 0, 0),
body_b,
(0, 0, 0),
(1, 0, 0, 0),
)
residuals, _ = _build_residuals_and_ranges([c], bodies, params)
result = per_entity_dof(residuals, params, bodies)
# Only non-grounded body (b) reported
assert len(result) == 1
assert result[0].entity_id == "b"
assert result[0].remaining_dof == 0
assert len(result[0].free_motions) == 0
def test_revolute_1dof(self):
"""Revolute joint leaves 1 DOF (rotation about Z)."""
params = ParamTable()
body_a, body_b = _make_two_bodies(params, pos_b=(0, 0, 0))
bodies = {"a": body_a, "b": body_b}
c = RevoluteConstraint(
body_a,
(0, 0, 0),
(1, 0, 0, 0),
body_b,
(0, 0, 0),
(1, 0, 0, 0),
)
residuals, _ = _build_residuals_and_ranges([c], bodies, params)
result = per_entity_dof(residuals, params, bodies)
assert len(result) == 1
assert result[0].remaining_dof == 1
# Should have one free motion that mentions rotation
assert len(result[0].free_motions) == 1
assert "rotation" in result[0].free_motions[0].lower()
def test_cylindrical_2dof(self):
"""Cylindrical joint leaves 2 DOF (rotation about Z + translation along Z)."""
params = ParamTable()
body_a, body_b = _make_two_bodies(params, pos_b=(0, 0, 0))
bodies = {"a": body_a, "b": body_b}
c = CylindricalConstraint(
body_a,
(0, 0, 0),
(1, 0, 0, 0),
body_b,
(0, 0, 0),
(1, 0, 0, 0),
)
residuals, _ = _build_residuals_and_ranges([c], bodies, params)
result = per_entity_dof(residuals, params, bodies)
assert len(result) == 1
assert result[0].remaining_dof == 2
assert len(result[0].free_motions) == 2
def test_coincident_3dof(self):
"""Coincident (ball) joint leaves 3 DOF (3 rotations)."""
params = ParamTable()
body_a, body_b = _make_two_bodies(params, pos_b=(0, 0, 0))
bodies = {"a": body_a, "b": body_b}
c = CoincidentConstraint(body_a, (0, 0, 0), body_b, (0, 0, 0))
residuals, _ = _build_residuals_and_ranges([c], bodies, params)
result = per_entity_dof(residuals, params, bodies)
assert len(result) == 1
assert result[0].remaining_dof == 3
# All 3 should be rotations
for motion in result[0].free_motions:
assert "rotation" in motion.lower()
def test_no_constraints_6dof(self):
"""No residuals at all gives 6 DOF."""
params = ParamTable()
body = RigidBody(
"b", params, position=(0, 0, 0), quaternion=(1, 0, 0, 0), grounded=False
)
bodies = {"b": body}
result = per_entity_dof([], params, bodies)
assert len(result) == 1
assert result[0].remaining_dof == 6
def test_grounded_body_excluded(self):
"""Grounded bodies are not reported."""
params = ParamTable()
body_a, body_b = _make_two_bodies(params)
bodies = {"a": body_a, "b": body_b}
residuals = [body_b.quat_norm_residual()]
result = per_entity_dof(residuals, params, bodies)
entity_ids = [r.entity_id for r in result]
assert "a" not in entity_ids # grounded
assert "b" in entity_ids
def test_multiple_bodies(self):
"""Two free bodies: each gets its own DOF report."""
params = ParamTable()
body_g = RigidBody(
"g", params, position=(0, 0, 0), quaternion=(1, 0, 0, 0), grounded=True
)
body_b = RigidBody(
"b", params, position=(5, 0, 0), quaternion=(1, 0, 0, 0), grounded=False
)
body_c = RigidBody(
"c", params, position=(10, 0, 0), quaternion=(1, 0, 0, 0), grounded=False
)
bodies = {"g": body_g, "b": body_b, "c": body_c}
# Fix b to ground, leave c unconstrained
c_fix = FixedConstraint(
body_g,
(0, 0, 0),
(1, 0, 0, 0),
body_b,
(0, 0, 0),
(1, 0, 0, 0),
)
residuals, _ = _build_residuals_and_ranges([c_fix], bodies, params)
result = per_entity_dof(residuals, params, bodies)
result_map = {r.entity_id: r for r in result}
assert result_map["b"].remaining_dof == 0
assert result_map["c"].remaining_dof == 6
# ============================================================================
# Overconstrained detection tests
# ============================================================================
class TestFindOverconstrained:
"""Redundant and conflicting constraint detection."""
def test_well_constrained_no_diagnostics(self):
"""Well-constrained system produces no diagnostics."""
params = ParamTable()
body_a, body_b = _make_two_bodies(params, pos_b=(0, 0, 0))
bodies = {"a": body_a, "b": body_b}
c = FixedConstraint(
body_a,
(0, 0, 0),
(1, 0, 0, 0),
body_b,
(0, 0, 0),
(1, 0, 0, 0),
)
residuals, ranges = _build_residuals_and_ranges([c], bodies, params)
diags = find_overconstrained(residuals, params, ranges)
assert len(diags) == 0
def test_duplicate_coincident_redundant(self):
"""Duplicate coincident constraint is flagged as redundant."""
params = ParamTable()
body_a, body_b = _make_two_bodies(params, pos_b=(0, 0, 0))
bodies = {"a": body_a, "b": body_b}
c1 = CoincidentConstraint(body_a, (0, 0, 0), body_b, (0, 0, 0))
c2 = CoincidentConstraint(body_a, (0, 0, 0), body_b, (0, 0, 0))
residuals, ranges = _build_residuals_and_ranges([c1, c2], bodies, params)
diags = find_overconstrained(residuals, params, ranges)
assert len(diags) > 0
# At least one should be redundant
kinds = {d.kind for d in diags}
assert "redundant" in kinds
def test_conflicting_distance(self):
"""Distance constraint that can't be satisfied is flagged as conflicting."""
params = ParamTable()
body_a, body_b = _make_two_bodies(params, pos_b=(0, 0, 0))
bodies = {"a": body_a, "b": body_b}
# Coincident forces distance=0, but distance constraint says 50
c1 = CoincidentConstraint(body_a, (0, 0, 0), body_b, (0, 0, 0))
c2 = DistancePointPointConstraint(
body_a,
(0, 0, 0),
body_b,
(0, 0, 0),
distance=50.0,
)
residuals, ranges = _build_residuals_and_ranges([c1, c2], bodies, params)
diags = find_overconstrained(residuals, params, ranges)
assert len(diags) > 0
kinds = {d.kind for d in diags}
assert "conflicting" in kinds
def test_empty_system_no_diagnostics(self):
"""Empty system has no diagnostics."""
params = ParamTable()
diags = find_overconstrained([], params, [])
assert len(diags) == 0

View File

@@ -173,15 +173,17 @@ class TestPointLinePerp:
pt = (Const(0.0), Const(0.0), Const(5.0))
origin = (Const(0.0), Const(0.0), Const(0.0))
direction = (Const(0.0), Const(0.0), Const(1.0))
cx, cy = point_line_perp_components(pt, origin, direction)
cx, cy, cz = point_line_perp_components(pt, origin, direction)
assert abs(cx.eval({})) < 1e-10
assert abs(cy.eval({})) < 1e-10
assert abs(cz.eval({})) < 1e-10
def test_off_line(self):
pt = (Const(3.0), Const(0.0), Const(0.0))
origin = (Const(0.0), Const(0.0), Const(0.0))
direction = (Const(0.0), Const(0.0), Const(1.0))
cx, cy = point_line_perp_components(pt, origin, direction)
cx, cy, cz = point_line_perp_components(pt, origin, direction)
# d = (3,0,0), dir = (0,0,1), d x dir = (0*1-0*0, 0*0-3*1, 3*0-0*0) = (0,-3,0)
assert abs(cx.eval({})) < 1e-10
assert abs(cy.eval({}) - (-3.0)) < 1e-10
assert abs(cz.eval({})) < 1e-10

View File

@@ -421,8 +421,10 @@ class TestSliderCrank:
bodies = [ground, crank, rod, piston]
dof = _dof(pt, bodies, constraints)
# 3D slider-crank: planar motion + out-of-plane fold modes
assert dof == 3
# With full 3-component cross products, the redundant constraint rows
# eliminate the out-of-plane fold modes, giving the correct 1 DOF
# (crank rotation only).
assert dof == 1
def test_slider_crank_solves(self):
"""Slider-crank converges from displaced state."""

View File

@@ -99,3 +99,50 @@ class TestParamTable:
pt.unfix("a")
assert pt.free_names() == ["a"]
assert pt.n_free() == 1
def test_snapshot_restore_roundtrip(self):
"""Snapshot captures values; restore brings them back."""
pt = ParamTable()
pt.add("x", 1.0)
pt.add("y", 2.0)
pt.add("z", 3.0, fixed=True)
snap = pt.snapshot()
pt.set_value("x", 99.0)
pt.set_value("y", 88.0)
pt.set_value("z", 77.0)
pt.restore(snap)
assert pt.get_value("x") == 1.0
assert pt.get_value("y") == 2.0
assert pt.get_value("z") == 3.0
def test_snapshot_is_independent_copy(self):
"""Mutating snapshot dict does not affect the table."""
pt = ParamTable()
pt.add("a", 5.0)
snap = pt.snapshot()
snap["a"] = 999.0
assert pt.get_value("a") == 5.0
def test_movement_cost_no_weights(self):
"""Movement cost is sum of squared displacements for free params."""
pt = ParamTable()
pt.add("x", 0.0)
pt.add("y", 0.0)
pt.add("z", 0.0, fixed=True)
snap = pt.snapshot()
pt.set_value("x", 3.0)
pt.set_value("y", 4.0)
pt.set_value("z", 100.0) # fixed — ignored
assert pt.movement_cost(snap) == pytest.approx(25.0)
def test_movement_cost_with_weights(self):
"""Weighted movement cost scales each displacement."""
pt = ParamTable()
pt.add("a", 0.0)
pt.add("b", 0.0)
snap = pt.snapshot()
pt.set_value("a", 1.0)
pt.set_value("b", 1.0)
weights = {"a": 4.0, "b": 9.0}
# cost = 1^2*4 + 1^2*9 = 13
assert pt.movement_cost(snap, weights) == pytest.approx(13.0)

384
tests/test_preference.py Normal file
View File

@@ -0,0 +1,384 @@
"""Tests for solution preference: half-space tracking and corrections."""
import math
import numpy as np
import pytest
from kindred_solver.constraints import (
AngleConstraint,
DistancePointPointConstraint,
ParallelConstraint,
PerpendicularConstraint,
)
from kindred_solver.entities import RigidBody
from kindred_solver.newton import newton_solve
from kindred_solver.params import ParamTable
from kindred_solver.preference import (
apply_half_space_correction,
compute_half_spaces,
)
def _make_two_bodies(
params,
pos_a=(0, 0, 0),
pos_b=(5, 0, 0),
quat_a=(1, 0, 0, 0),
quat_b=(1, 0, 0, 0),
ground_a=True,
ground_b=False,
):
"""Create two bodies with given positions/orientations."""
body_a = RigidBody(
"a", params, position=pos_a, quaternion=quat_a, grounded=ground_a
)
body_b = RigidBody(
"b", params, position=pos_b, quaternion=quat_b, grounded=ground_b
)
return body_a, body_b
class TestDistanceHalfSpace:
"""Half-space tracking for DistancePointPoint constraint."""
def test_positive_x_stays_positive(self):
"""Body starting at +X should stay at +X after solve."""
params = ParamTable()
body_a, body_b = _make_two_bodies(params, pos_b=(3, 0, 0))
c = DistancePointPointConstraint(
body_a,
(0, 0, 0),
body_b,
(0, 0, 0),
distance=5.0,
)
hs = compute_half_spaces([c], [0], params)
assert len(hs) == 1
# Solve with half-space correction
residuals = c.residuals()
residuals.append(body_b.quat_norm_residual())
quat_groups = [body_b.quat_param_names()]
def post_step(p):
apply_half_space_correction(p, hs)
converged = newton_solve(
residuals,
params,
quat_groups=quat_groups,
post_step=post_step,
)
assert converged
env = params.get_env()
# Body b should be at +X (x > 0), not -X
bx = env["b/tx"]
assert bx > 0, f"Expected positive X, got {bx}"
# Distance should be 5
dist = math.sqrt(bx**2 + env["b/ty"] ** 2 + env["b/tz"] ** 2)
assert dist == pytest.approx(5.0, abs=1e-8)
def test_negative_x_stays_negative(self):
"""Body starting at -X should stay at -X after solve."""
params = ParamTable()
body_a, body_b = _make_two_bodies(params, pos_b=(-3, 0, 0))
c = DistancePointPointConstraint(
body_a,
(0, 0, 0),
body_b,
(0, 0, 0),
distance=5.0,
)
hs = compute_half_spaces([c], [0], params)
assert len(hs) == 1
residuals = c.residuals()
residuals.append(body_b.quat_norm_residual())
quat_groups = [body_b.quat_param_names()]
def post_step(p):
apply_half_space_correction(p, hs)
converged = newton_solve(
residuals,
params,
quat_groups=quat_groups,
post_step=post_step,
)
assert converged
env = params.get_env()
bx = env["b/tx"]
assert bx < 0, f"Expected negative X, got {bx}"
def test_zero_distance_no_halfspace(self):
"""Zero distance constraint has no branch ambiguity."""
params = ParamTable()
body_a, body_b = _make_two_bodies(params, pos_b=(3, 0, 0))
c = DistancePointPointConstraint(
body_a,
(0, 0, 0),
body_b,
(0, 0, 0),
distance=0.0,
)
hs = compute_half_spaces([c], [0], params)
assert len(hs) == 0
class TestParallelHalfSpace:
"""Half-space tracking for Parallel constraint."""
def test_same_direction_tracked(self):
"""Same-direction parallel: positive reference sign."""
params = ParamTable()
body_a, body_b = _make_two_bodies(params)
c = ParallelConstraint(body_a, (1, 0, 0, 0), body_b, (1, 0, 0, 0))
hs = compute_half_spaces([c], [0], params)
assert len(hs) == 1
assert hs[0].reference_sign == 1.0
def test_opposite_direction_tracked(self):
"""Opposite-direction parallel: negative reference sign."""
params = ParamTable()
# Rotate body_b by 180 degrees about X: Z-axis flips
q_flip = (0, 1, 0, 0) # 180 deg about X
body_a, body_b = _make_two_bodies(params, quat_b=q_flip)
c = ParallelConstraint(body_a, (1, 0, 0, 0), body_b, (1, 0, 0, 0))
hs = compute_half_spaces([c], [0], params)
assert len(hs) == 1
assert hs[0].reference_sign == -1.0
class TestAngleHalfSpace:
"""Half-space tracking for Angle constraint."""
def test_90_degree_angle(self):
"""90-degree angle constraint creates a half-space."""
params = ParamTable()
# Rotate body_b by 90 degrees about X
q_90x = (math.cos(math.pi / 4), math.sin(math.pi / 4), 0, 0)
body_a, body_b = _make_two_bodies(params, quat_b=q_90x)
c = AngleConstraint(
body_a,
(1, 0, 0, 0),
body_b,
(1, 0, 0, 0),
angle=math.pi / 2,
)
hs = compute_half_spaces([c], [0], params)
assert len(hs) == 1
def test_zero_angle_no_halfspace(self):
"""0-degree angle has no branch ambiguity."""
params = ParamTable()
body_a, body_b = _make_two_bodies(params)
c = AngleConstraint(
body_a,
(1, 0, 0, 0),
body_b,
(1, 0, 0, 0),
angle=0.0,
)
hs = compute_half_spaces([c], [0], params)
assert len(hs) == 0
def test_180_angle_no_halfspace(self):
"""180-degree angle has no branch ambiguity."""
params = ParamTable()
body_a, body_b = _make_two_bodies(params)
c = AngleConstraint(
body_a,
(1, 0, 0, 0),
body_b,
(1, 0, 0, 0),
angle=math.pi,
)
hs = compute_half_spaces([c], [0], params)
assert len(hs) == 0
class TestPerpendicularHalfSpace:
"""Half-space tracking for Perpendicular constraint."""
def test_perpendicular_tracked(self):
"""Perpendicular constraint creates a half-space."""
params = ParamTable()
# Rotate body_b by 90 degrees about X
q_90x = (math.cos(math.pi / 4), math.sin(math.pi / 4), 0, 0)
body_a, body_b = _make_two_bodies(params, quat_b=q_90x)
c = PerpendicularConstraint(
body_a,
(1, 0, 0, 0),
body_b,
(1, 0, 0, 0),
)
hs = compute_half_spaces([c], [0], params)
assert len(hs) == 1
class TestNewtonPostStep:
"""Verify Newton post_step callback works correctly."""
def test_callback_fires(self):
"""post_step callback is invoked during Newton iterations."""
params = ParamTable()
x = params.add("x", 2.0)
from kindred_solver.expr import Const
residuals = [x - Const(5.0)]
call_count = [0]
def counter(p):
call_count[0] += 1
converged = newton_solve(residuals, params, post_step=counter)
assert converged
assert call_count[0] >= 1
def test_callback_does_not_break_convergence(self):
"""A no-op callback doesn't prevent convergence."""
params = ParamTable()
x = params.add("x", 1.0)
y = params.add("y", 1.0)
from kindred_solver.expr import Const
residuals = [x - Const(3.0), y - Const(7.0)]
converged = newton_solve(residuals, params, post_step=lambda p: None)
assert converged
assert params.get_value("x") == pytest.approx(3.0)
assert params.get_value("y") == pytest.approx(7.0)
class TestMixedHalfSpaces:
"""Multiple branching constraints in one system."""
def test_multiple_constraints(self):
"""compute_half_spaces handles mixed constraint types."""
params = ParamTable()
body_a, body_b = _make_two_bodies(params, pos_b=(5, 0, 0))
dist_c = DistancePointPointConstraint(
body_a,
(0, 0, 0),
body_b,
(0, 0, 0),
distance=5.0,
)
par_c = ParallelConstraint(body_a, (1, 0, 0, 0), body_b, (1, 0, 0, 0))
hs = compute_half_spaces([dist_c, par_c], [0, 1], params)
assert len(hs) == 2
class TestBuildWeightVector:
"""Weight vector construction."""
def test_translation_weight_one(self):
"""Translation params get weight 1.0."""
from kindred_solver.preference import build_weight_vector
params = ParamTable()
params.add("body/tx", 0.0)
params.add("body/ty", 0.0)
params.add("body/tz", 0.0)
w = build_weight_vector(params)
np.testing.assert_array_equal(w, [1.0, 1.0, 1.0])
def test_quaternion_weight_high(self):
"""Quaternion params get QUAT_WEIGHT."""
from kindred_solver.preference import QUAT_WEIGHT, build_weight_vector
params = ParamTable()
params.add("body/qw", 1.0)
params.add("body/qx", 0.0)
params.add("body/qy", 0.0)
params.add("body/qz", 0.0)
w = build_weight_vector(params)
np.testing.assert_array_equal(w, [QUAT_WEIGHT] * 4)
def test_mixed_params(self):
"""Mixed translation and quaternion params get correct weights."""
from kindred_solver.preference import QUAT_WEIGHT, build_weight_vector
params = ParamTable()
params.add("b/tx", 0.0)
params.add("b/qw", 1.0)
params.add("b/ty", 0.0)
params.add("b/qx", 0.0)
w = build_weight_vector(params)
assert w[0] == pytest.approx(1.0)
assert w[1] == pytest.approx(QUAT_WEIGHT)
assert w[2] == pytest.approx(1.0)
assert w[3] == pytest.approx(QUAT_WEIGHT)
def test_fixed_params_excluded(self):
"""Fixed params are not in free list, so not in weight vector."""
from kindred_solver.preference import build_weight_vector
params = ParamTable()
params.add("b/tx", 0.0, fixed=True)
params.add("b/ty", 0.0)
w = build_weight_vector(params)
assert len(w) == 1
assert w[0] == pytest.approx(1.0)
class TestWeightedNewton:
"""Weighted minimum-norm Newton solve."""
def test_well_constrained_same_result(self):
"""Weighted and unweighted produce identical results for unique solution."""
from kindred_solver.expr import Const
# Fully determined system: x = 3, y = 7
params1 = ParamTable()
x1 = params1.add("x", 1.0)
y1 = params1.add("y", 1.0)
r1 = [x1 - Const(3.0), y1 - Const(7.0)]
params2 = ParamTable()
x2 = params2.add("x", 1.0)
y2 = params2.add("y", 1.0)
r2 = [x2 - Const(3.0), y2 - Const(7.0)]
newton_solve(r1, params1)
newton_solve(r2, params2, weight_vector=np.array([1.0, 100.0]))
assert params1.get_value("x") == pytest.approx(
params2.get_value("x"), abs=1e-10
)
assert params1.get_value("y") == pytest.approx(
params2.get_value("y"), abs=1e-10
)
def test_underconstrained_prefers_low_weight(self):
"""Under-constrained: weighted solve moves high-weight params less."""
from kindred_solver.expr import Const
# 1 equation, 2 unknowns: x + y = 10 (from x=0, y=0)
params_unw = ParamTable()
xu = params_unw.add("x", 0.0)
yu = params_unw.add("y", 0.0)
ru = [xu + yu - Const(10.0)]
params_w = ParamTable()
xw = params_w.add("x", 0.0)
yw = params_w.add("y", 0.0)
rw = [xw + yw - Const(10.0)]
# Unweighted: lstsq gives equal movement
newton_solve(ru, params_unw)
# Weighted: y is 100x more expensive to move
newton_solve(rw, params_w, weight_vector=np.array([1.0, 100.0]))
# Both should satisfy x + y = 10
assert params_unw.get_value("x") + params_unw.get_value("y") == pytest.approx(
10.0
)
assert params_w.get_value("x") + params_w.get_value("y") == pytest.approx(10.0)
# Weighted solve should move y less than x
assert abs(params_w.get_value("y")) < abs(params_w.get_value("x"))