-
-
Notifications
You must be signed in to change notification settings - Fork 136
Expand file tree
/
Copy pathfactories.py
More file actions
148 lines (132 loc) · 5.43 KB
/
factories.py
File metadata and controls
148 lines (132 loc) · 5.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
from copy import deepcopy
from typing import Any
from typing import Optional
from typing import cast
from jsonschema._format import FormatChecker
from jsonschema.protocols import Validator
from jsonschema_path import SchemaPath
from openapi_core.validation.schemas.datatypes import FormatValidatorsDict
from openapi_core.validation.schemas.validators import SchemaValidator
class SchemaValidatorsFactory:
def __init__(
self,
schema_validator_class: type[Validator],
strict_schema_validator_class: Optional[type[Validator]] = None,
format_checker: Optional[FormatChecker] = None,
):
self.schema_validator_class = schema_validator_class
self.strict_schema_validator_class = strict_schema_validator_class
if format_checker is None:
format_checker = self.schema_validator_class.FORMAT_CHECKER
assert format_checker is not None
self.format_checker = format_checker
def get_format_checker(
self,
format_validators: Optional[FormatValidatorsDict] = None,
extra_format_validators: Optional[FormatValidatorsDict] = None,
) -> FormatChecker:
format_checker: FormatChecker
if format_validators is None:
format_checker = deepcopy(cast(FormatChecker, self.format_checker))
else:
format_checker = FormatChecker([])
format_checker = self._add_validators(
cast(FormatChecker, format_checker), format_validators
)
format_checker = self._add_validators(
cast(FormatChecker, format_checker), extra_format_validators
)
return format_checker
def _add_validators(
self,
base_format_checker: FormatChecker,
format_validators: Optional[FormatValidatorsDict] = None,
) -> FormatChecker:
if format_validators is not None:
for name, check in format_validators.items():
base_format_checker.checks(name)(check)
return base_format_checker
def create(
self,
schema: SchemaPath,
format_validators: Optional[FormatValidatorsDict] = None,
extra_format_validators: Optional[FormatValidatorsDict] = None,
strict_additional_properties: bool = False,
enforce_properties_required: bool = False,
) -> SchemaValidator:
validator_class: type[Validator] = self.schema_validator_class
if strict_additional_properties:
if self.strict_schema_validator_class is None:
raise ValueError(
"Strict additional properties validation is not supported "
"by this factory."
)
validator_class = self.strict_schema_validator_class
format_checker = self.get_format_checker(
format_validators, extra_format_validators
)
with schema.resolve() as resolved:
schema_value = resolved.contents
if enforce_properties_required:
schema_value = self._build_required_properties_schema(
schema_value
)
jsonschema_validator = validator_class(
schema_value,
_resolver=resolved.resolver,
format_checker=format_checker,
)
return SchemaValidator(schema, jsonschema_validator)
def _build_required_properties_schema(self, schema_value: Any) -> Any:
updated = deepcopy(schema_value)
self._set_required_properties(updated)
return updated
def _set_required_properties(self, schema: Any) -> None:
if not isinstance(schema, dict):
return
properties = schema.get("properties")
if isinstance(properties, dict) and properties:
schema["required"] = [
property_name
for property_name, property_schema in properties.items()
if not self._is_write_only_property(property_schema)
]
for property_schema in properties.values():
self._set_required_properties(property_schema)
for keyword in (
"allOf",
"anyOf",
"oneOf",
"prefixItems",
):
subschemas = schema.get(keyword)
if isinstance(subschemas, list):
for subschema in subschemas:
self._set_required_properties(subschema)
for keyword in (
"items",
"contains",
"if",
"then",
"else",
"not",
"propertyNames",
"additionalProperties",
"unevaluatedProperties",
"unevaluatedItems",
"contentSchema",
):
self._set_required_properties(schema.get(keyword))
for keyword in ("$defs", "definitions", "patternProperties"):
subschemas_map = schema.get(keyword)
if isinstance(subschemas_map, dict):
for subschema in subschemas_map.values():
self._set_required_properties(subschema)
dependent_schemas = schema.get("dependentSchemas")
if isinstance(dependent_schemas, dict):
for subschema in dependent_schemas.values():
self._set_required_properties(subschema)
def _is_write_only_property(self, property_schema: Any) -> bool:
if not isinstance(property_schema, dict):
return False
return property_schema.get("writeOnly") is True