Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions dpsynth/local_mode/initialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,80 @@ def __call__(self, zcdp_rho: float, data: np.ndarray) -> ColumnMeasurement:
noisy_counts, (self.name,), stddev=sigma
)
return ColumnMeasurement(self.attribute, transform_fn, measurement)


@dataclasses.dataclass
class OpenSetCategoricalInitializer:
"""Mechanism that discovers and measures an open-set categorical domain.

Uses Gaussian Thresholding (Algorithm 2 from the DP-SIPS paper) to privately
select significant partitions from the data and simultaneously obtain noisy
counts for each discovered partition. The discovered partitions, together
with the attribute's default_value (used as a catch-all for undiscovered
values), form a CategoricalAttribute used for downstream synthesis.

Attributes:
name: Attribute name used as the clique key in the measurement.
attribute: The OpenSetCategoricalAttribute specifying the default value.
delta: Failure probability for the partition selection threshold.
rng: A numpy random number generator.
"""

name: str
attribute: domain.OpenSetCategoricalAttribute
delta: float
rng: np.random.Generator

def dp_event(self, zcdp_rho: float) -> dp_accounting.DpEvent:
"""Returns the DpEvent for the Gaussian Thresholding mechanism.

Args:
zcdp_rho: Total zCDP privacy budget.

Returns:
A single GaussianDpEvent.
"""
gdp_budget = accounting.zcdp_to_gdp(zcdp_rho)
sigma = 1.0 / np.sqrt(gdp_budget)
return dp_accounting.GaussianDpEvent(noise_multiplier=sigma)

def __call__(self, zcdp_rho: float, data: np.ndarray) -> ColumnMeasurement:
"""Returns a differentially private measurement of the given data.

Args:
zcdp_rho: Total zCDP privacy budget for partition selection.
data: 1D array of raw categorical values.

Returns:
A ColumnMeasurement containing the discovered CategoricalAttribute, the
encoding transform, and a LinearMeasurement with the noisy counts from
DP-SIPS. The last entry in the domain is the default_value catch-all
whose count is not measured (set to zero in the measurement).
"""
# Map raw values to integer partition IDs for thresholding.
unique_values, inverse = np.unique(data, return_inverse=True)
gdp_budget = accounting.zcdp_to_gdp(zcdp_rho)
selected_ids, counts, stddev = (
primitives.select_partitions_gaussian_thresholding(
self.rng, inverse, gdp_budget, self.delta
)
)
selected_values = list(unique_values[selected_ids])

# Build the discovered domain: default first, then selected values.
possible_values = [self.attribute.default_value] + selected_values
cat_attr = domain.CategoricalAttribute(
possible_values=possible_values,
out_of_domain_index=0,
)
transform_fn = transformations.discrete_encoder(cat_attr)

# The measurement covers only the discovered partitions (indices 1:),
# not the unmeasured default at index 0.
measurement = mbi.LinearMeasurement(
counts,
(self.name,),
stddev=stddev,
query=lambda x: x[1:],
)
return ColumnMeasurement(cat_attr, transform_fn, measurement)
56 changes: 56 additions & 0 deletions dpsynth/local_mode/primitives.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,62 @@ def _get_threshold(delta, sigma, max_part):
return thresholds.max()


def select_partitions_gaussian_thresholding(
rng: np.random.Generator,
data: np.ndarray,
gdp_budget: float,
delta: float,
) -> tuple[np.ndarray, np.ndarray, float]:
"""Selects partitions using Gaussian Thresholding (Weighted Gaussian).

This implements Algorithm 2 from the DP-SIPS paper (Swanberg et al., 2023)
under item-level DP. It is the simplest partition selection mechanism:

1. Compute the histogram of partition counts.
2. Add Gaussian noise calibrated to the privacy budget.
3. Return partitions whose noisy count exceeds a threshold chosen to
bound the false-positive probability per empty partition at delta.

Under item-level DP each record is treated as a distinct user contributing
to exactly one partition, so the histogram has L2 sensitivity 1. The
threshold is T = 1 + sigma * Phi^{-1}(1 - delta), following the paper's
formula with max_part = 1.

Args:
rng: A numpy random number generator.
data: 1D array of integers, where each element is a partition ID.
gdp_budget: Privacy budget in terms of squared Gaussian DP mu parameter
(gdp_budget = mu^2 = 1 / sigma^2).
delta: Failure probability (false positive bound per empty partition).

Returns:
A tuple containing:
- selected_partitions: 1D array of partition IDs that passed the
threshold.
- estimated_counts: 1D array of noisy counts for each selected
partition.
- sigma: The standard deviation of the Gaussian noise added.
"""
if gdp_budget <= 0 or delta <= 0:
raise ValueError(f"{gdp_budget=} and {delta=} must be positive.")

sigma = 1.0 / np.sqrt(gdp_budget)

if data.size == 0:
return np.empty(0, dtype=data.dtype), np.empty(0, dtype=float), sigma

unique_parts, counts = np.unique(data, return_counts=True)
noisy_counts = counts + rng.normal(scale=sigma, size=counts.size)

# Threshold: ensures that an empty partition (true count 0) passes with
# probability at most delta. For max_part=1 this simplifies to:
# T = 1/sqrt(1) + sigma * Phi^{-1}(1 - delta) = 1 + sigma * ppf(1-delta)
threshold = 1.0 + sigma * scipy.stats.norm.ppf(1.0 - delta)
passed = noisy_counts >= threshold

return unique_parts[passed], noisy_counts[passed], sigma


def select_partitions_sips(
rng: np.random.Generator,
data: np.ndarray,
Expand Down
63 changes: 63 additions & 0 deletions tests/local_mode/initialization_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,68 @@ def test_out_of_domain_values(self):
)


class OpenSetCategoricalInitializerTest(absltest.TestCase):

def test_dp_event(self):
attr = domain.OpenSetCategoricalAttribute(default_value=None)
rng = np.random.default_rng(0)
initializer = initialization.OpenSetCategoricalInitializer(
name='test', attribute=attr, delta=1e-5, rng=rng
)
event = initializer.dp_event(zcdp_rho=0.5)
self.assertIsInstance(event, dp_accounting.GaussianDpEvent)

def test_call_noiseless(self):
attr = domain.OpenSetCategoricalAttribute(default_value=None)
rng = np.random.default_rng(42)
initializer = initialization.OpenSetCategoricalInitializer(
name='col', attribute=attr, delta=1e-5, rng=rng
)
# 'A' appears 100 times, 'B' 50, 'C' 1 (rare).
data = np.array(['A'] * 100 + ['B'] * 50 + ['C'] * 1)
result = initializer(zcdp_rho=np.inf, data=data)

self.assertIsInstance(result, initialization.ColumnMeasurement)
self.assertIsNotNone(result.measurement)
# With infinite budget, all values with count > 0 should be selected.
discovered = set(result.categorical_attribute.possible_values)
self.assertIn('A', discovered)
self.assertIn('B', discovered)
self.assertIn(None, discovered) # default value always present
# Default value is always first.
self.assertIsNone(result.categorical_attribute.possible_values[0])
self.assertEqual(result.categorical_attribute.out_of_domain_index, 0)

def test_undiscovered_values_map_to_default(self):
attr = domain.OpenSetCategoricalAttribute(default_value='OTHER')
rng = np.random.default_rng(0)
initializer = initialization.OpenSetCategoricalInitializer(
name='col', attribute=attr, delta=1e-5, rng=rng
)
data = np.array(['A'] * 100 + ['B'] * 50)
result = initializer(zcdp_rho=np.inf, data=data)

transform_fn = result.transform_fn
# Discovered values map to valid indices.
idx_a = transform_fn('A')
self.assertIsInstance(idx_a, int)
# Unknown value maps to the out-of-domain (default) index at 0.
self.assertEqual(result.categorical_attribute.out_of_domain_index, 0)
self.assertEqual(transform_fn('Z'), 0)

def test_empty_data(self):
attr = domain.OpenSetCategoricalAttribute(default_value=None)
rng = np.random.default_rng(0)
initializer = initialization.OpenSetCategoricalInitializer(
name='col', attribute=attr, delta=1e-5, rng=rng
)
data = np.array([], dtype=str)
result = initializer(zcdp_rho=np.inf, data=data)

# Only the default value should be in the domain.
self.assertEqual(result.categorical_attribute.possible_values, [None])
self.assertEqual(result.categorical_attribute.size, 1)


if __name__ == '__main__':
absltest.main()
64 changes: 64 additions & 0 deletions tests/local_mode/primitives_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,70 @@ def test_mismatched_user_ids_raises(self):
)


class SelectPartitionsGaussianThresholdingTest(absltest.TestCase):

def setUp(self):
super().setUp()
self.rng = np.random.default_rng(42)

def test_basic_operation(self):
data = np.array([1] * 50 + [2] * 5)
selected, counts, sigma = (
primitives.select_partitions_gaussian_thresholding(
self.rng, data, gdp_budget=10.0, delta=1e-5
)
)
self.assertIn(1, selected)
self.assertEqual(sigma, 1.0 / np.sqrt(10.0))
self.assertEqual(selected.size, counts.size)

def test_empty_data(self):
data = np.array([], dtype=int)
selected, counts, sigma = (
primitives.select_partitions_gaussian_thresholding(
self.rng, data, gdp_budget=1.0, delta=1e-5
)
)
self.assertEmpty(selected)
self.assertEmpty(counts)
self.assertEqual(sigma, 1.0)

def test_high_budget_selects_all(self):
data = np.array([1, 2, 3, 4, 5])
selected, _, _ = primitives.select_partitions_gaussian_thresholding(
self.rng, data, gdp_budget=1e6, delta=0.1
)
self.assertCountEqual(selected, [1, 2, 3, 4, 5])

def test_zero_budget_raises(self):
data = np.array([1, 2, 3])
with self.assertRaises(ValueError):
primitives.select_partitions_gaussian_thresholding(
self.rng, data, gdp_budget=-0.1, delta=1e-5
)
with self.assertRaises(ValueError):
primitives.select_partitions_gaussian_thresholding(
self.rng, data, gdp_budget=1.0, delta=-0.001
)

def test_rare_items_not_selected(self):
# One item with many occurrences, another with just 1.
# With moderate budget and tight delta, the rare item should be dropped.
data = np.array([1] * 100 + [2])
selected, _, _ = primitives.select_partitions_gaussian_thresholding(
self.rng, data, gdp_budget=0.5, delta=1e-6
)
self.assertIn(1, selected)
self.assertNotIn(2, selected)

def test_string_data_type(self):
data = np.array(["a", "b", "a", "a", "c", "a", "c"])
selected, _, _ = primitives.select_partitions_gaussian_thresholding(
self.rng, data, gdp_budget=10.0, delta=1e-5
)
self.assertTrue(all(isinstance(p, str) for p in selected))


class GaussianHistogramTest(absltest.TestCase):

def setUp(self):
Expand Down
Loading