-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest_async_base_model.py
More file actions
156 lines (117 loc) · 4.95 KB
/
test_async_base_model.py
File metadata and controls
156 lines (117 loc) · 4.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# ruff: noqa: S101, S311, PLR2004
import random
from collections.abc import AsyncGenerator
from typing import Any
import pytest
from dotenv import load_dotenv
from pydantic import BaseModel
from contiguity import AsyncBase, InvalidKeyError, ItemConflictError, ItemNotFoundError, QueryResponse
from tests import random_string
load_dotenv()
class TestItemModel(BaseModel):
key: str = "test_key"
field1: int = random.randint(1, 1000)
field2: str = random_string()
field3: int = 1
field4: int = 0
field5: list[str] = ["foo", "bar"]
field6: list[int] = [1, 2]
field7: dict[str, str] = {"foo": "bar"}
@pytest.fixture
async def base() -> AsyncGenerator[AsyncBase[TestItemModel], Any]:
base = AsyncBase("test_base_model", item_type=TestItemModel)
for item in (await base.query()).items:
await base.delete(item.key)
yield base
for item in (await base.query()).items:
await base.delete(item.key)
def test_bad_base_name() -> None:
with pytest.raises(ValueError, match="invalid Base name ''"):
AsyncBase("", item_type=TestItemModel)
async def test_bad_key(base: AsyncBase[TestItemModel]) -> None:
with pytest.raises(InvalidKeyError):
await base.get("")
with pytest.raises(InvalidKeyError):
await base.delete("")
with pytest.raises(InvalidKeyError):
await base.update({"foo": "bar"}, key="")
async def test_get(base: AsyncBase[TestItemModel]) -> None:
item = TestItemModel()
await base.insert(item)
fetched_item = await base.get("test_key")
assert fetched_item == item
async def test_get_nonexistent(base: AsyncBase[TestItemModel]) -> None:
with pytest.warns(DeprecationWarning):
assert await base.get("nonexistent_key") is None
async def test_get_default(base: AsyncBase[TestItemModel]) -> None:
for default_item in (None, "foo", 42, TestItemModel()):
fetched_item = await base.get("nonexistent_key", default=default_item)
assert fetched_item == default_item
async def test_delete(base: AsyncBase[TestItemModel]) -> None:
item = TestItemModel()
await base.insert(item)
await base.delete("test_key")
with pytest.warns(DeprecationWarning):
assert await base.get("test_key") is None
async def test_insert(base: AsyncBase[TestItemModel]) -> None:
item = TestItemModel()
inserted_item = await base.insert(item)
assert inserted_item == item
async def test_insert_existing(base: AsyncBase[TestItemModel]) -> None:
item = TestItemModel()
await base.insert(item)
with pytest.raises(ItemConflictError):
await base.insert(item)
async def test_put(base: AsyncBase[TestItemModel]) -> None:
items = [TestItemModel(key=f"test_key_{i}") for i in range(3)]
for _ in range(2):
response = await base.put(*items)
assert response == items
async def test_put_empty(base: AsyncBase[TestItemModel]) -> None:
items = []
response = await base.put(*items)
assert response == items
async def test_put_too_many(base: AsyncBase[TestItemModel]) -> None:
items = [TestItemModel(key=f"test_key_{i}") for i in range(base.PUT_LIMIT + 1)]
with pytest.raises(ValueError, match=f"cannot put more than {base.PUT_LIMIT} items at a time"):
await base.put(*items)
async def test_update(base: AsyncBase[TestItemModel]) -> None:
item = TestItemModel()
await base.insert(item)
updated_item = await base.update(
{
"field1": base.util.trim(),
"field2": "updated_value",
"field3": base.util.increment(2),
"field4": base.util.increment(-2),
"field5": base.util.append("baz"),
"field6": base.util.prepend([3, 4]),
},
key="test_key",
)
assert updated_item == TestItemModel(
key="test_key",
field1=updated_item.field1,
field2="updated_value",
field3=item.field3 + 2,
field4=item.field4 - 2,
field5=[*item.field5, "baz"],
field6=[3, 4, *item.field6],
field7=item.field7,
)
async def test_update_nonexistent(base: AsyncBase[TestItemModel]) -> None:
with pytest.raises(ItemNotFoundError):
await base.update({"foo": "bar"}, key=random_string())
async def test_update_empty(base: AsyncBase[TestItemModel]) -> None:
with pytest.raises(ValueError, match="no updates provided"):
await base.update({}, key="test_key")
async def test_query_empty(base: AsyncBase[TestItemModel]) -> None:
items = [TestItemModel(key=f"test_key_{i}", field1=i) for i in range(5)]
await base.put(*items)
response = await base.query()
assert response == QueryResponse(count=5, last_key=None, items=items)
async def test_query(base: AsyncBase[TestItemModel]) -> None:
items = [TestItemModel(key=f"test_key_{i}", field1=i) for i in range(5)]
await base.put(*items)
response = await base.query({"field1?gt": 1})
assert response == QueryResponse(count=3, last_key=None, items=[item for item in items if item.field1 > 1])