-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_device_cache.py
More file actions
100 lines (77 loc) · 3.17 KB
/
test_device_cache.py
File metadata and controls
100 lines (77 loc) · 3.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import asyncio
import pytest
from device_cache import DeviceCache
from unittest.mock import Mock, AsyncMock
@pytest.fixture
def device_cache():
return DeviceCache(cache_duration=5)
@pytest.fixture
def mock_connection():
reader = AsyncMock(spec=asyncio.StreamReader)
writer = AsyncMock(spec=asyncio.StreamWriter)
writer.is_closing.return_value = False
return reader, writer
@pytest.mark.asyncio
async def test_store_and_get_connection(device_cache, mock_connection):
reader, writer = mock_connection
ip = "192.168.41.145"
# Store connection
await device_cache.store_connection(ip, reader, writer)
# Get connection
stored_reader, stored_writer = await device_cache.get_connection(ip)
assert stored_reader == reader
assert stored_writer == writer
@pytest.mark.asyncio
async def test_connection_invalid_returns_none(device_cache, mock_connection):
reader, writer = mock_connection
ip = "192.168.41.145"
# Make connection appear closed
writer.is_closing.return_value = True
await device_cache.store_connection(ip, reader, writer)
result = await device_cache.get_connection(ip)
assert result is None
@pytest.mark.asyncio
async def test_close_connection(device_cache, mock_connection):
reader, writer = mock_connection
ip = "192.168.41.145"
await device_cache.store_connection(ip, reader, writer)
await device_cache._close_connection(ip)
writer.close.assert_called_once()
writer.wait_closed.assert_called_once()
result = await device_cache.get_connection(ip)
assert result is None
@pytest.mark.asyncio
async def test_close_all_connections(device_cache):
# Create multiple mock connections
ips = ["192.168.41.145", "192.168.41.140"]
connections = []
for ip in ips:
reader = AsyncMock(spec=asyncio.StreamReader)
writer = AsyncMock(spec=asyncio.StreamWriter)
writer.is_closing.return_value = False
connections.append((ip, reader, writer))
await device_cache.store_connection(ip, reader, writer)
await device_cache.close_all_connections()
# Verify all connections were closed
for ip, _, writer in connections:
writer.close.assert_called_once()
writer.wait_closed.assert_called_once()
result = await device_cache.get_connection(ip)
assert result is None
@pytest.mark.asyncio
async def test_store_connection_closes_existing(device_cache, mock_connection):
reader1, writer1 = mock_connection
reader2, writer2 = AsyncMock(spec=asyncio.StreamReader), AsyncMock(spec=asyncio.StreamWriter)
writer2.is_closing.return_value = False
ip = "192.168.41.145"
# Store first connection
await device_cache.store_connection(ip, reader1, writer1)
# Store second connection
await device_cache.store_connection(ip, reader2, writer2)
# Verify first connection was closed
writer1.close.assert_called_once()
writer1.wait_closed.assert_called_once()
# Verify second connection is active
stored_reader, stored_writer = await device_cache.get_connection(ip)
assert stored_reader == reader2
assert stored_writer == writer2