Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
Release History
---------------

2.13.8 (2026-03-20)
+++++++++++++++++++

**Improvements**

- BetdaqExecution updated to have two thread pools (place/update and cancel)

2.13.7 (2026-03-20)
+++++++++++++++++++

Expand Down
2 changes: 1 addition & 1 deletion flumine/__version__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__title__ = "flumine"
__description__ = "Betting trading framework"
__url__ = "https://github.com/betcode-org/flumine"
__version__ = "2.13.7"
__version__ = "2.13.8"
__author__ = "Liam Pauling"
__license__ = "MIT"
2 changes: 1 addition & 1 deletion flumine/baseflumine.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__(self, client: BaseClient = None):
# order execution class
self.simulated_execution = SimulatedExecution(self)
self.betfair_execution = BetfairExecution(self)
self.betdaq_execution = BetdaqExecution(self, 1)
self.betdaq_execution = BetdaqExecution(self)

# add client
if client:
Expand Down
45 changes: 45 additions & 0 deletions flumine/execution/betdaqexecution.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import requests
from typing import Callable
from betdaq import BetdaqError
from concurrent.futures import ThreadPoolExecutor

from .. import config
from ..clients import VenueType
from .baseexecution import BaseExecution
from ..exceptions import OrderExecutionError
Expand All @@ -20,10 +22,53 @@ class BetdaqExecution(BaseExecution):

'A Punter can not issue more than one order API (PlaceSingleOrder,
PlaceGroupOrder or ChangeOrder) at the same time'

Therefore this class holds two thread pools, one for
place/update (single) and one for cancel (multi).
"""

VENUE = VenueType.BETDAQ

def __init__(self, flumine, max_workers: int = config.max_execution_workers):
super().__init__(flumine, max_workers=max_workers)
# Single thread pool for place/update
self._thread_pool_single = ThreadPoolExecutor(max_workers=1)
# Multi worker thread pool for cancel
self._thread_pool_multi = ThreadPoolExecutor(max_workers=self._max_workers)

def handler(self, order_package: BaseOrderPackage):
"""Handles order_package, capable of place, cancel,
replace and update.
"""
http_session = self._get_http_session()
if order_package.package_type == OrderPackageType.PLACE:
func = self.execute_place
thread_pool = self._thread_pool_single
elif order_package.package_type == OrderPackageType.CANCEL:
func = self.execute_cancel
thread_pool = self._thread_pool_multi
elif order_package.package_type == OrderPackageType.UPDATE:
func = self.execute_update
thread_pool = self._thread_pool_single
elif order_package.package_type == OrderPackageType.REPLACE:
raise NotImplementedError()
else:
raise NotImplementedError()
thread_pool.submit(func, order_package, http_session)
logger.info(
"Thread pool submit",
extra={
"trading_function": func.__name__,
"session": http_session,
"latency": round(order_package.elapsed_seconds, 4),
"order_package": order_package.info,
"thread_pool": {
"num_threads": len(thread_pool._threads),
"work_queue_size": thread_pool._work_queue.qsize(),
},
},
)

def execute_place(
self, order_package: BaseOrderPackage, http_session: requests.Session
) -> None:
Expand Down
44 changes: 44 additions & 0 deletions tests/test_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -1373,6 +1373,50 @@ def setUp(self) -> None:

def test_init(self):
self.assertEqual(self.execution.VENUE, VenueType.BETDAQ)
self.assertIsNotNone(self.execution._thread_pool_single)
self.assertIsNotNone(self.execution._thread_pool_multi)

@mock.patch("flumine.execution.betdaqexecution.BetdaqExecution._get_http_session")
@mock.patch("flumine.execution.betdaqexecution.BetdaqExecution.execute_place")
def test_handler_place(self, mock_execute_place, mock__get_http_session):
mock_execute_place.__name__ = "execute_place"
mock_order_package = mock.Mock(elapsed_seconds=1)
mock_order_package.package_type = OrderPackageType.PLACE
mock_thread_pool = mock.Mock(_threads=())
self.execution._thread_pool_single = mock_thread_pool
self.execution.handler(mock_order_package)
mock_thread_pool.submit.assert_called_with(
mock_execute_place, mock_order_package, mock__get_http_session()
)
mock__get_http_session.assert_called_with()

@mock.patch("flumine.execution.betdaqexecution.BetdaqExecution._get_http_session")
@mock.patch("flumine.execution.betdaqexecution.BetdaqExecution.execute_update")
def test_handler_update(self, mock_execute_update, mock__get_http_session):
mock_execute_update.__name__ = "execute_update"
mock_order_package = mock.Mock(elapsed_seconds=1)
mock_order_package.package_type = OrderPackageType.UPDATE
mock_thread_pool = mock.Mock(_threads=())
self.execution._thread_pool_single = mock_thread_pool
self.execution.handler(mock_order_package)
mock_thread_pool.submit.assert_called_with(
mock_execute_update, mock_order_package, mock__get_http_session()
)
mock__get_http_session.assert_called_with()

@mock.patch("flumine.execution.betdaqexecution.BetdaqExecution._get_http_session")
@mock.patch("flumine.execution.betdaqexecution.BetdaqExecution.execute_cancel")
def test_handler_cancel(self, mock_execute_cancel, mock__get_http_session):
mock_execute_cancel.__name__ = "execute_cancel"
mock_order_package = mock.Mock(elapsed_seconds=1)
mock_order_package.package_type = OrderPackageType.CANCEL
mock_thread_pool = mock.Mock(_threads=())
self.execution._thread_pool_multi = mock_thread_pool
self.execution.handler(mock_order_package)
mock_thread_pool.submit.assert_called_with(
mock_execute_cancel, mock_order_package, mock__get_http_session()
)
mock__get_http_session.assert_called_with()

@mock.patch("flumine.execution.betdaqexecution.BetdaqExecution._order_logger")
@mock.patch("flumine.execution.betdaqexecution.BetdaqExecution._execution_helper")
Expand Down
Loading