Skip to content

Commit 195124a

Browse files
committed
[tests] Add WebSocket consumer tests #677
Implement tests for RadiusBatchConsumer to verify authentication, authorization, and group messaging logic. Increases coverage for consumers.py to 84%. Fixes #677
1 parent 33bbd55 commit 195124a

4 files changed

Lines changed: 161 additions & 10 deletions

File tree

.github/workflows/ci.yml

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ jobs:
3131
- python-version: "3.13"
3232
django-version: django~=4.2.0
3333

34+
permissions:
35+
pull-requests: write
36+
contents: read
37+
3438
steps:
3539
- uses: actions/checkout@v6
3640
with:
@@ -77,8 +81,10 @@ jobs:
7781
pip install ${{ matrix.django-version }}
7882
7983
- name: Start InfluxDB and Redis container
80-
run: docker compose up -d influxdb redis
81-
84+
run: |
85+
docker compose up -d influxdb redis
86+
until curl -sI "http://localhost:8086/ping"; do sleep 1; done
87+
docker compose exec -T influxdb influx -execute 'CREATE DATABASE openwisp2_test'
8288
- name: QA checks
8389
run: |
8490
./run-qa-checks

docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ version: "3"
33
services:
44
influxdb:
55
image: influxdb:1.8-alpine
6+
platform: linux/amd64
67
volumes:
78
- influxdb-data:/var/lib/influxdb
89
ports:

openwisp_radius/consumers.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from asgiref.sync import sync_to_async
22
from channels.generic.websocket import AsyncJsonWebsocketConsumer
3-
from django.core.exceptions import ObjectDoesNotExist
43

54
from .utils import load_model
65

@@ -12,13 +11,9 @@ def _user_can_access_batch(self, user, batch_id):
1211
if user.is_superuser:
1312
return RadiusBatch.objects.filter(pk=batch_id).exists()
1413
# For non-superusers, check their managed organizations
15-
try:
16-
RadiusBatch.objects.filter(
17-
pk=batch_id, organization__in=user.organizations_managed
18-
).exists()
19-
return True
20-
except ObjectDoesNotExist:
21-
return False
14+
return RadiusBatch.objects.filter(
15+
pk=batch_id, organization__in=user.organizations_managed
16+
).exists()
2217

2318
async def connect(self):
2419
self.batch_id = self.scope["url_route"]["kwargs"]["batch_id"]
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
from asgiref.sync import async_to_sync
2+
from channels.routing import URLRouter
3+
from channels.testing import WebsocketCommunicator
4+
from django.contrib.auth import get_user_model
5+
from django.test import TransactionTestCase
6+
from django.urls import re_path
7+
8+
from openwisp_users.tests.utils import TestOrganizationMixin
9+
10+
from ..consumers import RadiusBatchConsumer
11+
from ..utils import load_model
12+
from . import CreateRadiusObjectsMixin
13+
14+
User = get_user_model()
15+
RadiusBatch = load_model("RadiusBatch")
16+
17+
application = URLRouter(
18+
[
19+
re_path(
20+
r"^ws/radius/batch/(?P<batch_id>[^/]+)/$",
21+
RadiusBatchConsumer.as_asgi(),
22+
),
23+
]
24+
)
25+
26+
27+
class TestRadiusBatchConsumer(
28+
CreateRadiusObjectsMixin, TestOrganizationMixin, TransactionTestCase
29+
):
30+
31+
TEST_PASSWORD = "test_password" # noqa: S105
32+
33+
def _create_test_data(self):
34+
org = self._create_org()
35+
user = self._create_admin(password=self.TEST_PASSWORD)
36+
batch = self._create_radius_batch(
37+
name="test-batch",
38+
strategy="prefix",
39+
prefix="test-",
40+
organization=org,
41+
)
42+
return org, user, batch
43+
44+
def test_websocket_connect_superuser(self):
45+
_, user, batch = self._create_test_data()
46+
47+
async def test():
48+
communicator = WebsocketCommunicator(
49+
application,
50+
f"/ws/radius/batch/{batch.pk}/",
51+
)
52+
communicator.scope["user"] = user
53+
communicator.scope["url_route"] = {"kwargs": {"batch_id": str(batch.pk)}}
54+
55+
connected, _ = await communicator.connect()
56+
assert connected is True
57+
await communicator.disconnect()
58+
59+
async_to_sync(test)()
60+
61+
def test_websocket_connect_staff_with_permission(self):
62+
org, _, batch = self._create_test_data()
63+
staff_user = self._create_administrator(
64+
organizations=[org], password=self.TEST_PASSWORD
65+
)
66+
67+
async def test():
68+
communicator = WebsocketCommunicator(
69+
application,
70+
f"/ws/radius/batch/{batch.pk}/",
71+
)
72+
communicator.scope["user"] = staff_user
73+
communicator.scope["url_route"] = {"kwargs": {"batch_id": str(batch.pk)}}
74+
75+
connected, _ = await communicator.connect()
76+
assert connected is True
77+
await communicator.disconnect()
78+
79+
async_to_sync(test)()
80+
81+
def test_websocket_reject_unauthenticated(self):
82+
_, _, batch = self._create_test_data()
83+
84+
async def test():
85+
communicator = WebsocketCommunicator(
86+
application,
87+
f"/ws/radius/batch/{batch.pk}/",
88+
)
89+
from django.contrib.auth.models import AnonymousUser
90+
91+
communicator.scope["user"] = AnonymousUser()
92+
communicator.scope["url_route"] = {"kwargs": {"batch_id": str(batch.pk)}}
93+
94+
connected, _ = await communicator.connect()
95+
assert connected is False
96+
97+
async_to_sync(test)()
98+
99+
def test_websocket_reject_non_staff(self):
100+
_, _, batch = self._create_test_data()
101+
regular_user = self._create_user(is_staff=False, password=self.TEST_PASSWORD)
102+
103+
async def test():
104+
communicator = WebsocketCommunicator(
105+
application,
106+
f"/ws/radius/batch/{batch.pk}/",
107+
)
108+
communicator.scope["user"] = regular_user
109+
communicator.scope["url_route"] = {"kwargs": {"batch_id": str(batch.pk)}}
110+
111+
connected, _ = await communicator.connect()
112+
assert connected is False
113+
114+
async_to_sync(test)()
115+
116+
def test_websocket_reject_no_permission(self):
117+
_, _, batch = self._create_test_data()
118+
119+
staff_user = self._create_user(is_staff=True, password=self.TEST_PASSWORD)
120+
121+
async def test():
122+
communicator = WebsocketCommunicator(
123+
application,
124+
f"/ws/radius/batch/{batch.pk}/",
125+
)
126+
communicator.scope["user"] = staff_user
127+
communicator.scope["url_route"] = {"kwargs": {"batch_id": str(batch.pk)}}
128+
129+
connected, _ = await communicator.connect()
130+
assert connected is False
131+
132+
async_to_sync(test)()
133+
134+
def test_websocket_group_connection(self):
135+
_, user, batch = self._create_test_data()
136+
137+
async def test():
138+
communicator = WebsocketCommunicator(
139+
application,
140+
f"/ws/radius/batch/{batch.pk}/",
141+
)
142+
communicator.scope["user"] = user
143+
communicator.scope["url_route"] = {"kwargs": {"batch_id": str(batch.pk)}}
144+
145+
connected, _ = await communicator.connect()
146+
assert connected is True
147+
await communicator.disconnect()
148+
149+
async_to_sync(test)()

0 commit comments

Comments
 (0)