From 7880ffb04123a10c348585c7c038918419cf28e3 Mon Sep 17 00:00:00 2001 From: Ryan McKenna Date: Fri, 12 Jun 2026 15:49:54 -0700 Subject: [PATCH] Define DPMechanism ABC and wrap local_mode primitives Introduces a `DPMechanism` abstract base class with `calibrate`, `dp_event`, and `__call__(rng, data)` methods. Each mechanism is parameterized by its natural privacy parameter. Wraps all local mode primitives and initializers as DPMechanism subclasses. PiperOrigin-RevId: 931374092 --- dpsynth/local_mode/initialization.py | 129 ++++++------ dpsynth/local_mode/primitives.py | 251 ++++++++++++++++++------ tests/local_mode/initialization_test.py | 20 +- tests/local_mode/primitives_test.py | 153 ++++++++++++--- 4 files changed, 398 insertions(+), 155 deletions(-) diff --git a/dpsynth/local_mode/initialization.py b/dpsynth/local_mode/initialization.py index e99db09..94d3e0f 100644 --- a/dpsynth/local_mode/initialization.py +++ b/dpsynth/local_mode/initialization.py @@ -14,12 +14,14 @@ """Utilities for measuring and integer-encoding single columns.""" +from __future__ import annotations + import dataclasses +import functools import dp_accounting from dpsynth import domain from dpsynth import transformations -from dpsynth.discrete_mechanisms import accounting from dpsynth.local_mode import primitives import mbi import numpy as np @@ -33,34 +35,47 @@ class ColumnMeasurement: @dataclasses.dataclass -class NumericalInitializer: - """Mechanism that creates the data encoding transform for numerical data.""" +class NumericalInitializer(primitives.DPMechanism): + """Mechanism that creates the data encoding transform for numerical data. + + Internally delegates to a ``DPQuantiles`` mechanism for privacy accounting + and quantile computation. + + Attributes: + name: Attribute name used as the clique key in the measurement. + num_partitions: Number of quantile partitions (must be a power of 2). + attribute: The NumericalAttribute defining the data domain. + """ name: str num_partitions: int attribute: domain.NumericalAttribute - rng: np.random.Generator - - def dp_event(self, zcdp_rho: float) -> dp_accounting.DpEvent: - levels = int(np.log2(self.num_partitions)) - budget_weights = 4 ** np.arange(levels)[::-1] - rho_levels = zcdp_rho * budget_weights / budget_weights.sum() - epsilons = [accounting.zcdp_exponential_eps(rho) for rho in rho_levels] - - return dp_accounting.ComposedDpEvent( - [dp_accounting.ExponentialMechanismDpEvent(epsilon=e) for e in epsilons] - ) - - def __call__(self, zcdp_rho: float, data: np.ndarray) -> ColumnMeasurement: - """Returns a differentially private measurement of the given data.""" - bucket_edges = primitives.quantiles( - self.rng, - data, - self.attribute.min_value, - self.attribute.max_value, - self.num_partitions, - zcdp_rho, - ) + _zcdp_rho: float | None = dataclasses.field(default=None, repr=False) + + @functools.cached_property + def _mechanism(self) -> primitives.DPQuantiles: + if self._zcdp_rho is None: + raise ValueError('Must call calibrate() before using the mechanism.') + return primitives.DPQuantiles( + lower=self.attribute.min_value, + upper=self.attribute.max_value, + num_partitions=self.num_partitions, + ).calibrate(zcdp_rho=self._zcdp_rho) + + def calibrate(self, *, zcdp_rho: float) -> NumericalInitializer: + """Returns a copy calibrated to the given zCDP budget.""" + return dataclasses.replace(self, _zcdp_rho=zcdp_rho) + + @property + def dp_event(self) -> dp_accounting.DpEvent: + """Returns the composed privacy event for the quantile computation.""" + return self._mechanism.dp_event + + def __call__( + self, rng: np.random.Generator, data: np.ndarray + ) -> ColumnMeasurement: + """Returns a ColumnMeasurement with the discretization transform.""" + bucket_edges = self._mechanism(rng, data) attr, discretize_fn = transformations.create_discretize_transformation( self.attribute, bucket_edges ) @@ -69,54 +84,46 @@ def __call__(self, zcdp_rho: float, data: np.ndarray) -> ColumnMeasurement: @dataclasses.dataclass -class CategoricalInitializer: +class CategoricalInitializer(primitives.DPMechanism): """Mechanism that measures a noisy histogram for categorical data. - Computes a closed-domain histogram over the pre-specified categories using - the Gaussian mechanism. Values not in the domain are mapped to the - attribute's designated out-of-domain value before histogramming. + Internally delegates to a ``DPGaussianHistogram`` mechanism for privacy + accounting and noise addition. Attributes: name: Attribute name used as the clique key in the measurement. attribute: The CategoricalAttribute defining the closed domain. - rng: A numpy random number generator. """ name: str attribute: domain.CategoricalAttribute - rng: np.random.Generator - - def dp_event(self, zcdp_rho: float) -> dp_accounting.DpEvent: - """Returns the DpEvent for the Gaussian mechanism. - - Args: - zcdp_rho: Total zCDP privacy budget. - - Returns: - A GaussianDpEvent describing the privacy cost. - """ - # Gaussian mechanism with L2 sensitivity 1: rho = 1 / (2 * sigma^2). - sigma = 1.0 / np.sqrt(2.0 * zcdp_rho) - return dp_accounting.GaussianDpEvent(noise_multiplier=sigma) - - def __call__(self, zcdp_rho: float, data: np.ndarray) -> ColumnMeasurement: - """Returns a differentially private measurement of the given data. - - Args: - zcdp_rho: Total zCDP privacy budget for the histogram measurement. - data: 1D array of raw categorical values. - - Returns: - A ColumnMeasurement containing the categorical attribute, the encoding - transform, and a LinearMeasurement with the noisy histogram. - """ - sigma = 1.0 / np.sqrt(2.0 * zcdp_rho) + _zcdp_rho: float | None = dataclasses.field(default=None, repr=False) + + @functools.cached_property + def _mechanism(self) -> primitives.DPGaussianHistogram: + if self._zcdp_rho is None: + raise ValueError('Must call calibrate() before using the mechanism.') + return primitives.DPGaussianHistogram( + domain_size=self.attribute.size, + ).calibrate(zcdp_rho=self._zcdp_rho) + + def calibrate(self, *, zcdp_rho: float) -> CategoricalInitializer: + """Returns a copy calibrated to the given zCDP budget.""" + return dataclasses.replace(self, _zcdp_rho=zcdp_rho) + + @property + def dp_event(self) -> dp_accounting.DpEvent: + """Returns the Gaussian privacy event for this mechanism.""" + return self._mechanism.dp_event + + def __call__( + self, rng: np.random.Generator, data: np.ndarray + ) -> ColumnMeasurement: + """Returns a ColumnMeasurement with the noisy histogram.""" transform_fn = transformations.discrete_encoder(self.attribute) encoded = np.array([transform_fn(v) for v in data]) - noisy_counts = primitives.gaussian_histogram( - self.rng, encoded, self.attribute.size, sigma - ) + noisy_counts = self._mechanism(rng, encoded) measurement = mbi.LinearMeasurement( - noisy_counts, (self.name,), stddev=sigma + noisy_counts, (self.name,), stddev=self._mechanism.sigma ) return ColumnMeasurement(self.attribute, transform_fn, measurement) diff --git a/dpsynth/local_mode/primitives.py b/dpsynth/local_mode/primitives.py index 644181f..b0a9f81 100644 --- a/dpsynth/local_mode/primitives.py +++ b/dpsynth/local_mode/primitives.py @@ -18,19 +18,81 @@ operations for efficiency in single-machine environments. """ +from __future__ import annotations + +import abc +import dataclasses +import math +from typing import Any + +import dp_accounting import numpy as np import scipy.special import scipy.stats -def median( +class DPMechanism(abc.ABC): + """Abstract base class for differentially private mechanisms. + + A DPMechanism encapsulates a randomized algorithm that satisfies differential + privacy. Usage follows a two-phase pattern: + + 1. **Configure**: Create the mechanism with algorithm-specific parameters. + 2. **Calibrate**: Call ``calibrate(zcdp_rho=...)`` to bind a privacy budget, + returning a new mechanism instance whose natural privacy parameter (e.g., + Gaussian sigma, exponential mechanism epsilon) has been set accordingly. + 3. **Run**: Call the calibrated mechanism on data via ``__call__``. + + Subclasses should be parameterized by their **natural** privacy parameter + (e.g., ``sigma`` for the Gaussian mechanism, ``epsilon`` for the exponential + mechanism). The ``calibrate`` method converts from the universal zCDP budget + to the mechanism's natural parameter. + + The ``dp_event`` property returns the ``dp_accounting.DpEvent`` characterizing + the privacy cost of the calibrated mechanism. + """ + + @abc.abstractmethod + def calibrate(self, *, zcdp_rho: float) -> DPMechanism: + """Returns a new mechanism calibrated to the given zCDP budget. + + Converts the zCDP budget ``rho`` into the mechanism's natural privacy + parameter and returns a new instance with that parameter set. + + Args: + zcdp_rho: The zCDP privacy budget (rho). + + Returns: + A new DPMechanism instance calibrated to the given budget. + """ + + @property + @abc.abstractmethod + def dp_event(self) -> dp_accounting.DpEvent: + """The DpEvent characterizing the privacy cost of this mechanism.""" + + @abc.abstractmethod + def __call__(self, rng: Any, data: Any) -> Any: + """Runs the mechanism on the given data. + + Args: + rng: A source of randomness (e.g., ``np.random.Generator``). + data: The input data to the mechanism. + """ + + +_UNCALIBRATED_MSG = ( + '{param} has not been set. Set it directly or call calibrate().' +) + + +def _median( rng: np.random.Generator, data: np.ndarray, lower: float, upper: float, - zcdp_rho: float, + epsilon: float, jitter_multiple: float = 1e-4, - num_examples_per_user: int = 1, ) -> float: """Computes a differentially private median using the exponential mechanism. @@ -38,29 +100,24 @@ def median( the intervals between sorted data points. The utility of an interval is based on the distance of its rank from N/2. - This mechanism is an instance of the exponential mechanism with parameter - epsilon = sqrt(8 * zcdp_rho) and sensitivity = num_examples_per_user. - Args: rng: A numpy random number generator. data: 1D array of numerical data. lower: Lower bound for the data. upper: Upper bound for the data. - zcdp_rho: Total zCDP privacy budget for the median call. + epsilon: Exponential mechanism privacy parameter. jitter_multiple: Multiplier for the jitter scale, relative to upper-lower. - num_examples_per_user: Number of examples per user. If provided, this - mechanism satisfies user-level DP. Returns: A differentially private median estimate. """ if lower > upper: - raise ValueError(f"{lower=} cannot be greater than {upper=}.") + raise ValueError(f'{lower=} cannot be greater than {upper=}.') clamped_data = np.clip(data, lower, upper) n = clamped_data.size - if zcdp_rho == np.inf: + if epsilon == np.inf: if n == 0: return (lower + upper) / 2 return float(np.median(clamped_data)) @@ -78,13 +135,8 @@ def median( ranks = np.arange(n + 1) utilities = -np.abs(ranks - n / 2) - # Convert zCDP rho to exponential mechanism parameter. - epsilon = np.sqrt(8 * zcdp_rho) - sensitivity = num_examples_per_user - alpha = epsilon / sensitivity - # Compute output probabilities for each interval. - probs = scipy.special.softmax(np.log(lengths) + alpha * utilities) + probs = scipy.special.softmax(np.log(lengths) + epsilon * utilities) # Sample an interval index, and a value uniformly from the interval. interval_idx = rng.choice(n + 1, p=probs) @@ -93,62 +145,68 @@ def median( return rng.uniform(v_min, v_max) -def quantiles( +def _quantile_epsilon_levels( + zcdp_rho: float, num_partitions: int +) -> np.ndarray: + """Computes per-level exponential mechanism epsilons for DP quantiles. + + The budget is split so that each level receives noise proportional to + the data size at that level. Concretely, if there are L levels the + per-level zCDP budgets satisfy rho_i = 4 * rho_{i+1} (deeper levels get + less budget) and sum to ``zcdp_rho``. Each rho is then converted to an + exponential mechanism epsilon via ``epsilon = sqrt(8 * rho)``. + + Args: + zcdp_rho: Total zCDP privacy budget. + num_partitions: Number of partitions (must be a power of 2). + + Returns: + A length ``log2(num_partitions)`` array of per-level epsilons, ordered + from the deepest (finest) level to the shallowest (coarsest). + """ + levels = int(np.log2(num_partitions)) + if levels == 0: + return np.array([]) + budget_weights = 4 ** np.arange(levels)[::-1] + rho_levels = zcdp_rho * budget_weights / budget_weights.sum() + return np.sqrt(8 * rho_levels) + + +def _quantiles( rng: np.random.Generator, data: np.ndarray, lower: float, upper: float, - num_partitions: int, - zcdp_rho: float, - num_examples_per_user: int = 1, + epsilon_levels: np.ndarray, ) -> list[float]: """Computes uniformly spaced differentially private quantiles. - This function is a log2(num_partitions) composition of the exponential - mechanism where the fraction of the total zCDP budget assigned to each level - is proportional to 0.25^level. + This function is a ``len(epsilon_levels)``-level composition of the + exponential mechanism. The number of partitions is inferred as + ``2 ** len(epsilon_levels)``. Args: rng: A numpy random number generator. data: 1D array of numerical data. lower: Lower bound for the data. upper: Upper bound for the data. - num_partitions: Number of partitions (n) to compute (must be a power of 2). - This function computes n-1 quantiles for [k, 2*k, ..., (n-1)*k] where k = - 1/n, corresponding to the set of n intervals [lower, k), [k, 2k), ..., - [k*(n-1), upper). - zcdp_rho: Total zCDP privacy budget for the quantiles call. - num_examples_per_user: Number of examples per user. If provided, this - mechanism satisfies user-level DP. + epsilon_levels: Per-level exponential mechanism epsilons, as returned by + ``_quantile_epsilon_levels``. Returns: - A length (num_partitions-1) sorted list of private quantile estimates. + A list of ``2 ** len(epsilon_levels) - 1`` sorted private quantile + estimates. """ - if num_partitions <= 0 or (num_partitions & (num_partitions - 1)) != 0: - raise ValueError(f"num_buckets ({num_partitions}) must be a power of 2.") - - if num_examples_per_user != 1: - # It is not obvious if the parallel composition logic holds below when users - # may contribute a subset of their data to multiple partitions. - raise ValueError(f"{num_examples_per_user=} is not currently supported.") - - levels = int(np.log2(num_partitions)) + levels = len(epsilon_levels) if levels == 0: return [] - # Split the budget so that each level gets noise proportional to data size. - # rho_1 + ... + rho_levels = rho - # rho_i = 4 * rho_{i+1} - - budget_weights = 4 ** np.arange(levels)[::-1] - rho_levels = zcdp_rho * budget_weights / budget_weights.sum() - def quantiles_rec(current_data, curr_lower, curr_upper, current_depth): if current_depth == 0: return [] - rho_level = rho_levels[current_depth - 1] - med = median(rng, current_data, curr_lower, curr_upper, rho_level) + eps = epsilon_levels[current_depth - 1] + med = _median(rng, current_data, curr_lower, curr_upper, eps) left_mask = current_data <= med left_data = current_data[left_mask] @@ -171,7 +229,7 @@ def _contribution_bound(prng, user_ids, max_part): diff = np.r_[True, sorted_ids[1:] != sorted_ids[:-1]] kernel = np.ones(max_part, dtype=bool) # This convolution determines if any of previous max_part elements are True. - mask = np.convolve(diff, kernel, mode="full")[: user_ids.size] + mask = np.convolve(diff, kernel, mode='full')[: user_ids.size] return idx[mask] @@ -182,7 +240,7 @@ def _get_threshold(delta, sigma, max_part): return thresholds.max() -def select_partitions_sips( +def _select_partitions_sips( rng: np.random.Generator, data: np.ndarray, gdp_budget: float, @@ -221,9 +279,9 @@ def select_partitions_sips( if num_rounds is None: num_rounds = 1 if user_ids is None else 3 if num_rounds <= 0: - raise ValueError(f"num_rounds ({num_rounds}) must be greater than 0.") + raise ValueError(f'num_rounds ({num_rounds}) must be greater than 0.') if gdp_budget <= 0 or delta <= 0: - raise ValueError(f"{gdp_budget=} and {delta=} must be positive.") + raise ValueError(f'{gdp_budget=} and {delta=} must be positive.') fractions = allocation_factor ** np.arange(num_rounds)[::-1] fractions /= fractions.sum() @@ -237,7 +295,7 @@ def select_partitions_sips( if user_ids is None: user_ids = np.arange(data.size) if user_ids.size != data.size: - raise ValueError("user_ids must have the same size as data.") + raise ValueError('user_ids must have the same size as data.') combined = np.stack((user_ids, data), axis=1) unique_combined = np.unique(combined, axis=0) @@ -282,7 +340,7 @@ def select_partitions_sips( return selected_partitions, selected_counts, max_sigma -def gaussian_histogram( +def _gaussian_histogram( rng: np.random.Generator, data: np.ndarray, domain_size: int, @@ -306,3 +364,84 @@ def gaussian_histogram( return np.bincount(data, minlength=domain_size) + rng.normal( scale=sigma, size=domain_size ) + + +# --------------------------------------------------------------------------- +# DPMechanism subclasses +# --------------------------------------------------------------------------- + + +@dataclasses.dataclass +class DPQuantiles(DPMechanism): + """Differentially private quantiles via composed exponential mechanisms. + + This is a ``log2(num_partitions)``-level composition of the exponential + mechanism. The natural privacy parameter is ``zcdp_rho`` (the total zCDP + budget) since the mechanism internally splits it across levels. + + Attributes: + lower: Lower bound for the data domain. + upper: Upper bound for the data domain. + num_partitions: Number of partitions (must be a power of 2). + zcdp_rho: Total zCDP budget. Set directly or via ``calibrate``. + """ + + lower: float + upper: float + num_partitions: int + zcdp_rho: float | None = None + + def calibrate(self, *, zcdp_rho: float) -> DPQuantiles: + """Returns a copy calibrated to the given zCDP budget.""" + return dataclasses.replace(self, zcdp_rho=zcdp_rho) + + @property + def dp_event(self) -> dp_accounting.DpEvent: + """Returns the composed privacy event for this mechanism.""" + if self.zcdp_rho is None: + raise ValueError(_UNCALIBRATED_MSG.format(param='zcdp_rho')) + eps_levels = _quantile_epsilon_levels(self.zcdp_rho, self.num_partitions) + return dp_accounting.ComposedDpEvent([ + dp_accounting.ExponentialMechanismDpEvent(epsilon=float(eps)) + for eps in eps_levels + ]) + + def __call__(self, rng: np.random.Generator, data: np.ndarray) -> list[float]: + """Computes differentially private quantiles.""" + if self.zcdp_rho is None: + raise ValueError(_UNCALIBRATED_MSG.format(param='zcdp_rho')) + eps_levels = _quantile_epsilon_levels(self.zcdp_rho, self.num_partitions) + return _quantiles(rng, data, self.lower, self.upper, eps_levels) + + +@dataclasses.dataclass +class DPGaussianHistogram(DPMechanism): + """Differentially private histogram via the Gaussian mechanism. + + The natural privacy parameter is ``sigma``, the noise standard deviation. + The conversion from zCDP is ``sigma = sqrt(0.5 / zcdp_rho)``. + + Attributes: + domain_size: Number of categories in the histogram domain. + sigma: Gaussian noise standard deviation. Set directly or via ``calibrate``. + """ + + domain_size: int + sigma: float | None = None + + def calibrate(self, *, zcdp_rho: float) -> DPGaussianHistogram: + """Returns a copy with sigma derived from the zCDP budget.""" + return dataclasses.replace(self, sigma=math.sqrt(0.5 / zcdp_rho)) + + @property + def dp_event(self) -> dp_accounting.DpEvent: + """Returns the Gaussian privacy event for this mechanism.""" + if self.sigma is None: + raise ValueError(_UNCALIBRATED_MSG.format(param='sigma')) + return dp_accounting.GaussianDpEvent(noise_multiplier=self.sigma) + + def __call__(self, rng: np.random.Generator, data: np.ndarray) -> np.ndarray: + """Computes a differentially private histogram.""" + if self.sigma is None: + raise ValueError(_UNCALIBRATED_MSG.format(param='sigma')) + return _gaussian_histogram(rng, data, self.domain_size, self.sigma) diff --git a/tests/local_mode/initialization_test.py b/tests/local_mode/initialization_test.py index 0807c43..f8c0e20 100644 --- a/tests/local_mode/initialization_test.py +++ b/tests/local_mode/initialization_test.py @@ -25,9 +25,9 @@ def test_numerical_initializer_dp_event(self): attr = domain.NumericalAttribute(min_value=0, max_value=10) rng = np.random.default_rng(0) initializer = initialization.NumericalInitializer( - name='test', num_partitions=4, attribute=attr, rng=rng + name='test', num_partitions=4, attribute=attr ) - event = initializer.dp_event(1.0) + event = initializer.calibrate(zcdp_rho=1.0).dp_event self.assertIsInstance(event, dp_accounting.ComposedDpEvent) self.assertLen(event.events, 2) for e in event.events: @@ -37,13 +37,13 @@ def test_numerical_initializer_call(self): attr = domain.NumericalAttribute(min_value=0, max_value=10) rng = np.random.default_rng(0) initializer = initialization.NumericalInitializer( - name='test', num_partitions=4, attribute=attr, rng=rng + name='test', num_partitions=4, attribute=attr ) data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9]) # Level 0 median [1..9] --> 5 # Level 1 medians: [1..5] --> 3, [6..9] --> 7.5 - measurement = initializer(np.inf, data) + measurement = initializer.calibrate(zcdp_rho=np.inf)(rng, data) self.assertIsInstance(measurement, initialization.ColumnMeasurement) self.assertEqual(measurement.categorical_attribute.size, 4) @@ -62,9 +62,9 @@ def test_dp_event(self): attr = domain.CategoricalAttribute(possible_values=['A', 'B', 'C']) rng = np.random.default_rng(0) initializer = initialization.CategoricalInitializer( - name='test', attribute=attr, rng=rng + name='test', attribute=attr ) - event = initializer.dp_event(zcdp_rho=0.5) + event = initializer.calibrate(zcdp_rho=0.5).dp_event self.assertIsInstance(event, dp_accounting.GaussianDpEvent) # rho = 0.5 => sigma = 1/sqrt(2*0.5) = 1.0 self.assertEqual(event.noise_multiplier, 1.0) @@ -73,10 +73,10 @@ def test_call_noiseless(self): attr = domain.CategoricalAttribute(possible_values=['A', 'B', 'C']) rng = np.random.default_rng(0) initializer = initialization.CategoricalInitializer( - name='col', attribute=attr, rng=rng + name='col', attribute=attr ) data = np.array(['A', 'A', 'B', 'C', 'C', 'C']) - result = initializer(zcdp_rho=np.inf, data=data) + result = initializer.calibrate(zcdp_rho=np.inf)(rng, data) self.assertIsInstance(result, initialization.ColumnMeasurement) self.assertEqual(result.categorical_attribute, attr) @@ -93,10 +93,10 @@ def test_out_of_domain_values(self): ) rng = np.random.default_rng(0) initializer = initialization.CategoricalInitializer( - name='col', attribute=attr, rng=rng + name='col', attribute=attr ) data = np.array(['X', 'Y', 'Z', 'W']) - result = initializer(zcdp_rho=np.inf, data=data) + result = initializer.calibrate(zcdp_rho=np.inf)(rng, data) # 'Z' and 'W' are OOD, mapped to index 0 (None). np.testing.assert_array_equal( diff --git a/tests/local_mode/primitives_test.py b/tests/local_mode/primitives_test.py index 5f68750..c47e18d 100644 --- a/tests/local_mode/primitives_test.py +++ b/tests/local_mode/primitives_test.py @@ -16,6 +16,7 @@ from absl.testing import absltest from absl.testing import parameterized +import dp_accounting from dpsynth.local_mode import primitives import numpy as np @@ -28,70 +29,77 @@ def setUp(self): def test_median_basic(self): data = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) - med = primitives.median(self.rng, data, lower=0.0, upper=10.0, zcdp_rho=100) + med = primitives._median( + self.rng, data, lower=0.0, upper=10.0, zcdp_rho=100 + ) self.assertBetween(med, 2.0, 4.5) def test_median_empty(self): data = np.array([]) - med = primitives.median(self.rng, data, lower=0.0, upper=10.0, zcdp_rho=1.0) + med = primitives._median(self.rng, data, lower=0.0, upper=10.0, epsilon=1.0) self.assertBetween(med, 0.0, 10.0) def test_quantiles_basic(self): data = np.array([1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]) - qs = primitives.quantiles( - self.rng, data, lower=0.0, upper=10.0, num_partitions=4, zcdp_rho=100.0 + eps_levels = primitives._quantile_epsilon_levels(100.0, 4) + qs = primitives._quantiles( + self.rng, data, lower=0.0, upper=10.0, epsilon_levels=eps_levels ) self.assertLen(qs, 3) self.assertTrue(all(qs[i] <= qs[i + 1] for i in range(len(qs) - 1))) def test_quantiles_empty(self): data = np.array([]) - qs = primitives.quantiles( - self.rng, data, lower=0.0, upper=10.0, num_partitions=4, zcdp_rho=1.0 + eps_levels = primitives._quantile_epsilon_levels(1.0, 4) + qs = primitives._quantiles( + self.rng, data, lower=0.0, upper=10.0, epsilon_levels=eps_levels ) self.assertLen(qs, 3) self.assertTrue(all(qs[i] <= qs[i + 1] for i in range(len(qs) - 1))) def test_median_with_duplicates(self): data = np.array([2.0, 2.0, 2.0, 2.0, 2.0, 5.0, 6.0]) - med = primitives.median(self.rng, data, lower=0.0, upper=10.0, zcdp_rho=100) + med = primitives._median(self.rng, data, lower=0.0, upper=10.0, epsilon=100) self.assertBetween(med, 1.5, 2.5) def test_median_with_out_of_bounds(self): data = np.array([-5.0, -2.0, 1.0, 2.0, 3.0, 12.0, 15.0]) - med = primitives.median(self.rng, data, lower=0.0, upper=10.0, zcdp_rho=100) + med = primitives._median(self.rng, data, lower=0.0, upper=10.0, epsilon=100) self.assertBetween(med, 1.0, 3.0) def test_quantiles_with_duplicates_and_clamping(self): data = np.array([-1.0, 1.0, 1.0, 1.0, 1.0, 10.0, 12.0]) - qs = primitives.quantiles( - self.rng, data, lower=0.0, upper=10.0, num_partitions=4, zcdp_rho=100 + eps_levels = primitives._quantile_epsilon_levels(100, 4) + qs = primitives._quantiles( + self.rng, data, lower=0.0, upper=10.0, epsilon_levels=eps_levels ) self.assertLen(qs, 3) self.assertTrue(all(qs[i] <= qs[i + 1] for i in range(len(qs) - 1))) def test_median_zcdp_rho_zero(self): data = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) - med = primitives.median(self.rng, data, lower=0.0, upper=10.0, zcdp_rho=0.0) + med = primitives._median(self.rng, data, lower=0.0, upper=10.0, epsilon=0.0) self.assertBetween(med, 0.0, 10.0) def test_median_zcdp_rho_inf(self): data = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) - med = primitives.median(self.rng, data, 0, 10, zcdp_rho=np.inf) + med = primitives._median(self.rng, data, 0, 10, epsilon=np.inf) self.assertEqual(med, 3.0) def test_quantiles_zcdp_rho_zero(self): data = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) - qs = primitives.quantiles( - self.rng, data, lower=0.0, upper=10.0, num_partitions=4, zcdp_rho=0.0 + eps_levels = primitives._quantile_epsilon_levels(0.0, 4) + qs = primitives._quantiles( + self.rng, data, lower=0.0, upper=10.0, epsilon_levels=eps_levels ) self.assertLen(qs, 3) self.assertTrue(all(0.0 <= q <= 10.0 for q in qs)) def test_quantiles_zcdp_rho_inf(self): data = np.array([1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]) - qs = primitives.quantiles( - self.rng, data, lower=0.0, upper=10.0, num_partitions=4, zcdp_rho=np.inf + eps_levels = primitives._quantile_epsilon_levels(np.inf, 4) + qs = primitives._quantiles( + self.rng, data, lower=0.0, upper=10.0, epsilon_levels=eps_levels ) self.assertLen(qs, 3) self.assertEqual(qs, [2.5, 4.0, 6.0]) @@ -105,7 +113,7 @@ def setUp(self): def test_basic_operation(self): data = np.array([1] * 50 + [2] * 5) - selected, counts, sigma = primitives.select_partitions_sips( + selected, counts, sigma = primitives._select_partitions_sips( self.rng, data, gdp_budget=10.0, delta=1e-5 ) self.assertIn(1, selected) @@ -114,7 +122,7 @@ def test_basic_operation(self): def test_empty_data(self): data = np.array([], dtype=int) - selected, counts, sigma = primitives.select_partitions_sips( + selected, counts, sigma = primitives._select_partitions_sips( self.rng, data, gdp_budget=1.0, delta=1e-5 ) self.assertEmpty(selected) @@ -123,7 +131,7 @@ def test_empty_data(self): def test_infinite_budget(self): data = np.array([1, 2, 3, 4, 5]) - selected, counts, sigma = primitives.select_partitions_sips( + selected, counts, sigma = primitives._select_partitions_sips( self.rng, data, gdp_budget=np.inf, delta=0.1 ) self.assertCountEqual(selected, [1, 2, 3, 4, 5]) @@ -133,17 +141,17 @@ def test_infinite_budget(self): def test_zero_budget_raises(self): data = np.array([1, 2, 3]) with self.assertRaises(ValueError): - primitives.select_partitions_sips( + primitives._select_partitions_sips( self.rng, data, gdp_budget=-0.1, delta=1e-5 ) with self.assertRaises(ValueError): - primitives.select_partitions_sips( + primitives._select_partitions_sips( self.rng, data, gdp_budget=1.0, delta=-0.001 ) def test_string_data_type(self): data = np.array(["a", "b", "a", "c"]) - selected, _, _ = primitives.select_partitions_sips( + selected, _, _ = primitives._select_partitions_sips( self.rng, data, gdp_budget=10.0, delta=1e-5 ) self.assertTrue(all(isinstance(p, str) for p in selected)) @@ -151,7 +159,7 @@ def test_string_data_type(self): def test_user_level_dp_weighting(self): data = np.array([1] * 10 + [2]) user_ids = np.array([1] * 10 + [2]) - selected, counts, sigma = primitives.select_partitions_sips( + selected, counts, sigma = primitives._select_partitions_sips( self.rng, data, gdp_budget=100.0, delta=1e-5, user_ids=user_ids ) self.assertIn(1, selected) @@ -171,7 +179,7 @@ def test_configurations(self, user_ids, num_rounds): ( _, _, - ) = sigma = primitives.select_partitions_sips( + ) = sigma = primitives._select_partitions_sips( self.rng, data, gdp_budget=gdp_budget, @@ -185,7 +193,7 @@ def test_mismatched_user_ids_raises(self): data = np.array([1, 2, 3]) user_ids = np.array([1, 2]) with self.assertRaises(ValueError): - primitives.select_partitions_sips( + primitives._select_partitions_sips( self.rng, data, gdp_budget=10.0, delta=1e-5, user_ids=user_ids ) @@ -198,17 +206,106 @@ def setUp(self): def test_basic_operation(self): data = np.array([0, 0, 1, 1, 1, 2]) - result = primitives.gaussian_histogram(self.rng, data, 4, sigma=1.0) + result = primitives._gaussian_histogram(self.rng, data, 4, sigma=1.0) self.assertLen(result, 4) # Noisy counts should be close to true counts [2, 3, 1, 0]. np.testing.assert_allclose(result, [2, 3, 1, 0], atol=5.0) def test_zero_sigma(self): data = np.array([0, 0, 1, 2, 2, 2]) - result = primitives.gaussian_histogram(self.rng, data, 3, sigma=0.0) + result = primitives._gaussian_histogram(self.rng, data, 3, sigma=0.0) np.testing.assert_array_equal(result, [2, 1, 3]) def test_empty_data(self): data = np.array([], dtype=int) - result = primitives.gaussian_histogram(self.rng, data, 3, sigma=1.0) + result = primitives._gaussian_histogram(self.rng, data, 3, sigma=1.0) self.assertLen(result, 3) + + +# --------------------------------------------------------------------------- +# DPMechanism wrapper tests +# --------------------------------------------------------------------------- + + +class DPQuantilesTest(absltest.TestCase): + + def setUp(self): + super().setUp() + self.rng = np.random.default_rng(42) + + def test_calibrate_and_call(self): + mech = primitives.DPQuantiles(lower=0.0, upper=10.0, num_partitions=4) + calibrated = mech.calibrate(zcdp_rho=100.0) + data = np.array([1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]) + result = calibrated(self.rng, data) + self.assertLen(result, 3) + + def test_direct_zcdp_rho(self): + mech = primitives.DPQuantiles( + lower=0.0, upper=10.0, num_partitions=4, zcdp_rho=100.0 + ) + result = mech(self.rng, np.array([1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0])) + self.assertLen(result, 3) + + def test_dp_event_raises_before_calibration(self): + mech = primitives.DPQuantiles(lower=0.0, upper=10.0, num_partitions=4) + with self.assertRaises(ValueError): + _ = mech.dp_event + + def test_dp_event_type(self): + mech = primitives.DPQuantiles( + lower=0.0, upper=10.0, num_partitions=4 + ).calibrate(zcdp_rho=1.0) + event = mech.dp_event + self.assertIsInstance(event, dp_accounting.ComposedDpEvent) + self.assertLen(event.events, 2) # log2(4) = 2 levels + for e in event.events: + self.assertIsInstance(e, dp_accounting.ExponentialMechanismDpEvent) + + def test_dp_event_single_partition(self): + mech = primitives.DPQuantiles( + lower=0.0, upper=10.0, num_partitions=1 + ).calibrate(zcdp_rho=1.0) + event = mech.dp_event + self.assertIsInstance(event, dp_accounting.ComposedDpEvent) + self.assertEmpty(event.events) + + +class DPGaussianHistogramTest(absltest.TestCase): + + def setUp(self): + super().setUp() + self.rng = np.random.default_rng(42) + + def test_calibrate_and_call(self): + mech = primitives.DPGaussianHistogram(domain_size=4) + calibrated = mech.calibrate(zcdp_rho=0.5) + data = np.array([0, 0, 1, 1, 1, 2]) + result = calibrated(self.rng, data) + self.assertLen(result, 4) + np.testing.assert_allclose(result, [2, 3, 1, 0], atol=5.0) + + def test_direct_sigma(self): + mech = primitives.DPGaussianHistogram(domain_size=3, sigma=0.0) + data = np.array([0, 0, 1, 2, 2, 2]) + np.testing.assert_array_equal(mech(self.rng, data), [2, 1, 3]) + + def test_dp_event_raises_before_calibration(self): + mech = primitives.DPGaussianHistogram(domain_size=4) + with self.assertRaises(ValueError): + _ = mech.dp_event + + def test_call_raises_before_calibration(self): + mech = primitives.DPGaussianHistogram(domain_size=4) + with self.assertRaises(ValueError): + mech(self.rng, np.array([0, 1])) + + def test_dp_event_type(self): + mech = primitives.DPGaussianHistogram(domain_size=4).calibrate(zcdp_rho=0.5) + event = mech.dp_event + self.assertIsInstance(event, dp_accounting.GaussianDpEvent) + self.assertAlmostEqual(event.noise_multiplier, 1.0) + + +if __name__ == "__main__": + absltest.main()