Skip to content

Commit b3f3f36

Browse files
authored
Merge pull request #565 from splitio/rbs-old-spec-localhost
Rbs old spec localhost
2 parents 2de48b9 + 3fea6cb commit b3f3f36

34 files changed

+1833
-231
lines changed

splitio/api/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ def set_telemetry_data(self, metric_name, telemetry_runtime_producer):
133133
self._metric_name = metric_name
134134

135135
def is_sdk_endpoint_overridden(self):
136-
return self._urls['sdk'] == SDK_URL
136+
return self._urls['sdk'] != SDK_URL
137137

138138
def _get_headers(self, extra_headers, sdk_key):
139139
headers = _build_basic_headers(sdk_key)

splitio/api/splits.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from splitio.models.telemetry import HTTPExceptionsAndLatencies
1010
from splitio.util.time import utctime_ms
1111
from splitio.spec import SPEC_VERSION
12+
from splitio.sync import util
1213

1314
_LOGGER = logging.getLogger(__name__)
1415
_SPEC_1_1 = "1.1"
@@ -36,15 +37,20 @@ def __init__(self, client, sdk_key, sdk_metadata, telemetry_runtime_producer):
3637
self._spec_version = SPEC_VERSION
3738
self._last_proxy_check_timestamp = 0
3839
self.clear_storage = False
40+
self._old_spec_since = None
3941

40-
def _convert_to_new_spec(self, body):
41-
return {"ff": {"d": body["splits"], "s": body["since"], "t": body["till"]},
42-
"rbs": {"d": [], "s": -1, "t": -1}}
43-
44-
def _check_last_proxy_check_timestamp(self):
42+
def _check_last_proxy_check_timestamp(self, since):
4543
if self._spec_version == _SPEC_1_1 and ((utctime_ms() - self._last_proxy_check_timestamp) >= _PROXY_CHECK_INTERVAL_MILLISECONDS_SS):
4644
_LOGGER.info("Switching to new Feature flag spec (%s) and fetching.", SPEC_VERSION);
4745
self._spec_version = SPEC_VERSION
46+
self._old_spec_since = since
47+
48+
def _check_old_spec_since(self, change_number):
49+
if self._spec_version == _SPEC_1_1 and self._old_spec_since is not None:
50+
since = self._old_spec_since
51+
self._old_spec_since = None
52+
return since
53+
return change_number
4854

4955

5056
class SplitsAPI(SplitsAPIBase): # pylint: disable=too-few-public-methods
@@ -80,7 +86,9 @@ def fetch_splits(self, change_number, rbs_change_number, fetch_options):
8086
:rtype: dict
8187
"""
8288
try:
83-
self._check_last_proxy_check_timestamp()
89+
self._check_last_proxy_check_timestamp(change_number)
90+
change_number = self._check_old_spec_since(change_number)
91+
8492
query, extra_headers = build_fetch(change_number, fetch_options, self._metadata, rbs_change_number)
8593
response = self._client.get(
8694
'sdk',
@@ -91,7 +99,7 @@ def fetch_splits(self, change_number, rbs_change_number, fetch_options):
9199
)
92100
if 200 <= response.status_code < 300:
93101
if self._spec_version == _SPEC_1_1:
94-
return self._convert_to_new_spec(json.loads(response.body))
102+
return util.convert_to_new_spec(json.loads(response.body))
95103

96104
self.clear_storage = self._last_proxy_check_timestamp != 0
97105
self._last_proxy_check_timestamp = 0
@@ -148,7 +156,9 @@ async def fetch_splits(self, change_number, rbs_change_number, fetch_options):
148156
:rtype: dict
149157
"""
150158
try:
151-
self._check_last_proxy_check_timestamp()
159+
self._check_last_proxy_check_timestamp(change_number)
160+
change_number = self._check_old_spec_since(change_number)
161+
152162
query, extra_headers = build_fetch(change_number, fetch_options, self._metadata, rbs_change_number)
153163
response = await self._client.get(
154164
'sdk',
@@ -159,7 +169,7 @@ async def fetch_splits(self, change_number, rbs_change_number, fetch_options):
159169
)
160170
if 200 <= response.status_code < 300:
161171
if self._spec_version == _SPEC_1_1:
162-
return self._convert_to_new_spec(json.loads(response.body))
172+
return util.convert_to_new_spec(json.loads(response.body))
163173

164174
self.clear_storage = self._last_proxy_check_timestamp != 0
165175
self._last_proxy_check_timestamp = 0

splitio/client/factory.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
564564

565565
synchronizers = SplitSynchronizers(
566566
SplitSynchronizer(apis['splits'], storages['splits'], storages['rule_based_segments']),
567-
SegmentSynchronizer(apis['segments'], storages['splits'], storages['segments']),
567+
SegmentSynchronizer(apis['segments'], storages['splits'], storages['segments'], storages['rule_based_segments']),
568568
ImpressionSynchronizer(apis['impressions'], storages['impressions'],
569569
cfg['impressionsBulkSize']),
570570
EventSynchronizer(apis['events'], storages['events'], cfg['eventsBulkSize']),
@@ -693,7 +693,7 @@ async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url=
693693

694694
synchronizers = SplitSynchronizers(
695695
SplitSynchronizerAsync(apis['splits'], storages['splits'], storages['rule_based_segments']),
696-
SegmentSynchronizerAsync(apis['segments'], storages['splits'], storages['segments']),
696+
SegmentSynchronizerAsync(apis['segments'], storages['splits'], storages['segments'], storages['rule_based_segments']),
697697
ImpressionSynchronizerAsync(apis['impressions'], storages['impressions'],
698698
cfg['impressionsBulkSize']),
699699
EventSynchronizerAsync(apis['events'], storages['events'], cfg['eventsBulkSize']),

splitio/engine/evaluator.py

Lines changed: 53 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66
from splitio.models.grammar.condition import ConditionType
77
from splitio.models.grammar.matchers.misc import DependencyMatcher
88
from splitio.models.grammar.matchers.keys import UserDefinedSegmentMatcher
9-
from splitio.models.grammar.matchers.rule_based_segment import RuleBasedSegmentMatcher
9+
from splitio.models.grammar.matchers import RuleBasedSegmentMatcher
10+
from splitio.models.rule_based_segments import SegmentType
1011
from splitio.optional.loaders import asyncio
1112

1213
CONTROL = 'control'
13-
EvaluationContext = namedtuple('EvaluationContext', ['flags', 'segment_memberships', 'segment_rbs_memberships', 'segment_rbs_conditions'])
14+
EvaluationContext = namedtuple('EvaluationContext', ['flags', 'segment_memberships', 'rbs_segments'])
1415

1516
_LOGGER = logging.getLogger(__name__)
1617

@@ -114,47 +115,24 @@ def context_for(self, key, feature_names):
114115
:rtype: EvaluationContext
115116
"""
116117
pending = set(feature_names)
118+
pending_rbs = set()
117119
splits = {}
120+
rb_segments = {}
118121
pending_memberships = set()
119-
pending_rbs_memberships = set()
120-
while pending:
122+
while pending or pending_rbs:
121123
fetched = self._flag_storage.fetch_many(list(pending))
122-
features = filter_missing(fetched)
123-
splits.update(features)
124-
pending = set()
125-
for feature in features.values():
126-
cf, cs, crbs = get_dependencies(feature)
127-
pending.update(filter(lambda f: f not in splits, cf))
128-
pending_memberships.update(cs)
129-
pending_rbs_memberships.update(crbs)
130-
131-
rbs_segment_memberships = {}
132-
rbs_segment_conditions = {}
133-
key_membership = False
134-
segment_memberhsip = False
135-
for rbs_segment in pending_rbs_memberships:
136-
rbs_segment_obj = self._rbs_segment_storage.get(rbs_segment)
137-
pending_memberships.update(rbs_segment_obj.get_condition_segment_names())
138-
139-
key_membership = key in rbs_segment_obj.excluded.get_excluded_keys()
140-
segment_memberhsip = False
141-
for segment_name in rbs_segment_obj.excluded.get_excluded_segments():
142-
if self._segment_storage.segment_contains(segment_name, key):
143-
segment_memberhsip = True
144-
break
145-
146-
rbs_segment_memberships.update({rbs_segment: segment_memberhsip or key_membership})
147-
if not (segment_memberhsip or key_membership):
148-
rbs_segment_conditions.update({rbs_segment: [condition for condition in rbs_segment_obj.conditions]})
149-
124+
fetched_rbs = self._rbs_segment_storage.fetch_many(list(pending_rbs))
125+
features, rbsegments, splits, rb_segments = update_objects(fetched, fetched_rbs, splits, rb_segments)
126+
pending, pending_memberships, pending_rbs = get_pending_objects(features, splits, rbsegments, rb_segments, pending_memberships)
127+
150128
return EvaluationContext(
151129
splits,
152130
{ segment: self._segment_storage.segment_contains(segment, key)
153131
for segment in pending_memberships
154132
},
155-
rbs_segment_memberships,
156-
rbs_segment_conditions
133+
rb_segments
157134
)
135+
158136

159137
class AsyncEvaluationDataFactory:
160138

@@ -173,60 +151,36 @@ async def context_for(self, key, feature_names):
173151
:rtype: EvaluationContext
174152
"""
175153
pending = set(feature_names)
154+
pending_rbs = set()
176155
splits = {}
156+
rb_segments = {}
177157
pending_memberships = set()
178-
pending_rbs_memberships = set()
179-
while pending:
158+
while pending or pending_rbs:
180159
fetched = await self._flag_storage.fetch_many(list(pending))
181-
features = filter_missing(fetched)
182-
splits.update(features)
183-
pending = set()
184-
for feature in features.values():
185-
cf, cs, crbs = get_dependencies(feature)
186-
pending.update(filter(lambda f: f not in splits, cf))
187-
pending_memberships.update(cs)
188-
pending_rbs_memberships.update(crbs)
189-
190-
rbs_segment_memberships = {}
191-
rbs_segment_conditions = {}
192-
key_membership = False
193-
segment_memberhsip = False
194-
for rbs_segment in pending_rbs_memberships:
195-
rbs_segment_obj = await self._rbs_segment_storage.get(rbs_segment)
196-
pending_memberships.update(rbs_segment_obj.get_condition_segment_names())
197-
198-
key_membership = key in rbs_segment_obj.excluded.get_excluded_keys()
199-
segment_memberhsip = False
200-
for segment_name in rbs_segment_obj.excluded.get_excluded_segments():
201-
if await self._segment_storage.segment_contains(segment_name, key):
202-
segment_memberhsip = True
203-
break
204-
205-
rbs_segment_memberships.update({rbs_segment: segment_memberhsip or key_membership})
206-
if not (segment_memberhsip or key_membership):
207-
rbs_segment_conditions.update({rbs_segment: [condition for condition in rbs_segment_obj.conditions]})
160+
fetched_rbs = await self._rbs_segment_storage.fetch_many(list(pending_rbs))
161+
features, rbsegments, splits, rb_segments = update_objects(fetched, fetched_rbs, splits, rb_segments)
162+
pending, pending_memberships, pending_rbs = get_pending_objects(features, splits, rbsegments, rb_segments, pending_memberships)
208163

209164
segment_names = list(pending_memberships)
210165
segment_memberships = await asyncio.gather(*[
211166
self._segment_storage.segment_contains(segment, key)
212167
for segment in segment_names
213168
])
169+
214170
return EvaluationContext(
215171
splits,
216172
dict(zip(segment_names, segment_memberships)),
217-
rbs_segment_memberships,
218-
rbs_segment_conditions
173+
rb_segments
219174
)
220175

221-
222-
def get_dependencies(feature):
176+
def get_dependencies(object):
223177
"""
224178
:rtype: tuple(list, list)
225179
"""
226180
feature_names = []
227181
segment_names = []
228182
rbs_segment_names = []
229-
for condition in feature.conditions:
183+
for condition in object.conditions:
230184
for matcher in condition.matchers:
231185
if isinstance(matcher,RuleBasedSegmentMatcher):
232186
rbs_segment_names.append(matcher._rbs_segment_name)
@@ -239,3 +193,34 @@ def get_dependencies(feature):
239193

240194
def filter_missing(features):
241195
return {k: v for (k, v) in features.items() if v is not None}
196+
197+
def get_pending_objects(features, splits, rbsegments, rb_segments, pending_memberships):
198+
pending = set()
199+
pending_rbs = set()
200+
for feature in features.values():
201+
cf, cs, crbs = get_dependencies(feature)
202+
pending.update(filter(lambda f: f not in splits, cf))
203+
pending_memberships.update(cs)
204+
pending_rbs.update(filter(lambda f: f not in rb_segments, crbs))
205+
206+
for rb_segment in rbsegments.values():
207+
cf, cs, crbs = get_dependencies(rb_segment)
208+
pending.update(filter(lambda f: f not in splits, cf))
209+
pending_memberships.update(cs)
210+
for excluded_segment in rb_segment.excluded.get_excluded_segments():
211+
if excluded_segment.type == SegmentType.STANDARD:
212+
pending_memberships.add(excluded_segment.name)
213+
else:
214+
pending_rbs.update(filter(lambda f: f not in rb_segments, [excluded_segment.name]))
215+
pending_rbs.update(filter(lambda f: f not in rb_segments, crbs))
216+
217+
return pending, pending_memberships, pending_rbs
218+
219+
def update_objects(fetched, fetched_rbs, splits, rb_segments):
220+
features = filter_missing(fetched)
221+
rbsegments = filter_missing(fetched_rbs)
222+
splits.update(features)
223+
rb_segments.update(rbsegments)
224+
225+
return features, rbsegments, splits, rb_segments
226+

splitio/models/grammar/matchers/rule_based_segment.py

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Rule based segment matcher classes."""
22
from splitio.models.grammar.matchers.base import Matcher
3+
from splitio.models.rule_based_segments import SegmentType
34

45
class RuleBasedSegmentMatcher(Matcher):
56

@@ -29,20 +30,42 @@ def _match(self, key, attributes=None, context=None):
2930
if self._rbs_segment_name == None:
3031
return False
3132

32-
# Check if rbs segment has exclusions
33-
if context['ec'].segment_rbs_memberships.get(self._rbs_segment_name):
33+
rb_segment = context['ec'].rbs_segments.get(self._rbs_segment_name)
34+
35+
if key in rb_segment.excluded.get_excluded_keys():
36+
return False
37+
38+
if self._match_dep_rb_segments(rb_segment.excluded.get_excluded_segments(), key, attributes, context):
3439
return False
3540

36-
for parsed_condition in context['ec'].segment_rbs_conditions.get(self._rbs_segment_name):
37-
if parsed_condition.matches(key, attributes, context):
38-
return True
39-
40-
return False
41+
return self._match_conditions(rb_segment.conditions, key, attributes, context)
4142

4243
def _add_matcher_specific_properties_to_json(self):
4344
"""Return UserDefinedSegment specific properties."""
4445
return {
4546
'userDefinedSegmentMatcherData': {
4647
'segmentName': self._rbs_segment_name
4748
}
48-
}
49+
}
50+
51+
def _match_conditions(self, rbs_segment_conditions, key, attributes, context):
52+
for parsed_condition in rbs_segment_conditions:
53+
if parsed_condition.matches(key, attributes, context):
54+
return True
55+
56+
return False
57+
58+
def _match_dep_rb_segments(self, excluded_rb_segments, key, attributes, context):
59+
for excluded_rb_segment in excluded_rb_segments:
60+
if excluded_rb_segment.type == SegmentType.STANDARD:
61+
if context['ec'].segment_memberships[excluded_rb_segment.name]:
62+
return True
63+
else:
64+
excluded_segment = context['ec'].rbs_segments.get(excluded_rb_segment.name)
65+
if key in excluded_segment.excluded.get_excluded_keys():
66+
return False
67+
68+
if self._match_dep_rb_segments(excluded_segment.excluded.get_excluded_segments(), key, attributes, context):
69+
return True
70+
71+
return self._match_conditions(excluded_segment.conditions, key, attributes, context)

0 commit comments

Comments
 (0)