Skip to content

Commit 3fea6cb

Browse files
authored
Merge pull request #566 from splitio/rbs-fix-segment-initial-fetch
Rbs fix segment initial fetch
2 parents 1bd96ab + 533740b commit 3fea6cb

File tree

12 files changed

+292
-61
lines changed

12 files changed

+292
-61
lines changed

splitio/api/splits.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,20 @@ def __init__(self, client, sdk_key, sdk_metadata, telemetry_runtime_producer):
3737
self._spec_version = SPEC_VERSION
3838
self._last_proxy_check_timestamp = 0
3939
self.clear_storage = False
40+
self._old_spec_since = None
4041

41-
def _check_last_proxy_check_timestamp(self):
42+
def _check_last_proxy_check_timestamp(self, since):
4243
if self._spec_version == _SPEC_1_1 and ((utctime_ms() - self._last_proxy_check_timestamp) >= _PROXY_CHECK_INTERVAL_MILLISECONDS_SS):
4344
_LOGGER.info("Switching to new Feature flag spec (%s) and fetching.", SPEC_VERSION);
4445
self._spec_version = SPEC_VERSION
46+
self._old_spec_since = since
47+
48+
def _check_old_spec_since(self, change_number):
49+
if self._spec_version == _SPEC_1_1 and self._old_spec_since is not None:
50+
since = self._old_spec_since
51+
self._old_spec_since = None
52+
return since
53+
return change_number
4554

4655

4756
class SplitsAPI(SplitsAPIBase): # pylint: disable=too-few-public-methods
@@ -77,7 +86,9 @@ def fetch_splits(self, change_number, rbs_change_number, fetch_options):
7786
:rtype: dict
7887
"""
7988
try:
80-
self._check_last_proxy_check_timestamp()
89+
self._check_last_proxy_check_timestamp(change_number)
90+
change_number = self._check_old_spec_since(change_number)
91+
8192
query, extra_headers = build_fetch(change_number, fetch_options, self._metadata, rbs_change_number)
8293
response = self._client.get(
8394
'sdk',
@@ -145,7 +156,9 @@ async def fetch_splits(self, change_number, rbs_change_number, fetch_options):
145156
:rtype: dict
146157
"""
147158
try:
148-
self._check_last_proxy_check_timestamp()
159+
self._check_last_proxy_check_timestamp(change_number)
160+
change_number = self._check_old_spec_since(change_number)
161+
149162
query, extra_headers = build_fetch(change_number, fetch_options, self._metadata, rbs_change_number)
150163
response = await self._client.get(
151164
'sdk',

splitio/client/factory.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
564564

565565
synchronizers = SplitSynchronizers(
566566
SplitSynchronizer(apis['splits'], storages['splits'], storages['rule_based_segments']),
567-
SegmentSynchronizer(apis['segments'], storages['splits'], storages['segments']),
567+
SegmentSynchronizer(apis['segments'], storages['splits'], storages['segments'], storages['rule_based_segments']),
568568
ImpressionSynchronizer(apis['impressions'], storages['impressions'],
569569
cfg['impressionsBulkSize']),
570570
EventSynchronizer(apis['events'], storages['events'], cfg['eventsBulkSize']),
@@ -693,7 +693,7 @@ async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url=
693693

694694
synchronizers = SplitSynchronizers(
695695
SplitSynchronizerAsync(apis['splits'], storages['splits'], storages['rule_based_segments']),
696-
SegmentSynchronizerAsync(apis['segments'], storages['splits'], storages['segments']),
696+
SegmentSynchronizerAsync(apis['segments'], storages['splits'], storages['segments'], storages['rule_based_segments']),
697697
ImpressionSynchronizerAsync(apis['impressions'], storages['impressions'],
698698
cfg['impressionsBulkSize']),
699699
EventSynchronizerAsync(apis['events'], storages['events'], cfg['eventsBulkSize']),

splitio/models/rule_based_segments.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,14 @@ def get_excluded_segments(self):
152152
"""Return excluded segments"""
153153
return self._segments
154154

155+
def get_excluded_standard_segments(self):
156+
"""Return excluded segments"""
157+
to_return = []
158+
for segment in self._segments:
159+
if segment.type == SegmentType.STANDARD:
160+
to_return.append(segment.name)
161+
return to_return
162+
155163
def to_json(self):
156164
"""Return a JSON representation of this object."""
157165
return {

splitio/storage/inmemmory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ def contains(self, segment_names):
233233

234234
def fetch_many(self, segment_names):
235235
return {rb_segment_name: self.get(rb_segment_name) for rb_segment_name in segment_names}
236-
236+
237237
class InMemoryRuleBasedSegmentStorageAsync(RuleBasedSegmentsStorage):
238238
"""InMemory implementation of a feature flag storage base."""
239239
def __init__(self):

splitio/sync/segment.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from splitio.util.backoff import Backoff
1111
from splitio.optional.loaders import asyncio, aiofiles
1212
from splitio.sync import util
13+
from splitio.util.storage_helper import get_standard_segment_names_in_rbs_storage, get_standard_segment_names_in_rbs_storage_async
1314
from splitio.optional.loaders import asyncio
1415

1516
_LOGGER = logging.getLogger(__name__)
@@ -22,7 +23,7 @@
2223

2324

2425
class SegmentSynchronizer(object):
25-
def __init__(self, segment_api, feature_flag_storage, segment_storage):
26+
def __init__(self, segment_api, feature_flag_storage, segment_storage, rule_based_segment_storage):
2627
"""
2728
Class constructor.
2829
@@ -39,6 +40,7 @@ def __init__(self, segment_api, feature_flag_storage, segment_storage):
3940
self._api = segment_api
4041
self._feature_flag_storage = feature_flag_storage
4142
self._segment_storage = segment_storage
43+
self._rule_based_segment_storage = rule_based_segment_storage
4244
self._worker_pool = workerpool.WorkerPool(_MAX_WORKERS, self.synchronize_segment)
4345
self._worker_pool.start()
4446
self._backoff = Backoff(
@@ -181,9 +183,12 @@ def synchronize_segments(self, segment_names = None, dont_wait = False):
181183
:rtype: bool
182184
"""
183185
if segment_names is None:
184-
segment_names = self._feature_flag_storage.get_segment_names()
186+
segment_names = set(self._feature_flag_storage.get_segment_names())
187+
segment_names.update(get_standard_segment_names_in_rbs_storage(self._rule_based_segment_storage))
185188

186189
for segment_name in segment_names:
190+
_LOGGER.debug("Adding segment name to sync worker")
191+
_LOGGER.debug(segment_name)
187192
self._worker_pool.submit_work(segment_name)
188193
if (dont_wait):
189194
return True
@@ -204,7 +209,7 @@ def segment_exist_in_storage(self, segment_name):
204209

205210

206211
class SegmentSynchronizerAsync(object):
207-
def __init__(self, segment_api, feature_flag_storage, segment_storage):
212+
def __init__(self, segment_api, feature_flag_storage, segment_storage, rule_based_segment_storage):
208213
"""
209214
Class constructor.
210215
@@ -221,6 +226,7 @@ def __init__(self, segment_api, feature_flag_storage, segment_storage):
221226
self._api = segment_api
222227
self._feature_flag_storage = feature_flag_storage
223228
self._segment_storage = segment_storage
229+
self._rule_based_segment_storage = rule_based_segment_storage
224230
self._worker_pool = workerpool.WorkerPoolAsync(_MAX_WORKERS, self.synchronize_segment)
225231
self._worker_pool.start()
226232
self._backoff = Backoff(
@@ -364,7 +370,8 @@ async def synchronize_segments(self, segment_names = None, dont_wait = False):
364370
:rtype: bool
365371
"""
366372
if segment_names is None:
367-
segment_names = await self._feature_flag_storage.get_segment_names()
373+
segment_names = set(await self._feature_flag_storage.get_segment_names())
374+
segment_names.update(await get_standard_segment_names_in_rbs_storage_async(self._rule_based_segment_storage))
368375

369376
self._jobs = await self._worker_pool.submit_work(segment_names)
370377
if (dont_wait):

splitio/sync/split.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ def _fetch_until(self, fetch_options, till=None, rbs_till=None):
139139
rbs_segment_list = update_rule_based_segment_storage(self._rule_based_segment_storage, fetched_rule_based_segments, feature_flag_changes.get('rbs')['t'], self._api.clear_storage)
140140

141141
fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('ff').get('d', [])]
142-
segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes.get('ff')['t'], self._api.clear_storage)
142+
segment_list.update(update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes.get('ff')['t'], self._api.clear_storage))
143143
segment_list.update(rbs_segment_list)
144144

145145
if feature_flag_changes.get('ff')['t'] == feature_flag_changes.get('ff')['s'] and feature_flag_changes.get('rbs')['t'] == feature_flag_changes.get('rbs')['s']:

splitio/util/storage_helper.py

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Storage Helper."""
22
import logging
33
from splitio.models import splits
4+
from splitio.models import rule_based_segments
45

56
_LOGGER = logging.getLogger(__name__)
67

@@ -58,17 +59,29 @@ def update_rule_based_segment_storage(rule_based_segment_storage, rule_based_seg
5859
for rule_based_segment in rule_based_segments:
5960
if rule_based_segment.status == splits.Status.ACTIVE:
6061
to_add.append(rule_based_segment)
61-
segment_list.update(set(_get_segment_names(rule_based_segment.excluded.get_excluded_segments())))
62+
segment_list.update(set(rule_based_segment.excluded.get_excluded_standard_segments()))
6263
segment_list.update(rule_based_segment.get_condition_segment_names())
6364
else:
6465
if rule_based_segment_storage.get(rule_based_segment.name) is not None:
6566
to_delete.append(rule_based_segment.name)
6667

6768
rule_based_segment_storage.update(to_add, to_delete, change_number)
6869
return segment_list
70+
71+
def get_standard_segment_names_in_rbs_storage(rule_based_segment_storage):
72+
"""
73+
Retrieve a list of all standard segments names.
6974
70-
def _get_segment_names(excluded_segments):
71-
return [excluded_segment.name for excluded_segment in excluded_segments]
75+
:return: Set of segment names.
76+
:rtype: Set(str)
77+
"""
78+
segment_list = set()
79+
for rb_segment in rule_based_segment_storage.get_segment_names():
80+
rb_segment_obj = rule_based_segment_storage.get(rb_segment)
81+
segment_list.update(set(rb_segment_obj.excluded.get_excluded_standard_segments()))
82+
segment_list.update(rb_segment_obj.get_condition_segment_names())
83+
84+
return segment_list
7285

7386
async def update_feature_flag_storage_async(feature_flag_storage, feature_flags, change_number, clear_storage=False):
7487
"""
@@ -124,7 +137,7 @@ async def update_rule_based_segment_storage_async(rule_based_segment_storage, ru
124137
for rule_based_segment in rule_based_segments:
125138
if rule_based_segment.status == splits.Status.ACTIVE:
126139
to_add.append(rule_based_segment)
127-
segment_list.update(set(_get_segment_names(rule_based_segment.excluded.get_excluded_segments())))
140+
segment_list.update(set(rule_based_segment.excluded.get_excluded_standard_segments()))
128141
segment_list.update(rule_based_segment.get_condition_segment_names())
129142
else:
130143
if await rule_based_segment_storage.get(rule_based_segment.name) is not None:
@@ -133,6 +146,22 @@ async def update_rule_based_segment_storage_async(rule_based_segment_storage, ru
133146
await rule_based_segment_storage.update(to_add, to_delete, change_number)
134147
return segment_list
135148

149+
async def get_standard_segment_names_in_rbs_storage_async(rule_based_segment_storage):
150+
"""
151+
Retrieve a list of all standard segments names.
152+
153+
:return: Set of segment names.
154+
:rtype: Set(str)
155+
"""
156+
segment_list = set()
157+
segment_names = await rule_based_segment_storage.get_segment_names()
158+
for rb_segment in segment_names:
159+
rb_segment_obj = await rule_based_segment_storage.get(rb_segment)
160+
segment_list.update(set(rb_segment_obj.excluded.get_excluded_standard_segments()))
161+
segment_list.update(rb_segment_obj.get_condition_segment_names())
162+
163+
return segment_list
164+
136165
def get_valid_flag_sets(flag_sets, flag_set_filter):
137166
"""
138167
Check each flag set in given array, return it if exist in a given config flag set array, if config array is empty return all

tests/api/test_splits_api.py

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,41 @@ def get(sdk, splitChanges, sdk_key, extra_headers, query):
122122
assert self.query[2] == {'s': '1.3', 'since': 123, 'rbSince': -1}
123123
assert response == {"ff": {"d": [], "s": 123, "t": 456}, "rbs": {"d": [], "s": 123, "t": -1}}
124124
assert split_api.clear_storage
125+
126+
def test_using_old_spec_since(self, mocker):
127+
"""Test using old_spec_since variable."""
128+
httpclient = mocker.Mock(spec=client.HttpClient)
129+
self.counter = 0
130+
self.query = []
131+
def get(sdk, splitChanges, sdk_key, extra_headers, query):
132+
self.counter += 1
133+
self.query.append(query)
134+
if self.counter == 1:
135+
return client.HttpResponse(400, 'error', {})
136+
if self.counter == 2:
137+
return client.HttpResponse(200, '{"splits": [], "since": 123, "till": 456}', {})
138+
if self.counter == 3:
139+
return client.HttpResponse(400, 'error', {})
140+
if self.counter == 4:
141+
return client.HttpResponse(200, '{"splits": [], "since": 456, "till": 456}', {})
142+
143+
httpclient.is_sdk_endpoint_overridden.return_value = True
144+
httpclient.get = get
145+
split_api = splits.SplitsAPI(httpclient, 'some_api_key', SdkMetadata('1.0', 'some', '1.2.3.4'), mocker.Mock())
146+
response = split_api.fetch_splits(123, -1, FetchOptions(False, None, None, None))
147+
assert response == {"ff": {"d": [], "s": 123, "t": 456}, "rbs": {"d": [], "s": -1, "t": -1}}
148+
assert self.query == [{'s': '1.3', 'since': 123, 'rbSince': -1}, {'s': '1.1', 'since': 123}]
149+
assert not split_api.clear_storage
150+
151+
time.sleep(1)
152+
splits._PROXY_CHECK_INTERVAL_MILLISECONDS_SS = 10
153+
154+
response = split_api.fetch_splits(456, -1, FetchOptions(False, None, None, None))
155+
time.sleep(1)
156+
splits._PROXY_CHECK_INTERVAL_MILLISECONDS_SS = 1000000
157+
assert self.query[2] == {'s': '1.3', 'since': 456, 'rbSince': -1}
158+
assert self.query[3] == {'s': '1.1', 'since': 456}
159+
assert response == {"ff": {"d": [], "s": 456, "t": 456}, "rbs": {"d": [], "s": -1, "t": -1}}
125160

126161
class SplitAPIAsyncTests(object):
127162
"""Split async API test cases."""
@@ -253,9 +288,45 @@ async def get(sdk, splitChanges, sdk_key, extra_headers, query):
253288
assert self.query == [{'s': '1.3', 'since': 123, 'rbSince': -1}, {'s': '1.1', 'since': 123}]
254289
assert not split_api.clear_storage
255290

256-
time.sleep(1)
291+
time.sleep(1)
257292
splits._PROXY_CHECK_INTERVAL_MILLISECONDS_SS = 10
258293
response = await split_api.fetch_splits(123, -1, FetchOptions(False, None, None, None))
259294
assert self.query[2] == {'s': '1.3', 'since': 123, 'rbSince': -1}
260295
assert response == {"ff": {"d": [], "s": 123, "t": 456}, "rbs": {"d": [], "s": 123, "t": -1}}
261296
assert split_api.clear_storage
297+
298+
@pytest.mark.asyncio
299+
async def test_using_old_spec_since(self, mocker):
300+
"""Test using old_spec_since variable."""
301+
httpclient = mocker.Mock(spec=client.HttpClient)
302+
self.counter = 0
303+
self.query = []
304+
async def get(sdk, splitChanges, sdk_key, extra_headers, query):
305+
self.counter += 1
306+
self.query.append(query)
307+
if self.counter == 1:
308+
return client.HttpResponse(400, 'error', {})
309+
if self.counter == 2:
310+
return client.HttpResponse(200, '{"splits": [], "since": 123, "till": 456}', {})
311+
if self.counter == 3:
312+
return client.HttpResponse(400, 'error', {})
313+
if self.counter == 4:
314+
return client.HttpResponse(200, '{"splits": [], "since": 456, "till": 456}', {})
315+
316+
httpclient.is_sdk_endpoint_overridden.return_value = True
317+
httpclient.get = get
318+
split_api = splits.SplitsAPIAsync(httpclient, 'some_api_key', SdkMetadata('1.0', 'some', '1.2.3.4'), mocker.Mock())
319+
response = await split_api.fetch_splits(123, -1, FetchOptions(False, None, None, None))
320+
assert response == {"ff": {"d": [], "s": 123, "t": 456}, "rbs": {"d": [], "s": -1, "t": -1}}
321+
assert self.query == [{'s': '1.3', 'since': 123, 'rbSince': -1}, {'s': '1.1', 'since': 123}]
322+
assert not split_api.clear_storage
323+
324+
time.sleep(1)
325+
splits._PROXY_CHECK_INTERVAL_MILLISECONDS_SS = 10
326+
327+
response = await split_api.fetch_splits(456, -1, FetchOptions(False, None, None, None))
328+
time.sleep(1)
329+
splits._PROXY_CHECK_INTERVAL_MILLISECONDS_SS = 1000000
330+
assert self.query[2] == {'s': '1.3', 'since': 456, 'rbSince': -1}
331+
assert self.query[3] == {'s': '1.1', 'since': 456}
332+
assert response == {"ff": {"d": [], "s": 456, "t": 456}, "rbs": {"d": [], "s": -1, "t": -1}}

0 commit comments

Comments
 (0)