Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
Release History
---------------

2.13.6 (2026-03-17)
+++++++++++++++++++

**Improvements**

- live_orders threading Event added to Markets
- complete orders logic executed on each market per update

2.13.5 (2026-03-16)
+++++++++++++++++++

Expand Down
2 changes: 1 addition & 1 deletion flumine/__version__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__title__ = "flumine"
__description__ = "Betting trading framework"
__url__ = "https://github.com/betcode-org/flumine"
__version__ = "2.13.5"
__version__ = "2.13.6"
__author__ = "Liam Pauling"
__license__ = "MIT"
14 changes: 14 additions & 0 deletions flumine/baseflumine.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ def _process_market_books(self, event: events.MarketBookEvent) -> None:
utils.call_strategy_error_handling(
strategy.process_market_book, market, market_book
)
if self.markets.live_orders:
self.markets.live_orders_event.set()
else:
self.markets.live_orders_event.clear()

def _process_sports_data(self, event: events.SportsDataEvent) -> None:
for sports_data in event.event:
Expand Down Expand Up @@ -205,6 +209,10 @@ def _process_sports_data(self, event: events.SportsDataEvent) -> None:
utils.call_strategy_error_handling(
strategy.process_sports_data, market, sports_data
)
if self.markets.live_orders:
self.markets.live_orders_event.set()
else:
self.markets.live_orders_event.clear()

def process_order_package(self, order_package) -> None:
"""Execute through client."""
Expand Down Expand Up @@ -309,6 +317,12 @@ def _process_current_orders(self, event: events.CurrentOrdersEvent) -> None:
)
for market in self.markets:
if market.closed is False and market.blotter.active:
# complete orders if required
for order in market.blotter.live_orders:
if order.complete:
if order in market.blotter.live_orders:
market.blotter.complete_order(order)
# loop strategies
for strategy in self.strategies:
strategy_orders = market.blotter.strategy_orders(strategy)
if strategy_orders:
Expand Down
2 changes: 2 additions & 0 deletions flumine/markets/markets.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import threading
from typing import Iterator, Optional
from collections import defaultdict

Expand All @@ -12,6 +13,7 @@ class Markets:
def __init__(self):
self._markets = {} # marketId: <Market>
self.events = defaultdict(list) # eventId: [<Market>, ]
self.live_orders_event = threading.Event()

def add_market(self, market_id: str, market: Market) -> None:
if market_id in self._markets:
Expand Down
13 changes: 1 addition & 12 deletions flumine/order/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
Loop through each current order:
order = Lookup order in market using marketId and orderId
if order is None (not present locally):
create local order using data and make executable #todo!!!
create local order using data and make executable
if order betId != current_order betId:
Get order using current_order betId due to replace request (new betId)
if order:
Expand Down Expand Up @@ -71,11 +71,6 @@ def process_current_orders(
continue
# process order status
process_current_order(order, current_order, log_control)
# complete order if required
if order.complete:
market = markets.markets[order.market_id]
if order in market.blotter.live_orders:
market.blotter.complete_order(order)


def process_current_order(order: BaseOrder, current_order, log_control) -> None:
Expand Down Expand Up @@ -162,12 +157,6 @@ def process_betdaq_current_orders(
continue
# process order status
process_betdaq_current_order(order, current_order)
# complete order if required
if order.complete:
for market in markets:
if order in market.blotter.live_orders:
market.blotter.complete_order(order)
break


def process_betdaq_current_order(order: BaseOrder, current_order) -> None:
Expand Down
3 changes: 2 additions & 1 deletion flumine/streams/betdaqorderpolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ def run(self) -> None:
if self.flumine.markets.live_orders:
time.sleep(self.streaming_timeout)
else:
time.sleep(SNAP_DELTA)
# immediately exits sleep if an order is placed
self.flumine.markets.live_orders_event.wait(timeout=SNAP_DELTA)

logger.info(
f"Stopped BetdaqOrderPolling '{self.stream_id}'",
Expand Down
6 changes: 6 additions & 0 deletions tests/test_baseflumine.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ def test__process_current_orders(
mock_market = mock.Mock(closed=False)
mock_market.blotter.active = True
mock_market.blotter.strategy_orders.return_value = [mock_order]
mock_market.blotter.live_orders = [mock_order]
self.base_flumine.markets = [mock_market]
mock_strategy = mock.Mock()
self.base_flumine.strategies = [mock_strategy]
Expand All @@ -456,6 +457,7 @@ def test__process_current_orders(
mock_call_process_orders_error_handling.assert_called_with(
mock_strategy, mock_market, [mock_order]
)
mock_market.blotter.complete_order.assert_called_with(mock_order)

@mock.patch("flumine.baseflumine.utils.call_process_orders_error_handling")
@mock.patch("flumine.baseflumine.process_betdaq_current_orders")
Expand All @@ -468,6 +470,7 @@ def test__process_betdaq_current_orders(
mock_market = mock.Mock(closed=False)
mock_market.blotter.active = True
mock_market.blotter.strategy_orders.return_value = [mock_order]
mock_market.blotter.live_orders = [mock_order]
self.base_flumine.markets = [mock_market]
mock_strategy = mock.Mock()
self.base_flumine.strategies = [mock_strategy]
Expand All @@ -487,6 +490,7 @@ def test__process_betdaq_current_orders(
mock_call_process_orders_error_handling.assert_called_with(
mock_strategy, mock_market, [mock_order]
)
mock_market.blotter.complete_order.assert_called_with(mock_order)

@mock.patch("flumine.baseflumine.process_current_orders")
def test__process_current_orders_no_event(self, mock_process_current_orders):
Expand All @@ -502,6 +506,7 @@ def test__process_current_orders_callback(
mock_market = mock.Mock(closed=False)
mock_market.blotter.active = True
mock_market.blotter.strategy_orders.return_value = [mock_order]
mock_market.blotter.live_orders = [mock_order]
self.base_flumine.markets = [mock_market]
mock_strategy = mock.Mock()
self.base_flumine.strategies = [mock_strategy]
Expand All @@ -522,6 +527,7 @@ def test__process_current_orders_callback(
mock_call_process_orders_error_handling.assert_called_with(
mock_strategy, mock_market, [mock_order]
)
mock_market.blotter.complete_order.assert_called_with(mock_order)

def test__process_custom_event(self):
mock_market = mock.Mock()
Expand Down
3 changes: 3 additions & 0 deletions tests/test_markets.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import unittest
import datetime
import threading
from unittest import mock
from collections import defaultdict

Expand All @@ -15,6 +16,8 @@ def setUp(self) -> None:
def test_init(self):
self.assertEqual(self.markets._markets, {})
self.assertEqual(self.markets.events, {})
self.assertIsInstance(self.markets.live_orders_event, threading.Event)
self.assertFalse(self.markets.live_orders_event.is_set())

def test_add_market(self):
mock_market = mock.Mock(event_id="1234")
Expand Down
2 changes: 0 additions & 2 deletions tests/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def test_process_current_orders_with_default_sep(self, mock_process_current_orde
mock_process_current_order.assert_called_with(
betfair_order, current_order, mock_log_control
)
self.assertEqual(market.blotter._live_orders, [])

def test_process_current_order(self):
mock_order = mock.Mock(status=OrderStatus.EXECUTABLE)
Expand Down Expand Up @@ -149,7 +148,6 @@ def test_process_betdaq_current_orders(self, mock_process_betdaq_current_order):
mock_process_betdaq_current_order.assert_called_with(
betdaq_order, current_order
)
self.assertEqual(market.blotter._live_orders, [])

def test_process_betdaq_current_order_pending(self):
mock_order = mock.Mock(
Expand Down
Loading