-
-
Notifications
You must be signed in to change notification settings - Fork 136
Expand file tree
/
Copy pathcasters.py
More file actions
267 lines (209 loc) · 8.29 KB
/
casters.py
File metadata and controls
267 lines (209 loc) · 8.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
from typing import Any
from typing import Generic
from typing import Iterable
from typing import Mapping
from typing import Optional
from typing import Type
from typing import TypeVar
from typing import Union
from jsonschema_path import SchemaPath
from openapi_core.casting.schemas.exceptions import CastError
from openapi_core.schema.schemas import get_properties
from openapi_core.util import BOOLEAN_FALSE_VALUES
from openapi_core.util import BOOLEAN_TRUE_VALUES
from openapi_core.util import forcebool
from openapi_core.validation.schemas.validators import SchemaValidator
class PrimitiveCaster:
def __init__(
self,
schema: SchemaPath,
schema_validator: SchemaValidator,
schema_caster: "SchemaCaster",
):
self.schema = schema
self.schema_validator = schema_validator
self.schema_caster = schema_caster
def __call__(self, value: Any) -> Any:
self.validate(value)
return self.cast(value)
def validate(self, value: Any) -> None:
pass
def cast(self, value: Any) -> Any:
return value
class AnyCaster(PrimitiveCaster):
def cast(self, value: Any) -> Any:
if "allOf" in self.schema:
for subschema in self.schema / "allOf":
try:
# Note: Mutates `value` iteratively. This sequentially
# resolves standard overlapping types but can cause edge cases
# if a string is casted to an int and passed to a string schema.
value = self.schema_caster.evolve(subschema).cast(value)
except (ValueError, TypeError, CastError):
pass
if "oneOf" in self.schema:
for subschema in self.schema / "oneOf":
try:
# Note: Greedy resolution. Will return the first successful
# cast based on the order of the oneOf array.
return self.schema_caster.evolve(subschema).cast(value)
except (ValueError, TypeError, CastError):
pass
if "anyOf" in self.schema:
for subschema in self.schema / "anyOf":
try:
# Note: Greedy resolution. Will return the first successful
# cast based on the order of the anyOf array.
return self.schema_caster.evolve(subschema).cast(value)
except (ValueError, TypeError, CastError):
pass
return value
PrimitiveType = TypeVar("PrimitiveType")
class PrimitiveTypeCaster(Generic[PrimitiveType], PrimitiveCaster):
primitive_type: Type[PrimitiveType] = NotImplemented
def cast(self, value: Union[str, bytes]) -> PrimitiveType:
return self.primitive_type(value) # type: ignore [call-arg]
class IntegerCaster(PrimitiveTypeCaster[int]):
primitive_type = int
class NumberCaster(PrimitiveTypeCaster[float]):
primitive_type = float
class BooleanCaster(PrimitiveTypeCaster[bool]):
primitive_type = bool
def validate(self, value: Any) -> None:
super().validate(value)
if isinstance(value, bool):
return
if value.lower() not in BOOLEAN_TRUE_VALUES + BOOLEAN_FALSE_VALUES:
raise ValueError("not a boolean format")
def cast(self, value: Union[str, bytes]) -> bool:
return self.primitive_type(forcebool(value))
class ArrayCaster(PrimitiveCaster):
@property
def items_caster(self) -> "SchemaCaster":
# sometimes we don't have any schema i.e. free-form objects
items_schema = self.schema.get("items", SchemaPath.from_dict({}))
return self.schema_caster.evolve(items_schema)
def validate(self, value: Any) -> None:
# str and bytes are not arrays according to the OpenAPI spec
if isinstance(value, (str, bytes)) or not isinstance(value, Iterable):
raise ValueError("not an array format")
def cast(self, value: list[Any]) -> list[Any]:
return list(map(self.items_caster.cast, value))
class ObjectCaster(PrimitiveCaster):
def validate(self, value: Any) -> None:
if not isinstance(value, dict):
raise ValueError("not an object format")
def cast(self, value: dict[str, Any]) -> dict[str, Any]:
return self._cast_proparties(value)
def evolve(self, schema: SchemaPath) -> "ObjectCaster":
cls = self.__class__
return cls(
schema,
self.schema_validator.evolve(schema),
self.schema_caster.evolve(schema),
)
def _cast_proparties(
self, value: dict[str, Any], schema_only: bool = False
) -> dict[str, Any]:
if not isinstance(value, dict):
raise ValueError("not an object format")
all_of_schemas = self.schema_validator.iter_all_of_schemas(value)
for all_of_schema in all_of_schemas:
all_of_properties = self.evolve(all_of_schema)._cast_proparties(
value, schema_only=True
)
value.update(all_of_properties)
for prop_name, prop_schema in get_properties(self.schema).items():
try:
prop_value = value[prop_name]
except KeyError:
continue
value[prop_name] = self.schema_caster.evolve(prop_schema).cast(
prop_value
)
if schema_only:
return value
additional_properties = self.schema.get("additionalProperties", True)
if additional_properties is not False:
# free-form object
if additional_properties is True:
additional_prop_schema = SchemaPath.from_dict(
{"nullable": True}
)
# defined schema
else:
additional_prop_schema = self.schema / "additionalProperties"
additional_prop_caster = self.schema_caster.evolve(
additional_prop_schema
)
for prop_name, prop_value in value.items():
if prop_name in value:
continue
value[prop_name] = additional_prop_caster.cast(prop_value)
return value
class TypesCaster:
casters: Mapping[str, Type[PrimitiveCaster]] = {}
multi: Optional[Type[PrimitiveCaster]] = None
def __init__(
self,
casters: Mapping[str, Type[PrimitiveCaster]],
default: Type[PrimitiveCaster],
multi: Optional[Type[PrimitiveCaster]] = None,
):
self.casters = casters
self.default = default
self.multi = multi
def get_caster(
self,
schema_type: Optional[Union[Iterable[str], str]],
) -> Type["PrimitiveCaster"]:
if schema_type is None:
return self.default
if isinstance(schema_type, Iterable) and not isinstance(
schema_type, str
):
if self.multi is None:
raise TypeError("caster does not accept multiple types")
return self.multi
return self.casters[schema_type]
class SchemaCaster:
def __init__(
self,
schema: SchemaPath,
schema_validator: SchemaValidator,
types_caster: TypesCaster,
):
self.schema = schema
self.schema_validator = schema_validator
self.types_caster = types_caster
def cast(self, value: Any) -> Any:
# skip casting for nullable in OpenAPI 3.0
if value is None and (self.schema / "nullable").read_bool(
default=False
):
return value
schema_type = (self.schema / "type").read_str(None)
type_caster = self.get_type_caster(schema_type)
if value is None:
return value
try:
return type_caster(value)
except (ValueError, TypeError) as exc:
raise CastError(value, schema_type) from exc
def get_type_caster(
self,
schema_type: Optional[Union[Iterable[str], str]],
) -> PrimitiveCaster:
caster_cls = self.types_caster.get_caster(schema_type)
return caster_cls(
self.schema,
self.schema_validator,
self,
)
def evolve(self, schema: SchemaPath) -> "SchemaCaster":
cls = self.__class__
return cls(
schema,
self.schema_validator.evolve(schema),
self.types_caster,
)