Skip to content

Commit 26213dd

Browse files
authored
Merge pull request #153 from splitio/feature/impressionsDedupeV2_integration
Feature/impressions dedupe v2 integration
2 parents 9c66c71 + cbaa415 commit 26213dd

32 files changed

+1348
-598
lines changed

client/src/main/java/io/split/client/SplitClientConfig.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33

44
import io.split.client.impressions.ImpressionListener;
5+
import io.split.client.impressions.ImpressionsManager;
56
import io.split.integrations.IntegrationsConfig;
67
import org.apache.http.HttpHost;
78

@@ -24,6 +25,7 @@ public class SplitClientConfig {
2425
private final int _segmentsRefreshRate;
2526
private final int _impressionsRefreshRate;
2627
private final int _impressionsQueueSize;
28+
private final ImpressionsManager.Mode _impressionsMode;
2729
private final int _metricsRefreshRate;
2830
private final int _connectionTimeout;
2931
private final int _readTimeout;
@@ -64,6 +66,7 @@ private SplitClientConfig(String endpoint,
6466
int segmentsRefreshRate,
6567
int impressionsRefreshRate,
6668
int impressionsQueueSize,
69+
ImpressionsManager.Mode impressionsMode,
6770
int metricsRefreshRate,
6871
int connectionTimeout,
6972
int readTimeout,
@@ -93,6 +96,7 @@ private SplitClientConfig(String endpoint,
9396
_segmentsRefreshRate = segmentsRefreshRate;
9497
_impressionsRefreshRate = impressionsRefreshRate;
9598
_impressionsQueueSize = impressionsQueueSize;
99+
_impressionsMode = impressionsMode;
96100
_metricsRefreshRate = metricsRefreshRate;
97101
_connectionTimeout = connectionTimeout;
98102
_readTimeout = readTimeout;
@@ -158,6 +162,8 @@ public int impressionsQueueSize() {
158162
return _impressionsQueueSize;
159163
}
160164

165+
public ImpressionsManager.Mode impressionsMode() { return _impressionsMode; }
166+
161167
public int metricsRefreshRate() {
162168
return _metricsRefreshRate;
163169
}
@@ -250,8 +256,9 @@ public static final class Builder {
250256
private boolean _eventsEndpointSet = false;
251257
private int _featuresRefreshRate = 60;
252258
private int _segmentsRefreshRate = 60;
253-
private int _impressionsRefreshRate = 30;
259+
private int _impressionsRefreshRate = -1; // use -1 to identify lack of a user submitted value & handle in build()
254260
private int _impressionsQueueSize = 30000;
261+
private ImpressionsManager.Mode _impressionsMode = ImpressionsManager.Mode.OPTIMIZED;
255262
private int _connectionTimeout = 15000;
256263
private int _readTimeout = 15000;
257264
private int _numThreadsForSegmentFetch = 2;
@@ -380,6 +387,11 @@ public Builder impressionsRefreshRate(int seconds) {
380387
return this;
381388
}
382389

390+
public Builder impressionsMode(ImpressionsManager.Mode mode) {
391+
_impressionsMode = mode;
392+
return this;
393+
}
394+
383395
/**
384396
* The impression listener captures the which key saw what treatment ("on", "off", etc)
385397
* at what time. This log is periodically pushed back to split endpoint.
@@ -671,8 +683,13 @@ public SplitClientConfig build() {
671683
throw new IllegalArgumentException("segmentsRefreshRate must be >= 30: " + _segmentsRefreshRate);
672684
}
673685

674-
if (_impressionsRefreshRate <= 0) {
675-
throw new IllegalArgumentException("impressionsRefreshRate must be > 0: " + _impressionsRefreshRate);
686+
switch (_impressionsMode) {
687+
case OPTIMIZED:
688+
_impressionsRefreshRate = (_impressionsRefreshRate <= 0) ? 300 : Math.max(60, _impressionsRefreshRate);
689+
break;
690+
case DEBUG:
691+
_impressionsRefreshRate = (_impressionsRefreshRate <= 0) ? 60 : _impressionsRefreshRate;
692+
break;
676693
}
677694

678695
if (_eventFlushIntervalInMillis < 1000) {
@@ -734,6 +751,7 @@ public SplitClientConfig build() {
734751
_segmentsRefreshRate,
735752
_impressionsRefreshRate,
736753
_impressionsQueueSize,
754+
_impressionsMode,
737755
_metricsRefreshRate,
738756
_connectionTimeout,
739757
_readTimeout,

client/src/main/java/io/split/client/SplitClientImpl.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
import io.split.client.dtos.Event;
88
import io.split.client.exceptions.ChangeNumberExceptionWrapper;
99
import io.split.client.impressions.Impression;
10-
import io.split.client.impressions.ImpressionListener;
10+
import io.split.client.impressions.ImpressionsManager;
11+
import io.split.client.impressions.ImpressionsManagerImpl;
1112
import io.split.engine.SDKReadinessGates;
1213
import io.split.engine.experiments.ParsedCondition;
1314
import io.split.engine.experiments.ParsedSplit;
@@ -50,7 +51,7 @@ public final class SplitClientImpl implements SplitClient {
5051

5152
private final SplitFactory _container;
5253
private final SplitFetcher _splitFetcher;
53-
private final ImpressionListener _impressionListener;
54+
private final ImpressionsManager _impressionManager;
5455
private final Metrics _metrics;
5556
private final SplitClientConfig _config;
5657
private final EventClient _eventClient;
@@ -59,22 +60,22 @@ public final class SplitClientImpl implements SplitClient {
5960

6061
public SplitClientImpl(SplitFactory container,
6162
SplitFetcher splitFetcher,
62-
ImpressionListener impressionListener,
63+
ImpressionsManager impressionManager,
6364
Metrics metrics,
6465
EventClient eventClient,
6566
SplitClientConfig config,
6667
SDKReadinessGates gates) {
6768
_container = container;
6869
_splitFetcher = splitFetcher;
69-
_impressionListener = impressionListener;
70+
_impressionManager = impressionManager;
7071
_metrics = metrics;
7172
_eventClient = eventClient;
7273
_config = config;
7374
_gates = gates;
7475

7576
checkNotNull(gates);
7677
checkNotNull(_splitFetcher);
77-
checkNotNull(_impressionListener);
78+
checkNotNull(_impressionManager);
7879
}
7980

8081
@Override
@@ -222,7 +223,7 @@ private SplitResult getTreatmentWithConfigInternal(String label, String matching
222223
private void recordStats(String matchingKey, String bucketingKey, String split, long start, String result,
223224
String operation, String label, Long changeNumber, Map<String, Object> attributes) {
224225
try {
225-
_impressionListener.log(new Impression(matchingKey, bucketingKey, split, result, System.currentTimeMillis(), label, changeNumber, attributes));
226+
_impressionManager.track(new Impression(matchingKey, bucketingKey, split, result, System.currentTimeMillis(), label, changeNumber, attributes));
226227
_metrics.time(operation, System.currentTimeMillis() - start);
227228
} catch (Throwable t) {
228229
_log.error("Exception", t);

client/src/main/java/io/split/client/SplitFactoryImpl.java

Lines changed: 13 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import com.google.common.collect.Multiset;
55
import io.split.client.impressions.AsynchronousImpressionListener;
66
import io.split.client.impressions.ImpressionListener;
7-
import io.split.client.impressions.ImpressionsManager;
7+
import io.split.client.impressions.ImpressionsManagerImpl;
88
import io.split.client.interceptors.AddSplitHeadersFilter;
99
import io.split.client.interceptors.GzipDecoderResponseInterceptor;
1010
import io.split.client.interceptors.GzipEncoderRequestInterceptor;
@@ -51,6 +51,7 @@
5151
import java.util.List;
5252
import java.util.Random;
5353
import java.util.concurrent.TimeUnit;
54+
import java.util.stream.Collectors;
5455

5556
public class SplitFactoryImpl implements SplitFactory {
5657
private static final Logger _log = LoggerFactory.getLogger(SplitFactory.class);
@@ -168,43 +169,21 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
168169

169170
final RefreshableSplitFetcherProvider splitFetcherProvider = new RefreshableSplitFetcherProvider(splitChangeFetcher, splitParser, findPollingPeriod(RANDOM, config.featuresRefreshRate()), gates);
170171

171-
// Impressions
172-
final ImpressionsManager splitImpressionListener = ImpressionsManager.instance(httpclient, config);
173172

174173
List<ImpressionListener> impressionListeners = new ArrayList<>();
175-
impressionListeners.add(splitImpressionListener);
176-
177174
// Setup integrations
178175
if (config.integrationsConfig() != null) {
176+
config.integrationsConfig().getImpressionsListeners(IntegrationsConfig.Execution.ASYNC).stream()
177+
.map(l -> AsynchronousImpressionListener.build(l.listener(), l.queueSize()))
178+
.collect(Collectors.toCollection(() -> impressionListeners));
179179

180-
// asynchronous impressions listeners
181-
List<IntegrationsConfig.ImpressionListenerWithMeta> asyncListeners = config
182-
.integrationsConfig()
183-
.getImpressionsListeners(IntegrationsConfig.Execution.ASYNC);
184-
185-
for (IntegrationsConfig.ImpressionListenerWithMeta listener : asyncListeners) {
186-
AsynchronousImpressionListener wrapper = AsynchronousImpressionListener
187-
.build(listener.listener(), listener.queueSize());
188-
impressionListeners.add(wrapper);
189-
}
190-
191-
// synchronous impressions listeners
192-
List<IntegrationsConfig.ImpressionListenerWithMeta> syncListeners = config
193-
.integrationsConfig()
194-
.getImpressionsListeners(IntegrationsConfig.Execution.SYNC);
195-
for (IntegrationsConfig.ImpressionListenerWithMeta listener: syncListeners) {
196-
impressionListeners.add(listener.listener());
197-
198-
}
180+
config.integrationsConfig().getImpressionsListeners(IntegrationsConfig.Execution.SYNC).stream()
181+
.map(IntegrationsConfig.ImpressionListenerWithMeta::listener)
182+
.collect(Collectors.toCollection(() -> impressionListeners));
199183
}
200184

201-
final ImpressionListener impressionListener;
202-
if (impressionListeners.size() > 1) {
203-
// since there are more than just the default integration, let's federate and add them all.
204-
impressionListener = new ImpressionListener.FederatedImpressionListener(impressionListeners);
205-
} else {
206-
impressionListener = splitImpressionListener;
207-
}
185+
// Impressions
186+
final ImpressionsManagerImpl impressionsManager = ImpressionsManagerImpl.instance(httpclient, config, impressionListeners);
208187

209188
CachedMetrics cachedMetrics = new CachedMetrics(httpMetrics, TimeUnit.SECONDS.toMillis(config.metricsRefreshRate()));
210189
final FireAndForgetMetrics cachedFireAndForgetMetrics = FireAndForgetMetrics.instance(cachedMetrics, 2, 1000);
@@ -223,12 +202,12 @@ public void run() {
223202
_log.info("Successful shutdown of segment fetchers");
224203
splitFetcherProvider.close();
225204
_log.info("Successful shutdown of splits");
205+
impressionsManager.close();
206+
_log.info("Successful shutdown of impressions manager");
226207
uncachedFireAndForget.close();
227208
_log.info("Successful shutdown of metrics 1");
228209
cachedFireAndForgetMetrics.close();
229210
_log.info("Successful shutdown of metrics 2");
230-
impressionListener.close();
231-
_log.info("Successful shutdown of ImpressionListener");
232211
httpclient.close();
233212
_log.info("Successful shutdown of httpclient");
234213
eventClient.close();
@@ -253,7 +232,7 @@ public void run() {
253232

254233
_client = new SplitClientImpl(this,
255234
splitFetcherProvider.getFetcher(),
256-
impressionListener,
235+
impressionsManager,
257236
cachedFireAndForgetMetrics,
258237
eventClient,
259238
config,
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package io.split.client.dtos;
2+
3+
import com.google.gson.annotations.SerializedName;
4+
import io.split.client.impressions.ImpressionCounter;
5+
import java.util.List;
6+
import java.util.Map;
7+
import java.util.Objects;
8+
import java.util.stream.Collectors;
9+
10+
public class ImpressionCount {
11+
12+
private static final String FIELD_PER_FEATURE_COUNTS = "pf";
13+
14+
@SerializedName(FIELD_PER_FEATURE_COUNTS)
15+
public final List<CountPerFeature> perFeature;
16+
17+
public ImpressionCount(List<CountPerFeature> cs) {
18+
perFeature = cs;
19+
}
20+
21+
public static ImpressionCount fromImpressionCounterData(Map<ImpressionCounter.Key, Integer> raw) {
22+
return new ImpressionCount(raw.entrySet().stream()
23+
.map(e -> new CountPerFeature(e.getKey().featureName(), e.getKey().timeFrame(), e.getValue()))
24+
.collect(Collectors.toList()));
25+
}
26+
27+
@Override
28+
public int hashCode() {
29+
return Objects.hash(perFeature);
30+
}
31+
32+
@Override
33+
public boolean equals(Object o) {
34+
if (this == o) return true;
35+
if (o == null || getClass() != o.getClass()) return false;
36+
37+
ImpressionCount c = (ImpressionCount) o;
38+
return Objects.equals(perFeature, c.perFeature);
39+
}
40+
41+
public static class CountPerFeature {
42+
43+
private static final String FIELD_FEATURE = "f";
44+
private static final String FIELD_TIMEFRAME = "m";
45+
private static final String FIELD_COUNT = "rc";
46+
47+
@SerializedName(FIELD_FEATURE)
48+
public final String feature;
49+
50+
@SerializedName(FIELD_TIMEFRAME)
51+
public final long timeframe;
52+
53+
@SerializedName(FIELD_COUNT)
54+
public final int count;
55+
56+
public CountPerFeature(String f, long t, int c) {
57+
feature = f;
58+
timeframe = t;
59+
count = c;
60+
}
61+
62+
@Override
63+
public int hashCode() {
64+
return Objects.hash(feature, timeframe, count);
65+
}
66+
67+
@Override
68+
public boolean equals(Object o) {
69+
if (this == o) return true;
70+
if (o == null || getClass() != o.getClass()) return false;
71+
72+
CountPerFeature c = (CountPerFeature) o;
73+
return Objects.equals(feature, c.feature) && Objects.equals(timeframe, c.timeframe) &&
74+
Objects.equals(count, c.count);
75+
}
76+
}
77+
}

client/src/main/java/io/split/client/dtos/KeyImpression.java

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,43 @@
11
package io.split.client.dtos;
22

33

4+
import com.google.gson.annotations.SerializedName;
5+
import io.split.client.impressions.Impression;
6+
7+
import java.util.Objects;
8+
49
public class KeyImpression {
5-
public String feature;
10+
11+
/* package private */ static final String FIELD_KEY_NAME = "k";
12+
/* package private */ static final String FIELD_BUCKETING_KEY = "b";
13+
/* package private */ static final String FIELD_TREATMENT = "t";
14+
/* package private */ static final String FIELD_LABEL = "r";
15+
/* package private */ static final String FIELD_TIME = "m";
16+
/* package private */ static final String FIELD_CHANGE_NUMBER = "c";
17+
/* package private */ static final String FIELD_PREVIOUS_TIME = "pt";
18+
19+
public transient String feature; // Non-serializable
20+
21+
@SerializedName(FIELD_KEY_NAME)
622
public String keyName;
23+
24+
@SerializedName(FIELD_BUCKETING_KEY)
725
public String bucketingKey;
26+
27+
@SerializedName(FIELD_TREATMENT)
828
public String treatment;
29+
30+
@SerializedName(FIELD_LABEL)
931
public String label;
32+
33+
@SerializedName(FIELD_TIME)
1034
public long time;
35+
36+
@SerializedName(FIELD_CHANGE_NUMBER)
1137
public Long changeNumber; // can be null if there is no changeNumber
12-
public Long pt;
38+
39+
@SerializedName(FIELD_PREVIOUS_TIME)
40+
public Long previousTime;
1341

1442
@Override
1543
public boolean equals(Object o) {
@@ -19,7 +47,7 @@ public boolean equals(Object o) {
1947
KeyImpression that = (KeyImpression) o;
2048

2149
if (time != that.time) return false;
22-
if (feature != null ? !feature.equals(that.feature) : that.feature != null) return false;
50+
if (!Objects.equals(feature, that.feature)) return false;
2351
if (!keyName.equals(that.keyName)) return false;
2452
if (!treatment.equals(that.treatment)) return false;
2553

@@ -39,4 +67,16 @@ public int hashCode() {
3967
result = 31 * result + (int) (time ^ (time >>> 32));
4068
return result;
4169
}
70+
71+
public static KeyImpression fromImpression(Impression i) {
72+
KeyImpression ki = new KeyImpression();
73+
ki.feature = i.split();
74+
ki.keyName = i.key();
75+
ki.bucketingKey = i.bucketingKey();
76+
ki.time = i.time();
77+
ki.changeNumber = i.changeNumber();
78+
ki.treatment = i.treatment();
79+
ki.label = i.appliedRule();
80+
return ki;
81+
}
4282
}

0 commit comments

Comments
 (0)