|
12 | 12 | sizeof, |
13 | 13 | ) |
14 | 14 | from types import SimpleNamespace |
15 | | -from typing import Any, Callable |
| 15 | +from typing import TYPE_CHECKING, Any, Callable |
16 | 16 | from unittest.mock import Mock |
17 | 17 | from weakref import finalize |
18 | 18 |
|
| 19 | +if TYPE_CHECKING: |
| 20 | + from collections.abc import Generator |
| 21 | + |
19 | 22 | import pytest |
20 | 23 |
|
21 | 24 | from mss.exception import ScreenShotError |
@@ -224,6 +227,77 @@ def visual_validation_env(monkeypatch: pytest.MonkeyPatch) -> _VisualValidationH |
224 | 227 | return _VisualValidationHarness(monkeypatch) |
225 | 228 |
|
226 | 229 |
|
| 230 | +#### intern_atom tests |
| 231 | + |
| 232 | + |
| 233 | +class TestInternAtom: |
| 234 | + """Tests for xcb.intern_atom and the _ATOM_CACHE mechanism.""" |
| 235 | + |
| 236 | + @pytest.fixture(autouse=True) |
| 237 | + def setup_intern_atom(self) -> Generator[None, None, None]: |
| 238 | + self.conn, _ = xcb.connect() |
| 239 | + yield |
| 240 | + xcb.disconnect(self.conn) |
| 241 | + |
| 242 | + def _mock_xcb_intern_atom(self, monkeypatch: pytest.MonkeyPatch, atom_value: int) -> Mock: |
| 243 | + """Patch LIB.xcb.xcb_intern_atom to return a fake reply with the given atom value.""" |
| 244 | + fake_reply = SimpleNamespace(atom=SimpleNamespace(value=atom_value)) |
| 245 | + fake_cookie = Mock() |
| 246 | + fake_cookie.reply.return_value = fake_reply |
| 247 | + mock = Mock(return_value=fake_cookie) |
| 248 | + monkeypatch.setattr(xcb.LIB.xcb, "xcb_intern_atom", mock) |
| 249 | + return mock |
| 250 | + |
| 251 | + def test_predefined_atom_skips_xcb(self, monkeypatch: pytest.MonkeyPatch) -> None: |
| 252 | + mock = self._mock_xcb_intern_atom(monkeypatch, 0) |
| 253 | + atom = xcb.intern_atom(self.conn, "PRIMARY") |
| 254 | + assert atom == xcb.Atom(1) |
| 255 | + mock.assert_not_called() |
| 256 | + |
| 257 | + def test_cache_miss_calls_xcb_and_caches_result(self, monkeypatch: pytest.MonkeyPatch) -> None: |
| 258 | + mock = self._mock_xcb_intern_atom(monkeypatch, 100) |
| 259 | + cache_key = addressof(self.conn) |
| 260 | + atom = xcb.intern_atom(self.conn, "_NET_WM_NAME") |
| 261 | + assert atom == xcb.Atom(100) |
| 262 | + mock.assert_called_once() |
| 263 | + assert xcb._ATOM_CACHE[cache_key]["_NET_WM_NAME"] == xcb.Atom(100) |
| 264 | + |
| 265 | + def test_cache_hit_skips_xcb(self, monkeypatch: pytest.MonkeyPatch) -> None: |
| 266 | + mock = self._mock_xcb_intern_atom(monkeypatch, 0) |
| 267 | + # xcb.connect() in setup_intern_atom guarantees a cache entry for self.conn. |
| 268 | + xcb._ATOM_CACHE[addressof(self.conn)]["_NET_WM_NAME"] = xcb.Atom(100) |
| 269 | + atom = xcb.intern_atom(self.conn, "_NET_WM_NAME") |
| 270 | + assert atom == xcb.Atom(100) |
| 271 | + mock.assert_not_called() |
| 272 | + |
| 273 | + def test_only_if_exists_returns_none_when_missing(self) -> None: |
| 274 | + atom = xcb.intern_atom(self.conn, "_MSS_TEST_NONEXISTENT_ATOM_12345", only_if_exists=True) |
| 275 | + assert atom is None |
| 276 | + |
| 277 | + def test_raises_when_missing_and_not_only_if_exists(self, monkeypatch: pytest.MonkeyPatch) -> None: |
| 278 | + # Exercises the "shouldn't be possible" code path where the server returns 0 with only_if_exists=False. |
| 279 | + self._mock_xcb_intern_atom(monkeypatch, 0) |
| 280 | + with pytest.raises(xcb.XError, match="X server failed to intern atom"): |
| 281 | + xcb.intern_atom(self.conn, "_NET_NONEXISTENT") |
| 282 | + |
| 283 | + def test_pointer_connection_uses_correct_cache_key(self) -> None: |
| 284 | + atom = xcb.intern_atom(pointer(self.conn), "_NET_WM_NAME") |
| 285 | + assert atom is not None |
| 286 | + assert addressof(self.conn) in xcb._ATOM_CACHE |
| 287 | + |
| 288 | + |
| 289 | +def test_atom_cache_lifecycle() -> None: |
| 290 | + """connect() initializes and disconnect() clears the per-connection atom cache entry.""" |
| 291 | + before = set(xcb._ATOM_CACHE) |
| 292 | + conn, _ = xcb.connect() |
| 293 | + cache_key = addressof(conn) |
| 294 | + assert cache_key in xcb._ATOM_CACHE |
| 295 | + assert xcb._ATOM_CACHE[cache_key] == {} |
| 296 | + xcb.disconnect(conn) |
| 297 | + assert cache_key not in xcb._ATOM_CACHE |
| 298 | + assert set(xcb._ATOM_CACHE) == before |
| 299 | + |
| 300 | + |
227 | 301 | def test_xgetimage_visual_validation_accepts_default_setup(visual_validation_env: _VisualValidationHarness) -> None: |
228 | 302 | visual_validation_env.reset() |
229 | 303 | mss_instance = xgetimage.MSS() |
|
0 commit comments