From 58cef6369791fca638c9a58a2a3f0d9f1eccd546 Mon Sep 17 00:00:00 2001 From: Ivan Chernyshov Date: Thu, 19 Mar 2026 18:53:12 +0300 Subject: [PATCH 1/2] Reworks powerfit solver and adds powerfit edge diagnostics --- CHANGELOG.md | 17 + docs/guide/powerfit.md | 7 +- src/pyvoro2/__about__.py | 2 +- src/pyvoro2/__init__.py | 2 + src/pyvoro2/powerfit/__init__.py | 2 + src/pyvoro2/powerfit/active.py | 5 + src/pyvoro2/powerfit/report.py | 35 ++ src/pyvoro2/powerfit/solver.py | 360 +++++++++++++++--- tests/test_powerfit_fit.py | 34 +- tests/test_powerfit_reports.py | 30 ++ tests/test_powerfit_validation_regressions.py | 84 ++++ 11 files changed, 511 insertions(+), 67 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 506c730..4ff652a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,23 @@ All notable changes to this project are documented in this file. The format is based on *Keep a Changelog*, and this project follows *Semantic Versioning*. +## [0.6.2] - 2026-03-19 + +### Added + +- `PowerWeightFitResult.edge_diagnostics` with theorem-facing edge-space arrays and summaries: `alpha`, `beta`, `z_obs`, `z_fit`, algebraic residuals, and weighted/unweighted algebraic inconsistency metrics. +- Fit reports and per-constraint record exporters now serialize the new algebraic edge diagnostics alongside the existing measurement-space predictions and residuals. +- A medium-size robust-fit regression test that checks the native Huber/ADMM path on a deterministic sparse-outlier benchmark rather than only on tiny two-point cases. + +### Changed + +- The iterative `solver='admm'` backend now splits on the predicted measurement variable itself (`y = beta + alpha (w_i - w_j)`) instead of on raw weight differences, so the linear solve uses the scientifically natural `alpha^2` scaling of the graph Laplacian. +- The ADMM backend now warm-starts from the quadratic analytic fit when that warm start is identifiable, improving convergence on robust Huber fits without expanding the public optimizer API. + +### Fixed + +- Native Huber fitting via `FitModel(mismatch=HuberLoss(...))` and `solver='admm'` no longer stalls or blows up on the medium-size sparse-outlier benchmark family used during the JCAM paper work; the corrected native path now matches the expected measurement-space residual quality and converges in a small number of iterations on the representative failure case. + ## [0.6.1] - 2026-03-16 ### Added diff --git a/docs/guide/powerfit.md b/docs/guide/powerfit.md index eeb0823..afb900c 100644 --- a/docs/guide/powerfit.md +++ b/docs/guide/powerfit.md @@ -123,6 +123,7 @@ The result contains: - fitted `weights` and shifted `radii`, - predicted separator locations in both fraction and position form, - residuals in the chosen measurement space, +- `edge_diagnostics` with algebraic edge-space quantities such as `z_obs`, `z_fit`, and weighted inconsistency summaries, - solver/termination metadata, - and explicit infeasibility reporting for contradictory hard constraints. @@ -141,7 +142,7 @@ Both low-level fits and active-set results also provide `to_records(...)` helper that turn per-constraint diagnostics into plain Python rows for downstream packages, table exporters, or custom reporting. -For radii output, 0.6.1 makes the gauge choice explicit: +For radii output, 0.6.2 makes the gauge choice explicit: - by default, `weights_to_radii(...)` uses the minimal additive shift that makes all returned radii non-negative; @@ -341,9 +342,9 @@ The main current restriction is geometric, not algebraic: - 2D currently supports `Box` and rectangular `RectangularCell`; - there is **no** planar oblique-periodic `PeriodicCell` yet. -### Objective-model scope for 0.6.1 +### Objective-model scope for 0.6.2 -The 0.6.1 line still keeps the built-in objective family compact: +The 0.6.2 line still keeps the built-in objective family compact: - mismatch terms: `SquaredLoss`, `HuberLoss` - hard feasibility: `Interval`, `FixedValue` diff --git a/src/pyvoro2/__about__.py b/src/pyvoro2/__about__.py index ff52f36..a12635e 100644 --- a/src/pyvoro2/__about__.py +++ b/src/pyvoro2/__about__.py @@ -5,4 +5,4 @@ """ # Keep this as a simple assignment so scikit-build-core can extract it via regex. -__version__ = '0.6.1' +__version__ = '0.6.2' diff --git a/src/pyvoro2/__init__.py b/src/pyvoro2/__init__.py index 2c51c17..f4bdd25 100644 --- a/src/pyvoro2/__init__.py +++ b/src/pyvoro2/__init__.py @@ -52,6 +52,7 @@ ReciprocalBoundaryPenalty, L2Regularization, FitModel, + AlgebraicEdgeDiagnostics, ConstraintGraphDiagnostics, ConnectivityDiagnostics, ConnectivityDiagnosticsError, @@ -114,6 +115,7 @@ 'ReciprocalBoundaryPenalty', 'L2Regularization', 'FitModel', + 'AlgebraicEdgeDiagnostics', 'ConstraintGraphDiagnostics', 'ConnectivityDiagnostics', 'ConnectivityDiagnosticsError', diff --git a/src/pyvoro2/powerfit/__init__.py b/src/pyvoro2/powerfit/__init__.py index fd67d70..c057923 100644 --- a/src/pyvoro2/powerfit/__init__.py +++ b/src/pyvoro2/powerfit/__init__.py @@ -36,6 +36,7 @@ write_report_json, ) from .solver import ( + AlgebraicEdgeDiagnostics, ConnectivityDiagnostics, ConnectivityDiagnosticsError, ConstraintGraphDiagnostics, @@ -59,6 +60,7 @@ 'ReciprocalBoundaryPenalty', 'L2Regularization', 'FitModel', + 'AlgebraicEdgeDiagnostics', 'ConstraintGraphDiagnostics', 'ConnectivityDiagnostics', 'ConnectivityDiagnosticsError', diff --git a/src/pyvoro2/powerfit/active.py b/src/pyvoro2/powerfit/active.py index c3b1419..5b667c8 100644 --- a/src/pyvoro2/powerfit/active.py +++ b/src/pyvoro2/powerfit/active.py @@ -15,6 +15,7 @@ PowerWeightFitResult, _apply_connectivity_policy, _build_active_set_connectivity_diagnostics, + _compute_edge_diagnostics, _connected_components, _difference_identifying_mask, _predict_measurements, @@ -891,6 +892,10 @@ def _rebuild_fit_with_weights( residuals=residuals, rms_residual=rms, max_residual=mx, + edge_diagnostics=_compute_edge_diagnostics( + constraints, + weights=weights, + ), ) diff --git a/src/pyvoro2/powerfit/report.py b/src/pyvoro2/powerfit/report.py index 1a88718..fc7a5be 100644 --- a/src/pyvoro2/powerfit/report.py +++ b/src/pyvoro2/powerfit/report.py @@ -20,6 +20,7 @@ ConstraintGraphDiagnostics, HardConstraintConflict, PowerWeightFitResult, + _edge_diagnostics_for_result, ) @@ -226,6 +227,39 @@ def _conflict_record( } +def _edge_diagnostics_record( + result: PowerWeightFitResult, + constraints: PairBisectorConstraints, +) -> dict[str, object]: + diagnostics = _edge_diagnostics_for_result(result, constraints) + return { + 'alpha': diagnostics.alpha.tolist(), + 'beta': diagnostics.beta.tolist(), + 'z_obs': diagnostics.z_obs.tolist(), + 'z_fit': ( + None if diagnostics.z_fit is None else diagnostics.z_fit.tolist() + ), + 'residual': ( + None + if diagnostics.residual is None + else diagnostics.residual.tolist() + ), + 'edge_weight': diagnostics.edge_weight.tolist(), + 'weighted_l2': ( + None + if diagnostics.weighted_l2 is None + else float(diagnostics.weighted_l2) + ), + 'weighted_rmse': ( + None + if diagnostics.weighted_rmse is None + else float(diagnostics.weighted_rmse) + ), + 'rmse': None if diagnostics.rmse is None else float(diagnostics.rmse), + 'mae': None if diagnostics.mae is None else float(diagnostics.mae), + } + + def build_fit_report( result: PowerWeightFitResult, constraints: PairBisectorConstraints, @@ -260,6 +294,7 @@ def build_fit_report( }, 'constraints': list(constraints.to_records(use_ids=use_ids)), 'fit_records': list(result.to_records(constraints, use_ids=use_ids)), + 'edge_diagnostics': _edge_diagnostics_record(result, constraints), 'weights': None if result.weights is None else result.weights.tolist(), 'radii': None if result.radii is None else result.radii.tolist(), 'weight_shift': ( diff --git a/src/pyvoro2/powerfit/solver.py b/src/pyvoro2/powerfit/solver.py index 0cedb14..30498bd 100644 --- a/src/pyvoro2/powerfit/solver.py +++ b/src/pyvoro2/powerfit/solver.py @@ -76,6 +76,22 @@ def __str__(self) -> str: return str(self.args[0]) +@dataclass(frozen=True, slots=True) +class AlgebraicEdgeDiagnostics: + """Edge-space diagnostics matching the paper-side algebraic formulas.""" + + alpha: np.ndarray + beta: np.ndarray + z_obs: np.ndarray + z_fit: np.ndarray | None + residual: np.ndarray | None + edge_weight: np.ndarray + weighted_l2: float | None + weighted_rmse: float | None + rmse: float | None + mae: float | None + + @dataclass(frozen=True, slots=True) class PowerWeightFitResult: """Result of inverse fitting of power weights.""" @@ -102,6 +118,7 @@ class PowerWeightFitResult: conflict: 'HardConstraintConflict | None' warnings: tuple[str, ...] connectivity: ConnectivityDiagnostics | None = None + edge_diagnostics: AlgebraicEdgeDiagnostics | None = None @property def is_optimal(self) -> bool: @@ -134,6 +151,7 @@ def to_records( if constraints.n_constraints != int(self.target.shape[0]): raise ValueError('constraints do not match the fit result length') left, right = constraints.pair_labels(use_ids=use_ids) + edge_diag = _edge_diagnostics_for_result(self, constraints) rows: list[dict[str, object]] = [] left_is_int = np.issubdtype(np.asarray(left).dtype, np.integer) right_is_int = np.issubdtype(np.asarray(right).dtype, np.integer) @@ -168,6 +186,20 @@ def to_records( if self.residuals is None else float(self.residuals[k]) ), + 'alpha': float(edge_diag.alpha[k]), + 'beta': float(edge_diag.beta[k]), + 'z_obs': float(edge_diag.z_obs[k]), + 'z_fit': ( + None + if edge_diag.z_fit is None + else float(edge_diag.z_fit[k]) + ), + 'algebraic_residual': ( + None + if edge_diag.residual is None + else float(edge_diag.residual[k]) + ), + 'edge_weight': float(edge_diag.edge_weight[k]), } ) return tuple(rows) @@ -430,6 +462,12 @@ def _fit_power_weights_resolved( hard = _hard_constraint_bounds(model.feasible, geom.alpha, geom.beta) z_lo = hard[0] if hard is not None else None z_hi = hard[1] if hard is not None else None + hard_measurement = _hard_constraint_measurement_bounds( + model.feasible, + constraints.n_constraints, + ) + y_lo = hard_measurement[0] if hard_measurement is not None else None + y_hi = hard_measurement[1] if hard_measurement is not None else None nonquadratic = _requires_admm(model) if solver == 'auto': @@ -496,6 +534,11 @@ def _fit_power_weights_resolved( conflict=conflict, warnings=tuple(warnings_list), connectivity=connectivity, + edge_diagnostics=_compute_edge_diagnostics( + constraints, + weights=None, + geom=geom, + ), ) else: conflict = None @@ -546,6 +589,11 @@ def _fit_power_weights_resolved( conflict=conflict, warnings=tuple(warnings_list), connectivity=connectivity, + edge_diagnostics=_compute_edge_diagnostics( + constraints, + weights=weights, + geom=geom, + ), ) weights = np.zeros(n, dtype=np.float64) @@ -585,9 +633,6 @@ def _fit_power_weights_resolved( target_c = geom.target[mask] conf_c = constraints.confidence[mask] w0_c = w0[idx_nodes] - z_lo_c = None if z_lo is None else z_lo[mask] - z_hi_c = None if z_hi is None else z_hi[mask] - if solver_eff == 'analytic': w_c = _solve_component_analytic(ii, jj, a_c, b_c, w0_c, lam) iters = 1 @@ -607,8 +652,8 @@ def _fit_power_weights_resolved( max_iter=max_iter, tol_abs=tol_abs, tol_rel=tol_rel, - z_lo=z_lo_c, - z_hi=z_hi_c, + y_lo=None if y_lo is None else y_lo[mask], + y_hi=None if y_hi is None else y_hi[mask], ) if not np.all(np.isfinite(w_c)): raise _NumericalFailure('component solver returned non-finite weights') @@ -663,6 +708,11 @@ def _fit_power_weights_resolved( conflict=conflict, warnings=tuple(warnings_list), connectivity=connectivity, + edge_diagnostics=_compute_edge_diagnostics( + constraints, + weights=None, + geom=geom, + ), ) if converged_all: @@ -692,6 +742,11 @@ def _fit_power_weights_resolved( conflict=conflict, warnings=tuple(warnings_list), connectivity=connectivity, + edge_diagnostics=_compute_edge_diagnostics( + constraints, + weights=weights, + geom=geom, + ), ) @@ -729,6 +784,77 @@ def _predict_measurements( ) +def _compute_edge_diagnostics( + constraints: PairBisectorConstraints, + *, + weights: np.ndarray | None, + geom: _MeasurementGeometry | None = None, +) -> AlgebraicEdgeDiagnostics: + if geom is None: + geom = _measurement_geometry(constraints) + + alpha = np.asarray(geom.alpha, dtype=np.float64).copy() + beta = np.asarray(geom.beta, dtype=np.float64).copy() + z_obs = (geom.target - beta) / alpha + edge_weight = ( + np.asarray(constraints.confidence, dtype=np.float64) * (alpha * alpha) + ) + + if weights is None: + return AlgebraicEdgeDiagnostics( + alpha=alpha, + beta=beta, + z_obs=np.asarray(z_obs, dtype=np.float64), + z_fit=None, + residual=None, + edge_weight=np.asarray(edge_weight, dtype=np.float64), + weighted_l2=None, + weighted_rmse=None, + rmse=None, + mae=None, + ) + + w = np.asarray(weights, dtype=np.float64) + z_fit = w[constraints.i] - w[constraints.j] + residual = z_obs - z_fit + weighted_sq = edge_weight * residual * residual + if residual.size: + weighted_l2 = float(np.linalg.norm(np.sqrt(edge_weight) * residual)) + weighted_rmse = float(np.sqrt(np.mean(weighted_sq))) + rmse = float(np.sqrt(np.mean(residual * residual))) + mae = float(np.mean(np.abs(residual))) + else: + weighted_l2 = 0.0 + weighted_rmse = 0.0 + rmse = 0.0 + mae = 0.0 + + return AlgebraicEdgeDiagnostics( + alpha=alpha, + beta=beta, + z_obs=np.asarray(z_obs, dtype=np.float64), + z_fit=np.asarray(z_fit, dtype=np.float64), + residual=np.asarray(residual, dtype=np.float64), + edge_weight=np.asarray(edge_weight, dtype=np.float64), + weighted_l2=weighted_l2, + weighted_rmse=weighted_rmse, + rmse=rmse, + mae=mae, + ) + + +def _edge_diagnostics_for_result( + result: PowerWeightFitResult, + constraints: PairBisectorConstraints, +) -> AlgebraicEdgeDiagnostics: + if result.edge_diagnostics is not None: + return result.edge_diagnostics + return _compute_edge_diagnostics( + constraints, + weights=result.weights, + ) + + def _regularization_reference(reg: L2Regularization, n: int) -> np.ndarray: if reg.reference is None: return np.zeros(n, dtype=np.float64) @@ -1000,6 +1126,22 @@ def _apply_connectivity_policy( raise ValueError('unsupported connectivity policy') +def _hard_constraint_measurement_bounds( + feasible: HardConstraint | None, + n_constraints: int, +) -> tuple[np.ndarray, np.ndarray] | None: + if feasible is None: + return None + if isinstance(feasible, Interval): + lower = np.full(n_constraints, float(feasible.lower), dtype=np.float64) + upper = np.full(n_constraints, float(feasible.upper), dtype=np.float64) + return lower, upper + if isinstance(feasible, FixedValue): + lower = np.full(n_constraints, float(feasible.value), dtype=np.float64) + return lower, lower.copy() + raise TypeError(f'unsupported hard constraint: {type(feasible)!r}') + + def _hard_constraint_bounds( feasible: HardConstraint | None, alpha: np.ndarray, @@ -1227,6 +1369,54 @@ def _solve_component_analytic( return w +def _positive_confidence_connects_component( + n_c: int, + I: np.ndarray, + J: np.ndarray, + confidence: np.ndarray, +) -> bool: + mask = np.asarray(confidence, dtype=np.float64) > 0.0 + if not np.any(mask): + return False + comps = _connected_components(n_c, I[mask], J[mask]) + return len(comps) == 1 + + +def _admm_warm_start_weights( + I: np.ndarray, + J: np.ndarray, + alpha: np.ndarray, + beta: np.ndarray, + target: np.ndarray, + confidence: np.ndarray, + w0: np.ndarray, + *, + lambda_regularize: float, +) -> np.ndarray: + n_c = int(np.max(np.maximum(I, J))) + 1 + lam = float(lambda_regularize) + if lam > 0.0 or _positive_confidence_connects_component( + n_c, + I, + J, + confidence, + ): + try: + return _solve_component_analytic( + I, + J, + np.asarray(confidence, dtype=np.float64) * (alpha * alpha), + (target - beta) / alpha, + w0, + lam, + ) + except np.linalg.LinAlgError: + pass + if lam > 0.0: + return np.asarray(w0, dtype=np.float64).copy() + return np.zeros(n_c, dtype=np.float64) + + def _solve_component_admm( I: np.ndarray, J: np.ndarray, @@ -1242,26 +1432,27 @@ def _solve_component_admm( max_iter: int, tol_abs: float, tol_rel: float, - z_lo: np.ndarray | None, - z_hi: np.ndarray | None, + y_lo: np.ndarray | None, + y_hi: np.ndarray | None, ) -> tuple[np.ndarray, int, bool]: n_c = int(np.max(np.maximum(I, J))) + 1 m_c = I.shape[0] lam = float(lambda_regularize) - if lam > 0: + if lam > 0.0: anchor: int | None = None free = np.arange(n_c, dtype=np.int64) else: anchor = 0 free = np.arange(1, n_c, dtype=np.int64) + edge_scale = alpha * alpha L = np.zeros((n_c, n_c), dtype=np.float64) - for i, j in zip(I.tolist(), J.tolist()): - L[i, i] += 1.0 - L[j, j] += 1.0 - L[i, j] -= 1.0 - L[j, i] -= 1.0 + for i, j, scale in zip(I.tolist(), J.tolist(), edge_scale.tolist()): + L[i, i] += scale + L[j, j] += scale + L[i, j] -= scale + L[j, i] -= scale M = rho * L + lam * np.eye(n_c) Mf = M[np.ix_(free, free)] @@ -1292,67 +1483,78 @@ def solve_M(rhs_free: np.ndarray) -> np.ndarray: raise _NumericalFailure('ADMM linear solve produced non-finite values') return x - # Initialize at the target z implied by the chosen measurement. - z = (target - beta) / alpha - if z_lo is not None and z_hi is not None: - z = np.clip(z, z_lo, z_hi) + w = _admm_warm_start_weights( + I, + J, + alpha, + beta, + target, + confidence, + w0, + lambda_regularize=lam, + ) + if not np.all(np.isfinite(w)): + raise _NumericalFailure('ADMM warm start produced non-finite values') + y = beta + alpha * (w[I] - w[J]) + if y_lo is not None: + y = np.maximum(y, y_lo) + if y_hi is not None: + y = np.minimum(y, y_hi) u = np.zeros(m_c, dtype=np.float64) - w = np.zeros(n_c, dtype=np.float64) converged = False for _it in range(1, max_iter + 1): - y = z - u rhs = np.zeros(n_c, dtype=np.float64) - np.add.at(rhs, I, rho * y) - np.add.at(rhs, J, -rho * y) - if lam > 0: + edge_rhs = rho * alpha * (y - u - beta) + np.add.at(rhs, I, edge_rhs) + np.add.at(rhs, J, -edge_rhs) + if lam > 0.0: rhs += lam * w0 - rhs_free = rhs[free] - w_free = solve_M(rhs_free) + w_free = solve_M(rhs[free]) if anchor is not None: w[anchor] = 0.0 w[free] = w_free if not np.all(np.isfinite(w)): raise _NumericalFailure('ADMM primal iterate became non-finite') - v = (w[I] - w[J]) + u - z_prev = z.copy() - z = _prox_edge_objective( - v, - alpha, - beta, + predicted_y = beta + alpha * (w[I] - w[J]) + y_prev = y.copy() + y = _prox_measurement_objective( + predicted_y + u, target, confidence, model=model, rho=rho, - z_lo=z_lo, - z_hi=z_hi, + y_lo=y_lo, + y_hi=y_hi, ) - - Aw = w[I] - w[J] - r = Aw - z + r = predicted_y - y u = u + r if not ( - np.all(np.isfinite(z)) + np.all(np.isfinite(predicted_y)) + and np.all(np.isfinite(y)) and np.all(np.isfinite(r)) and np.all(np.isfinite(u)) - and np.all(np.isfinite(Aw)) ): raise _NumericalFailure('ADMM iterates became non-finite') r_norm = float(np.linalg.norm(r)) - z_norm = float(np.linalg.norm(z)) - Aw_norm = float(np.linalg.norm(Aw)) - eps_pri = np.sqrt(m_c) * tol_abs + tol_rel * max(Aw_norm, z_norm) + predicted_norm = float(np.linalg.norm(predicted_y)) + y_norm = float(np.linalg.norm(y)) + eps_pri = np.sqrt(m_c) * tol_abs + tol_rel * max(predicted_norm, y_norm) - dz = z - z_prev + dy = y - y_prev s_vec = np.zeros(n_c, dtype=np.float64) - np.add.at(s_vec, I, rho * dz) - np.add.at(s_vec, J, -rho * dz) + np.add.at(s_vec, I, rho * alpha * dy) + np.add.at(s_vec, J, -rho * alpha * dy) s_norm = float(np.linalg.norm(s_vec[free])) if free.size else 0.0 - u_norm = float(np.linalg.norm(u)) - eps_dual = np.sqrt(len(free)) * tol_abs + tol_rel * rho * u_norm + + dual_vec = np.zeros(n_c, dtype=np.float64) + np.add.at(dual_vec, I, rho * alpha * u) + np.add.at(dual_vec, J, -rho * alpha * u) + dual_norm = float(np.linalg.norm(dual_vec[free])) if free.size else 0.0 + eps_dual = np.sqrt(len(free)) * tol_abs + tol_rel * dual_norm if r_norm <= eps_pri and s_norm <= eps_dual: converged = True @@ -1361,32 +1563,64 @@ def solve_M(rhs_free: np.ndarray) -> np.ndarray: return w, _it, converged -def _prox_edge_objective( +def _prox_measurement_mismatch_only( + v: np.ndarray, + target: np.ndarray, + confidence: np.ndarray, + mismatch: SquaredLoss | HuberLoss, + rho: float, +) -> np.ndarray: + if isinstance(mismatch, SquaredLoss): + denom = rho + (2.0 * confidence) + return (rho * v + (2.0 * confidence * target)) / denom + if isinstance(mismatch, HuberLoss): + delta = float(mismatch.delta) + y_quad = (rho * v + confidence * target) / (rho + confidence) + lower = target - delta + upper = target + delta + y_lower = v + (confidence * delta) / rho + y_upper = v - (confidence * delta) / rho + return np.where( + y_quad < lower, + y_lower, + np.where(y_quad > upper, y_upper, y_quad), + ) + raise TypeError(f'unsupported mismatch: {type(mismatch)!r}') + + +def _prox_measurement_objective( v: np.ndarray, - alpha: np.ndarray, - beta: np.ndarray, target: np.ndarray, confidence: np.ndarray, *, model: FitModel, rho: float, - z_lo: np.ndarray | None, - z_hi: np.ndarray | None, + y_lo: np.ndarray | None, + y_hi: np.ndarray | None, ) -> np.ndarray: - z = v.copy() - if z_lo is not None and z_hi is not None: - z = np.clip(z, z_lo, z_hi) + y = _prox_measurement_mismatch_only( + v, + target, + confidence, + model.mismatch, + rho, + ) + if y_lo is not None: + y = np.maximum(y, y_lo) + if y_hi is not None: + y = np.minimum(y, y_hi) + if not model.penalties: + return y for _ in range(60): - y = beta + alpha * z fp_y, fpp_y = _mismatch_derivatives(y, target, confidence, model.mismatch) for penalty in model.penalties: p_fp_y, p_fpp_y = _penalty_derivatives(y, penalty) fp_y = fp_y + p_fp_y fpp_y = fpp_y + p_fpp_y - g = fp_y * alpha + rho * (z - v) - gp = fpp_y * (alpha**2) + rho + g = fp_y + rho * (y - v) + gp = fpp_y + rho if not np.all(np.isfinite(gp)) or np.any(np.abs(gp) < 1e-18): raise _NumericalFailure( 'prox Newton derivative became singular or non-finite' @@ -1394,14 +1628,16 @@ def _prox_edge_objective( step = g / gp if not np.all(np.isfinite(step)): raise _NumericalFailure('prox Newton step became non-finite') - z_new = z - step - if z_lo is not None and z_hi is not None: - z_new = np.clip(z_new, z_lo, z_hi) + y_new = y - step + if y_lo is not None: + y_new = np.maximum(y_new, y_lo) + if y_hi is not None: + y_new = np.minimum(y_new, y_hi) if float(np.max(np.abs(step))) < 1e-12: - z = z_new + y = y_new break - z = z_new - return z + y = y_new + return y def _mismatch_derivatives( diff --git a/tests/test_powerfit_fit.py b/tests/test_powerfit_fit.py index 43b6ef7..1ead0ba 100644 --- a/tests/test_powerfit_fit.py +++ b/tests/test_powerfit_fit.py @@ -14,6 +14,37 @@ def test_fit_power_weights_fraction_two_points_analytic(): assert res.status == 'optimal' +def test_fit_result_exposes_algebraic_edge_diagnostics(): + from pyvoro2 import fit_power_weights, resolve_pair_bisector_constraints + + pts = np.array([[0.0, 0.0, 0.0], [2.0, 0.0, 0.0]], dtype=float) + res = fit_power_weights(pts, [(0, 1, 0.25)], measurement='fraction') + + assert res.edge_diagnostics is not None + diag = res.edge_diagnostics + assert np.allclose(diag.alpha, np.array([0.125])) + assert np.allclose(diag.beta, np.array([0.5])) + assert np.allclose(diag.z_obs, np.array([-2.0])) + assert np.allclose(diag.z_fit, np.array([-2.0])) + assert np.allclose(diag.residual, np.array([0.0])) + assert np.allclose(diag.edge_weight, np.array([0.015625])) + assert np.isclose(diag.weighted_l2, 0.0) + assert np.isclose(diag.weighted_rmse, 0.0) + assert np.isclose(diag.rmse, 0.0) + assert np.isclose(diag.mae, 0.0) + + constraints = resolve_pair_bisector_constraints( + pts, + [(0, 1, 0.25)], + measurement='fraction', + ) + rows = res.to_records(constraints) + assert rows[0]['z_obs'] == pytest.approx(-2.0) + assert rows[0]['z_fit'] == pytest.approx(-2.0) + assert rows[0]['algebraic_residual'] == pytest.approx(0.0) + assert rows[0]['edge_weight'] == pytest.approx(0.015625) + + def test_fit_power_weights_fraction_allows_values_outside_segment(): from pyvoro2 import fit_power_weights @@ -162,7 +193,8 @@ def test_huber_loss_is_available_as_an_alternative_mismatch(): max_iter=5000, ) - assert res.status in ('optimal', 'max_iter') + assert res.status == 'optimal' + assert res.converged is True assert res.predicted is not None diff --git a/tests/test_powerfit_reports.py b/tests/test_powerfit_reports.py index bdb9a7f..a2e0489 100644 --- a/tests/test_powerfit_reports.py +++ b/tests/test_powerfit_reports.py @@ -177,6 +177,36 @@ def test_active_set_report_supports_planar_tessellation_diagnostics() -> None: assert report['tessellation_diagnostics']['ok_area'] is True +def test_fit_report_includes_edge_diagnostics_and_algebraic_rows(): + from pyvoro2 import ( + build_fit_report, + fit_power_weights, + resolve_pair_bisector_constraints, + ) + + pts = np.array([[0.0, 0.0, 0.0], [2.0, 0.0, 0.0]], dtype=float) + constraints = resolve_pair_bisector_constraints( + pts, + [(10, 20, 0.25)], + ids=[10, 20], + index_mode='id', + measurement='fraction', + ) + fit = fit_power_weights(pts, constraints) + + report = build_fit_report(fit, constraints, use_ids=True) + + assert report['edge_diagnostics']['z_obs'] == [-2.0] + assert report['edge_diagnostics']['z_fit'] == [-2.0] + assert report['edge_diagnostics']['residual'] == [0.0] + assert report['edge_diagnostics']['edge_weight'] == [0.015625] + assert report['fit_records'][0]['site_i'] == 10 + assert report['fit_records'][0]['z_obs'] == -2.0 + assert report['fit_records'][0]['z_fit'] == -2.0 + assert report['fit_records'][0]['algebraic_residual'] == 0.0 + assert report['fit_records'][0]['edge_weight'] == 0.015625 + + def test_fit_report_includes_connectivity_diagnostics(): from pyvoro2 import ( build_fit_report, diff --git a/tests/test_powerfit_validation_regressions.py b/tests/test_powerfit_validation_regressions.py index 2e0ec95..a2d3382 100644 --- a/tests/test_powerfit_validation_regressions.py +++ b/tests/test_powerfit_validation_regressions.py @@ -79,6 +79,90 @@ def test_zero_confidence_rows_do_not_join_effective_components(): assert np.allclose(res.weights[2], 0.0, atol=1e-12) +def test_huber_admm_handles_medium_size_sparse_outliers(): + from pyvoro2 import ( + FitModel, + HuberLoss, + fit_power_weights, + resolve_pair_bisector_constraints, + ) + + rng = np.random.default_rng(20260318) + grid = np.array( + [(x, y, z) for x in range(4) for y in range(3) for z in range(2)], + dtype=float, + ) + pts = (2.0 * grid) + (0.07 * rng.normal(size=grid.shape)) + true_weights = 0.08 * rng.normal(size=pts.shape[0]) + true_weights -= np.mean(true_weights) + + clean_rows: list[tuple[int, int, float]] = [] + for i in range(pts.shape[0]): + for j in range(i + 1, pts.shape[0]): + d2 = float(np.sum((pts[i] - pts[j]) ** 2)) + value = 0.5 + (true_weights[i] - true_weights[j]) / (2.0 * d2) + clean_rows.append((i, j, value)) + + noisy_rows = [list(row) for row in clean_rows] + outlier_count = max(1, int(round(0.05 * len(noisy_rows)))) + outlier_idx = rng.choice(len(noisy_rows), size=outlier_count, replace=False) + outlier_step = rng.choice([-0.08, 0.08], size=outlier_idx.size) + for idx, step in zip(outlier_idx, outlier_step, strict=True): + noisy_rows[int(idx)][2] += float(step) + + clean = resolve_pair_bisector_constraints( + pts, + clean_rows, + measurement='fraction', + ) + noisy = resolve_pair_bisector_constraints( + pts, + [tuple(row) for row in noisy_rows], + measurement='fraction', + ) + + squared = fit_power_weights( + pts, + noisy, + measurement='fraction', + solver='analytic', + ) + huber = fit_power_weights( + pts, + noisy, + measurement='fraction', + model=FitModel(mismatch=HuberLoss(delta=0.01)), + solver='admm', + max_iter=4000, + ) + + def _centered_weight_rmse(weights: np.ndarray) -> float: + centered = np.asarray(weights, dtype=float) - np.mean(weights) + target = true_weights - np.mean(true_weights) + return float(np.sqrt(np.mean((centered - target) ** 2))) + + def _clean_measurement_rmse(weights: np.ndarray) -> float: + diff = ( + np.asarray(weights, dtype=float)[clean.i] + - np.asarray(weights, dtype=float)[clean.j] + ) + predicted = 0.5 + diff / (2.0 * clean.distance2) + residual = predicted - clean.target_fraction + return float(np.sqrt(np.mean(residual * residual))) + + assert huber.status == 'optimal' + assert huber.converged is True + assert huber.n_iter < 200 + assert huber.edge_diagnostics is not None + assert np.isfinite(huber.edge_diagnostics.weighted_l2) + assert _centered_weight_rmse(huber.weights) < ( + 0.35 * _centered_weight_rmse(squared.weights) + ) + assert _clean_measurement_rmse(huber.weights) < ( + 0.35 * _clean_measurement_rmse(squared.weights) + ) + + def test_empty_resolved_constraints_use_regularization_only_solution(): from pyvoro2 import FitModel, L2Regularization, fit_power_weights from pyvoro2.powerfit.constraints import resolve_pair_bisector_constraints From 8b4d954a00d9b763a80cc1c17f8bed24354f7aa0 Mon Sep 17 00:00:00 2001 From: Ivan Chernyshov Date: Thu, 19 Mar 2026 19:57:58 +0300 Subject: [PATCH 2/2] Adds public powerfit problem export --- CHANGELOG.md | 21 + docs/guide/powerfit.md | 90 +- src/pyvoro2/__about__.py | 2 +- src/pyvoro2/powerfit/__init__.py | 22 +- src/pyvoro2/powerfit/active.py | 80 +- src/pyvoro2/powerfit/constraints.py | 63 + src/pyvoro2/powerfit/problem.py | 1114 ++++++++++++++ src/pyvoro2/powerfit/report.py | 34 +- src/pyvoro2/powerfit/solver.py | 1309 ++--------------- src/pyvoro2/powerfit/transforms.py | 52 + src/pyvoro2/powerfit/types.py | 314 ++++ tests/test_powerfit_active_set.py | 3 +- tests/test_powerfit_problem_api.py | 164 +++ tests/test_powerfit_validation_regressions.py | 2 +- 14 files changed, 2047 insertions(+), 1223 deletions(-) create mode 100644 src/pyvoro2/powerfit/problem.py create mode 100644 src/pyvoro2/powerfit/transforms.py create mode 100644 src/pyvoro2/powerfit/types.py create mode 100644 tests/test_powerfit_problem_api.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ff652a..f8c5598 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,27 @@ All notable changes to this project are documented in this file. The format is based on *Keep a Changelog*, and this project follows *Semantic Versioning*. +## [0.6.3] - 2026-03-19 + +### Added + +- Public power-fit problem export via `pyvoro2.powerfit.build_power_fit_problem(...)`, including stable `PowerFitProblem`, `PowerFitBounds`, and `PowerFitPredictions` objects for external evaluation or solver experiments. +- Public result packaging via `pyvoro2.powerfit.build_power_fit_result(...)`, so externally computed candidate weights can be turned back into a normal `PowerWeightFitResult` with standard residuals, reports, and algebraic diagnostics. +- `PowerFitObjectiveBreakdown` and `PowerWeightFitResult.objective_breakdown`, including explicit hard-bound satisfaction and per-penalty summaries for debugging or research workflows. +- A small advanced problem-API regression suite covering problem export, read-only arrays, result repackaging, and status-detail reporting. + +### Changed + +- Shared public power-fit dataclasses now live in `pyvoro2.powerfit.types`, while pure public weight/radius conversions live in `pyvoro2.powerfit.transforms`. +- Mathematical formulas for prediction, algebraic diagnostics, hard-bound conversion, gauge canonicalization, and objective evaluation are now centralized in `pyvoro2.powerfit.problem` instead of being split across solver/report helpers. +- The standalone native solver, active-set layer, and report layer now consume that shared problem/evaluation machinery instead of maintaining their own private copies of the same formulas. +- `PairBisectorConstraints` and the new problem-definition objects now expose read-only NumPy arrays to match their frozen-dataclass semantics more honestly. +- `PowerWeightFitResult.status` is now open-ended and pairs with `status_detail` so externally packaged results can preserve solver-specific termination text without expanding the native status vocabulary. + +### Fixed + +- Internal formula drift risks between low-level fitting, active-set post-processing, and report serialization were removed by routing them all through one public problem/evaluation layer. + ## [0.6.2] - 2026-03-19 ### Added diff --git a/docs/guide/powerfit.md b/docs/guide/powerfit.md index afb900c..866ace0 100644 --- a/docs/guide/powerfit.md +++ b/docs/guide/powerfit.md @@ -124,7 +124,8 @@ The result contains: - predicted separator locations in both fraction and position form, - residuals in the chosen measurement space, - `edge_diagnostics` with algebraic edge-space quantities such as `z_obs`, `z_fit`, and weighted inconsistency summaries, -- solver/termination metadata, +- `objective_breakdown` with mismatch / penalty / regularization totals for the packaged candidate weights, +- solver/termination metadata including optional `status_detail`, - and explicit infeasibility reporting for contradictory hard constraints. For example, if hard interval or equality restrictions cannot all hold @@ -142,7 +143,47 @@ Both low-level fits and active-set results also provide `to_records(...)` helper that turn per-constraint diagnostics into plain Python rows for downstream packages, table exporters, or custom reporting. -For radii output, 0.6.2 makes the gauge choice explicit: + +### Measurement-space vs algebraic edge-space diagnostics + +`PowerWeightFitResult` now exposes two complementary diagnostic views. + +Measurement-space quantities live in the same space as the chosen separator +targets: + +- `target`, `predicted`, `residuals` + +Algebraic edge-space quantities live in the implied difference model + +\[ + y = \beta + \alpha (w_i - w_j), +\] + +with + +\[ + z_{\mathrm{obs}} = \frac{y_{\mathrm{target}} - \beta}{\alpha}, + \qquad + z_{\mathrm{fit}} = w_i - w_j. +\] + +The edge diagnostics expose `alpha`, `beta`, `z_obs`, `z_fit`, the algebraic +residual `z_obs - z_fit`, and edge weights + +\[ + \omega = \mathrm{confidence} \cdot \alpha^2. +\] + +The exported `weighted_rmse` is defined explicitly as + +\[ + \sqrt{\mathrm{mean}(\omega r^2)}, +\] + +not as `sqrt(sum(w r^2) / sum(w))`. That distinction matters when you compare +results to other code that uses a normalized weighted RMSE convention. + +For radii output, 0.6.3 makes the gauge choice explicit: - by default, `weights_to_radii(...)` uses the minimal additive shift that makes all returned radii non-negative; @@ -328,6 +369,45 @@ text = pv.dumps_report_json(solve_report, sort_keys=True) pv.write_report_json(solve_report, 'solve_report.json', sort_keys=True) ``` +## Native Huber fit on sparse outliers + +A short robust-fitting example looks like this: + +```python +model = pv.FitModel(mismatch=pv.HuberLoss(delta=0.03)) +fit = pv.fit_power_weights(points, constraints, model=model, solver='admm') + +print(fit.status, fit.rms_residual) +print(fit.edge_diagnostics.weighted_rmse) +``` + +This is still the native `pyvoro2` solver path. The robust part comes from the +measurement-space Huber objective, while `edge_diagnostics` lets you inspect +the theorem-facing algebraic residuals directly. + +## Advanced problem export / repackaging + +For research workflows or external solvers, `pyvoro2` now exposes the resolved +inverse problem itself: + +```python +resolved = pv.powerfit.resolve_pair_bisector_constraints(points, raw_constraints) +problem = pv.powerfit.build_power_fit_problem(resolved, model=model) + +weights = some_external_solver(problem) +result = pv.powerfit.build_power_fit_result( + problem, + weights, + solver='external', + status='external_failure', + status_detail='candidate iterate only', +) +``` + +This keeps `fit_power_weights(...)` solver-owned while still giving downstream +code a stable public export of the mathematics, prediction formulas, objective +evaluation, and result packaging. + ## Current scope The current implementation supports both **3D** domains through `pyvoro2` and @@ -342,9 +422,9 @@ The main current restriction is geometric, not algebraic: - 2D currently supports `Box` and rectangular `RectangularCell`; - there is **no** planar oblique-periodic `PeriodicCell` yet. -### Objective-model scope for 0.6.2 +### Objective-model scope for 0.6.3 -The 0.6.2 line still keeps the built-in objective family compact: +The 0.6.3 line still keeps the built-in objective family compact: - mismatch terms: `SquaredLoss`, `HuberLoss` - hard feasibility: `Interval`, `FixedValue` @@ -357,7 +437,7 @@ hard-feasibility checks, residual diagnostics, and solver behavior easy to reason about. Additional mismatch or penalty families should wait until downstream packages -validate a concrete need for them. In particular, 0.6.1 does **not** try to +validate a concrete need for them. In particular, 0.6.3 still does **not** try to freeze an open-ended callback API for arbitrary user-defined objectives. ## Worked example notebooks diff --git a/src/pyvoro2/__about__.py b/src/pyvoro2/__about__.py index a12635e..31b0a08 100644 --- a/src/pyvoro2/__about__.py +++ b/src/pyvoro2/__about__.py @@ -5,4 +5,4 @@ """ # Keep this as a simple assignment so scikit-build-core can extract it via regex. -__version__ = '0.6.2' +__version__ = '0.6.3' diff --git a/src/pyvoro2/powerfit/__init__.py b/src/pyvoro2/powerfit/__init__.py index c057923..3d3674a 100644 --- a/src/pyvoro2/powerfit/__init__.py +++ b/src/pyvoro2/powerfit/__init__.py @@ -22,6 +22,11 @@ SelfConsistentPowerFitResult, solve_self_consistent_power_weights, ) +from .problem import ( + PowerFitProblem, + build_power_fit_problem, + build_power_fit_result, +) from .realize import ( RealizedPairDiagnostics, UnaccountedRealizedPair, @@ -35,17 +40,18 @@ dumps_report_json, write_report_json, ) -from .solver import ( +from .solver import ConnectivityDiagnosticsError, fit_power_weights +from .transforms import radii_to_weights, weights_to_radii +from .types import ( AlgebraicEdgeDiagnostics, ConnectivityDiagnostics, - ConnectivityDiagnosticsError, ConstraintGraphDiagnostics, HardConstraintConflict, HardConstraintConflictTerm, + PowerFitBounds, + PowerFitObjectiveBreakdown, + PowerFitPredictions, PowerWeightFitResult, - fit_power_weights, - radii_to_weights, - weights_to_radii, ) __all__ = [ @@ -66,7 +72,13 @@ 'ConnectivityDiagnosticsError', 'HardConstraintConflictTerm', 'HardConstraintConflict', + 'PowerFitBounds', + 'PowerFitPredictions', + 'PowerFitObjectiveBreakdown', + 'PowerFitProblem', 'PowerWeightFitResult', + 'build_power_fit_problem', + 'build_power_fit_result', 'RealizedPairDiagnostics', 'UnaccountedRealizedPair', 'UnaccountedRealizedPairError', diff --git a/src/pyvoro2/powerfit/active.py b/src/pyvoro2/powerfit/active.py index 5b667c8..2f9a4ca 100644 --- a/src/pyvoro2/powerfit/active.py +++ b/src/pyvoro2/powerfit/active.py @@ -2,7 +2,7 @@ from __future__ import annotations -from dataclasses import dataclass, replace +from dataclasses import dataclass from typing import Literal import numpy as np @@ -10,18 +10,18 @@ from .constraints import PairBisectorConstraints, resolve_pair_bisector_constraints from .model import FitModel from .realize import RealizedPairDiagnostics, match_realized_pairs +from .problem import ( + _build_active_set_connectivity_diagnostics, + build_power_fit_problem, + build_power_fit_result, +) from .solver import ( ConnectivityDiagnostics, PowerWeightFitResult, _apply_connectivity_policy, - _build_active_set_connectivity_diagnostics, - _compute_edge_diagnostics, - _connected_components, - _difference_identifying_mask, - _predict_measurements, fit_power_weights, - weights_to_radii, ) +from .transforms import weights_to_radii from ..diagnostics import TessellationDiagnostics as TessellationDiagnostics3D from ..domains import Box as Box3D, OrthorhombicCell, PeriodicCell from ..planar.diagnostics import TessellationDiagnostics as TessellationDiagnostics2D @@ -324,6 +324,7 @@ def solve_self_consistent_power_weights( raise ValueError('active0 must have shape (m,)') warnings_list = list(resolved.warnings) + full_problem = build_power_fit_problem(resolved, model=model) add_streak = np.zeros(m, dtype=np.int64) drop_streak = np.zeros(m, dtype=np.int64) toggle_count = np.zeros(m, dtype=np.int64) @@ -510,10 +511,10 @@ def solve_self_consistent_power_weights( n_added = int(np.count_nonzero((~active) & new_active)) n_removed = int(np.count_nonzero(active & (~new_active))) - pred_fraction, pred_position, pred = _predict_measurements( - weights_eval, - resolved, - ) + pred_all = full_problem.predict(weights_eval) + pred_fraction = np.asarray(pred_all.fraction, dtype=np.float64) + pred_position = np.asarray(pred_all.position, dtype=np.float64) + pred = np.asarray(pred_all.measurement, dtype=np.float64) target = ( resolved.target_fraction if resolved.measurement == 'fraction' @@ -617,6 +618,7 @@ def solve_self_consistent_power_weights( final_fit, active_constraints, final_weights, + model=model, r_min=r_min, weight_shift=weight_shift, ) @@ -632,10 +634,10 @@ def solve_self_consistent_power_weights( unaccounted_pair_check=unaccounted_pair_check, ) warnings_list.extend(final_realized.warnings) - pred_fraction, pred_position, pred = _predict_measurements( - final_fit.weights, - resolved, - ) + pred_all = full_problem.predict(final_fit.weights) + pred_fraction = np.asarray(pred_all.fraction, dtype=np.float64) + pred_position = np.asarray(pred_all.position, dtype=np.float64) + pred = np.asarray(pred_all.measurement, dtype=np.float64) else: final_realized = last_diag pred_fraction = np.full(m, np.nan, dtype=np.float64) @@ -755,12 +757,11 @@ def _active_alignment_components( constraints: PairBisectorConstraints, model: FitModel, ) -> list[list[int]]: - effective_mask = _difference_identifying_mask(constraints, model) - return _connected_components( - constraints.n_points, - constraints.i[effective_mask], - constraints.j[effective_mask], - ) + problem = build_power_fit_problem(constraints, model=model) + return [ + list(component) + for component in problem.connectivity.effective_graph.connected_components + ] def _self_consistent_gauge_policy_description() -> str: @@ -864,39 +865,24 @@ def _rebuild_fit_with_weights( constraints: PairBisectorConstraints, weights: np.ndarray, *, + model: FitModel, r_min: float, weight_shift: float | None, ) -> PowerWeightFitResult: - radii, shift = weights_to_radii( + problem = build_power_fit_problem(constraints, model=model) + return build_power_fit_result( + problem, weights, + solver=fit.solver, + status=fit.status, + status_detail=fit.status_detail, + converged=fit.converged, + n_iter=fit.n_iter, + warnings=fit.warnings, + canonicalize_gauge=False, r_min=r_min, weight_shift=weight_shift, ) - pred_fraction, pred_position, pred = _predict_measurements(weights, constraints) - target = ( - constraints.target_fraction - if constraints.measurement == 'fraction' - else constraints.target_position - ) - residuals = pred - target - rms = float(np.sqrt(np.mean(residuals * residuals))) if residuals.size else 0.0 - mx = float(np.max(np.abs(residuals))) if residuals.size else 0.0 - return replace( - fit, - weights=np.asarray(weights, dtype=np.float64).copy(), - radii=radii, - weight_shift=shift, - predicted=pred, - predicted_fraction=pred_fraction, - predicted_position=pred_position, - residuals=residuals, - rms_residual=rms, - max_residual=mx, - edge_diagnostics=_compute_edge_diagnostics( - constraints, - weights=weights, - ), - ) def _empty_realized_pair_diagnostics( diff --git a/src/pyvoro2/powerfit/constraints.py b/src/pyvoro2/powerfit/constraints.py index ddad5f9..fae571c 100644 --- a/src/pyvoro2/powerfit/constraints.py +++ b/src/pyvoro2/powerfit/constraints.py @@ -23,6 +23,18 @@ def _plain_value(value: object) -> object: return value.item() if hasattr(value, 'item') else value +def _readonly_array( + value: np.ndarray | None, + *, + dtype: np.dtype | type | None = None, +) -> np.ndarray | None: + if value is None: + return None + arr = np.array(value, dtype=dtype, copy=True) + arr.setflags(write=False) + return arr + + def _validated_ids_array(ids: Sequence[int] | np.ndarray, n_points: int) -> np.ndarray: """Return validated external ids as a 1D NumPy array. @@ -68,6 +80,57 @@ class PairBisectorConstraints: warnings: tuple[str, ...] def __post_init__(self) -> None: + object.__setattr__(self, 'i', _readonly_array(self.i, dtype=np.int64)) + object.__setattr__(self, 'j', _readonly_array(self.j, dtype=np.int64)) + object.__setattr__( + self, + 'shifts', + _readonly_array(self.shifts, dtype=np.int64), + ) + object.__setattr__( + self, + 'target', + _readonly_array(self.target, dtype=np.float64), + ) + object.__setattr__( + self, + 'confidence', + _readonly_array(self.confidence, dtype=np.float64), + ) + object.__setattr__( + self, + 'distance', + _readonly_array(self.distance, dtype=np.float64), + ) + object.__setattr__( + self, + 'distance2', + _readonly_array(self.distance2, dtype=np.float64), + ) + object.__setattr__(self, 'delta', _readonly_array(self.delta, dtype=np.float64)) + object.__setattr__( + self, + 'target_fraction', + _readonly_array(self.target_fraction, dtype=np.float64), + ) + object.__setattr__( + self, + 'target_position', + _readonly_array(self.target_position, dtype=np.float64), + ) + object.__setattr__( + self, + 'input_index', + _readonly_array(self.input_index, dtype=np.int64), + ) + object.__setattr__( + self, + 'explicit_shift', + _readonly_array(self.explicit_shift, dtype=bool), + ) + object.__setattr__(self, 'ids', _readonly_array(self.ids)) + object.__setattr__(self, 'warnings', tuple(self.warnings)) + m = int(self.i.shape[0]) if self.i.shape != (m,) or self.j.shape != (m,): raise ValueError('PairBisectorConstraints.i/j must have shape (m,)') diff --git a/src/pyvoro2/powerfit/problem.py b/src/pyvoro2/powerfit/problem.py new file mode 100644 index 0000000..887bc24 --- /dev/null +++ b/src/pyvoro2/powerfit/problem.py @@ -0,0 +1,1114 @@ +"""Public power-fit problem construction, evaluation, and result packaging.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Literal + +import numpy as np + +from .constraints import PairBisectorConstraints +from .model import ( + ExponentialBoundaryPenalty, + FitModel, + FixedValue, + HardConstraint, + HuberLoss, + Interval, + L2Regularization, + ReciprocalBoundaryPenalty, + SoftIntervalPenalty, + SquaredLoss, +) +from .transforms import weights_to_radii +from .types import ( + AlgebraicEdgeDiagnostics, + ConnectivityDiagnostics, + ConstraintGraphDiagnostics, + HardConstraintConflict, + HardConstraintConflictTerm, + PowerFitBounds, + PowerFitObjectiveBreakdown, + PowerFitPredictions, + PowerWeightFitResult, + _readonly_array, +) + + +@dataclass(frozen=True, slots=True) +class _MeasurementGeometry: + alpha: np.ndarray + beta: np.ndarray + target: np.ndarray + target_fraction: np.ndarray + target_position: np.ndarray + + +@dataclass(frozen=True, slots=True) +class _DifferenceEdge: + source: int + target: int + weight: float + constraint_index: int + site_i: int + site_j: int + relation: Literal['<=', '>='] + bound_value: float + + +@dataclass(frozen=True, slots=True) +class PowerFitProblem: + """Stable public export of the resolved inverse power-fit problem.""" + + constraints: PairBisectorConstraints + model: FitModel + alpha: np.ndarray + beta: np.ndarray + z_obs: np.ndarray + edge_weight: np.ndarray + regularization_strength: float + regularization_reference: np.ndarray + offset_identifying_constraint_mask: np.ndarray + bounds: PowerFitBounds + connectivity: ConnectivityDiagnostics + hard_feasible: bool + hard_conflict: HardConstraintConflict | None + + def __post_init__(self) -> None: + m = int(self.constraints.n_constraints) + n = int(self.constraints.n_points) + object.__setattr__(self, 'alpha', _readonly_array(self.alpha, dtype=np.float64)) + object.__setattr__(self, 'beta', _readonly_array(self.beta, dtype=np.float64)) + object.__setattr__(self, 'z_obs', _readonly_array(self.z_obs, dtype=np.float64)) + object.__setattr__( + self, + 'edge_weight', + _readonly_array(self.edge_weight, dtype=np.float64), + ) + object.__setattr__( + self, + 'regularization_reference', + _readonly_array(self.regularization_reference, dtype=np.float64), + ) + object.__setattr__( + self, + 'offset_identifying_constraint_mask', + _readonly_array(self.offset_identifying_constraint_mask, dtype=bool), + ) + if self.alpha.shape != (m,): + raise ValueError('alpha must have shape (m,)') + if self.beta.shape != (m,): + raise ValueError('beta must have shape (m,)') + if self.z_obs.shape != (m,): + raise ValueError('z_obs must have shape (m,)') + if self.edge_weight.shape != (m,): + raise ValueError('edge_weight must have shape (m,)') + if self.regularization_reference.shape != (n,): + raise ValueError('regularization_reference must have shape (n_points,)') + if self.offset_identifying_constraint_mask.shape != (m,): + raise ValueError( + 'offset_identifying_constraint_mask must have shape (m,)' + ) + + @property + def measurement(self) -> str: + return self.constraints.measurement + + @property + def measurement_target(self) -> np.ndarray: + target = ( + self.constraints.target_fraction + if self.constraints.measurement == 'fraction' + else self.constraints.target_position + ) + return np.asarray(target, dtype=np.float64) + + @property + def confidence(self) -> np.ndarray: + return np.asarray(self.constraints.confidence, dtype=np.float64) + + @property + def suggested_anchor_indices(self) -> tuple[int, ...]: + if self.connectivity.offsets_identified_in_objective: + return tuple() + return tuple( + int(component[0]) + for component in self.connectivity.effective_graph.connected_components + if component + ) + + def predict(self, weights: np.ndarray) -> PowerFitPredictions: + w = _validated_weight_vector(self, weights) + return _predict_all(self, w) + + def predict_difference(self, weights: np.ndarray) -> np.ndarray: + return np.asarray(self.predict(weights).difference, dtype=np.float64) + + def predict_fraction(self, weights: np.ndarray) -> np.ndarray: + return np.asarray(self.predict(weights).fraction, dtype=np.float64) + + def predict_position(self, weights: np.ndarray) -> np.ndarray: + return np.asarray(self.predict(weights).position, dtype=np.float64) + + def predict_measurement(self, weights: np.ndarray) -> np.ndarray: + return np.asarray(self.predict(weights).measurement, dtype=np.float64) + + def edge_diagnostics(self, weights: np.ndarray) -> AlgebraicEdgeDiagnostics: + w = _validated_weight_vector(self, weights) + predictions = _predict_all(self, w) + return _compute_edge_diagnostics( + self.constraints, + weights=w, + predictions=predictions, + ) + + def objective_breakdown( + self, + weights: np.ndarray, + ) -> PowerFitObjectiveBreakdown: + w = _validated_weight_vector(self, weights) + predictions = _predict_all(self, w) + return _objective_breakdown(self, predictions, w) + + def evaluate_objective(self, weights: np.ndarray) -> float: + parts = self.objective_breakdown(weights) + if not parts.hard_constraints_satisfied: + return float('inf') + return float(parts.total) + + def canonicalize_gauge(self, weights: np.ndarray) -> np.ndarray: + """Apply the standalone component gauge convention to candidate weights.""" + + w = _validated_weight_vector(self, weights) + if self.connectivity.offsets_identified_in_objective: + return w.copy() + comps = [ + list(component) + for component in self.connectivity.effective_graph.connected_components + ] + return _apply_component_mean_gauge( + w, + comps, + reference=( + None + if self.model.regularization.reference is None + else self.regularization_reference + ), + ) + + +def build_power_fit_problem( + constraints: PairBisectorConstraints, + *, + model: FitModel | None = None, +) -> PowerFitProblem: + """Build a stable public power-fit problem from resolved constraints.""" + + if model is None: + model = FitModel() + geom = _measurement_geometry(constraints) + reg_ref = _regularization_reference(model.regularization, constraints.n_points) + hard_diff = _hard_constraint_bounds(model.feasible, geom.alpha, geom.beta) + hard_measurement = _hard_constraint_measurement_bounds( + model.feasible, + constraints.n_constraints, + ) + bounds = PowerFitBounds( + measurement_lower=None if hard_measurement is None else hard_measurement[0], + measurement_upper=None if hard_measurement is None else hard_measurement[1], + difference_lower=None if hard_diff is None else hard_diff[0], + difference_upper=None if hard_diff is None else hard_diff[1], + ) + hard_feasible = True + conflict = None + if hard_diff is not None: + hard_feasible, conflict = _check_hard_feasibility( + int(constraints.n_points), + constraints.i, + constraints.j, + hard_diff[0], + hard_diff[1], + ) + connectivity = _build_fit_connectivity_diagnostics( + constraints, + model=model, + gauge_policy=_standalone_gauge_policy_description(model.regularization), + ) + alpha = np.asarray(geom.alpha, dtype=np.float64) + beta = np.asarray(geom.beta, dtype=np.float64) + target = np.asarray(geom.target, dtype=np.float64) + z_obs = (target - beta) / alpha + edge_weight = np.asarray(constraints.confidence, dtype=np.float64) * (alpha * alpha) + return PowerFitProblem( + constraints=constraints, + model=model, + alpha=alpha, + beta=beta, + z_obs=z_obs, + edge_weight=edge_weight, + regularization_strength=float(model.regularization.strength), + regularization_reference=reg_ref, + offset_identifying_constraint_mask=_offset_identifying_constraint_mask( + constraints, + model, + ), + bounds=bounds, + connectivity=connectivity, + hard_feasible=bool(hard_feasible), + hard_conflict=conflict, + ) + + +def build_power_fit_result( + problem: PowerFitProblem, + weights: np.ndarray, + *, + solver: str = 'external', + status: str = 'optimal', + status_detail: str | None = None, + converged: bool = True, + n_iter: int = 0, + warnings: tuple[str, ...] = (), + canonicalize_gauge: bool = True, + r_min: float = 0.0, + weight_shift: float | None = None, +) -> PowerWeightFitResult: + """Package candidate weights into a standard power-fit result object.""" + + w = _validated_weight_vector(problem, weights) + if canonicalize_gauge: + w = problem.canonicalize_gauge(w) + predictions = _predict_all(problem, w) + residuals = np.asarray( + predictions.measurement - problem.measurement_target, + dtype=np.float64, + ) + edge_diagnostics = _compute_edge_diagnostics( + problem.constraints, + weights=w, + predictions=predictions, + ) + objective_breakdown = _objective_breakdown(problem, predictions, w) + warnings_list = list(warnings) + if not objective_breakdown.hard_constraints_satisfied: + warnings_list.append( + 'candidate weights violate hard measurement bounds' + ) + radii, shift = weights_to_radii( + w, + r_min=r_min, + weight_shift=weight_shift, + ) + rms = float(np.sqrt(np.mean(residuals * residuals))) if residuals.size else 0.0 + mx = float(np.max(np.abs(residuals))) if residuals.size else 0.0 + return PowerWeightFitResult( + status=status, + status_detail=status_detail, + hard_feasible=bool(problem.hard_feasible), + weights=np.asarray(w, dtype=np.float64), + radii=radii, + weight_shift=shift, + measurement=problem.constraints.measurement, + target=np.asarray(problem.measurement_target, dtype=np.float64), + predicted=np.asarray(predictions.measurement, dtype=np.float64), + predicted_fraction=np.asarray(predictions.fraction, dtype=np.float64), + predicted_position=np.asarray(predictions.position, dtype=np.float64), + residuals=residuals, + rms_residual=rms, + max_residual=mx, + used_shifts=np.asarray(problem.constraints.shifts), + solver=solver, + n_iter=int(n_iter), + converged=bool(converged), + conflict=problem.hard_conflict, + warnings=tuple(warnings_list), + connectivity=problem.connectivity, + edge_diagnostics=edge_diagnostics, + objective_breakdown=objective_breakdown, + ) + + +def _validated_weight_vector( + problem: PowerFitProblem, + weights: np.ndarray, +) -> np.ndarray: + w = np.asarray(weights, dtype=float) + if w.ndim != 1 or w.shape != (int(problem.constraints.n_points),): + raise ValueError('weights must have shape (n_points,)') + if not np.all(np.isfinite(w)): + raise ValueError('weights must contain only finite values') + return np.asarray(w, dtype=np.float64) + + +def _measurement_geometry(constraints: PairBisectorConstraints) -> _MeasurementGeometry: + d = constraints.distance + d2 = constraints.distance2 + if constraints.measurement == 'fraction': + alpha = 1.0 / (2.0 * d2) + beta = np.full_like(alpha, 0.5) + target = constraints.target_fraction + else: + alpha = 1.0 / (2.0 * d) + beta = 0.5 * d + target = constraints.target_position + return _MeasurementGeometry( + alpha=np.asarray(alpha, dtype=np.float64), + beta=np.asarray(beta, dtype=np.float64), + target=np.asarray(target, dtype=np.float64), + target_fraction=np.asarray(constraints.target_fraction, dtype=np.float64), + target_position=np.asarray(constraints.target_position, dtype=np.float64), + ) + + +def _predict_all( + problem: PowerFitProblem, + weights: np.ndarray, +) -> PowerFitPredictions: + z_pred = weights[problem.constraints.i] - weights[problem.constraints.j] + fraction = 0.5 + z_pred / (2.0 * problem.constraints.distance2) + position = problem.constraints.distance * fraction + measurement = ( + fraction if problem.constraints.measurement == 'fraction' else position + ) + return PowerFitPredictions( + difference=np.asarray(z_pred, dtype=np.float64), + fraction=np.asarray(fraction, dtype=np.float64), + position=np.asarray(position, dtype=np.float64), + measurement=np.asarray(measurement, dtype=np.float64), + ) + + +def _compute_edge_diagnostics( + constraints: PairBisectorConstraints, + *, + weights: np.ndarray | None, + predictions: PowerFitPredictions | None = None, + geom: _MeasurementGeometry | None = None, +) -> AlgebraicEdgeDiagnostics: + if geom is None: + geom = _measurement_geometry(constraints) + alpha = np.asarray(geom.alpha, dtype=np.float64) + beta = np.asarray(geom.beta, dtype=np.float64) + target = np.asarray(geom.target, dtype=np.float64) + z_obs = (target - beta) / alpha + edge_weight = np.asarray(constraints.confidence, dtype=np.float64) * (alpha * alpha) + if weights is None: + return AlgebraicEdgeDiagnostics( + alpha=alpha, + beta=beta, + z_obs=z_obs, + z_fit=None, + residual=None, + edge_weight=edge_weight, + weighted_l2=None, + weighted_rmse=None, + rmse=None, + mae=None, + ) + if predictions is None: + z_fit = np.asarray( + weights[constraints.i] - weights[constraints.j], + dtype=np.float64, + ) + else: + z_fit = np.asarray(predictions.difference, dtype=np.float64) + residual = z_obs - z_fit + weighted_sq = edge_weight * residual * residual + if residual.size: + weighted_l2 = float(np.linalg.norm(np.sqrt(edge_weight) * residual)) + weighted_rmse = float(np.sqrt(np.mean(weighted_sq))) + rmse = float(np.sqrt(np.mean(residual * residual))) + mae = float(np.mean(np.abs(residual))) + else: + weighted_l2 = 0.0 + weighted_rmse = 0.0 + rmse = 0.0 + mae = 0.0 + return AlgebraicEdgeDiagnostics( + alpha=alpha, + beta=beta, + z_obs=z_obs, + z_fit=z_fit, + residual=np.asarray(residual, dtype=np.float64), + edge_weight=edge_weight, + weighted_l2=weighted_l2, + weighted_rmse=weighted_rmse, + rmse=rmse, + mae=mae, + ) + + +def _edge_diagnostics_for_result( + result: PowerWeightFitResult, + constraints: PairBisectorConstraints, +) -> AlgebraicEdgeDiagnostics: + if result.edge_diagnostics is not None: + return result.edge_diagnostics + return _compute_edge_diagnostics(constraints, weights=result.weights) + + +def _mismatch_values( + measurement: np.ndarray, + target: np.ndarray, + confidence: np.ndarray, + mismatch: SquaredLoss | HuberLoss, +) -> np.ndarray: + residual = measurement - target + if isinstance(mismatch, SquaredLoss): + return confidence * residual * residual + if isinstance(mismatch, HuberLoss): + delta = float(mismatch.delta) + abs_r = np.abs(residual) + return confidence * np.where( + abs_r <= delta, + 0.5 * residual * residual, + delta * (abs_r - 0.5 * delta), + ) + raise TypeError(f'unsupported mismatch: {type(mismatch)!r}') + + +def _penalty_values( + measurement: np.ndarray, + penalty: SoftIntervalPenalty + | ExponentialBoundaryPenalty + | ReciprocalBoundaryPenalty, +) -> np.ndarray: + y = np.asarray(measurement, dtype=np.float64) + if isinstance(penalty, SoftIntervalPenalty): + out = np.zeros_like(y) + lo_mask = y < float(penalty.lower) + hi_mask = y > float(penalty.upper) + if np.any(lo_mask): + out[lo_mask] = float(penalty.strength) * ( + y[lo_mask] - float(penalty.lower) + ) ** 2 + if np.any(hi_mask): + out[hi_mask] = float(penalty.strength) * ( + y[hi_mask] - float(penalty.upper) + ) ** 2 + return out + if isinstance(penalty, ExponentialBoundaryPenalty): + left = float(penalty.lower) + float(penalty.margin) + right = float(penalty.upper) - float(penalty.margin) + tau = float(penalty.tau) + strength = float(penalty.strength) + A = np.exp((left - y) / tau) + B = np.exp((y - right) / tau) + return strength * (A + B) + if isinstance(penalty, ReciprocalBoundaryPenalty): + lower = float(penalty.lower) + upper = float(penalty.upper) + left = lower + float(penalty.margin) + right = upper - float(penalty.margin) + eps = float(penalty.epsilon) + strength = float(penalty.strength) + out = np.zeros_like(y) + lo_mask = y < left + hi_mask = y > right + if np.any(lo_mask): + denom = np.maximum(y[lo_mask] - lower, eps) + base = max(left - lower, eps) + out[lo_mask] = strength * ((1.0 / denom) - (1.0 / base)) + if np.any(hi_mask): + denom = np.maximum(upper - y[hi_mask], eps) + base = max(upper - right, eps) + out[hi_mask] = strength * ((1.0 / denom) - (1.0 / base)) + return out + raise TypeError(f'unsupported penalty: {type(penalty)!r}') + + +def _hard_constraint_status( + problem: PowerFitProblem, + predictions: PowerFitPredictions, +) -> tuple[bool, float]: + lower = problem.bounds.measurement_lower + upper = problem.bounds.measurement_upper + if lower is None or upper is None: + return True, 0.0 + y = np.asarray(predictions.measurement, dtype=np.float64) + lo_violation = np.maximum(lower - y, 0.0) + hi_violation = np.maximum(y - upper, 0.0) + violation = np.maximum(lo_violation, hi_violation) + if violation.size == 0: + return True, 0.0 + max_violation = float(np.max(violation)) + return max_violation <= 1e-12, max_violation + + +def _objective_breakdown( + problem: PowerFitProblem, + predictions: PowerFitPredictions, + weights: np.ndarray, +) -> PowerFitObjectiveBreakdown: + target = np.asarray(problem.measurement_target, dtype=np.float64) + confidence = np.asarray(problem.constraints.confidence, dtype=np.float64) + measurement = np.asarray(predictions.measurement, dtype=np.float64) + mismatch = float( + np.sum( + _mismatch_values( + measurement, + target, + confidence, + problem.model.mismatch, + ) + ) + ) + penalty_terms_list: list[tuple[str, float]] = [] + penalties_total = 0.0 + for penalty in problem.model.penalties: + value = float(np.sum(_penalty_values(measurement, penalty))) + penalty_terms_list.append((type(penalty).__name__, value)) + penalties_total += value + reg = problem.regularization_strength * float( + np.sum((weights - problem.regularization_reference) ** 2) + ) + hard_satisfied, hard_max_violation = _hard_constraint_status(problem, predictions) + total = mismatch + penalties_total + reg + return PowerFitObjectiveBreakdown( + total=float(total), + mismatch=float(mismatch), + penalties_total=float(penalties_total), + penalty_terms=tuple(penalty_terms_list), + regularization=float(reg), + hard_constraints_satisfied=bool(hard_satisfied), + hard_max_violation=float(hard_max_violation), + ) + + +def _regularization_reference(reg: L2Regularization, n: int) -> np.ndarray: + if reg.reference is None: + return np.zeros(n, dtype=np.float64) + w0 = np.asarray(reg.reference, dtype=float) + if w0.shape != (n,): + raise ValueError('regularization.reference must have shape (n,)') + return np.asarray(w0, dtype=np.float64) + + +def _offset_identifying_constraint_mask( + constraints: PairBisectorConstraints, + model: FitModel, +) -> np.ndarray: + mask = np.asarray(constraints.confidence > 0.0, dtype=bool) + if model.feasible is not None or len(model.penalties) > 0: + mask = np.ones(constraints.n_constraints, dtype=bool) + return mask + + +def _apply_component_mean_gauge( + weights: np.ndarray, + comps: list[list[int]], + *, + reference: np.ndarray | None, +) -> np.ndarray: + aligned = np.asarray(weights, dtype=np.float64).copy() + ref = None if reference is None else np.asarray(reference, dtype=np.float64) + for comp in comps: + idx = np.asarray(comp, dtype=np.int64) + if idx.size == 0: + continue + if ref is None: + target_mean = 0.0 + else: + target_mean = float(np.mean(ref[idx])) + current_mean = float(np.mean(aligned[idx])) + aligned[idx] += target_mean - current_mean + return aligned + + +def _standalone_gauge_policy_description(reg: L2Regularization) -> str: + if reg.reference is not None: + return ( + 'each effective component is shifted so its mean matches the ' + 'reference mean on that component' + ) + return 'each effective component is centered to mean zero' + + +def _connected_components( + n: int, + i_idx: np.ndarray, + j_idx: np.ndarray, +) -> list[list[int]]: + adj: list[list[int]] = [[] for _ in range(n)] + for i, j in zip(i_idx.tolist(), j_idx.tolist()): + adj[i].append(j) + adj[j].append(i) + seen = np.zeros(n, dtype=bool) + comps: list[list[int]] = [] + for start in range(n): + if seen[start]: + continue + if len(adj[start]) == 0: + seen[start] = True + comps.append([start]) + continue + stack = [start] + seen[start] = True + comp: list[int] = [] + while stack: + v = stack.pop() + comp.append(v) + for nb in adj[v]: + if not seen[nb]: + seen[nb] = True + stack.append(nb) + comps.append(sorted(comp)) + return comps + + +def _graph_diagnostics( + n: int, + i_idx: np.ndarray, + j_idx: np.ndarray, + *, + n_constraints: int, +) -> ConstraintGraphDiagnostics: + ii = np.asarray(i_idx, dtype=np.int64) + jj = np.asarray(j_idx, dtype=np.int64) + degree = np.zeros(n, dtype=np.int64) + if ii.size: + np.add.at(degree, ii, 1) + np.add.at(degree, jj, 1) + isolated = tuple(np.flatnonzero(degree == 0).tolist()) + components = tuple( + tuple(int(node) for node in comp) + for comp in _connected_components(n, ii, jj) + ) + edges = { + (int(min(i, j)), int(max(i, j))) + for i, j in zip(ii.tolist(), jj.tolist()) + } + return ConstraintGraphDiagnostics( + n_points=int(n), + n_constraints=int(n_constraints), + n_edges=int(len(edges)), + isolated_points=isolated, + connected_components=components, + fully_connected=bool((n <= 1) or len(components) == 1), + ) + + +def _format_component_counts(graph: ConstraintGraphDiagnostics) -> str: + n_components = graph.n_components + if n_components == 1: + return '1 connected component' + return f'{n_components} connected components' + + +def _format_point_list(points: tuple[int, ...]) -> str: + return '[' + ', '.join(str(int(v)) for v in points) + ']' + + +def _build_fit_connectivity_diagnostics( + constraints: PairBisectorConstraints, + *, + model: FitModel, + gauge_policy: str, +) -> ConnectivityDiagnostics: + n = int(constraints.n_points) + candidate_graph = _graph_diagnostics( + n, + constraints.i, + constraints.j, + n_constraints=constraints.n_constraints, + ) + effective_mask = _offset_identifying_constraint_mask(constraints, model) + effective_graph = _graph_diagnostics( + n, + constraints.i[effective_mask], + constraints.j[effective_mask], + n_constraints=int(np.count_nonzero(effective_mask)), + ) + + messages: list[str] = [] + if candidate_graph.isolated_points: + messages.append( + 'candidate graph leaves unconstrained points ' + f'{_format_point_list(candidate_graph.isolated_points)}' + ) + if candidate_graph.n_components > 1: + messages.append( + 'candidate graph has ' f'{_format_component_counts(candidate_graph)}' + ) + if np.any(~effective_mask): + messages.append( + 'zero-confidence candidate rows do not identify pair differences ' + 'in the current objective and are ignored for ' + 'connectivity/gauge diagnostics' + ) + if effective_graph.n_components > 1: + messages.append( + 'pairwise data identify only ' + f'{_format_component_counts(effective_graph)}; relative component ' + 'offsets are not identified by the data' + ) + + return ConnectivityDiagnostics( + unconstrained_points=candidate_graph.isolated_points, + candidate_graph=candidate_graph, + effective_graph=effective_graph, + candidate_offsets_identified_by_data=bool(effective_graph.fully_connected), + active_offsets_identified_by_data=None, + offsets_identified_in_objective=bool( + effective_graph.fully_connected + or float(model.regularization.strength) > 0.0 + ), + gauge_policy=gauge_policy, + messages=tuple(messages), + ) + + +def _build_active_set_connectivity_diagnostics( + constraints: PairBisectorConstraints, + active_mask: np.ndarray, + *, + model: FitModel, + gauge_policy: str, +) -> ConnectivityDiagnostics: + mask = np.asarray(active_mask, dtype=bool) + if mask.shape != (constraints.n_constraints,): + raise ValueError('active_mask must have shape (m,)') + + n = int(constraints.n_points) + candidate_graph = _graph_diagnostics( + n, + constraints.i, + constraints.j, + n_constraints=constraints.n_constraints, + ) + effective_mask = _offset_identifying_constraint_mask(constraints, model) + effective_graph = _graph_diagnostics( + n, + constraints.i[effective_mask], + constraints.j[effective_mask], + n_constraints=int(np.count_nonzero(effective_mask)), + ) + + active_constraints = constraints.subset(mask) + active_graph = _graph_diagnostics( + n, + active_constraints.i, + active_constraints.j, + n_constraints=active_constraints.n_constraints, + ) + active_effective_mask = _offset_identifying_constraint_mask( + active_constraints, + model, + ) + active_effective_graph = _graph_diagnostics( + n, + active_constraints.i[active_effective_mask], + active_constraints.j[active_effective_mask], + n_constraints=int(np.count_nonzero(active_effective_mask)), + ) + + messages: list[str] = [] + if candidate_graph.isolated_points: + messages.append( + 'candidate graph leaves unconstrained points ' + f'{_format_point_list(candidate_graph.isolated_points)}' + ) + if candidate_graph.n_components > 1: + messages.append( + 'candidate graph has ' f'{_format_component_counts(candidate_graph)}' + ) + if np.any(~effective_mask): + messages.append( + 'zero-confidence candidate rows do not identify pair differences ' + 'in the current objective and are ignored for ' + 'connectivity/gauge diagnostics' + ) + if effective_graph.n_components > 1: + messages.append( + 'candidate pairwise data identify only ' + f'{_format_component_counts(effective_graph)}; relative component ' + 'offsets are not identified by the data' + ) + if active_graph.n_components > 1: + messages.append( + 'final active graph has ' f'{_format_component_counts(active_graph)}' + ) + if np.any(mask) and np.any(~active_effective_mask): + messages.append( + 'zero-confidence active rows do not identify pair differences in ' + 'the current objective and are ignored for active-component gauge ' + 'alignment' + ) + if active_effective_graph.n_components > 1: + messages.append( + 'final active pairwise data identify only ' + f'{_format_component_counts(active_effective_graph)}; relative ' + 'component offsets are preserved by the self-consistent gauge ' + 'policy rather than identified by the data' + ) + + return ConnectivityDiagnostics( + unconstrained_points=candidate_graph.isolated_points, + candidate_graph=candidate_graph, + effective_graph=effective_graph, + active_graph=active_graph, + active_effective_graph=active_effective_graph, + candidate_offsets_identified_by_data=bool(effective_graph.fully_connected), + active_offsets_identified_by_data=bool( + active_effective_graph.fully_connected + ), + offsets_identified_in_objective=bool( + active_effective_graph.fully_connected + or float(model.regularization.strength) > 0.0 + ), + gauge_policy=gauge_policy, + messages=tuple(messages), + ) + + +def _hard_constraint_measurement_bounds( + feasible: HardConstraint | None, + n_constraints: int, +) -> tuple[np.ndarray, np.ndarray] | None: + if feasible is None: + return None + if isinstance(feasible, Interval): + lower = np.full(n_constraints, float(feasible.lower), dtype=np.float64) + upper = np.full(n_constraints, float(feasible.upper), dtype=np.float64) + return lower, upper + if isinstance(feasible, FixedValue): + lower = np.full(n_constraints, float(feasible.value), dtype=np.float64) + return lower, lower.copy() + raise TypeError(f'unsupported hard constraint: {type(feasible)!r}') + + +def _hard_constraint_bounds( + feasible: HardConstraint | None, + alpha: np.ndarray, + beta: np.ndarray, +) -> tuple[np.ndarray, np.ndarray] | None: + if feasible is None: + return None + if isinstance(feasible, Interval): + lower = np.full_like(alpha, float(feasible.lower)) + upper = np.full_like(alpha, float(feasible.upper)) + elif isinstance(feasible, FixedValue): + lower = np.full_like(alpha, float(feasible.value)) + upper = lower.copy() + else: + raise TypeError(f'unsupported hard constraint: {type(feasible)!r}') + z_lo = (lower - beta) / alpha + z_hi = (upper - beta) / alpha + lo = np.minimum(z_lo, z_hi) + hi = np.maximum(z_lo, z_hi) + return np.asarray(lo, dtype=np.float64), np.asarray(hi, dtype=np.float64) + + +def _check_hard_feasibility( + n: int, + i_idx: np.ndarray, + j_idx: np.ndarray, + z_lo: np.ndarray, + z_hi: np.ndarray, +) -> tuple[bool, HardConstraintConflict | None]: + edges: list[_DifferenceEdge] = [] + for k, (i, j, lo, hi) in enumerate( + zip(i_idx.tolist(), j_idx.tolist(), z_lo.tolist(), z_hi.tolist()) + ): + edges.append( + _DifferenceEdge( + source=int(j), + target=int(i), + weight=float(hi), + constraint_index=int(k), + site_i=int(i), + site_j=int(j), + relation='<=', + bound_value=float(hi), + ) + ) + edges.append( + _DifferenceEdge( + source=int(i), + target=int(j), + weight=float(-lo), + constraint_index=int(k), + site_i=int(i), + site_j=int(j), + relation='>=', + bound_value=float(lo), + ) + ) + + dist = np.zeros(n, dtype=np.float64) + pred_node = np.full(n, -1, dtype=np.int64) + pred_edge = np.full(n, -1, dtype=np.int64) + last_updated = -1 + tol = 1e-12 + + for _ in range(n): + updated = False + last_updated = -1 + for edge_index, edge in enumerate(edges): + cand = dist[edge.source] + edge.weight + if cand < dist[edge.target] - tol: + dist[edge.target] = cand + pred_node[edge.target] = edge.source + pred_edge[edge.target] = edge_index + updated = True + last_updated = edge.target + if not updated: + return True, None + + if last_updated < 0: + return True, None + + y = int(last_updated) + for _ in range(n): + y = int(pred_node[y]) + if y < 0: + return False, None + + cycle_edges_rev: list[_DifferenceEdge] = [] + cur = y + while True: + edge_index = int(pred_edge[cur]) + if edge_index < 0: + return False, None + edge = edges[edge_index] + cycle_edges_rev.append(edge) + cur = edge.source + if cur == y: + break + + cycle_edges = tuple(reversed(cycle_edges_rev)) + cycle_nodes_list: list[int] = [] + if cycle_edges: + cycle_nodes_list.append(cycle_edges[0].source) + cycle_nodes_list.extend(edge.target for edge in cycle_edges) + if len(cycle_nodes_list) >= 2 and cycle_nodes_list[0] == cycle_nodes_list[-1]: + cycle_nodes_list.pop() + + cycle_node_set = set(cycle_nodes_list) + component_nodes: tuple[int, ...] = () + for comp in _connected_components(n, i_idx, j_idx): + if any(node in cycle_node_set for node in comp): + component_nodes = tuple(int(node) for node in comp) + break + + terms = tuple( + HardConstraintConflictTerm( + constraint_index=edge.constraint_index, + site_i=edge.site_i, + site_j=edge.site_j, + relation=edge.relation, + bound_value=edge.bound_value, + ) + for edge in cycle_edges + ) + unique_constraints = tuple(sorted({term.constraint_index for term in terms})) + component_label = ( + '[' + ', '.join(str(v) for v in component_nodes) + ']' + if component_nodes + else '[]' + ) + cycle_label = '[' + ', '.join(str(v) for v in unique_constraints) + ']' + conflict = HardConstraintConflict( + component_nodes=component_nodes, + cycle_nodes=tuple(int(v) for v in cycle_nodes_list), + terms=terms, + message=( + 'inconsistent hard separator restrictions on connected component ' + f'{component_label}; contradiction cycle uses constraint rows {cycle_label}' + ), + ) + return False, conflict + + +def _requires_admm(model: FitModel) -> bool: + if model.feasible is not None: + return True + if model.penalties: + return True + return not isinstance(model.mismatch, SquaredLoss) + + +def _mismatch_derivatives( + y: np.ndarray, + target: np.ndarray, + confidence: np.ndarray, + mismatch: SquaredLoss | HuberLoss, +) -> tuple[np.ndarray, np.ndarray]: + residual = y - target + if isinstance(mismatch, SquaredLoss): + fp_y = 2.0 * confidence * residual + fpp_y = 2.0 * confidence + return fp_y, fpp_y + if isinstance(mismatch, HuberLoss): + delta = float(mismatch.delta) + abs_r = np.abs(residual) + quad = abs_r <= delta + fp_y = np.where( + quad, + confidence * residual, + confidence * delta * np.sign(residual), + ) + fpp_y = np.where(quad, confidence, 0.0) + return fp_y, fpp_y + raise TypeError(f'unsupported mismatch: {type(mismatch)!r}') + + +def _penalty_derivatives( + y: np.ndarray, + penalty: SoftIntervalPenalty + | ExponentialBoundaryPenalty + | ReciprocalBoundaryPenalty, +) -> tuple[np.ndarray, np.ndarray]: + if isinstance(penalty, SoftIntervalPenalty): + lower = float(penalty.lower) + upper = float(penalty.upper) + strength = float(penalty.strength) + fp = np.zeros_like(y) + fpp = np.zeros_like(y) + lo_mask = y < lower + hi_mask = y > upper + if np.any(lo_mask): + fp[lo_mask] += 2.0 * strength * (y[lo_mask] - lower) + fpp[lo_mask] += 2.0 * strength + if np.any(hi_mask): + fp[hi_mask] += 2.0 * strength * (y[hi_mask] - upper) + fpp[hi_mask] += 2.0 * strength + return fp, fpp + + if isinstance(penalty, ExponentialBoundaryPenalty): + lower = float(penalty.lower) + upper = float(penalty.upper) + margin = float(penalty.margin) + strength = float(penalty.strength) + tau = float(penalty.tau) + left = lower + margin + right = upper - margin + A = np.exp((left - y) / tau) + B = np.exp((y - right) / tau) + fp = strength * (-A + B) / tau + fpp = strength * (A + B) / (tau * tau) + return fp, fpp + + if isinstance(penalty, ReciprocalBoundaryPenalty): + lower = float(penalty.lower) + upper = float(penalty.upper) + margin = float(penalty.margin) + strength = float(penalty.strength) + eps = float(penalty.epsilon) + left = lower + margin + right = upper - margin + fp = np.zeros_like(y) + fpp = np.zeros_like(y) + lo_mask = y < left + if np.any(lo_mask): + denom = np.maximum(y[lo_mask] - lower, eps) + fp[lo_mask] += -strength / (denom**2) + fpp[lo_mask] += 2.0 * strength / (denom**3) + hi_mask = y > right + if np.any(hi_mask): + denom = np.maximum(upper - y[hi_mask], eps) + fp[hi_mask] += strength / (denom**2) + fpp[hi_mask] += 2.0 * strength / (denom**3) + return fp, fpp + + raise TypeError(f'unsupported penalty: {type(penalty)!r}') diff --git a/src/pyvoro2/powerfit/report.py b/src/pyvoro2/powerfit/report.py index fc7a5be..db4e832 100644 --- a/src/pyvoro2/powerfit/report.py +++ b/src/pyvoro2/powerfit/report.py @@ -15,12 +15,11 @@ from .constraints import PairBisectorConstraints from .realize import RealizedPairDiagnostics -from .solver import ( +from .types import ( ConnectivityDiagnostics, ConstraintGraphDiagnostics, HardConstraintConflict, PowerWeightFitResult, - _edge_diagnostics_for_result, ) @@ -227,11 +226,36 @@ def _conflict_record( } +def _objective_breakdown_record( + breakdown: object | None, +) -> dict[str, object] | None: + if breakdown is None: + return None + return { + 'total': float(breakdown.total), + 'mismatch': float(breakdown.mismatch), + 'penalties_total': float(breakdown.penalties_total), + 'penalty_terms': [ + {'name': str(name), 'value': float(value)} + for name, value in breakdown.penalty_terms + ], + 'regularization': float(breakdown.regularization), + 'hard_constraints_satisfied': bool( + breakdown.hard_constraints_satisfied + ), + 'hard_max_violation': float(breakdown.hard_max_violation), + } + + def _edge_diagnostics_record( result: PowerWeightFitResult, constraints: PairBisectorConstraints, ) -> dict[str, object]: - diagnostics = _edge_diagnostics_for_result(result, constraints) + diagnostics = result.edge_diagnostics + if diagnostics is None: + from .problem import _edge_diagnostics_for_result + + diagnostics = _edge_diagnostics_for_result(result, constraints) return { 'alpha': diagnostics.alpha.tolist(), 'beta': diagnostics.beta.tolist(), @@ -281,6 +305,7 @@ def build_fit_report( 'n_constraints': int(constraints.n_constraints), 'n_points': int(constraints.n_points), 'converged': bool(result.converged), + 'status_detail': result.status_detail, 'n_iter': int(result.n_iter), 'rms_residual': ( None if result.rms_residual is None else float(result.rms_residual) @@ -295,6 +320,9 @@ def build_fit_report( 'constraints': list(constraints.to_records(use_ids=use_ids)), 'fit_records': list(result.to_records(constraints, use_ids=use_ids)), 'edge_diagnostics': _edge_diagnostics_record(result, constraints), + 'objective_breakdown': _objective_breakdown_record( + result.objective_breakdown + ), 'weights': None if result.weights is None else result.weights.tolist(), 'radii': None if result.radii is None else result.radii.tolist(), 'weight_shift': ( diff --git a/src/pyvoro2/powerfit/solver.py b/src/pyvoro2/powerfit/solver.py index 30498bd..943825e 100644 --- a/src/pyvoro2/powerfit/solver.py +++ b/src/pyvoro2/powerfit/solver.py @@ -1,66 +1,27 @@ -"""Low-level inverse solver for fitting power weights from pairwise constraints.""" +"""Native numerical solvers for fitting power weights.""" from __future__ import annotations -from dataclasses import dataclass +from dataclasses import replace from typing import Literal import numpy as np from .constraints import PairBisectorConstraints, resolve_pair_bisector_constraints -from .model import ( - ExponentialBoundaryPenalty, - FitModel, - FixedValue, - HardConstraint, - HuberLoss, - Interval, - L2Regularization, - ReciprocalBoundaryPenalty, - SoftIntervalPenalty, - SquaredLoss, +from .model import FitModel, HuberLoss, SquaredLoss +from .problem import ( + _compute_edge_diagnostics, + _connected_components, + _mismatch_derivatives, + _penalty_derivatives, + _requires_admm, + build_power_fit_problem, + build_power_fit_result, ) +from .types import ConnectivityDiagnostics, PowerWeightFitResult from ..domains import Box, OrthorhombicCell, PeriodicCell -def _plain_value(value: object) -> object: - return value.item() if hasattr(value, 'item') else value - - -@dataclass(frozen=True, slots=True) -class ConstraintGraphDiagnostics: - """Connectivity summary for a graph induced by constraint rows.""" - - n_points: int - n_constraints: int - n_edges: int - isolated_points: tuple[int, ...] - connected_components: tuple[tuple[int, ...], ...] - fully_connected: bool - - @property - def n_components(self) -> int: - """Return the number of connected components.""" - - return int(len(self.connected_components)) - - -@dataclass(frozen=True, slots=True) -class ConnectivityDiagnostics: - """Structured connectivity diagnostics for the inverse-fit graph.""" - - unconstrained_points: tuple[int, ...] - candidate_graph: ConstraintGraphDiagnostics - effective_graph: ConstraintGraphDiagnostics - active_graph: ConstraintGraphDiagnostics | None = None - active_effective_graph: ConstraintGraphDiagnostics | None = None - candidate_offsets_identified_by_data: bool = False - active_offsets_identified_by_data: bool | None = None - offsets_identified_in_objective: bool = False - gauge_policy: str = '' - messages: tuple[str, ...] = () - - class ConnectivityDiagnosticsError(ValueError): """Raised when connectivity_check='raise' detects a graph issue.""" @@ -76,275 +37,10 @@ def __str__(self) -> str: return str(self.args[0]) -@dataclass(frozen=True, slots=True) -class AlgebraicEdgeDiagnostics: - """Edge-space diagnostics matching the paper-side algebraic formulas.""" - - alpha: np.ndarray - beta: np.ndarray - z_obs: np.ndarray - z_fit: np.ndarray | None - residual: np.ndarray | None - edge_weight: np.ndarray - weighted_l2: float | None - weighted_rmse: float | None - rmse: float | None - mae: float | None - - -@dataclass(frozen=True, slots=True) -class PowerWeightFitResult: - """Result of inverse fitting of power weights.""" - - status: Literal[ - 'optimal', 'infeasible_hard_constraints', 'max_iter', 'numerical_failure' - ] - hard_feasible: bool - weights: np.ndarray | None - radii: np.ndarray | None - weight_shift: float | None - measurement: Literal['fraction', 'position'] - target: np.ndarray - predicted: np.ndarray | None - predicted_fraction: np.ndarray | None - predicted_position: np.ndarray | None - residuals: np.ndarray | None - rms_residual: float | None - max_residual: float | None - used_shifts: np.ndarray - solver: str - n_iter: int - converged: bool - conflict: 'HardConstraintConflict | None' - warnings: tuple[str, ...] - connectivity: ConnectivityDiagnostics | None = None - edge_diagnostics: AlgebraicEdgeDiagnostics | None = None - - @property - def is_optimal(self) -> bool: - """Whether the fit terminated with a final solution.""" - - return self.status == 'optimal' - - @property - def is_infeasible(self) -> bool: - """Whether hard feasibility failed before optimization.""" - - return self.status == 'infeasible_hard_constraints' - - @property - def conflicting_constraint_indices(self) -> tuple[int, ...]: - """Constraint rows participating in the infeasibility witness.""" - - if self.conflict is None: - return tuple() - return self.conflict.constraint_indices - - def to_records( - self, - constraints: PairBisectorConstraints, - *, - use_ids: bool = False, - ) -> tuple[dict[str, object], ...]: - """Return one plain-Python record per fitted constraint row.""" - - if constraints.n_constraints != int(self.target.shape[0]): - raise ValueError('constraints do not match the fit result length') - left, right = constraints.pair_labels(use_ids=use_ids) - edge_diag = _edge_diagnostics_for_result(self, constraints) - rows: list[dict[str, object]] = [] - left_is_int = np.issubdtype(np.asarray(left).dtype, np.integer) - right_is_int = np.issubdtype(np.asarray(right).dtype, np.integer) - for k in range(constraints.n_constraints): - site_i = int(left[k]) if left_is_int else _plain_value(left[k]) - site_j = int(right[k]) if right_is_int else _plain_value(right[k]) - rows.append( - { - 'constraint_index': int(k), - 'site_i': site_i, - 'site_j': site_j, - 'shift': tuple(int(v) for v in constraints.shifts[k]), - 'measurement': self.measurement, - 'target': float(self.target[k]), - 'predicted': ( - None - if self.predicted is None - else float(self.predicted[k]) - ), - 'predicted_fraction': ( - None - if self.predicted_fraction is None - else float(self.predicted_fraction[k]) - ), - 'predicted_position': ( - None - if self.predicted_position is None - else float(self.predicted_position[k]) - ), - 'residual': ( - None - if self.residuals is None - else float(self.residuals[k]) - ), - 'alpha': float(edge_diag.alpha[k]), - 'beta': float(edge_diag.beta[k]), - 'z_obs': float(edge_diag.z_obs[k]), - 'z_fit': ( - None - if edge_diag.z_fit is None - else float(edge_diag.z_fit[k]) - ), - 'algebraic_residual': ( - None - if edge_diag.residual is None - else float(edge_diag.residual[k]) - ), - 'edge_weight': float(edge_diag.edge_weight[k]), - } - ) - return tuple(rows) - - def to_report( - self, - constraints: PairBisectorConstraints, - *, - use_ids: bool = False, - ) -> dict[str, object]: - """Return a JSON-friendly report for this fit result.""" - - from .report import build_fit_report - - return build_fit_report(self, constraints, use_ids=use_ids) - - -@dataclass(frozen=True, slots=True) -class HardConstraintConflictTerm: - """One bound relation participating in an infeasibility witness. - - Each term refers back to one input constraint row and states which bound on - ``w_i - w_j`` participates in the contradiction cycle. - """ - - constraint_index: int - site_i: int - site_j: int - relation: Literal['<=', '>='] - bound_value: float - - def to_record(self, *, ids: np.ndarray | None = None) -> dict[str, object]: - """Return a plain-Python record for this conflict term.""" - - site_i = int(self.site_i) if ids is None else ids[self.site_i].item() - site_j = int(self.site_j) if ids is None else ids[self.site_j].item() - return { - 'constraint_index': int(self.constraint_index), - 'site_i': site_i, - 'site_j': site_j, - 'relation': self.relation, - 'bound_value': float(self.bound_value), - } - - -@dataclass(frozen=True, slots=True) -class HardConstraintConflict: - """Compact witness for inconsistent hard separator restrictions.""" - - component_nodes: tuple[int, ...] - cycle_nodes: tuple[int, ...] - terms: tuple[HardConstraintConflictTerm, ...] - message: str - - @property - def constraint_indices(self) -> tuple[int, ...]: - """Sorted unique input rows participating in the conflict.""" - - return tuple(sorted({int(term.constraint_index) for term in self.terms})) - - def to_records( - self, *, ids: np.ndarray | None = None - ) -> tuple[dict[str, object], ...]: - """Return plain-Python records for the witness terms.""" - - return tuple(term.to_record(ids=ids) for term in self.terms) - - -@dataclass(frozen=True, slots=True) -class _DifferenceEdge: - source: int - target: int - weight: float - constraint_index: int - site_i: int - site_j: int - relation: Literal['<=', '>='] - bound_value: float - - -@dataclass(frozen=True, slots=True) -class _MeasurementGeometry: - alpha: np.ndarray - beta: np.ndarray - target: np.ndarray - target_fraction: np.ndarray - target_position: np.ndarray - - class _NumericalFailure(RuntimeError): """Raised when the numerical backend fails before producing a result.""" -def radii_to_weights(radii: np.ndarray) -> np.ndarray: - """Convert radii to power weights (``w = r^2``).""" - - r = np.asarray(radii, dtype=float) - if r.ndim != 1: - raise ValueError('radii must be 1D') - if not np.all(np.isfinite(r)): - raise ValueError('radii must contain only finite values') - if np.any(r < 0): - raise ValueError('radii must be non-negative') - return r * r - - -def weights_to_radii( - weights: np.ndarray, - *, - r_min: float = 0.0, - weight_shift: float | None = None, -) -> tuple[np.ndarray, float]: - """Convert power weights to radii using an explicit global shift. - - By default, the returned radii use the minimal additive shift that makes the - smallest radius equal to ``r_min``. Pass ``weight_shift`` to request an - explicit gauge instead. - """ - - w = np.asarray(weights, dtype=float) - if w.ndim != 1: - raise ValueError('weights must be 1D') - if not np.all(np.isfinite(w)): - raise ValueError('weights must contain only finite values') - - if weight_shift is not None: - if r_min != 0.0: - raise ValueError('specify at most one of r_min and weight_shift') - C = float(weight_shift) - if not np.isfinite(C): - raise ValueError('weight_shift must be finite') - else: - r_min = float(r_min) - if r_min < 0: - raise ValueError('r_min must be >= 0') - w_min = float(np.min(w)) if w.size else 0.0 - C = (r_min * r_min) - w_min - - w_shifted = w + C - if np.any(w_shifted < -1e-14): - raise ValueError('weight shift produced negative values (numerical issue)') - w_shifted = np.maximum(w_shifted, 0.0) - return np.sqrt(w_shifted), float(C) - - def fit_power_weights( points: np.ndarray, constraints: PairBisectorConstraints | list[tuple] | tuple[tuple, ...], @@ -366,13 +62,7 @@ def fit_power_weights( tol_rel: float = 1e-5, connectivity_check: Literal['none', 'diagnose', 'warn', 'raise'] = 'warn', ) -> PowerWeightFitResult: - """Fit power weights from resolved pairwise separator constraints. - - The raw constraint tuples are ``(i, j, value[, shift])`` where ``shift`` is - the integer lattice image applied to site ``j``. The returned radii use the - minimal non-negative global gauge by default; pass ``weight_shift`` for an - explicit output gauge or ``r_min`` for the legacy minimum-radius helper. - """ + """Fit power weights from resolved pairwise separator constraints.""" pts = np.asarray(points, dtype=float) if pts.ndim != 2 or pts.shape[1] <= 0: @@ -388,9 +78,7 @@ def fit_power_weights( if resolved.n_points != pts.shape[0]: raise ValueError('resolved constraints do not match the number of points') if resolved.dim != pts.shape[1]: - raise ValueError( - 'resolved constraints do not match the point dimension' - ) + raise ValueError('resolved constraints do not match the point dimension') if resolved.measurement != measurement: measurement = resolved.measurement else: @@ -450,24 +138,13 @@ def _fit_power_weights_resolved( 'connectivity_check must be none, diagnose, warn, or raise' ) - reg = model.regularization - lam = float(reg.strength) - w0 = _regularization_reference(reg, n) - reference = None if reg.reference is None else w0 - - geom = _measurement_geometry(constraints) - z_target = (geom.target - geom.beta) / geom.alpha - a = constraints.confidence * (geom.alpha**2) - - hard = _hard_constraint_bounds(model.feasible, geom.alpha, geom.beta) - z_lo = hard[0] if hard is not None else None - z_hi = hard[1] if hard is not None else None - hard_measurement = _hard_constraint_measurement_bounds( - model.feasible, - constraints.n_constraints, + problem = build_power_fit_problem(constraints, model=model) + lam = float(problem.regularization_strength) + reference = ( + None + if problem.model.regularization.reference is None + else problem.regularization_reference ) - y_lo = hard_measurement[0] if hard_measurement is not None else None - y_hi = hard_measurement[1] if hard_measurement is not None else None nonquadratic = _requires_admm(model) if solver == 'auto': @@ -482,70 +159,51 @@ def _fit_power_weights_resolved( 'or non-quadratic penalties' ) - effective_mask = _difference_identifying_mask(constraints, model) - comps = _connected_components( - n, - constraints.i[effective_mask], - constraints.j[effective_mask], - ) - connectivity = None - if connectivity_check != 'none': - connectivity = _build_fit_connectivity_diagnostics( - constraints, - model=model, - gauge_policy=_standalone_gauge_policy_description(reg), - ) - _apply_connectivity_policy( - connectivity_check, - connectivity, - warnings_list, - ) - - if hard is not None: - feasible, conflict = _check_hard_feasibility( - n, - constraints.i, - constraints.j, - z_lo, - z_hi, - ) - if not feasible: - warnings_list.append('hard feasibility check failed before optimization') - if conflict is not None: - warnings_list.append(conflict.message) - return PowerWeightFitResult( - status='infeasible_hard_constraints', - hard_feasible=False, + connectivity = None if connectivity_check == 'none' else problem.connectivity + if connectivity is not None: + _apply_connectivity_policy(connectivity_check, connectivity, warnings_list) + + if not problem.hard_feasible: + warnings_list.append('hard feasibility check failed before optimization') + if problem.hard_conflict is not None: + warnings_list.append(problem.hard_conflict.message) + result = PowerWeightFitResult( + status='infeasible_hard_constraints', + status_detail=( + None + if problem.hard_conflict is None + else problem.hard_conflict.message + ), + hard_feasible=False, + weights=None, + radii=None, + weight_shift=None, + measurement=constraints.measurement, + target=np.asarray(problem.measurement_target, dtype=np.float64), + predicted=None, + predicted_fraction=None, + predicted_position=None, + residuals=None, + rms_residual=None, + max_residual=None, + used_shifts=np.asarray(constraints.shifts), + solver='none', + n_iter=0, + converged=False, + conflict=problem.hard_conflict, + warnings=tuple(warnings_list), + connectivity=connectivity, + edge_diagnostics=_compute_edge_diagnostics( + problem.constraints, weights=None, - radii=None, - weight_shift=None, - measurement=constraints.measurement, - target=geom.target.copy(), - predicted=None, - predicted_fraction=None, - predicted_position=None, - residuals=None, - rms_residual=None, - max_residual=None, - used_shifts=constraints.shifts.copy(), - solver='none', - n_iter=0, - converged=False, - conflict=conflict, - warnings=tuple(warnings_list), - connectivity=connectivity, - edge_diagnostics=_compute_edge_diagnostics( - constraints, - weights=None, - geom=geom, - ), - ) - else: - conflict = None + ), + objective_breakdown=None, + ) + return result if m == 0: if lam > 0.0: - weights = w0.copy() + weights = problem.regularization_reference.copy() warnings_list.append( 'empty constraint set; using the regularization-only solution' ) @@ -560,56 +218,43 @@ def _fit_power_weights_resolved( warnings_list.append( 'empty constraint set; returning the mean-zero gauge solution' ) - radii, shift = weights_to_radii( + result = build_power_fit_result( + problem, weights, - r_min=r_min, - weight_shift=weight_shift, - ) - pred_fraction = np.zeros(0, dtype=np.float64) - pred_position = np.zeros(0, dtype=np.float64) - pred = pred_fraction if constraints.measurement == 'fraction' else pred_position - return PowerWeightFitResult( - status='optimal', - hard_feasible=True, - weights=weights, - radii=radii, - weight_shift=shift, - measurement=constraints.measurement, - target=geom.target.copy(), - predicted=pred, - predicted_fraction=pred_fraction, - predicted_position=pred_position, - residuals=np.zeros(0, dtype=np.float64), - rms_residual=0.0, - max_residual=0.0, - used_shifts=constraints.shifts.copy(), solver='analytic', - n_iter=0, + status='optimal', converged=True, - conflict=conflict, + n_iter=0, warnings=tuple(warnings_list), - connectivity=connectivity, - edge_diagnostics=_compute_edge_diagnostics( - constraints, - weights=weights, - geom=geom, - ), + canonicalize_gauge=True, + r_min=r_min, + weight_shift=weight_shift, ) + if connectivity is None: + return replace(result, connectivity=None) + return result weights = np.zeros(n, dtype=np.float64) converged_all = True n_iter_max = 0 + comps = _connected_components( + n, + constraints.i[problem.offset_identifying_constraint_mask], + constraints.j[problem.offset_identifying_constraint_mask], + ) try: for nodes in comps: idx_nodes = np.asarray(nodes, dtype=np.int64) if idx_nodes.size <= 1: if lam > 0.0 and idx_nodes.size == 1: - weights[idx_nodes[0]] = w0[idx_nodes[0]] + weights[idx_nodes[0]] = ( + problem.regularization_reference[idx_nodes[0]] + ) continue node_set = set(nodes) - mask = effective_mask & np.fromiter( + mask = problem.offset_identifying_constraint_mask & np.fromiter( ( (int(i) in node_set) and (int(j) in node_set) for i, j in zip(constraints.i, constraints.j) @@ -626,15 +271,20 @@ def _fit_power_weights_resolved( [local_index[int(j)] for j in constraints.j[mask]], dtype=np.int64, ) - a_c = a[mask] - b_c = z_target[mask] - alpha_c = geom.alpha[mask] - beta_c = geom.beta[mask] - target_c = geom.target[mask] + alpha_c = problem.alpha[mask] + beta_c = problem.beta[mask] + target_c = problem.measurement_target[mask] conf_c = constraints.confidence[mask] - w0_c = w0[idx_nodes] + w0_c = problem.regularization_reference[idx_nodes] if solver_eff == 'analytic': - w_c = _solve_component_analytic(ii, jj, a_c, b_c, w0_c, lam) + w_c = _solve_component_analytic( + ii, + jj, + problem.edge_weight[mask], + problem.z_obs[mask], + w0_c, + lam, + ) iters = 1 conv = True else: @@ -652,8 +302,16 @@ def _fit_power_weights_resolved( max_iter=max_iter, tol_abs=tol_abs, tol_rel=tol_rel, - y_lo=None if y_lo is None else y_lo[mask], - y_hi=None if y_hi is None else y_hi[mask], + y_lo=( + None + if problem.bounds.measurement_lower is None + else problem.bounds.measurement_lower[mask] + ), + y_hi=( + None + if problem.bounds.measurement_upper is None + else problem.bounds.measurement_upper[mask] + ), ) if not np.all(np.isfinite(w_c)): raise _NumericalFailure('component solver returned non-finite weights') @@ -661,671 +319,63 @@ def _fit_power_weights_resolved( converged_all = converged_all and conv n_iter_max = max(n_iter_max, iters) - if lam == 0.0: - weights = _apply_component_mean_gauge( - weights, - comps, - reference=reference, - ) if not np.all(np.isfinite(weights)): raise _NumericalFailure('assembled weight vector is non-finite') - try: - radii, shift = weights_to_radii( - weights, - r_min=r_min, - weight_shift=weight_shift, - ) - except ValueError as exc: - raise _NumericalFailure(str(exc)) from exc - pred_fraction, pred_position, pred = _predict_measurements(weights, constraints) - residuals = pred - geom.target - if not np.all(np.isfinite(residuals)): - raise _NumericalFailure( - 'predicted measurements or residuals are non-finite' - ) - rms = float(np.sqrt(np.mean(residuals * residuals))) if residuals.size else 0.0 - mx = float(np.max(np.abs(residuals))) if residuals.size else 0.0 + result = build_power_fit_result( + problem, + weights, + solver=solver_eff, + status='optimal' if converged_all else 'max_iter', + status_detail=( + None + if converged_all + else 'iterative solver reached max_iter before convergence' + ), + converged=bool(converged_all), + n_iter=int(n_iter_max), + warnings=tuple(warnings_list) + ( + () + if converged_all + else ('iterative solver reached max_iter before convergence',) + ), + canonicalize_gauge=True, + r_min=r_min, + weight_shift=weight_shift, + ) + if connectivity is None: + return replace(result, connectivity=None) + return result except (np.linalg.LinAlgError, FloatingPointError, _NumericalFailure) as exc: warnings_list.append(f'numerical solver failure: {exc}') return PowerWeightFitResult( status='numerical_failure', + status_detail=str(exc), hard_feasible=True, weights=None, radii=None, weight_shift=None, measurement=constraints.measurement, - target=geom.target.copy(), + target=np.asarray(problem.measurement_target, dtype=np.float64), predicted=None, predicted_fraction=None, predicted_position=None, residuals=None, rms_residual=None, max_residual=None, - used_shifts=constraints.shifts.copy(), + used_shifts=np.asarray(constraints.shifts), solver=solver_eff, n_iter=int(n_iter_max), converged=False, - conflict=conflict, + conflict=problem.hard_conflict, warnings=tuple(warnings_list), connectivity=connectivity, edge_diagnostics=_compute_edge_diagnostics( - constraints, + problem.constraints, weights=None, - geom=geom, ), + objective_breakdown=None, ) - if converged_all: - status: Literal['optimal', 'max_iter', 'numerical_failure'] = 'optimal' - else: - status = 'max_iter' - warnings_list.append('iterative solver reached max_iter before convergence') - - return PowerWeightFitResult( - status=status, - hard_feasible=True, - weights=weights, - radii=radii, - weight_shift=shift, - measurement=constraints.measurement, - target=geom.target.copy(), - predicted=pred, - predicted_fraction=pred_fraction, - predicted_position=pred_position, - residuals=residuals, - rms_residual=rms, - max_residual=mx, - used_shifts=constraints.shifts.copy(), - solver=solver_eff, - n_iter=int(n_iter_max), - converged=bool(converged_all), - conflict=conflict, - warnings=tuple(warnings_list), - connectivity=connectivity, - edge_diagnostics=_compute_edge_diagnostics( - constraints, - weights=weights, - geom=geom, - ), - ) - - -def _measurement_geometry(constraints: PairBisectorConstraints) -> _MeasurementGeometry: - d = constraints.distance - d2 = constraints.distance2 - if constraints.measurement == 'fraction': - alpha = 1.0 / (2.0 * d2) - beta = np.full_like(alpha, 0.5) - target = constraints.target_fraction - else: - alpha = 1.0 / (2.0 * d) - beta = 0.5 * d - target = constraints.target_position - return _MeasurementGeometry( - alpha=np.asarray(alpha, dtype=np.float64), - beta=np.asarray(beta, dtype=np.float64), - target=np.asarray(target, dtype=np.float64), - target_fraction=constraints.target_fraction.copy(), - target_position=constraints.target_position.copy(), - ) - - -def _predict_measurements( - weights: np.ndarray, constraints: PairBisectorConstraints -) -> tuple[np.ndarray, np.ndarray, np.ndarray]: - z_pred = weights[constraints.i] - weights[constraints.j] - t_pred = 0.5 + z_pred / (2.0 * constraints.distance2) - x_pred = constraints.distance * t_pred - pred = t_pred if constraints.measurement == 'fraction' else x_pred - return ( - np.asarray(t_pred, dtype=np.float64), - np.asarray(x_pred, dtype=np.float64), - np.asarray(pred, dtype=np.float64), - ) - - -def _compute_edge_diagnostics( - constraints: PairBisectorConstraints, - *, - weights: np.ndarray | None, - geom: _MeasurementGeometry | None = None, -) -> AlgebraicEdgeDiagnostics: - if geom is None: - geom = _measurement_geometry(constraints) - - alpha = np.asarray(geom.alpha, dtype=np.float64).copy() - beta = np.asarray(geom.beta, dtype=np.float64).copy() - z_obs = (geom.target - beta) / alpha - edge_weight = ( - np.asarray(constraints.confidence, dtype=np.float64) * (alpha * alpha) - ) - - if weights is None: - return AlgebraicEdgeDiagnostics( - alpha=alpha, - beta=beta, - z_obs=np.asarray(z_obs, dtype=np.float64), - z_fit=None, - residual=None, - edge_weight=np.asarray(edge_weight, dtype=np.float64), - weighted_l2=None, - weighted_rmse=None, - rmse=None, - mae=None, - ) - - w = np.asarray(weights, dtype=np.float64) - z_fit = w[constraints.i] - w[constraints.j] - residual = z_obs - z_fit - weighted_sq = edge_weight * residual * residual - if residual.size: - weighted_l2 = float(np.linalg.norm(np.sqrt(edge_weight) * residual)) - weighted_rmse = float(np.sqrt(np.mean(weighted_sq))) - rmse = float(np.sqrt(np.mean(residual * residual))) - mae = float(np.mean(np.abs(residual))) - else: - weighted_l2 = 0.0 - weighted_rmse = 0.0 - rmse = 0.0 - mae = 0.0 - - return AlgebraicEdgeDiagnostics( - alpha=alpha, - beta=beta, - z_obs=np.asarray(z_obs, dtype=np.float64), - z_fit=np.asarray(z_fit, dtype=np.float64), - residual=np.asarray(residual, dtype=np.float64), - edge_weight=np.asarray(edge_weight, dtype=np.float64), - weighted_l2=weighted_l2, - weighted_rmse=weighted_rmse, - rmse=rmse, - mae=mae, - ) - - -def _edge_diagnostics_for_result( - result: PowerWeightFitResult, - constraints: PairBisectorConstraints, -) -> AlgebraicEdgeDiagnostics: - if result.edge_diagnostics is not None: - return result.edge_diagnostics - return _compute_edge_diagnostics( - constraints, - weights=result.weights, - ) - - -def _regularization_reference(reg: L2Regularization, n: int) -> np.ndarray: - if reg.reference is None: - return np.zeros(n, dtype=np.float64) - w0 = np.asarray(reg.reference, dtype=float) - if w0.shape != (n,): - raise ValueError('regularization.reference must have shape (n,)') - return w0.astype(np.float64) - - -def _difference_identifying_mask( - constraints: PairBisectorConstraints, - model: FitModel, -) -> np.ndarray: - mask = constraints.confidence > 0.0 - if model.feasible is not None or len(model.penalties) > 0: - mask = np.ones(constraints.n_constraints, dtype=bool) - return np.asarray(mask, dtype=bool) - - -def _apply_component_mean_gauge( - weights: np.ndarray, - comps: list[list[int]], - *, - reference: np.ndarray | None, -) -> np.ndarray: - aligned = np.asarray(weights, dtype=np.float64).copy() - ref = None if reference is None else np.asarray(reference, dtype=np.float64) - for comp in comps: - idx = np.asarray(comp, dtype=np.int64) - if idx.size == 0: - continue - if ref is None: - target_mean = 0.0 - else: - target_mean = float(np.mean(ref[idx])) - current_mean = float(np.mean(aligned[idx])) - aligned[idx] += target_mean - current_mean - return aligned - - -def _standalone_gauge_policy_description(reg: L2Regularization) -> str: - if reg.reference is not None: - return ( - 'each effective component is shifted so its mean matches the ' - 'reference mean on that component' - ) - return 'each effective component is centered to mean zero' - - -def _graph_diagnostics( - n: int, - i_idx: np.ndarray, - j_idx: np.ndarray, - *, - n_constraints: int, -) -> ConstraintGraphDiagnostics: - ii = np.asarray(i_idx, dtype=np.int64) - jj = np.asarray(j_idx, dtype=np.int64) - degree = np.zeros(n, dtype=np.int64) - if ii.size: - np.add.at(degree, ii, 1) - np.add.at(degree, jj, 1) - isolated = tuple(np.flatnonzero(degree == 0).tolist()) - components = tuple( - tuple(int(node) for node in comp) - for comp in _connected_components(n, ii, jj) - ) - edges = { - (int(min(i, j)), int(max(i, j))) - for i, j in zip(ii.tolist(), jj.tolist()) - } - return ConstraintGraphDiagnostics( - n_points=int(n), - n_constraints=int(n_constraints), - n_edges=int(len(edges)), - isolated_points=isolated, - connected_components=components, - fully_connected=bool((n <= 1) or len(components) == 1), - ) - - -def _format_component_counts(graph: ConstraintGraphDiagnostics) -> str: - n_components = graph.n_components - return ( - '1 connected component' - if n_components == 1 - else f'{n_components} connected components' - ) - - -def _format_point_list(points: tuple[int, ...]) -> str: - return '[' + ', '.join(str(int(v)) for v in points) + ']' - - -def _build_fit_connectivity_diagnostics( - constraints: PairBisectorConstraints, - *, - model: FitModel, - gauge_policy: str, -) -> ConnectivityDiagnostics: - n = int(constraints.n_points) - candidate_graph = _graph_diagnostics( - n, - constraints.i, - constraints.j, - n_constraints=constraints.n_constraints, - ) - effective_mask = _difference_identifying_mask(constraints, model) - effective_graph = _graph_diagnostics( - n, - constraints.i[effective_mask], - constraints.j[effective_mask], - n_constraints=int(np.count_nonzero(effective_mask)), - ) - - messages: list[str] = [] - if candidate_graph.isolated_points: - messages.append( - 'candidate graph leaves unconstrained points ' - f'{_format_point_list(candidate_graph.isolated_points)}' - ) - if candidate_graph.n_components > 1: - messages.append( - 'candidate graph has ' f'{_format_component_counts(candidate_graph)}' - ) - if np.any(~effective_mask): - messages.append( - 'zero-confidence candidate rows do not identify pair differences ' - 'in the current objective and are ignored for ' - 'connectivity/gauge diagnostics' - ) - if effective_graph.n_components > 1: - messages.append( - 'pairwise data identify only ' - f'{_format_component_counts(effective_graph)}; relative component ' - 'offsets are not identified by the data' - ) - - return ConnectivityDiagnostics( - unconstrained_points=candidate_graph.isolated_points, - candidate_graph=candidate_graph, - effective_graph=effective_graph, - candidate_offsets_identified_by_data=bool(effective_graph.fully_connected), - active_offsets_identified_by_data=None, - offsets_identified_in_objective=bool( - effective_graph.fully_connected - or float(model.regularization.strength) > 0.0 - ), - gauge_policy=gauge_policy, - messages=tuple(messages), - ) - - -def _build_active_set_connectivity_diagnostics( - constraints: PairBisectorConstraints, - active_mask: np.ndarray, - *, - model: FitModel, - gauge_policy: str, -) -> ConnectivityDiagnostics: - mask = np.asarray(active_mask, dtype=bool) - if mask.shape != (constraints.n_constraints,): - raise ValueError('active_mask must have shape (m,)') - - n = int(constraints.n_points) - candidate_graph = _graph_diagnostics( - n, - constraints.i, - constraints.j, - n_constraints=constraints.n_constraints, - ) - effective_mask = _difference_identifying_mask(constraints, model) - effective_graph = _graph_diagnostics( - n, - constraints.i[effective_mask], - constraints.j[effective_mask], - n_constraints=int(np.count_nonzero(effective_mask)), - ) - - active_constraints = constraints.subset(mask) - active_graph = _graph_diagnostics( - n, - active_constraints.i, - active_constraints.j, - n_constraints=active_constraints.n_constraints, - ) - active_effective_mask = _difference_identifying_mask(active_constraints, model) - active_effective_graph = _graph_diagnostics( - n, - active_constraints.i[active_effective_mask], - active_constraints.j[active_effective_mask], - n_constraints=int(np.count_nonzero(active_effective_mask)), - ) - - messages: list[str] = [] - if candidate_graph.isolated_points: - messages.append( - 'candidate graph leaves unconstrained points ' - f'{_format_point_list(candidate_graph.isolated_points)}' - ) - if candidate_graph.n_components > 1: - messages.append( - 'candidate graph has ' f'{_format_component_counts(candidate_graph)}' - ) - if np.any(~effective_mask): - messages.append( - 'zero-confidence candidate rows do not identify pair differences ' - 'in the current objective and are ignored for ' - 'connectivity/gauge diagnostics' - ) - if effective_graph.n_components > 1: - messages.append( - 'candidate pairwise data identify only ' - f'{_format_component_counts(effective_graph)}; relative component ' - 'offsets are not identified by the data' - ) - if active_graph.n_components > 1: - messages.append( - 'final active graph has ' f'{_format_component_counts(active_graph)}' - ) - if np.any(mask) and np.any(~active_effective_mask): - messages.append( - 'zero-confidence active rows do not identify pair differences in ' - 'the current objective and are ignored for active-component gauge ' - 'alignment' - ) - if active_effective_graph.n_components > 1: - messages.append( - 'final active pairwise data identify only ' - f'{_format_component_counts(active_effective_graph)}; relative ' - 'component offsets are preserved by the self-consistent gauge ' - 'policy rather than identified by the data' - ) - - return ConnectivityDiagnostics( - unconstrained_points=candidate_graph.isolated_points, - candidate_graph=candidate_graph, - effective_graph=effective_graph, - active_graph=active_graph, - active_effective_graph=active_effective_graph, - candidate_offsets_identified_by_data=bool(effective_graph.fully_connected), - active_offsets_identified_by_data=bool( - active_effective_graph.fully_connected - ), - offsets_identified_in_objective=bool( - active_effective_graph.fully_connected - or float(model.regularization.strength) > 0.0 - ), - gauge_policy=gauge_policy, - messages=tuple(messages), - ) - - -def _apply_connectivity_policy( - policy: Literal['none', 'diagnose', 'warn', 'raise'], - diagnostics: ConnectivityDiagnostics, - warnings_list: list[str], -) -> None: - if policy in ('none', 'diagnose') or not diagnostics.messages: - return - if policy == 'warn': - warnings_list.extend(diagnostics.messages) - return - if policy == 'raise': - raise ConnectivityDiagnosticsError( - '; '.join(diagnostics.messages), - diagnostics, - ) - raise ValueError('unsupported connectivity policy') - - -def _hard_constraint_measurement_bounds( - feasible: HardConstraint | None, - n_constraints: int, -) -> tuple[np.ndarray, np.ndarray] | None: - if feasible is None: - return None - if isinstance(feasible, Interval): - lower = np.full(n_constraints, float(feasible.lower), dtype=np.float64) - upper = np.full(n_constraints, float(feasible.upper), dtype=np.float64) - return lower, upper - if isinstance(feasible, FixedValue): - lower = np.full(n_constraints, float(feasible.value), dtype=np.float64) - return lower, lower.copy() - raise TypeError(f'unsupported hard constraint: {type(feasible)!r}') - - -def _hard_constraint_bounds( - feasible: HardConstraint | None, - alpha: np.ndarray, - beta: np.ndarray, -) -> tuple[np.ndarray, np.ndarray] | None: - if feasible is None: - return None - if isinstance(feasible, Interval): - lower = np.full_like(alpha, float(feasible.lower)) - upper = np.full_like(alpha, float(feasible.upper)) - elif isinstance(feasible, FixedValue): - lower = np.full_like(alpha, float(feasible.value)) - upper = lower.copy() - else: # pragma: no cover - defensive - raise TypeError(f'unsupported hard constraint: {type(feasible)!r}') - z_lo = (lower - beta) / alpha - z_hi = (upper - beta) / alpha - lo = np.minimum(z_lo, z_hi) - hi = np.maximum(z_lo, z_hi) - return lo.astype(np.float64), hi.astype(np.float64) - - -def _requires_admm(model: FitModel) -> bool: - if model.feasible is not None: - return True - if model.penalties: - return True - return not isinstance(model.mismatch, SquaredLoss) - - -def _connected_components( - n: int, i_idx: np.ndarray, j_idx: np.ndarray -) -> list[list[int]]: - adj: list[list[int]] = [[] for _ in range(n)] - for i, j in zip(i_idx.tolist(), j_idx.tolist()): - adj[i].append(j) - adj[j].append(i) - seen = np.zeros(n, dtype=bool) - comps: list[list[int]] = [] - for start in range(n): - if seen[start]: - continue - if len(adj[start]) == 0: - seen[start] = True - comps.append([start]) - continue - stack = [start] - seen[start] = True - comp: list[int] = [] - while stack: - v = stack.pop() - comp.append(v) - for nb in adj[v]: - if not seen[nb]: - seen[nb] = True - stack.append(nb) - comps.append(sorted(comp)) - return comps - - -def _check_hard_feasibility( - n: int, - i_idx: np.ndarray, - j_idx: np.ndarray, - z_lo: np.ndarray, - z_hi: np.ndarray, -) -> tuple[bool, HardConstraintConflict | None]: - """Check feasibility of difference constraints via Bellman-Ford.""" - - edges: list[_DifferenceEdge] = [] - for k, (i, j, lo, hi) in enumerate( - zip(i_idx.tolist(), j_idx.tolist(), z_lo.tolist(), z_hi.tolist()) - ): - # w_i - w_j <= hi -> w_i <= w_j + hi : edge j -> i with weight hi - edges.append( - _DifferenceEdge( - source=int(j), - target=int(i), - weight=float(hi), - constraint_index=int(k), - site_i=int(i), - site_j=int(j), - relation='<=', - bound_value=float(hi), - ) - ) - # w_i - w_j >= lo -> w_j - w_i <= -lo: edge i -> j with weight -lo - edges.append( - _DifferenceEdge( - source=int(i), - target=int(j), - weight=float(-lo), - constraint_index=int(k), - site_i=int(i), - site_j=int(j), - relation='>=', - bound_value=float(lo), - ) - ) - - dist = np.zeros(n, dtype=np.float64) - pred_node = np.full(n, -1, dtype=np.int64) - pred_edge = np.full(n, -1, dtype=np.int64) - last_updated = -1 - tol = 1e-12 - - for _ in range(n): - updated = False - last_updated = -1 - for edge_index, edge in enumerate(edges): - cand = dist[edge.source] + edge.weight - if cand < dist[edge.target] - tol: - dist[edge.target] = cand - pred_node[edge.target] = edge.source - pred_edge[edge.target] = edge_index - updated = True - last_updated = edge.target - if not updated: - return True, None - - if last_updated < 0: - return True, None - - y = int(last_updated) - for _ in range(n): - y = int(pred_node[y]) - if y < 0: - return False, None - - cycle_edges_rev: list[_DifferenceEdge] = [] - cur = y - while True: - edge_index = int(pred_edge[cur]) - if edge_index < 0: - return False, None - edge = edges[edge_index] - cycle_edges_rev.append(edge) - cur = edge.source - if cur == y: - break - - cycle_edges = tuple(reversed(cycle_edges_rev)) - cycle_nodes_list: list[int] = [] - if cycle_edges: - cycle_nodes_list.append(cycle_edges[0].source) - cycle_nodes_list.extend(edge.target for edge in cycle_edges) - if len(cycle_nodes_list) >= 2 and cycle_nodes_list[0] == cycle_nodes_list[-1]: - cycle_nodes_list.pop() - - cycle_node_set = set(cycle_nodes_list) - component_nodes: tuple[int, ...] = () - for comp in _connected_components(n, i_idx, j_idx): - if any(node in cycle_node_set for node in comp): - component_nodes = tuple(int(node) for node in comp) - break - - terms = tuple( - HardConstraintConflictTerm( - constraint_index=edge.constraint_index, - site_i=edge.site_i, - site_j=edge.site_j, - relation=edge.relation, - bound_value=edge.bound_value, - ) - for edge in cycle_edges - ) - unique_constraints = tuple(sorted({term.constraint_index for term in terms})) - component_label = ( - '[' + ', '.join(str(v) for v in component_nodes) + ']' - if component_nodes - else '[]' - ) - cycle_label = '[' + ', '.join(str(v) for v in unique_constraints) + ']' - conflict = HardConstraintConflict( - component_nodes=component_nodes, - cycle_nodes=tuple(int(v) for v in cycle_nodes_list), - terms=terms, - message=( - 'inconsistent hard separator restrictions on connected component ' - f'{component_label}; contradiction cycle uses constraint rows {cycle_label}' - ), - ) - return False, conflict - def _solve_component_analytic( I: np.ndarray, @@ -1640,83 +690,22 @@ def _prox_measurement_objective( return y -def _mismatch_derivatives( - y: np.ndarray, - target: np.ndarray, - confidence: np.ndarray, - mismatch: SquaredLoss | HuberLoss, -) -> tuple[np.ndarray, np.ndarray]: - r = y - target - if isinstance(mismatch, SquaredLoss): - fp_y = 2.0 * confidence * r - fpp_y = 2.0 * confidence - return fp_y, fpp_y - if isinstance(mismatch, HuberLoss): - delta = float(mismatch.delta) - abs_r = np.abs(r) - quad = abs_r <= delta - fp_y = np.where(quad, confidence * r, confidence * delta * np.sign(r)) - fpp_y = np.where(quad, confidence, 0.0) - return fp_y, fpp_y - raise TypeError(f'unsupported mismatch: {type(mismatch)!r}') - - -def _penalty_derivatives( - y: np.ndarray, - penalty: SoftIntervalPenalty - | ExponentialBoundaryPenalty - | ReciprocalBoundaryPenalty, -) -> tuple[np.ndarray, np.ndarray]: - if isinstance(penalty, SoftIntervalPenalty): - lower = float(penalty.lower) - upper = float(penalty.upper) - strength = float(penalty.strength) - fp = np.zeros_like(y) - fpp = np.zeros_like(y) - lo_mask = y < lower - hi_mask = y > upper - if np.any(lo_mask): - fp[lo_mask] += 2.0 * strength * (y[lo_mask] - lower) - fpp[lo_mask] += 2.0 * strength - if np.any(hi_mask): - fp[hi_mask] += 2.0 * strength * (y[hi_mask] - upper) - fpp[hi_mask] += 2.0 * strength - return fp, fpp - - if isinstance(penalty, ExponentialBoundaryPenalty): - lower = float(penalty.lower) - upper = float(penalty.upper) - margin = float(penalty.margin) - strength = float(penalty.strength) - tau = float(penalty.tau) - left = lower + margin - right = upper - margin - A = np.exp((left - y) / tau) - B = np.exp((y - right) / tau) - fp = strength * (-A + B) / tau - fpp = strength * (A + B) / (tau * tau) - return fp, fpp +def _apply_connectivity_policy( + policy: Literal['none', 'diagnose', 'warn', 'raise'], + diagnostics: ConnectivityDiagnostics, + warnings_list: list[str], +) -> None: + if policy in ('none', 'diagnose') or not diagnostics.messages: + return + if policy == 'warn': + warnings_list.extend(diagnostics.messages) + return + if policy == 'raise': + raise ConnectivityDiagnosticsError( + '; '.join(diagnostics.messages), + diagnostics, + ) + raise ValueError('unsupported connectivity policy') - if isinstance(penalty, ReciprocalBoundaryPenalty): - lower = float(penalty.lower) - upper = float(penalty.upper) - margin = float(penalty.margin) - strength = float(penalty.strength) - eps = float(penalty.epsilon) - left = lower + margin - right = upper - margin - fp = np.zeros_like(y) - fpp = np.zeros_like(y) - lo_mask = y < left - if np.any(lo_mask): - denom = np.maximum(y[lo_mask] - lower, eps) - fp[lo_mask] += -strength / (denom**2) - fpp[lo_mask] += 2.0 * strength / (denom**3) - hi_mask = y > right - if np.any(hi_mask): - denom = np.maximum(upper - y[hi_mask], eps) - fp[hi_mask] += strength / (denom**2) - fpp[hi_mask] += 2.0 * strength / (denom**3) - return fp, fpp - raise TypeError(f'unsupported penalty: {type(penalty)!r}') +__all__ = ['fit_power_weights', 'ConnectivityDiagnosticsError'] diff --git a/src/pyvoro2/powerfit/transforms.py b/src/pyvoro2/powerfit/transforms.py new file mode 100644 index 0000000..5cea212 --- /dev/null +++ b/src/pyvoro2/powerfit/transforms.py @@ -0,0 +1,52 @@ +"""Public scalar transforms for power-fit weights and radii.""" + +from __future__ import annotations + +import numpy as np + + +def radii_to_weights(radii: np.ndarray) -> np.ndarray: + """Convert radii to power weights (``w = r^2``).""" + + r = np.asarray(radii, dtype=float) + if r.ndim != 1: + raise ValueError('radii must be 1D') + if not np.all(np.isfinite(r)): + raise ValueError('radii must contain only finite values') + if np.any(r < 0): + raise ValueError('radii must be non-negative') + return r * r + + +def weights_to_radii( + weights: np.ndarray, + *, + r_min: float = 0.0, + weight_shift: float | None = None, +) -> tuple[np.ndarray, float]: + """Convert power weights to radii using an explicit global shift.""" + + w = np.asarray(weights, dtype=float) + if w.ndim != 1: + raise ValueError('weights must be 1D') + if not np.all(np.isfinite(w)): + raise ValueError('weights must contain only finite values') + + if weight_shift is not None: + if r_min != 0.0: + raise ValueError('specify at most one of r_min and weight_shift') + C = float(weight_shift) + if not np.isfinite(C): + raise ValueError('weight_shift must be finite') + else: + r_min = float(r_min) + if r_min < 0: + raise ValueError('r_min must be >= 0') + w_min = float(np.min(w)) if w.size else 0.0 + C = (r_min * r_min) - w_min + + w_shifted = w + C + if np.any(w_shifted < -1e-14): + raise ValueError('weight shift produced negative values (numerical issue)') + w_shifted = np.maximum(w_shifted, 0.0) + return np.sqrt(w_shifted), float(C) diff --git a/src/pyvoro2/powerfit/types.py b/src/pyvoro2/powerfit/types.py new file mode 100644 index 0000000..aa350d6 --- /dev/null +++ b/src/pyvoro2/powerfit/types.py @@ -0,0 +1,314 @@ +"""Shared public value objects for inverse power fitting.""" + +from __future__ import annotations + +from dataclasses import dataclass + +import numpy as np + +from .constraints import PairBisectorConstraints + + +def _plain_value(value: object) -> object: + return value.item() if hasattr(value, 'item') else value + + +def _readonly_array( + value: np.ndarray | None, + *, + dtype: np.dtype | type | None = None, +) -> np.ndarray | None: + if value is None: + return None + arr = np.array(value, dtype=dtype, copy=True) + arr.setflags(write=False) + return arr + + +@dataclass(frozen=True, slots=True) +class ConstraintGraphDiagnostics: + """Connectivity summary for a graph induced by constraint rows.""" + + n_points: int + n_constraints: int + n_edges: int + isolated_points: tuple[int, ...] + connected_components: tuple[tuple[int, ...], ...] + fully_connected: bool + + @property + def n_components(self) -> int: + return int(len(self.connected_components)) + + +@dataclass(frozen=True, slots=True) +class ConnectivityDiagnostics: + """Structured connectivity diagnostics for the inverse-fit graph.""" + + unconstrained_points: tuple[int, ...] + candidate_graph: ConstraintGraphDiagnostics + effective_graph: ConstraintGraphDiagnostics + active_graph: ConstraintGraphDiagnostics | None = None + active_effective_graph: ConstraintGraphDiagnostics | None = None + candidate_offsets_identified_by_data: bool = False + active_offsets_identified_by_data: bool | None = None + offsets_identified_in_objective: bool = False + gauge_policy: str = '' + messages: tuple[str, ...] = () + + +@dataclass(frozen=True, slots=True) +class AlgebraicEdgeDiagnostics: + """Edge-space diagnostics matching the paper-side algebraic formulas.""" + + alpha: np.ndarray + beta: np.ndarray + z_obs: np.ndarray + z_fit: np.ndarray | None + residual: np.ndarray | None + edge_weight: np.ndarray + weighted_l2: float | None + weighted_rmse: float | None + rmse: float | None + mae: float | None + + def __post_init__(self) -> None: + object.__setattr__(self, 'alpha', _readonly_array(self.alpha, dtype=np.float64)) + object.__setattr__(self, 'beta', _readonly_array(self.beta, dtype=np.float64)) + object.__setattr__(self, 'z_obs', _readonly_array(self.z_obs, dtype=np.float64)) + object.__setattr__(self, 'z_fit', _readonly_array(self.z_fit, dtype=np.float64)) + object.__setattr__( + self, + 'residual', + _readonly_array(self.residual, dtype=np.float64), + ) + object.__setattr__( + self, + 'edge_weight', + _readonly_array(self.edge_weight, dtype=np.float64), + ) + + +@dataclass(frozen=True, slots=True) +class PowerFitBounds: + measurement_lower: np.ndarray | None + measurement_upper: np.ndarray | None + difference_lower: np.ndarray | None + difference_upper: np.ndarray | None + + def __post_init__(self) -> None: + object.__setattr__( + self, + 'measurement_lower', + _readonly_array(self.measurement_lower, dtype=np.float64), + ) + object.__setattr__( + self, + 'measurement_upper', + _readonly_array(self.measurement_upper, dtype=np.float64), + ) + object.__setattr__( + self, + 'difference_lower', + _readonly_array(self.difference_lower, dtype=np.float64), + ) + object.__setattr__( + self, + 'difference_upper', + _readonly_array(self.difference_upper, dtype=np.float64), + ) + + +@dataclass(frozen=True, slots=True) +class PowerFitPredictions: + difference: np.ndarray + fraction: np.ndarray + position: np.ndarray + measurement: np.ndarray + + def __post_init__(self) -> None: + object.__setattr__( + self, + 'difference', + _readonly_array(self.difference, dtype=np.float64), + ) + object.__setattr__( + self, + 'fraction', + _readonly_array(self.fraction, dtype=np.float64), + ) + object.__setattr__( + self, + 'position', + _readonly_array(self.position, dtype=np.float64), + ) + object.__setattr__( + self, + 'measurement', + _readonly_array(self.measurement, dtype=np.float64), + ) + + +@dataclass(frozen=True, slots=True) +class PowerFitObjectiveBreakdown: + total: float + mismatch: float + penalties_total: float + penalty_terms: tuple[tuple[str, float], ...] + regularization: float + hard_constraints_satisfied: bool + hard_max_violation: float + + +@dataclass(frozen=True, slots=True) +class HardConstraintConflictTerm: + """One bound relation participating in an infeasibility witness.""" + + constraint_index: int + site_i: int + site_j: int + relation: str + bound_value: float + + def to_record(self, *, ids: np.ndarray | None = None) -> dict[str, object]: + site_i = int(self.site_i) if ids is None else ids[self.site_i].item() + site_j = int(self.site_j) if ids is None else ids[self.site_j].item() + return { + 'constraint_index': int(self.constraint_index), + 'site_i': site_i, + 'site_j': site_j, + 'relation': self.relation, + 'bound_value': float(self.bound_value), + } + + +@dataclass(frozen=True, slots=True) +class HardConstraintConflict: + """Compact witness for inconsistent hard separator restrictions.""" + + component_nodes: tuple[int, ...] + cycle_nodes: tuple[int, ...] + terms: tuple[HardConstraintConflictTerm, ...] + message: str + + @property + def constraint_indices(self) -> tuple[int, ...]: + return tuple(sorted({int(term.constraint_index) for term in self.terms})) + + def to_records( + self, + *, + ids: np.ndarray | None = None, + ) -> tuple[dict[str, object], ...]: + return tuple(term.to_record(ids=ids) for term in self.terms) + + +@dataclass(frozen=True, slots=True) +class PowerWeightFitResult: + """Result of inverse fitting of power weights.""" + + status: str + hard_feasible: bool + weights: np.ndarray | None + radii: np.ndarray | None + weight_shift: float | None + measurement: str + target: np.ndarray + predicted: np.ndarray | None + predicted_fraction: np.ndarray | None + predicted_position: np.ndarray | None + residuals: np.ndarray | None + rms_residual: float | None + max_residual: float | None + used_shifts: np.ndarray + solver: str + n_iter: int + converged: bool + conflict: HardConstraintConflict | None + warnings: tuple[str, ...] + status_detail: str | None = None + connectivity: ConnectivityDiagnostics | None = None + edge_diagnostics: AlgebraicEdgeDiagnostics | None = None + objective_breakdown: PowerFitObjectiveBreakdown | None = None + + @property + def is_optimal(self) -> bool: + return self.status == 'optimal' + + @property + def is_infeasible(self) -> bool: + return self.status == 'infeasible_hard_constraints' + + @property + def conflicting_constraint_indices(self) -> tuple[int, ...]: + if self.conflict is None: + return tuple() + return self.conflict.constraint_indices + + def to_records( + self, + constraints: PairBisectorConstraints, + *, + use_ids: bool = False, + ) -> tuple[dict[str, object], ...]: + if constraints.n_constraints != int(self.target.shape[0]): + raise ValueError('constraints do not match the fit result length') + left, right = constraints.pair_labels(use_ids=use_ids) + from .problem import _edge_diagnostics_for_result + + edge_diag = _edge_diagnostics_for_result(self, constraints) + rows: list[dict[str, object]] = [] + left_is_int = np.issubdtype(np.asarray(left).dtype, np.integer) + right_is_int = np.issubdtype(np.asarray(right).dtype, np.integer) + for k in range(constraints.n_constraints): + site_i = int(left[k]) if left_is_int else _plain_value(left[k]) + site_j = int(right[k]) if right_is_int else _plain_value(right[k]) + rows.append( + { + 'constraint_index': int(k), + 'site_i': site_i, + 'site_j': site_j, + 'shift': tuple(int(v) for v in constraints.shifts[k]), + 'measurement': self.measurement, + 'target': float(self.target[k]), + 'predicted': ( + None if self.predicted is None else float(self.predicted[k]) + ), + 'predicted_fraction': ( + None + if self.predicted_fraction is None + else float(self.predicted_fraction[k]) + ), + 'predicted_position': ( + None + if self.predicted_position is None + else float(self.predicted_position[k]) + ), + 'residual': ( + None if self.residuals is None else float(self.residuals[k]) + ), + 'alpha': float(edge_diag.alpha[k]), + 'beta': float(edge_diag.beta[k]), + 'z_obs': float(edge_diag.z_obs[k]), + 'z_fit': ( + None if edge_diag.z_fit is None else float(edge_diag.z_fit[k]) + ), + 'algebraic_residual': ( + None + if edge_diag.residual is None + else float(edge_diag.residual[k]) + ), + 'edge_weight': float(edge_diag.edge_weight[k]), + } + ) + return tuple(rows) + + def to_report( + self, + constraints: PairBisectorConstraints, + *, + use_ids: bool = False, + ) -> dict[str, object]: + from .report import build_fit_report + + return build_fit_report(self, constraints, use_ids=use_ids) diff --git a/tests/test_powerfit_active_set.py b/tests/test_powerfit_active_set.py index 97b9d52..fbd817c 100644 --- a/tests/test_powerfit_active_set.py +++ b/tests/test_powerfit_active_set.py @@ -317,7 +317,8 @@ def test_self_consistent_solver_preserves_active_component_offsets_on_final_refi import pyvoro2.powerfit.active as active_mod from pyvoro2 import ActiveSetOptions, Box from pyvoro2.powerfit.realize import RealizedPairDiagnostics - from pyvoro2.powerfit.solver import PowerWeightFitResult, weights_to_radii + from pyvoro2.powerfit.transforms import weights_to_radii + from pyvoro2.powerfit.types import PowerWeightFitResult pts = np.array( [[0.0, 0.0, 0.0], [2.0, 0.0, 0.0], [10.0, 0.0, 0.0], [12.0, 0.0, 0.0]], diff --git a/tests/test_powerfit_problem_api.py b/tests/test_powerfit_problem_api.py new file mode 100644 index 0000000..2412e20 --- /dev/null +++ b/tests/test_powerfit_problem_api.py @@ -0,0 +1,164 @@ +import numpy as np + + +def test_build_power_fit_problem_exposes_resolved_numeric_problem(): + from pyvoro2.powerfit import ( + build_power_fit_problem, + resolve_pair_bisector_constraints, + ) + + pts = np.array([[0.0, 0.0, 0.0], [2.0, 0.0, 0.0]], dtype=float) + constraints = resolve_pair_bisector_constraints( + pts, + [(0, 1, 0.25)], + measurement='fraction', + ) + problem = build_power_fit_problem(constraints) + + assert np.allclose(problem.alpha, np.array([0.125])) + assert np.allclose(problem.beta, np.array([0.5])) + assert np.allclose(problem.z_obs, np.array([-2.0])) + assert np.allclose(problem.edge_weight, np.array([0.015625])) + assert np.allclose(problem.measurement_target, np.array([0.25])) + assert problem.suggested_anchor_indices == tuple() + + +def test_build_power_fit_problem_reports_advisory_anchors_for_disconnected_case(): + from pyvoro2.powerfit import ( + build_power_fit_problem, + resolve_pair_bisector_constraints, + ) + + pts = np.array( + [[0.0, 0.0, 0.0], [2.0, 0.0, 0.0], [10.0, 0.0, 0.0], [12.0, 0.0, 0.0]], + dtype=float, + ) + constraints = resolve_pair_bisector_constraints( + pts, + [(0, 1, 0.25), (2, 3, 0.75)], + measurement='fraction', + ) + problem = build_power_fit_problem(constraints) + + assert problem.connectivity.effective_graph.n_components == 2 + assert problem.suggested_anchor_indices == (0, 2) + + +def test_problem_definition_objects_are_read_only(): + from pyvoro2.powerfit import ( + FitModel, + Interval, + build_power_fit_problem, + resolve_pair_bisector_constraints, + ) + + pts = np.array([[0.0, 0.0, 0.0], [2.0, 0.0, 0.0]], dtype=float) + constraints = resolve_pair_bisector_constraints( + pts, + [(0, 1, 0.25)], + measurement='fraction', + ) + problem = build_power_fit_problem( + constraints, + model=FitModel(feasible=Interval(0.0, 1.0)), + ) + predictions = problem.predict(np.array([0.0, 2.0])) + + assert not constraints.i.flags.writeable + assert not problem.alpha.flags.writeable + assert not problem.bounds.measurement_lower.flags.writeable + assert not predictions.measurement.flags.writeable + + +def test_build_power_fit_result_round_trips_native_weights_and_reports_objective(): + from pyvoro2.powerfit import ( + build_power_fit_problem, + build_power_fit_result, + fit_power_weights, + resolve_pair_bisector_constraints, + ) + + pts = np.array([[0.0, 0.0, 0.0], [2.0, 0.0, 0.0]], dtype=float) + constraints = resolve_pair_bisector_constraints( + pts, + [(0, 1, 0.25)], + measurement='fraction', + ) + fit = fit_power_weights(pts, constraints) + problem = build_power_fit_problem(constraints) + rebuilt = build_power_fit_result(problem, fit.weights, solver='external-lbfgsb') + + assert np.allclose(rebuilt.weights, fit.weights) + assert np.allclose(rebuilt.predicted, fit.predicted) + assert rebuilt.objective_breakdown is not None + assert rebuilt.objective_breakdown.hard_constraints_satisfied is True + assert rebuilt.objective_breakdown.total == 0.0 + + +def test_build_power_fit_result_can_package_imperfect_external_weights(): + from pyvoro2.powerfit import ( + FitModel, + Interval, + build_power_fit_problem, + build_power_fit_result, + resolve_pair_bisector_constraints, + ) + + pts = np.array([[0.0, 0.0, 0.0], [2.0, 0.0, 0.0]], dtype=float) + constraints = resolve_pair_bisector_constraints( + pts, + [(0, 1, 0.25)], + measurement='fraction', + ) + problem = build_power_fit_problem( + constraints, + model=FitModel(feasible=Interval(0.0, 1.0)), + ) + result = build_power_fit_result( + problem, + np.array([0.0, 8.0]), + solver='external', + status='external_failure', + status_detail='line search failed', + ) + + assert result.objective_breakdown is not None + assert result.objective_breakdown.hard_constraints_satisfied is False + assert result.status == 'external_failure' + assert result.status_detail == 'line search failed' + assert any('hard measurement bounds' in warning for warning in result.warnings) + + +def test_fit_report_includes_objective_breakdown_and_status_detail(): + from pyvoro2.powerfit import ( + FitModel, + Interval, + build_fit_report, + build_power_fit_problem, + build_power_fit_result, + resolve_pair_bisector_constraints, + ) + + pts = np.array([[0.0, 0.0, 0.0], [2.0, 0.0, 0.0]], dtype=float) + constraints = resolve_pair_bisector_constraints( + pts, + [(0, 1, 0.25)], + measurement='fraction', + ) + problem = build_power_fit_problem( + constraints, + model=FitModel(feasible=Interval(0.0, 1.0)), + ) + result = build_power_fit_result( + problem, + np.array([0.0, 2.0]), + solver='external', + status='external_failure', + status_detail='test detail', + ) + + report = build_fit_report(result, constraints) + + assert report['summary']['status_detail'] == 'test detail' + assert report['objective_breakdown'] is not None + assert report['objective_breakdown']['hard_constraints_satisfied'] is True diff --git a/tests/test_powerfit_validation_regressions.py b/tests/test_powerfit_validation_regressions.py index a2d3382..b3ddf20 100644 --- a/tests/test_powerfit_validation_regressions.py +++ b/tests/test_powerfit_validation_regressions.py @@ -228,7 +228,7 @@ def boom(*args, **kwargs): def test_active_set_propagates_numerical_failure(monkeypatch): import pyvoro2.powerfit.active as active_mod from pyvoro2 import Box - from pyvoro2.powerfit.solver import PowerWeightFitResult + from pyvoro2.powerfit.types import PowerWeightFitResult pts = np.array([[0.0, 0.0, 0.0], [2.0, 0.0, 0.0]], dtype=float) domain = Box(((-5.0, 5.0), (-5.0, 5.0), (-5.0, 5.0)))