Skip to content

Commit c314338

Browse files
authored
chore: Add FDv2 compatible data source for testing (#347)
1 parent 2492c12 commit c314338

8 files changed

Lines changed: 1702 additions & 5 deletions

File tree

lib/ldclient-rb/config.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -708,7 +708,7 @@ class DataSystemConfig
708708
# The (optional) builder proc for FDv1-compatible fallback synchronizer
709709
#
710710
def initialize(initializers: nil, primary_synchronizer: nil, secondary_synchronizer: nil,
711-
data_store_mode: LaunchDarkly::Interfaces::DataStoreMode::READ_ONLY, data_store: nil, fdv1_fallback_synchronizer: nil)
711+
data_store_mode: LaunchDarkly::Interfaces::DataSystem::DataStoreMode::READ_ONLY, data_store: nil, fdv1_fallback_synchronizer: nil)
712712
@initializers = initializers
713713
@primary_synchronizer = primary_synchronizer
714714
@secondary_synchronizer = secondary_synchronizer

lib/ldclient-rb/data_system.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def initialize
1919
@primary_synchronizer = nil
2020
@secondary_synchronizer = nil
2121
@fdv1_fallback_synchronizer = nil
22-
@data_store_mode = LaunchDarkly::Interfaces::DataStoreMode::READ_ONLY
22+
@data_store_mode = LaunchDarkly::Interfaces::DataSystem::DataStoreMode::READ_ONLY
2323
@data_store = nil
2424
end
2525

@@ -205,7 +205,7 @@ def self.custom
205205
# @return [ConfigBuilder]
206206
#
207207
def self.daemon(store)
208-
custom.data_store(store, LaunchDarkly::Interfaces::DataStoreMode::READ_ONLY)
208+
custom.data_store(store, LaunchDarkly::Interfaces::DataSystem::DataStoreMode::READ_ONLY)
209209
end
210210

211211
#
@@ -219,7 +219,7 @@ def self.daemon(store)
219219
# @return [ConfigBuilder]
220220
#
221221
def self.persistent_store(store)
222-
default.data_store(store, LaunchDarkly::Interfaces::DataStoreMode::READ_WRITE)
222+
default.data_store(store, LaunchDarkly::Interfaces::DataSystem::DataStoreMode::READ_WRITE)
223223
end
224224
end
225225
end

lib/ldclient-rb/impl/data_store/store.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ def get_data_store_status_provider
328328
private def send_change_events(affected_items)
329329
affected_items.each do |item|
330330
if item[:kind] == FEATURES
331-
@flag_change_broadcaster.broadcast(item[:key])
331+
@flag_change_broadcaster.broadcast(LaunchDarkly::Interfaces::FlagChange.new(item[:key]))
332332
end
333333
end
334334
end
Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
require 'concurrent/atomics'
2+
require 'ldclient-rb/impl/data_system'
3+
require 'ldclient-rb/interfaces/data_system'
4+
require 'ldclient-rb/util'
5+
require 'thread'
6+
7+
module LaunchDarkly
8+
module Impl
9+
module Integrations
10+
module TestData
11+
#
12+
# Internal implementation of both Initializer and Synchronizer protocols for TestDataV2.
13+
#
14+
# This component bridges the test data management in TestDataV2 with the FDv2 protocol
15+
# interfaces. Each instance implements both Initializer and Synchronizer protocols
16+
# and receives change notifications for dynamic updates.
17+
#
18+
class TestDataSourceV2
19+
include LaunchDarkly::Interfaces::DataSystem::Initializer
20+
include LaunchDarkly::Interfaces::DataSystem::Synchronizer
21+
22+
# @api private
23+
#
24+
# @param test_data [LaunchDarkly::Integrations::TestDataV2] the test data instance
25+
#
26+
def initialize(test_data)
27+
@test_data = test_data
28+
@closed = false
29+
@update_queue = Queue.new
30+
@lock = Mutex.new
31+
32+
# Always register for change notifications
33+
@test_data.add_instance(self)
34+
end
35+
36+
#
37+
# Return the name of this data source.
38+
#
39+
# @return [String]
40+
#
41+
def name
42+
'TestDataV2'
43+
end
44+
45+
#
46+
# Implementation of the Initializer.fetch method.
47+
#
48+
# Returns the current test data as a Basis for initial data loading.
49+
#
50+
# @param selector_store [LaunchDarkly::Interfaces::DataSystem::SelectorStore] Provides the Selector (unused for test data)
51+
# @return [LaunchDarkly::Result] A Result containing either a Basis or an error message
52+
#
53+
def fetch(selector_store)
54+
begin
55+
@lock.synchronize do
56+
if @closed
57+
return LaunchDarkly::Result.fail('TestDataV2 source has been closed')
58+
end
59+
60+
# Get all current flags and segments from test data
61+
init_data = @test_data.make_init_data
62+
version = @test_data.get_version
63+
64+
# Build a full transfer changeset
65+
builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new
66+
builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL)
67+
68+
# Add all flags to the changeset
69+
init_data[:flags].each do |key, flag_data|
70+
builder.add_put(
71+
LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG,
72+
key,
73+
flag_data[:version] || 1,
74+
flag_data
75+
)
76+
end
77+
78+
# Add all segments to the changeset
79+
init_data[:segments].each do |key, segment_data|
80+
builder.add_put(
81+
LaunchDarkly::Interfaces::DataSystem::ObjectKind::SEGMENT,
82+
key,
83+
segment_data[:version] || 1,
84+
segment_data
85+
)
86+
end
87+
88+
# Create selector for this version
89+
selector = LaunchDarkly::Interfaces::DataSystem::Selector.new_selector(version.to_s, version)
90+
change_set = builder.finish(selector)
91+
92+
basis = LaunchDarkly::Interfaces::DataSystem::Basis.new(change_set: change_set, persist: false, environment_id: nil)
93+
94+
LaunchDarkly::Result.success(basis)
95+
end
96+
rescue => e
97+
LaunchDarkly::Result.fail("Error fetching test data: #{e.message}", e)
98+
end
99+
end
100+
101+
#
102+
# Implementation of the Synchronizer.sync method.
103+
#
104+
# Yields updates as test data changes occur.
105+
#
106+
# @param selector_store [LaunchDarkly::Interfaces::DataSystem::SelectorStore] Provides the Selector (unused for test data)
107+
# @yield [LaunchDarkly::Interfaces::DataSystem::Update] Yields Update objects as synchronization progresses
108+
# @return [void]
109+
#
110+
def sync(selector_store)
111+
# First yield initial data
112+
initial_result = fetch(selector_store)
113+
unless initial_result.success?
114+
yield LaunchDarkly::Interfaces::DataSystem::Update.new(
115+
state: LaunchDarkly::Interfaces::DataSource::Status::OFF,
116+
error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(
117+
LaunchDarkly::Interfaces::DataSource::ErrorInfo::STORE_ERROR,
118+
0,
119+
initial_result.error,
120+
Time.now
121+
)
122+
)
123+
return
124+
end
125+
126+
# Yield the initial successful state
127+
yield LaunchDarkly::Interfaces::DataSystem::Update.new(
128+
state: LaunchDarkly::Interfaces::DataSource::Status::VALID,
129+
change_set: initial_result.value.change_set
130+
)
131+
132+
# Continue yielding updates as they arrive
133+
until @closed
134+
begin
135+
# stop() will push nil to the queue to wake us up when shutting down
136+
update = @update_queue.pop
137+
138+
# Handle nil sentinel for shutdown
139+
break if update.nil?
140+
141+
# Yield the actual update
142+
yield update
143+
rescue => e
144+
yield LaunchDarkly::Interfaces::DataSystem::Update.new(
145+
state: LaunchDarkly::Interfaces::DataSource::Status::OFF,
146+
error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(
147+
LaunchDarkly::Interfaces::DataSource::ErrorInfo::UNKNOWN,
148+
0,
149+
"Error in test data synchronizer: #{e.message}",
150+
Time.now
151+
)
152+
)
153+
break
154+
end
155+
end
156+
end
157+
158+
#
159+
# Stop the data source and clean up resources
160+
#
161+
# @return [void]
162+
#
163+
def stop
164+
@lock.synchronize do
165+
return if @closed
166+
@closed = true
167+
end
168+
169+
@test_data.closed_instance(self)
170+
# Signal shutdown to sync generator
171+
@update_queue.push(nil)
172+
end
173+
174+
#
175+
# Called by TestDataV2 when a flag is updated.
176+
#
177+
# This method converts the flag update into an FDv2 changeset and
178+
# queues it for delivery through the sync() generator.
179+
#
180+
# @param flag_data [Hash] the flag data
181+
# @return [void]
182+
#
183+
def upsert_flag(flag_data)
184+
@lock.synchronize do
185+
return if @closed
186+
187+
begin
188+
version = @test_data.get_version
189+
190+
# Build a changes transfer changeset
191+
builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new
192+
builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_CHANGES)
193+
194+
# Add the updated flag
195+
builder.add_put(
196+
LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG,
197+
flag_data[:key],
198+
flag_data[:version] || 1,
199+
flag_data
200+
)
201+
202+
# Create selector for this version
203+
selector = LaunchDarkly::Interfaces::DataSystem::Selector.new_selector(version.to_s, version)
204+
change_set = builder.finish(selector)
205+
206+
# Queue the update
207+
update = LaunchDarkly::Interfaces::DataSystem::Update.new(
208+
state: LaunchDarkly::Interfaces::DataSource::Status::VALID,
209+
change_set: change_set
210+
)
211+
212+
@update_queue.push(update)
213+
rescue => e
214+
# Queue an error update
215+
error_update = LaunchDarkly::Interfaces::DataSystem::Update.new(
216+
state: LaunchDarkly::Interfaces::DataSource::Status::OFF,
217+
error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(
218+
LaunchDarkly::Interfaces::DataSource::ErrorInfo::STORE_ERROR,
219+
0,
220+
"Error processing flag update: #{e.message}",
221+
Time.now
222+
)
223+
)
224+
@update_queue.push(error_update)
225+
end
226+
end
227+
end
228+
229+
#
230+
# Called by TestDataV2 when a segment is updated.
231+
#
232+
# This method converts the segment update into an FDv2 changeset and
233+
# queues it for delivery through the sync() generator.
234+
#
235+
# @param segment_data [Hash] the segment data
236+
# @return [void]
237+
#
238+
def upsert_segment(segment_data)
239+
@lock.synchronize do
240+
return if @closed
241+
242+
begin
243+
version = @test_data.get_version
244+
245+
# Build a changes transfer changeset
246+
builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new
247+
builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_CHANGES)
248+
249+
# Add the updated segment
250+
builder.add_put(
251+
LaunchDarkly::Interfaces::DataSystem::ObjectKind::SEGMENT,
252+
segment_data[:key],
253+
segment_data[:version] || 1,
254+
segment_data
255+
)
256+
257+
# Create selector for this version
258+
selector = LaunchDarkly::Interfaces::DataSystem::Selector.new_selector(version.to_s, version)
259+
change_set = builder.finish(selector)
260+
261+
# Queue the update
262+
update = LaunchDarkly::Interfaces::DataSystem::Update.new(
263+
state: LaunchDarkly::Interfaces::DataSource::Status::VALID,
264+
change_set: change_set
265+
)
266+
267+
@update_queue.push(update)
268+
rescue => e
269+
# Queue an error update
270+
error_update = LaunchDarkly::Interfaces::DataSystem::Update.new(
271+
state: LaunchDarkly::Interfaces::DataSource::Status::OFF,
272+
error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(
273+
LaunchDarkly::Interfaces::DataSource::ErrorInfo::STORE_ERROR,
274+
0,
275+
"Error processing segment update: #{e.message}",
276+
Time.now
277+
)
278+
)
279+
@update_queue.push(error_update)
280+
end
281+
end
282+
end
283+
end
284+
end
285+
end
286+
end
287+
end
288+

0 commit comments

Comments
 (0)