Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,27 @@ import com.google.privacy.differentialprivacy.pipelinedp4j.core.ContributionBoun
* the privacy unit is present in any specific group is limited by the given Differential Privacy
* budget, but there are no guarantees for being able to understand whether the privacy unit is
* present in the whole dataset. This privacy guarantee for a given privacy unit is weaker than
* the one provided by the dataset contribution bounding level.
* the one provided by the dataset contribution bounding level, but stronger than the one
* provided by the record contribution bounding level.
*
* When you use the group contribution bounding level the effective privacy unit becomes
* (privacy_unit, group_key). It means that contributions to different groups from the same
* privacy unit are treated as if they were contributions from different privacy units (users).
* At the same time, contributions of the privacy unit to the same group are treated as
* contributions from the same privacy unit (user) and therefore are bounded.
* 3. @property RECORD_LEVEL
*
* No contribution bounding is performed.
*
* From privacy point of view this contribution bounding level protects the privacy unit only
* within the record. It means that the possibility of understanding whether the privacy unit is
* present in the record is limited by the given Differential Privacy budget, but there are no
* guarantees for being able to understand whether the privacy unit is present in any group or
* the whole dataset.
*
* When you use the record contribution bounding level the effective privacy unit becomes the
* whole record. It means that contributions to different records from the same privacy unit are
* treated as if they were contributions from different privacy units (users).
*/
sealed interface ContributionBoundingLevel {
data class DATASET_LEVEL(
Expand All @@ -75,18 +89,22 @@ sealed interface ContributionBoundingLevel {
require(maxContributionsPerGroup > 0) { "maxContributionsPerGroup must be positive" }
}
}

class RECORD_LEVEL : ContributionBoundingLevel
}

internal fun ContributionBoundingLevel.getMaxPartitionsContributed() =
when (this) {
is ContributionBoundingLevel.DATASET_LEVEL -> this.maxGroupsContributed
is ContributionBoundingLevel.GROUP_LEVEL -> 1
is ContributionBoundingLevel.RECORD_LEVEL -> 1
}

internal fun ContributionBoundingLevel.getMaxContributionsPerPartition() =
when (this) {
is ContributionBoundingLevel.DATASET_LEVEL -> this.maxContributionsPerGroup
is ContributionBoundingLevel.GROUP_LEVEL -> this.maxContributionsPerGroup
is ContributionBoundingLevel.RECORD_LEVEL -> 1
}

/**
Expand All @@ -99,4 +117,5 @@ internal fun ContributionBoundingLevel.toInternalContributionBoundingLevel() =
when (this) {
is ContributionBoundingLevel.DATASET_LEVEL -> InternalContributionBoundingLevel.DATASET_LEVEL
is ContributionBoundingLevel.GROUP_LEVEL -> InternalContributionBoundingLevel.PARTITION_LEVEL
is ContributionBoundingLevel.RECORD_LEVEL -> InternalContributionBoundingLevel.RECORD_LEVEL
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class CountCombiner(
contributions
.size()
.toLong()
.coerceInIfContributionBoundingEnabled(
.coerceInIfPerPartitionContributionBoundingEnabled(
0,
aggregationParams.maxContributionsPerPartition!!.toLong(),
aggregationParams,
Expand Down Expand Up @@ -410,8 +410,16 @@ class SumCombiner(
0.0
} else {
contributions.singleValueContributionsList
.map {
it.coerceInIfPerRecordContributionBoundingEnabled(
aggregationParams.minTotalValue!!,
aggregationParams.maxTotalValue!!,
aggregationParams,
executionMode,
)
}
.sum()
.coerceInIfContributionBoundingEnabled(
.coerceInIfPerPartitionContributionBoundingEnabled(
aggregationParams.minTotalValue!!,
aggregationParams.maxTotalValue!!,
aggregationParams,
Expand Down Expand Up @@ -656,7 +664,7 @@ class MeanCombiner(
normalizedSum =
contributions.singleValueContributionsList
.map {
it.coerceInIfContributionBoundingEnabled(
it.coerceInIfPerRecordOrPerPartitionContributionBoundingEnabled(
aggregationParams.minValue!!,
aggregationParams.maxValue!!,
aggregationParams,
Expand Down Expand Up @@ -713,8 +721,7 @@ class MeanCombiner(
* @property count the differentially private count (if requested).
*/
@Immutable
data class MeanCombinerResult(val mean: Double, val sum: Double?, val count: Double?) :
Serializable
data class MeanCombinerResult(val mean: Double, val sum: Double?, val count: Double?) : Serializable

/**
* A [Combiner] for the [MetricType.QUANTILES].
Expand Down Expand Up @@ -871,7 +878,7 @@ class VarianceCombiner(
varianceAccumulator {
val coercedValues =
contributions.singleValueContributionsList.map {
it.coerceInIfContributionBoundingEnabled(
it.coerceInIfPerRecordOrPerPartitionContributionBoundingEnabled(
aggregationParams.minValue!!,
aggregationParams.maxValue!!,
aggregationParams,
Expand Down Expand Up @@ -1140,7 +1147,7 @@ class CompoundCombiner(val combiners: Iterable<Combiner<*, *>>) :
* Clamp value to the range [minimumValue, maximumValue] if per partition contribution bounding is
* required.
*/
private fun <T : Comparable<T>> T.coerceInIfContributionBoundingEnabled(
private fun <T : Comparable<T>> T.coerceInIfPerPartitionContributionBoundingEnabled(
minimumValue: T,
maximumValue: T,
params: AggregationParams,
Expand All @@ -1154,6 +1161,44 @@ private fun <T : Comparable<T>> T.coerceInIfContributionBoundingEnabled(
}
}

/**
* Clamp value to the range [minimumValue, maximumValue] if per record contribution bounding is
* required.
*/
private fun <T : Comparable<T>> T.coerceInIfPerRecordContributionBoundingEnabled(
minimumValue: T,
maximumValue: T,
params: AggregationParams,
executionMode: ExecutionMode,
): T {
// Per-record bounding implies clamping.
return if (params.applyPerRecordBounding(executionMode)) {
coerceIn(minimumValue, maximumValue)
} else {
this
}
}

/**
* Clamp value to the range [minimumValue, maximumValue] if per record or per partition contribution
* bounding is required.
*/
private fun <T : Comparable<T>> T.coerceInIfPerRecordOrPerPartitionContributionBoundingEnabled(
minimumValue: T,
maximumValue: T,
params: AggregationParams,
executionMode: ExecutionMode,
): T {
// Per-record bounding implies clamping.
return if (
params.applyPerRecordBounding(executionMode) || params.applyPerPartitionBounding(executionMode)
) {
coerceIn(minimumValue, maximumValue)
} else {
this
}
}

private fun getNoisedCount(
count: Long,
aggregationParams: AggregationParams,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList
import com.google.errorprone.annotations.Immutable
import com.google.privacy.differentialprivacy.pipelinedp4j.core.ContributionBoundingLevel.DATASET_LEVEL
import com.google.privacy.differentialprivacy.pipelinedp4j.core.ContributionBoundingLevel.PARTITION_LEVEL
import com.google.privacy.differentialprivacy.pipelinedp4j.core.ContributionBoundingLevel.RECORD_LEVEL
import com.google.privacy.differentialprivacy.pipelinedp4j.core.MetricType.COUNT
import com.google.privacy.differentialprivacy.pipelinedp4j.core.MetricType.MEAN
import com.google.privacy.differentialprivacy.pipelinedp4j.core.MetricType.PRIVACY_ID_COUNT
Expand Down Expand Up @@ -59,6 +60,20 @@ sealed interface Params {
val preThreshold: Int
}

/**
* Whether per-record contribution bounding should be applied with respect to execution mode.
*
* It can be used outside of this class to determine whether per-record contribution bounding should
* be applied with respect to [contributionBoundingLevel] and [executionMode].
*
* This property should not be used for validation of [Params] because it accounts for the execution
* mode and validation should always be performed for the production mode only.
* [contributionBoundingLevel] represents the contribution bounding level that is used in production
* therefore only its values should be used for validation.
*/
fun Params.applyPerRecordBounding(executionMode: ExecutionMode): Boolean =
perRecordBoundingShouldBeApplied(executionMode, contributionBoundingLevel)

/**
* Whether per-partition contribution bounding should be applied with respect to execution mode.
*
Expand Down Expand Up @@ -87,6 +102,18 @@ fun Params.applyPerPartitionBounding(executionMode: ExecutionMode): Boolean =
fun Params.applyPartitionsContributedBounding(executionMode: ExecutionMode): Boolean =
partitionsContributedBoundingShouldBeApplied(executionMode, contributionBoundingLevel)

/**
* Determines whether per-record contribution bounding should be applied given [executionMode] and
* [contributionBoundingLevel].
*/
fun perRecordBoundingShouldBeApplied(
executionMode: ExecutionMode,
contributionBoundingLevel: ContributionBoundingLevel,
): Boolean =
executionMode.appliesContributionBounding &&
!contributionBoundingLevel.withContributionsPerPartitionBounding &&
!contributionBoundingLevel.withPartitionsContributedBounding

/**
* Determines whether per-partition contribution bounding should be applied given [executionMode]
* and [contributionBoundingLevel].
Expand Down Expand Up @@ -457,6 +484,11 @@ enum class ContributionBoundingLevel(
withPartitionsContributedBounding = false,
withContributionsPerPartitionBounding = true,
),
/** No contribution bounding is applied. */
RECORD_LEVEL(
withPartitionsContributedBounding = false,
withContributionsPerPartitionBounding = false,
),
}

/**
Expand Down Expand Up @@ -593,12 +625,15 @@ private fun validateBaseParams(params: Params) {
"maxPartitionsContributed must be less than ${MAX_PROCESSED_CONTRIBUTIONS_PER_PRIVACY_ID} " +
"Provided values: maxPartitionsContributed=${params.maxPartitionsContributed}."
}
// TODO: Add validation for record level - max contributions per partition must be 1.
// Contribution bounding level.
require(
params.contributionBoundingLevel != PARTITION_LEVEL ||
(params.contributionBoundingLevel == PARTITION_LEVEL && params.maxPartitionsContributed == 1)
(params.contributionBoundingLevel == PARTITION_LEVEL &&
params.maxPartitionsContributed == 1) ||
(params.contributionBoundingLevel == RECORD_LEVEL && params.maxPartitionsContributed == 1)
) {
"maxPartitionsContributed must be 1 if partition level contribution bounding is set. " +
"maxPartitionsContributed must be 1 if partition or record level contribution bounding is set. " +
"Provided value: ${params.maxPartitionsContributed}."
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,78 @@ class LocalApiTest {
assertEquals(result, expected)
}

@Test
fun run_recordLevelBounding_sumOnly_calculatesStatisticsCorrectly() {
val data =
createInputData(
listOf(
TestDataRow("group1", "pid1", 1.0),
TestDataRow("group1", "pid1", 3.0),
TestDataRow("group1", "pid2", 4.0),
)
)
val publicGroups = createPublicGroups(listOf("group1"))
val query =
LocalQueryBuilder.from(data, { it.privacyUnit }, ContributionBoundingLevel.RECORD_LEVEL())
.groupBy({ it.groupKey }, GroupsType.PublicGroups.create(publicGroups))
.aggregateValue(
{ it.value },
ValueAggregationsBuilder().sum("sumResult"),
ContributionBounds(totalValueBounds = Bounds(minValue = 0.0, maxValue = 3.0)),
)
.build(TotalBudget(epsilon = 1000.0), NoiseKind.LAPLACE)

val result: Sequence<QueryPerGroupResult<String>> = query.run()

val expected =
listOf(
QueryPerGroupResultWithTolerance(
"group1",
mapOf("sumResult" to DoubleWithTolerance(value = 7.0, tolerance = 0.5)),
vectorAggregationResults = mapOf(),
)
)
assertEquals(result, expected)
}

@Test
fun run_recordLevelBounding_meanAndVariance_calculatesStatisticsCorrectly() {
val data =
createInputData(
listOf(
TestDataRow("group1", "pid1", 1.0),
TestDataRow("group1", "pid1", 3.0),
TestDataRow("group1", "pid2", 7.0),
)
)
val publicGroups = createPublicGroups(listOf("group1"))
val query =
LocalQueryBuilder.from(data, { it.privacyUnit }, ContributionBoundingLevel.RECORD_LEVEL())
.groupBy({ it.groupKey }, GroupsType.PublicGroups.create(publicGroups))
.aggregateValue(
{ it.value },
ValueAggregationsBuilder().mean("meanResult").variance("varianceResult"),
ContributionBounds(valueBounds = Bounds(minValue = 0.0, maxValue = 3.0)),
)
.build(TotalBudget(epsilon = 1000.0), NoiseKind.LAPLACE)

val result: Sequence<QueryPerGroupResult<String>> = query.run()

val expected =
listOf(
QueryPerGroupResultWithTolerance(
"group1",
mapOf(
"meanResult" to DoubleWithTolerance(value = 2.33, tolerance = 0.5),
// (1^2+3^2+3^2)/3-((1.0+3.0+3.0)/3)^2 = 0.88
"varianceResult" to DoubleWithTolerance(value = 0.88, tolerance = 0.5),
),
vectorAggregationResults = mapOf(),
)
)
assertEquals(result, expected)
}

@Test
fun run_sumAndQuantiles_bothBoundTypesAreUsed_calculatesStatisticsCorrectly() {
val data =
Expand Down
Loading