Skip to content

Commit 6928908

Browse files
committed
[PERF] bus: do not use registry to dispatch messages
When there are more databases than what the LRU cache can hold, the gevent worker struggles to keep notifications flowing. Each outgoing message requires a registry, which is highly inefficient. This commit replaces the call to `bus.bus@_poll` by a direct SQL query thus removing the need to build a registry. closes odoo#235746 Signed-off-by: Julien Castiaux (juc) <juc@odoo.com>
1 parent d7f5264 commit 6928908

File tree

3 files changed

+30
-30
lines changed

3 files changed

+30
-30
lines changed

addons/bus/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# -*- coding: utf-8 -*-
22
from . import models
3+
from . import session_helpers
34
from . import controllers
45
from . import websocket

addons/bus/models/bus.py

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
from odoo import api, fields, models
1515
from odoo.service.server import CommonServer
1616
from odoo.tools import json_default, SQL
17-
from odoo.tools.constants import GC_UNLINK_LIMIT
1817
from odoo.tools.misc import OrderedSet
1918

2019
_logger = logging.getLogger(__name__)
@@ -41,12 +40,37 @@ def get_notify_payload_max_length(default=8000):
4140
NOTIFY_PAYLOAD_MAX_LENGTH = get_notify_payload_max_length()
4241

4342

43+
def fetch_bus_notifications(cr, channels, last=0, ignore_ids=None):
44+
"""Fetch notifications from the bus table.
45+
46+
:param cr: Database cursor.
47+
:param channels: List of channels for which notifications should be fetched.
48+
May contain channel names, model instances, or (model, string) tuples.
49+
:param last: The ID of the last fetched notification. Defaults to 0.
50+
:param ignore_ids: IDs to exclude.
51+
:return: List of notifications.
52+
53+
"""
54+
conditions = [
55+
SQL("channel IN %s", tuple(json_dump(channel_with_db(cr.dbname, c)) for c in channels)),
56+
SQL("create_date > %s", fields.Datetime.now() - datetime.timedelta(seconds=TIMEOUT))
57+
if last == 0
58+
else SQL("id > %s", last),
59+
]
60+
if ignore_ids:
61+
conditions.append(SQL("id NOT IN %s", tuple(ignore_ids)))
62+
where = SQL(" AND ").join(conditions)
63+
cr.execute(SQL("SELECT id, message FROM bus_bus WHERE %s ORDER BY id", where))
64+
return [{"id": r[0], "message": json.loads(r[1])} for r in cr.fetchall()]
65+
66+
4467
# ---------------------------------------------------------
4568
# Bus
4669
# ---------------------------------------------------------
4770
def json_dump(v):
4871
return json.dumps(v, separators=(',', ':'), default=json_default)
4972

73+
5074
def hashable(key):
5175
if isinstance(key, list):
5276
key = tuple(key)
@@ -164,25 +188,7 @@ def notify():
164188

165189
@api.model
166190
def _poll(self, channels, last=0, ignore_ids=None):
167-
# first poll return the notification in the 'buffer'
168-
if last == 0:
169-
timeout_ago = fields.Datetime.now() - datetime.timedelta(seconds=TIMEOUT)
170-
domain = [('create_date', '>', timeout_ago)]
171-
else: # else returns the unread notifications
172-
domain = [('id', '>', last)]
173-
if ignore_ids:
174-
domain.append(("id", "not in", ignore_ids))
175-
channels = [json_dump(channel_with_db(self.env.cr.dbname, c)) for c in channels]
176-
domain.append(('channel', 'in', channels))
177-
notifications = self.sudo().search_read(domain, ["message"])
178-
# list of notification to return
179-
result = []
180-
for notif in notifications:
181-
result.append({
182-
'id': notif['id'],
183-
'message': json.loads(notif['message']),
184-
})
185-
return result
191+
return fetch_bus_notifications(self.env.cr, channels, last, ignore_ids)
186192

187193
def _bus_last_id(self):
188194
last = self.env['bus.bus'].search([], order='id desc', limit=1)
@@ -193,12 +199,6 @@ def _bus_last_id(self):
193199
# Dispatcher
194200
# ---------------------------------------------------------
195201

196-
class BusSubscription:
197-
def __init__(self, channels, last):
198-
self.last_notification_id = last
199-
self.channels = channels
200-
201-
202202
class ImDispatch(threading.Thread):
203203
def __init__(self):
204204
super().__init__(daemon=True, name=f'{__name__}.Bus')

addons/bus/websocket.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
from odoo.service.server import CommonServer
3434
from odoo.tools import config
3535

36-
from .models.bus import dispatch
36+
from .models.bus import dispatch, fetch_bus_notifications
3737
from .session_helpers import check_session, new_env
3838

3939
_logger = logging.getLogger(__name__)
@@ -754,9 +754,8 @@ def _process_control_command(self, command, data):
754754
def _dispatch_bus_notifications(self):
755755
self._waiting_for_dispatch = False
756756
with acquire_cursor(self._session.db) as cr:
757-
env = new_env(cr, self._session)
758-
notifications = env['bus.bus']._poll(
759-
self._channels, self._last_notif_sent_id, [n[0] for n in self._notif_history]
757+
notifications = fetch_bus_notifications(
758+
cr, self._channels, self._last_notif_sent_id, [n[0] for n in self._notif_history]
760759
)
761760
if not notifications:
762761
return

0 commit comments

Comments
 (0)