Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,18 @@ def evaluate_pre_deterministic_guardrail(
self,
input_data: dict[str, Any],
guardrail: DeterministicGuardrail,
*,
agent_input: dict[str, Any] | None = None,
) -> GuardrailValidationResult:
"""Evaluate deterministic guardrail rules against input data (pre-execution)."""
"""Evaluate deterministic guardrail rules against input data (pre-execution).

Args:
input_data: Tool input data being validated.
guardrail: The deterministic guardrail to evaluate.
agent_input: The agent's validated input parameters. Available only
in pre-execution; rules with ``FieldSource.AGENT_INPUT`` resolve
their values from this dict.
"""
# Check if guardrail contains any output-dependent rules
has_output_rule = self._has_output_dependent_rule(guardrail, [ApplyTo.OUTPUT])

Expand All @@ -46,6 +56,7 @@ def evaluate_pre_deterministic_guardrail(
input_data=input_data,
output_data={},
guardrail=guardrail,
agent_input=agent_input,
)

@traced("evaluate_post_deterministic_guardrails", run_type="uipath")
Expand Down Expand Up @@ -116,6 +127,7 @@ def _evaluate_deterministic_guardrail(
input_data: dict[str, Any],
output_data: dict[str, Any],
guardrail: DeterministicGuardrail,
agent_input: dict[str, Any] | None = None,
) -> GuardrailValidationResult:
"""Evaluate deterministic guardrail rules against input and output data.

Expand All @@ -125,11 +137,17 @@ def _evaluate_deterministic_guardrail(

for rule in guardrail.rules:
if isinstance(rule, WordRule):
passed, reason = evaluate_word_rule(rule, input_data, output_data)
passed, reason = evaluate_word_rule(
rule, input_data, output_data, agent_input
)
elif isinstance(rule, NumberRule):
passed, reason = evaluate_number_rule(rule, input_data, output_data)
passed, reason = evaluate_number_rule(
rule, input_data, output_data, agent_input
)
elif isinstance(rule, BooleanRule):
passed, reason = evaluate_boolean_rule(rule, input_data, output_data)
passed, reason = evaluate_boolean_rule(
rule, input_data, output_data, agent_input
)
elif isinstance(rule, UniversalRule):
passed, reason = evaluate_universal_rule(rule, output_data)
else:
Expand Down
43 changes: 37 additions & 6 deletions packages/uipath-core/src/uipath/core/guardrails/_evaluators.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def get_fields_from_selector(
field_selector: AllFieldsSelector | SpecificFieldsSelector,
input_data: dict[str, Any],
output_data: dict[str, Any],
agent_input: dict[str, Any] | None = None,
) -> list[tuple[Any, FieldReference]]:
"""Get field values and their references based on the field selector."""
fields: list[tuple[Any, FieldReference]] = []
Expand All @@ -159,6 +160,14 @@ def get_fields_from_selector(
FieldReference(path=key, source=FieldSource.OUTPUT),
)
)
if FieldSource.AGENT_INPUT in field_selector.sources and agent_input:
for key, value in agent_input.items():
fields.append(
(
value,
FieldReference(path=key, source=FieldSource.AGENT_INPUT),
)
)
elif isinstance(field_selector, SpecificFieldsSelector):
# For specific fields, extract values based on field references
for field_ref in field_selector.fields:
Expand All @@ -167,6 +176,10 @@ def get_fields_from_selector(
data = input_data
elif field_ref.source == FieldSource.OUTPUT:
data = output_data
elif field_ref.source == FieldSource.AGENT_INPUT:
if agent_input is None:
continue
data = agent_input
else:
# Unknown source, skip this field
continue
Expand All @@ -185,7 +198,12 @@ def format_guardrail_passed_validation_result_message(
rule_description: str | None,
) -> str:
"""Format a guardrail validation result message following the standard pattern."""
source = "Input" if field_ref.source == FieldSource.INPUT else "Output"
if field_ref.source == FieldSource.INPUT:
source = "Input"
elif field_ref.source == FieldSource.AGENT_INPUT:
source = "Agent input"
else:
source = "Output"

if rule_description:
return (
Expand All @@ -211,10 +229,15 @@ def get_validated_conditions_description(


def evaluate_word_rule(
rule: WordRule, input_data: dict[str, Any], output_data: dict[str, Any]
rule: WordRule,
input_data: dict[str, Any],
output_data: dict[str, Any],
agent_input: dict[str, Any] | None = None,
) -> tuple[bool, str]:
"""Evaluate a word rule against input and output data."""
fields = get_fields_from_selector(rule.field_selector, input_data, output_data)
fields = get_fields_from_selector(
rule.field_selector, input_data, output_data, agent_input
)
if not fields:
return True, "No fields to validate"

Expand Down Expand Up @@ -256,10 +279,15 @@ def evaluate_word_rule(


def evaluate_number_rule(
rule: NumberRule, input_data: dict[str, Any], output_data: dict[str, Any]
rule: NumberRule,
input_data: dict[str, Any],
output_data: dict[str, Any],
agent_input: dict[str, Any] | None = None,
) -> tuple[bool, str]:
"""Evaluate a number rule against input and output data."""
fields = get_fields_from_selector(rule.field_selector, input_data, output_data)
fields = get_fields_from_selector(
rule.field_selector, input_data, output_data, agent_input
)
if not fields:
return True, "No fields to validate"

Expand Down Expand Up @@ -304,9 +332,12 @@ def evaluate_boolean_rule(
rule: BooleanRule,
input_data: dict[str, Any],
output_data: dict[str, Any],
agent_input: dict[str, Any] | None = None,
) -> tuple[bool, str]:
"""Evaluate a boolean rule against input and output data."""
fields = get_fields_from_selector(rule.field_selector, input_data, output_data)
fields = get_fields_from_selector(
rule.field_selector, input_data, output_data, agent_input
)
if not fields:
return True, "No fields to validate"

Expand Down
31 changes: 31 additions & 0 deletions packages/uipath-core/src/uipath/core/guardrails/guardrails.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class FieldSource(str, Enum):

INPUT = "input"
OUTPUT = "output"
AGENT_INPUT = "agentInput"


class ApplyTo(str, Enum):
Expand Down Expand Up @@ -239,3 +240,33 @@ class DeterministicGuardrail(BaseGuardrail):
rules: list[Rule]

model_config = ConfigDict(populate_by_name=True, extra="allow")

@field_validator("rules")
@classmethod
def _agent_input_is_pre_execution_only(cls, rules: list[Rule]) -> list[Rule]:
# Guardrails are dispatched pre or post based on whether ANY rule has an
# OUTPUT-dependent reference; agent_input is not threaded into post, so a
# guardrail mixing agent_input with output sources would silently no-op
# the agent_input rules in post. Reject at config load.
sources: set[FieldSource] = set()
has_output_universal = False
for rule in rules:
if isinstance(rule, (WordRule, NumberRule, BooleanRule)):
selector = rule.field_selector
if isinstance(selector, SpecificFieldsSelector):
sources.update(f.source for f in selector.fields)
elif isinstance(selector, AllFieldsSelector):
sources.update(selector.sources)
elif isinstance(rule, UniversalRule):
if rule.apply_to in (ApplyTo.OUTPUT, ApplyTo.INPUT_AND_OUTPUT):
has_output_universal = True

if FieldSource.AGENT_INPUT in sources and (
FieldSource.OUTPUT in sources or has_output_universal
):
raise ValueError(
"A guardrail referencing the 'agentInput' field source cannot "
"also have output-dependent rules. agent_input is available "
"only in pre-execution."
)
return rules
Original file line number Diff line number Diff line change
Expand Up @@ -1674,3 +1674,180 @@ def test_word_rule_on_wrapped_array_output_passes_when_no_match(
guardrail=guardrail,
)
assert result.result == GuardrailValidationResultType.PASSED


class TestAgentInputFieldSource:
"""AGENT_INPUT field source: rules can reference the agent's validated
input parameters during pre-execution evaluation."""

@staticmethod
def _user_identity_guardrail() -> DeterministicGuardrail:
return DeterministicGuardrail(
id="block-non-admin-callers",
name="Block non-admin callers",
description="Caller-context RBAC: only 'admin' role may proceed",
enabled_for_evals=True,
guardrail_type="custom",
selector=GuardrailSelector(
scopes=[GuardrailScope.TOOL], match_names=["any_tool"]
),
rules=[
WordRule(
rule_type="word",
field_selector=SpecificFieldsSelector(
selector_type="specific",
fields=[
FieldReference(path="role", source=FieldSource.AGENT_INPUT)
],
),
detects_violation=lambda s: s != "admin",
rule_description="role must equal 'admin'",
),
],
)

def test_pre_evaluation_passes_when_agent_input_matches(
self, service: DeterministicGuardrailsService
) -> None:
result = service.evaluate_pre_deterministic_guardrail(
input_data={"target": "anything"},
guardrail=self._user_identity_guardrail(),
agent_input={"role": "admin"},
)
assert result.result == GuardrailValidationResultType.PASSED

def test_pre_evaluation_fails_when_agent_input_violates(
self, service: DeterministicGuardrailsService
) -> None:
result = service.evaluate_pre_deterministic_guardrail(
input_data={"target": "anything"},
guardrail=self._user_identity_guardrail(),
agent_input={"role": "viewer"},
)
assert result.result == GuardrailValidationResultType.VALIDATION_FAILED

def test_pre_evaluation_silent_pass_when_agent_input_omitted(
self, service: DeterministicGuardrailsService
) -> None:
# No agent_input passed → field selector resolves to no values → rule
# passes with "No fields to validate". This matches existing behavior
# for missing INPUT fields and avoids breaking older callers that don't
# yet pass agent_input.
result = service.evaluate_pre_deterministic_guardrail(
input_data={"target": "anything"},
guardrail=self._user_identity_guardrail(),
)
assert result.result == GuardrailValidationResultType.PASSED

def test_pre_evaluation_with_all_fields_selector(
self, service: DeterministicGuardrailsService
) -> None:
guardrail = DeterministicGuardrail(
id="dry-run-mode",
name="Block writes in dry-run mode",
enabled_for_evals=True,
guardrail_type="custom",
selector=GuardrailSelector(scopes=[GuardrailScope.TOOL]),
rules=[
BooleanRule(
rule_type="boolean",
field_selector=AllFieldsSelector(
selector_type="all",
sources=[FieldSource.AGENT_INPUT],
),
detects_violation=lambda b: b is True,
rule_description="dry_run must not be true",
),
],
)
result_blocked = service.evaluate_pre_deterministic_guardrail(
input_data={},
guardrail=guardrail,
agent_input={"dry_run": True},
)
assert result_blocked.result == GuardrailValidationResultType.VALIDATION_FAILED

result_allowed = service.evaluate_pre_deterministic_guardrail(
input_data={},
guardrail=guardrail,
agent_input={"dry_run": False},
)
assert result_allowed.result == GuardrailValidationResultType.PASSED

def test_validator_rejects_agent_input_with_output_in_same_rule(
self,
) -> None:
with pytest.raises(ValueError, match="agent_input is available only in pre"):
DeterministicGuardrail(
id="bad",
name="bad",
enabled_for_evals=True,
guardrail_type="custom",
rules=[
WordRule(
rule_type="word",
field_selector=SpecificFieldsSelector(
selector_type="specific",
fields=[
FieldReference(
path="role", source=FieldSource.AGENT_INPUT
),
FieldReference(
path="result", source=FieldSource.OUTPUT
),
],
),
detects_violation=lambda s: False,
),
],
)

def test_validator_rejects_agent_input_paired_with_output_universal_rule(
self,
) -> None:
with pytest.raises(ValueError, match="agent_input is available only in pre"):
DeterministicGuardrail(
id="bad",
name="bad",
enabled_for_evals=True,
guardrail_type="custom",
rules=[
WordRule(
rule_type="word",
field_selector=SpecificFieldsSelector(
selector_type="specific",
fields=[
FieldReference(
path="role", source=FieldSource.AGENT_INPUT
)
],
),
detects_violation=lambda s: False,
),
UniversalRule(
rule_type="always",
apply_to=ApplyTo.OUTPUT,
),
],
)

def test_post_evaluation_does_not_receive_agent_input(
self, service: DeterministicGuardrailsService
) -> None:
# A guardrail whose only rule references AGENT_INPUT has no
# output-dependent rule, so post-evaluation short-circuits to PASSED
# without consulting agent_input. This documents that agent_input is
# not threaded into post by design.
result = service.evaluate_post_deterministic_guardrail(
input_data={"target": "anything"},
output_data={"some": "output"},
guardrail=self._user_identity_guardrail(),
)
assert result.result == GuardrailValidationResultType.PASSED
assert result.reason == "No rules to apply for output data."

def test_field_reference_normalizes_pascalcase_agent_input(self) -> None:
# JSON config commonly uses PascalCase ("AgentInput"); the source
# field validator decapitalizes to the camelCase enum value.
ref = FieldReference(path="role", source="AgentInput") # type: ignore[arg-type]
assert ref.source == FieldSource.AGENT_INPUT
Loading