Skip to content

Commit 6404ee9

Browse files
authored
Merge pull request #2 from santanu2310/feature
remove mongodb watcher and replaced with rabbitMQ pub/sub
2 parents 11d945f + eaf22fb commit 6404ee9

10 files changed

Lines changed: 221 additions & 72 deletions

File tree

server/app/api/friends/router.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
Friends_Status,
1414
FriendRequestOut,
1515
AddFriendMessage,
16+
FriendRequestMessage,
17+
SyncMessageType,
1618
)
1719

1820
from app.core.db import AsyncDatabase, get_async_database
@@ -98,10 +100,28 @@ async def make_friend_request(
98100
)
99101

100102
# Inserting the request into the collection
101-
await db.friend_request.insert_one(
103+
result = await db.friend_request.insert_one(
102104
request.model_dump(by_alias=True, exclude={"id"})
103105
)
104106

107+
full_user = await get_full_user(db=db, user_id=request.sender_id)
108+
user_brief = UserBrief.model_validate(full_user.model_dump())
109+
110+
message = FriendRequestMessage(
111+
type=SyncMessageType.friend_request,
112+
id=str(result.inserted_id),
113+
message=request.message,
114+
user=user_brief,
115+
status=Friends_Status.pending,
116+
created_time=request.created_at,
117+
)
118+
119+
# Send the status update to the message sender
120+
await send_message(
121+
user_ids=[request.receiver_id],
122+
message_data=message,
123+
)
124+
105125
return
106126

107127

server/app/api/sync_socket/router.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
get_call_record,
3939
list_call_record,
4040
)
41+
from .schemas import OnlineStatus
4142

4243
router = APIRouter()
4344
logger = logging.getLogger(__name__)
@@ -92,16 +93,21 @@ async def websocket_endpoint(
9293
user_id=user.id, message=SyncPacket(type=PacketType.pong)
9394
)
9495
elif packet.type == PacketType.message and packet.data:
95-
await handle_recieved_message(db, user_id=user.id, message=packet.data)
96+
await handle_recieved_message(
97+
db, user_id=user.id, message=packet.data, queue=queue_connection
98+
)
9699

97100
except WebSocketDisconnect as e:
98-
logger.error(e)
101+
logger.error(f"User disconnected: {e}")
99102
await connections.disconnect(user.id, queue_connection)
100103
return
101104

102105

103106
async def handle_recieved_message(
104-
db: AsyncDatabase, user_id: ObjectId, message: SyncSocketMessage
107+
db: AsyncDatabase,
108+
user_id: ObjectId,
109+
message: SyncSocketMessage,
110+
queue: AbstractRobustConnection,
105111
):
106112
if message.type == SyncMessageType.message_status:
107113
try:
@@ -137,6 +143,14 @@ async def handle_recieved_message(
137143

138144
if updates:
139145
await db.message.bulk_write(updates)
146+
147+
# Publish the updated messages to the queue for further processing
148+
await publish_message(
149+
connection=queue,
150+
exchange_name=settings.EXCHANGES.sync_message.value,
151+
topic=settings.TOPICS.message_status_update.value,
152+
data=msg,
153+
)
140154
except Exception as e:
141155
logger.critical(e)
142156

@@ -225,7 +239,7 @@ async def notify_online_status(
225239
user_id: ObjectId,
226240
is_online: Literal["online", "offline"],
227241
):
228-
data = {"user_id": str(user_id), "status": is_online}
242+
data = OnlineStatus(user_id=str(user_id), status=is_online)
229243
await publish_message(
230244
connection=connection,
231245
exchange_name=settings.EXCHANGES.sync_message.value,
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from pydantic import BaseModel
2+
3+
4+
class OnlineStatus(BaseModel):
5+
user_id: str
6+
status: str

server/app/background_tasks/async_ops/tasks.py

Lines changed: 124 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
from bson import ObjectId
12
import logging
23
from aio_pika.abc import AbstractIncomingMessage
4+
from pymongo.errors import PyMongoError
35
from app.core.config import settings
46
from app.api.sync_socket.router import send_message as send_sync_message
57
from app.core.schemas import (
@@ -11,6 +13,8 @@
1113
UserBrief,
1214
FriendRequestMessage,
1315
SyncMessageType,
16+
BrodcastMessage,
17+
Message_Status,
1418
)
1519
from app.core.db import (
1620
AsyncDatabase,
@@ -27,36 +31,36 @@
2731
logger = logging.getLogger(__name__)
2832

2933

30-
async def watch_user_updates():
31-
client = create_async_client()
32-
db = AsyncDatabase(client, settings.DATABASE_NAME)
34+
# async def watch_user_updates():
35+
# client = create_async_client()
36+
# db = AsyncDatabase(client, settings.DATABASE_NAME)
3337

34-
async with db.user_profile.watch(
35-
pipeline=[{"$match": {"operationType": "update"}}], full_document="updateLookup"
36-
) as stream:
37-
async for change in stream:
38-
try:
39-
# user_id = await db.user_profile.find_one({"_id": change['documentKey']['auth_id']})
40-
cursor = db.friends.find({"user_id": change["fullDocument"]["auth_id"]})
38+
# async with db.user_profile.watch(
39+
# pipeline=[{"$match": {"operationType": "update"}}], full_document="updateLookup"
40+
# ) as stream:
41+
# async for change in stream:
42+
# try:
43+
# # user_id = await db.user_profile.find_one({"_id": change['documentKey']['auth_id']})
44+
# cursor = db.friends.find({"user_id": change["fullDocument"]["auth_id"]})
4145

42-
# Extracting the `friends_id` values from each result document
43-
friend_ids = [doc["friend_id"] async for doc in cursor]
46+
# # Extracting the `friends_id` values from each result document
47+
# friend_ids = [doc["friend_id"] async for doc in cursor]
4448

45-
# Sending the data to the online frinds
46-
data: dict[str:any] = change["updateDescription"]["updatedFields"]
47-
data["id"] = change["fullDocument"]["auth_id"]
49+
# # Sending the data to the online frinds
50+
# data: dict[str:any] = change["updateDescription"]["updatedFields"]
51+
# data["id"] = change["fullDocument"]["auth_id"]
4852

49-
friend_update = FriendUpdateMessage.model_validate(data)
50-
await send_sync_message(friend_ids, friend_update)
53+
# friend_update = FriendUpdateMessage.model_validate(data)
54+
# await send_sync_message(friend_ids, friend_update)
5155

52-
# updating the lastupdate fo friends data
53-
await db.friends.update_many(
54-
{"friend_id": change["fullDocument"]["auth_id"]},
55-
{"$set": {"update_at": change["wallTime"]}},
56-
)
56+
# # updating the lastupdate fo friends data
57+
# await db.friends.update_many(
58+
# {"friend_id": change["fullDocument"]["auth_id"]},
59+
# {"$set": {"update_at": change["wallTime"]}},
60+
# )
5761

58-
except Exception as e:
59-
logger.error(f"Error processing user update : {e}")
62+
# except Exception as e:
63+
# logger.error(f"Error processing user update : {e}")
6064

6165

6266
@rabbit_consumer(
@@ -69,52 +73,100 @@ async def handle_online_status_update(
6973
await distribute_online_status_update(message=message, db=db)
7074

7175

72-
async def watch_message_updates():
73-
"""
74-
Watches for updates in the MessageCollection and sends message status updates
75-
to the sender when the message status changes.
76+
@rabbit_consumer(
77+
topic_name=settings.TOPICS.message_status_update.value,
78+
exchange_name=settings.EXCHANGES.sync_message.value,
79+
)
80+
async def process_message_status_updates(
81+
message: AbstractIncomingMessage, db: AsyncDatabase
82+
):
7683
"""
84+
Processes message status updates and forwards them to relevant senders.
7785
78-
client = create_async_client()
79-
db = AsyncDatabase(client, settings.DATABASE_NAME)
86+
This function:
87+
1. Decodes incoming message status updates
88+
2. Queries affected messages from database
89+
3. Groups messages by sender
90+
4. Forwards status updates to each sender
8091
81-
async with db.message.watch(
82-
pipeline=[{"$match": {"operationType": "update"}}],
83-
full_document="updateLookup",
84-
) as stream:
85-
async for change in stream:
86-
try:
87-
if not change["updateDescription"]["updatedFields"]["status"]:
92+
Args:
93+
message: Incoming RabbitMQ message containing status update payload
94+
db: Async database connection
95+
"""
96+
try:
97+
try:
98+
decoded_data = message.body.decode("utf-8")
99+
payload: MessageStatusUpdate = MessageStatusUpdate.model_validate_json(
100+
decoded_data
101+
)
102+
103+
message_ids = [ObjectId(data.message_id) for data in payload.data]
104+
105+
except (UnicodeDecodeError, ValueError) as e:
106+
logger.error(f"Failed to decode or validate message payload: {e}")
107+
return
108+
state = (
109+
"received_time"
110+
if payload.status == Message_Status.recieved.value
111+
else "seen_time"
112+
)
113+
114+
try:
115+
cursor = db.message.find(
116+
{"_id": {"$in": message_ids}},
117+
projection={"sender_id": 1, state: 1},
118+
)
119+
except PyMongoError as e:
120+
logger.error(f"Database query failed: {e}")
121+
return
122+
123+
sender_data: dict[str, list[dict[str, any]]] = {}
124+
try:
125+
async for db_message in cursor:
126+
sender_id = db_message["sender_id"]
127+
128+
# Initialize sender list if not exists
129+
if sender_id not in sender_data:
130+
sender_data[sender_id] = []
131+
132+
# Skip messages without timestamp
133+
if db_message[state] is None:
134+
logger.warning(f"Message {db_message['_id']} has null {state}")
88135
continue
89136

90-
message_id = change["documentKey"]["_id"]
91-
updated_field = change["updateDescription"]["updatedFields"]
92-
status = change["updateDescription"]["updatedFields"]["status"]
93-
94-
# Get the timestamp of the update
95-
timestamp = next(iter(updated_field.values()))
96-
97-
# Fetch the updated message from the database
98-
message = Message.model_validate(change["fullDocument"])
99-
100-
# Construct a MessageEvent Object
101-
message_event = MessageEvent(
102-
message_id=str(message_id), timestamp=timestamp
137+
# Add message data to sender's list
138+
sender_data[sender_id].append(
139+
{"timestamp": db_message[state], "message_id": db_message["_id"]}
103140
)
104141

105-
# Create a MessageStatusUpdate object to send to the user
106-
sync_message = MessageStatusUpdate(
107-
data=[message_event],
108-
status=status,
109-
)
142+
except PyMongoError as e:
143+
logger.error(f"Error iterating database cursor: {e}")
144+
return
110145

111-
# Send the status update to the message sender
146+
# send the data to the sender
147+
for sender_id, data in sender_data.items():
148+
try:
149+
data = [
150+
MessageEvent(
151+
message_id=str(msg["message_id"]), timestamp=msg["timestamp"]
152+
)
153+
for msg in data
154+
]
112155
await send_sync_message(
113-
user_ids=[message.sender_id], message_data=sync_message
156+
user_ids=[sender_id],
157+
message_data=MessageStatusUpdate(data=data, status=payload.status),
114158
)
115159

116160
except Exception as e:
117-
print(f"Error processing user update(line:147) : {e}")
161+
logger.error(f"Failed to send status update to sender {sender_id}: {e}")
162+
163+
except Exception as e:
164+
logger.error(
165+
f"Unexpected error in process_message_status_updates: {e}", exc_info=True
166+
)
167+
finally:
168+
# retry won't happen
169+
message.ack()
118170

119171

120172
@rabbit_consumer(
@@ -181,3 +233,17 @@ async def profile_media_update_confirmation(
181233
message: AbstractIncomingMessage,
182234
):
183235
await send_profilemedia_update_confirmation(data=message)
236+
237+
238+
@rabbit_consumer(
239+
topic_name=settings.TOPICS.chat_broadcast_selected.value,
240+
exchange_name=settings.EXCHANGES.sync_message.value,
241+
)
242+
async def send_message_to_users(
243+
message: AbstractIncomingMessage,
244+
):
245+
decoded_data = message.body.decode("utf-8")
246+
logger.info(f"{decoded_data=}")
247+
payload: BrodcastMessage = BrodcastMessage.model_validate_json(decoded_data)
248+
249+
await send_sync_message(user_ids=payload.ids, message_data=payload.data)

server/app/background_tasks/celery/tasks.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,18 @@
77
from app.background_tasks.celery.dependency import get_dependency_manager, Dependency
88
from app.core.config import create_celery_client, settings
99
from app.core.message_broker import publish_bloking_message
10-
from app.core.schemas import MediaType, Message, ProfileMediaUpdate, UserProfile
10+
from app.core.schemas import (
11+
MediaType,
12+
Message,
13+
ProfileMediaUpdate,
14+
UserProfile,
15+
FriendUpdateMessage,
16+
BrodcastMessage,
17+
)
1118
from app.utils import get_file_extension
1219
from botocore.exceptions import ClientError, NoCredentialsError # type: ignore
1320

21+
from .utils import list_friends_id
1422
from .services import process_image_to_aspect, send_otp_email
1523

1624
celery_app = create_celery_client()
@@ -169,6 +177,8 @@ def process_profile_media(file_id: str, user_id: str, media_type: str):
169177
return_document=ReturnDocument.BEFORE,
170178
)
171179

180+
friends_list = list_friends_id(user_id=ObjectId(user_id), db=db)
181+
172182
user_profile: UserProfile = UserProfile.model_validate(user_profile_response)
173183

174184
message = ProfileMediaUpdate(
@@ -189,6 +199,19 @@ def process_profile_media(file_id: str, user_id: str, media_type: str):
189199
topic=settings.TOPICS.media_update,
190200
data=data,
191201
)
202+
203+
# Send the updated user data to all friends
204+
data = FriendUpdateMessage(**{"id": ObjectId(user_id), media_type: new_key})
205+
payload = BrodcastMessage(ids=friends_list, data=data)
206+
207+
publish_bloking_message(
208+
connection=queue,
209+
exchange_name=settings.EXCHANGES.sync_message.value,
210+
topic=settings.TOPICS.chat_broadcast_selected.value,
211+
data=payload.model_dump_json(),
212+
)
213+
214+
logger.error(f"{payload=}")
192215
except Exception as e:
193216
logger.exception(
194217
f"Infrastructure error processing media for user {user_id}: {e}"
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from bson import ObjectId
2+
from app.core.db import SyncDatabase
3+
4+
5+
def list_friends_id(user_id: ObjectId, db: SyncDatabase) -> list[ObjectId]:
6+
friends = db.friends.find(
7+
{"user_id": user_id},
8+
projection={"friend_id": 1, "_id": 0},
9+
)
10+
friend_ids = [friend["friend_id"] for friend in friends]
11+
return friend_ids

0 commit comments

Comments
 (0)