From debe317b2ebae05a6b438169990bd389fe83bd3e Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Wed, 28 Jan 2026 16:41:15 +0000 Subject: [PATCH 1/3] chore: Create FDv2 File Data Source --- .gitignore | 1 + .../impl/integrations/file_data_source_v2.rb | 488 ++++++++++++++++ lib/ldclient-rb/integrations/file_data.rb | 62 +++ spec/integrations/file_data_source_v2_spec.rb | 523 ++++++++++++++++++ 4 files changed, 1074 insertions(+) create mode 100644 lib/ldclient-rb/impl/integrations/file_data_source_v2.rb create mode 100644 spec/integrations/file_data_source_v2_spec.rb diff --git a/.gitignore b/.gitignore index d1ed1a09..466b4051 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ /pkg/ /spec/reports/ /tmp/ +/vendor/bundle *.bundle *.so *.o diff --git a/lib/ldclient-rb/impl/integrations/file_data_source_v2.rb b/lib/ldclient-rb/impl/integrations/file_data_source_v2.rb new file mode 100644 index 00000000..9e6ce094 --- /dev/null +++ b/lib/ldclient-rb/impl/integrations/file_data_source_v2.rb @@ -0,0 +1,488 @@ +# frozen_string_literal: true + +require 'ldclient-rb/impl/util' +require 'ldclient-rb/interfaces/data_system' +require 'ldclient-rb/util' + +require 'concurrent/atomics' +require 'json' +require 'yaml' +require 'pathname' +require 'thread' + +module LaunchDarkly + module Impl + module Integrations + # + # Internal implementation of both Initializer and Synchronizer protocols for file-based data. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + # Do not use it. + # You have been warned. + # + # This component reads feature flag and segment data from local files and provides them + # via the FDv2 protocol interfaces. Each instance implements both Initializer and Synchronizer + # protocols: + # - As an Initializer: reads files once and returns initial data + # - As a Synchronizer: watches for file changes and yields updates + # + # The files use the same format as the v1 file data source, supporting flags, flagValues, + # and segments in JSON or YAML format. + # + class FileDataSourceV2 + include LaunchDarkly::Interfaces::DataSystem::Initializer + include LaunchDarkly::Interfaces::DataSystem::Synchronizer + + # To avoid pulling in 'listen' and its transitive dependencies for people who aren't using the + # file data source or who don't need auto-updating, we only enable auto-update if the 'listen' + # gem has been provided by the host app. + @@have_listen = false + begin + require 'listen' + @@have_listen = true + rescue LoadError + # Ignored + end + + # + # Initialize the file data source. + # + # @param logger [Logger] the logger + # @param paths [Array, String] file paths to load (or a single path string) + # @param poll_interval [Float] seconds between polling checks when watching files (default: 1) + # @param force_polling [Boolean] force polling even if listen gem is available (default: false) + # + def initialize(logger, paths:, poll_interval: 1, force_polling: false) + @logger = logger + @paths = paths.is_a?(Array) ? paths : [paths] + @poll_interval = poll_interval + @force_polling = force_polling + @use_listen = @@have_listen && !@force_polling + + @closed = false + @update_queue = Queue.new + @lock = Mutex.new + @listener = nil + + @version_lock = Mutex.new + @last_version = 1 + end + + # + # Return the name of this data source. + # + # @return [String] + # + def name + 'FileDataV2' + end + + # + # Implementation of the Initializer.fetch method. + # + # Reads all configured files once and returns their contents as a Basis. + # + # @param selector_store [LaunchDarkly::Interfaces::DataSystem::SelectorStore] Provides the Selector (unused for file data) + # @return [LaunchDarkly::Result] A Result containing either a Basis or an error message + # + def fetch(selector_store) + @lock.synchronize do + if @closed + return LaunchDarkly::Result.fail('FileDataV2 source has been closed') + end + + result = load_all_to_changeset + return result unless result.success? + + change_set = result.value + basis = LaunchDarkly::Interfaces::DataSystem::Basis.new( + change_set: change_set, + persist: false, + environment_id: nil + ) + + LaunchDarkly::Result.success(basis) + end + rescue => e + @logger.error { "[LDClient] Error fetching file data: #{e.message}" } + LaunchDarkly::Result.fail("Error fetching file data: #{e.message}", e) + end + + # + # Implementation of the Synchronizer.sync method. + # + # Yields initial data from files, then continues to watch for file changes + # and yields updates when files are modified. + # + # @param selector_store [LaunchDarkly::Interfaces::DataSystem::SelectorStore] Provides the Selector (unused for file data) + # @yield [LaunchDarkly::Interfaces::DataSystem::Update] Yields Update objects as synchronization progresses + # @return [void] + # + def sync(selector_store) + # First yield initial data + initial_result = fetch(selector_store) + unless initial_result.success? + yield LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::OFF, + error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( + LaunchDarkly::Interfaces::DataSource::ErrorInfo::INVALID_DATA, + 0, + initial_result.error, + Time.now + ) + ) + return + end + + # Yield the initial successful state + yield LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::VALID, + change_set: initial_result.value.change_set + ) + + # Start watching for file changes + @lock.synchronize do + @listener = start_listener unless @closed + end + + # Continue yielding updates as they arrive + until @closed + begin + # stop() will push nil to the queue to wake us up when shutting down + update = @update_queue.pop + + # Handle nil sentinel for shutdown + break if update.nil? + + # Yield the actual update + yield update + rescue => e + yield LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::OFF, + error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( + LaunchDarkly::Interfaces::DataSource::ErrorInfo::UNKNOWN, + 0, + "Error in file data synchronizer: #{e.message}", + Time.now + ) + ) + break + end + end + end + + # + # Stop the data source and clean up resources. + # + # @return [void] + # + def stop + @lock.synchronize do + return if @closed + @closed = true + + listener = @listener + @listener = nil + + listener&.stop + end + + # Signal shutdown to sync generator + @update_queue.push(nil) + end + + private + + # + # Load all files and build a changeset. + # + # @return [LaunchDarkly::Result] A Result containing either a ChangeSet or an error message + # + def load_all_to_changeset + flags_dict = {} + segments_dict = {} + + @paths.each do |path| + begin + load_file(path, flags_dict, segments_dict) + rescue => e + Impl::Util.log_exception(@logger, "Unable to load flag data from \"#{path}\"", e) + return LaunchDarkly::Result.fail("Unable to load flag data from \"#{path}\": #{e.message}", e) + end + end + + # Build a full transfer changeset + builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new + builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + + # Add all flags to the changeset + flags_dict.each do |key, flag_data| + builder.add_put( + LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key, + flag_data[:version] || 1, + flag_data + ) + end + + # Add all segments to the changeset + segments_dict.each do |key, segment_data| + builder.add_put( + LaunchDarkly::Interfaces::DataSystem::ObjectKind::SEGMENT, + key, + segment_data[:version] || 1, + segment_data + ) + end + + # Use no_selector since we don't have versioning information from files + change_set = builder.finish(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector) + + LaunchDarkly::Result.success(change_set) + end + + # + # Load a single file and add its contents to the provided dictionaries. + # + # @param path [String] path to the file + # @param flags_dict [Hash] dictionary to add flags to + # @param segments_dict [Hash] dictionary to add segments to + # + def load_file(path, flags_dict, segments_dict) + version = 1 + @version_lock.synchronize do + version = @last_version + @last_version += 1 + end + + parsed = parse_content(File.read(path)) + + (parsed[:flags] || {}).each do |key, flag| + flag[:version] = version + add_item(flags_dict, 'flags', flag) + end + + (parsed[:flagValues] || {}).each do |key, value| + add_item(flags_dict, 'flags', make_flag_with_value(key.to_s, value, version)) + end + + (parsed[:segments] || {}).each do |key, segment| + segment[:version] = version + add_item(segments_dict, 'segments', segment) + end + end + + # + # Parse file content as JSON or YAML. + # + # @param content [String] file content string + # @return [Hash] parsed dictionary with symbolized keys + # + def parse_content(content) + # We can use the Ruby YAML parser for both YAML and JSON (JSON is a subset of YAML and while + # not all YAML parsers handle it correctly, we have verified that the Ruby one does, at least + # for all the samples of actual flag data that we've tested). + symbolize_all_keys(YAML.safe_load(content)) + end + + # + # Recursively symbolize all keys in a hash or array. + # + # @param value [Object] the value to symbolize + # @return [Object] the value with all keys symbolized + # + def symbolize_all_keys(value) + # This is necessary because YAML.load doesn't have an option for parsing keys as symbols, and + # the SDK expects all objects to be formatted that way. + if value.is_a?(Hash) + value.map { |k, v| [k.to_sym, symbolize_all_keys(v)] }.to_h + elsif value.is_a?(Array) + value.map { |v| symbolize_all_keys(v) } + else + value + end + end + + # + # Add an item to a dictionary, checking for duplicates. + # + # @param items_dict [Hash] dictionary to add to + # @param kind_name [String] name of the kind (for error messages) + # @param item [Hash] item to add + # + def add_item(items_dict, kind_name, item) + key = item[:key].to_sym + if items_dict[key].nil? + items_dict[key] = item + else + raise ArgumentError, "In #{kind_name}, key \"#{item[:key]}\" was used more than once" + end + end + + # + # Create a simple flag configuration from a key-value pair. + # + # @param key [String] flag key + # @param value [Object] flag value + # @param version [Integer] version number + # @return [Hash] flag dictionary + # + def make_flag_with_value(key, value, version) + { + key: key, + on: true, + version: version, + fallthrough: { variation: 0 }, + variations: [value], + } + end + + # + # Callback invoked when files change. + # + # Reloads all files and queues an update. + # + def on_file_change + @lock.synchronize do + return if @closed + + begin + # Reload all files + result = load_all_to_changeset + + if result.success? + # Queue a successful update + update = LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::VALID, + change_set: result.value + ) + @update_queue.push(update) + else + # Queue an error update + error_update = LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, + error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( + LaunchDarkly::Interfaces::DataSource::ErrorInfo::INVALID_DATA, + 0, + result.error, + Time.now + ) + ) + @update_queue.push(error_update) + end + rescue => e + @logger.error { "[LDClient] Error processing file change: #{e.message}" } + error_update = LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, + error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( + LaunchDarkly::Interfaces::DataSource::ErrorInfo::UNKNOWN, + 0, + "Error processing file change: #{e.message}", + Time.now + ) + ) + @update_queue.push(error_update) + end + end + end + + # + # Start watching files for changes. + # + # @return [Object] auto-updater instance + # + def start_listener + resolved_paths = @paths.map do |p| + begin + Pathname.new(File.absolute_path(p)).realpath.to_s + rescue + @logger.warn { "[LDClient] Cannot watch for changes to data file \"#{p}\" because it is an invalid path" } + nil + end + end.compact + + if @use_listen + start_listener_with_listen_gem(resolved_paths) + else + FileDataSourcePollerV2.new(resolved_paths, @poll_interval, method(:on_file_change), @logger) + end + end + + # + # Start listening for file changes using the listen gem. + # + # @param resolved_paths [Array] resolved file paths to watch + # @return [Listen::Listener] the listener instance + # + def start_listener_with_listen_gem(resolved_paths) + path_set = resolved_paths.to_set + dir_paths = resolved_paths.map { |p| File.dirname(p) }.uniq + opts = { latency: @poll_interval } + l = Listen.to(*dir_paths, **opts) do |modified, added, removed| + paths = modified + added + removed + if paths.any? { |p| path_set.include?(p) } + on_file_change + end + end + l.start + l + end + end + + # + # Used internally by FileDataSourceV2 to track data file changes if the 'listen' gem is not available. + # + class FileDataSourcePollerV2 + def initialize(resolved_paths, interval, on_change_callback, logger) + @stopped = Concurrent::AtomicBoolean.new(false) + @on_change = on_change_callback + @logger = logger + + get_file_times = proc do + ret = {} + resolved_paths.each do |path| + begin + ret[path] = File.mtime(path) + rescue Errno::ENOENT + ret[path] = nil + end + end + ret + end + + last_times = get_file_times.call + @thread = Thread.new do + loop do + sleep interval + break if @stopped.value + + begin + new_times = get_file_times.call + changed = false + last_times.each do |path, old_time| + new_time = new_times[path] + if !new_time.nil? && new_time != old_time + changed = true + break + end + end + last_times = new_times + @on_change.call if changed + rescue => e + Impl::Util.log_exception(@logger, "Unexpected exception in FileDataSourcePollerV2", e) + end + end + end + @thread.name = "LD/FileDataSourceV2" + end + + def stop + @stopped.make_true + @thread.run # wakes it up if it's sleeping + end + end + end + end +end diff --git a/lib/ldclient-rb/integrations/file_data.rb b/lib/ldclient-rb/integrations/file_data.rb index fb85ad98..1931c05b 100644 --- a/lib/ldclient-rb/integrations/file_data.rb +++ b/lib/ldclient-rb/integrations/file_data.rb @@ -1,4 +1,5 @@ require 'ldclient-rb/impl/integrations/file_data_source' +require 'ldclient-rb/impl/integrations/file_data_source_v2' module LaunchDarkly module Integrations @@ -103,6 +104,67 @@ def self.data_source(options={}) lambda { |sdk_key, config| Impl::Integrations::FileDataSourceImpl.new(config.feature_store, config.data_source_update_sink, config.logger, options) } end + + # + # Returns a builder for the FDv2-compatible file data source. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + # Do not use it. + # You have been warned. + # + # This method returns a builder proc that can be used with the FDv2 data system + # configuration as both an Initializer and a Synchronizer. When used as an Initializer + # (via `fetch`), it reads files once. When used as a Synchronizer (via `sync`), it + # watches for file changes and automatically updates when files are modified. + # + # @param options [Hash] the configuration options + # @option options [Array, String] :paths The paths of the source files for loading flag data. These + # may be absolute paths or relative to the current working directory. (Required) + # @option options [Float] :poll_interval The minimum interval, in seconds, between checks for + # file modifications - used only if the native file-watching mechanism from 'listen' is not + # being used. The default value is 1 second. + # @option options [Boolean] :force_polling Force polling even if the 'listen' gem is available. + # The default value is false. + # @return [Proc] a builder proc that can be used as an FDv2 initializer or synchronizer + # + # @example Using as an initializer + # file_source = LaunchDarkly::Integrations::FileData.data_source_v2(paths: ['flags.json']) + # data_system_config = LaunchDarkly::DataSystemConfig.new( + # initializers: [file_source] + # ) + # config = LaunchDarkly::Config.new(data_system: data_system_config) + # + # @example Using as a synchronizer + # file_source = LaunchDarkly::Integrations::FileData.data_source_v2(paths: ['flags.json']) + # data_system_config = LaunchDarkly::DataSystemConfig.new( + # synchronizer: file_source + # ) + # config = LaunchDarkly::Config.new(data_system: data_system_config) + # + # @example Using as both initializer and synchronizer + # file_source = LaunchDarkly::Integrations::FileData.data_source_v2(paths: ['flags.json']) + # data_system_config = LaunchDarkly::DataSystemConfig.new( + # initializers: [file_source], + # synchronizer: file_source + # ) + # config = LaunchDarkly::Config.new(data_system: data_system_config) + # + def self.data_source_v2(options = {}) + paths = options[:paths] || [] + poll_interval = options[:poll_interval] || 1 + force_polling = options[:force_polling] || false + + lambda { |_sdk_key, config| + Impl::Integrations::FileDataSourceV2.new( + config.logger, + paths: paths, + poll_interval: poll_interval, + force_polling: force_polling + ) + } + end end end end diff --git a/spec/integrations/file_data_source_v2_spec.rb b/spec/integrations/file_data_source_v2_spec.rb new file mode 100644 index 00000000..4344408a --- /dev/null +++ b/spec/integrations/file_data_source_v2_spec.rb @@ -0,0 +1,523 @@ +# frozen_string_literal: true + +require "spec_helper" +require "tempfile" +require "ldclient-rb/impl/integrations/file_data_source_v2" +require "ldclient-rb/integrations/file_data" +require "ldclient-rb/interfaces/data_system" + +module LaunchDarkly + module Integrations + RSpec.describe "FileDataSourceV2" do + let(:logger) { $null_log } + + let(:all_properties_json) { <<-EOF +{ + "flags": { + "flag1": { + "key": "flag1", + "on": true, + "fallthrough": { + "variation": 2 + }, + "variations": [ "fall", "off", "on" ] + } + }, + "flagValues": { + "flag2": "value2" + }, + "segments": { + "seg1": { + "key": "seg1", + "include": ["user1"] + } + } +} +EOF + } + + let(:all_properties_yaml) { <<-EOF +--- +flags: + flag1: + key: flag1 + "on": true +flagValues: + flag2: value2 +segments: + seg1: + key: seg1 + include: ["user1"] +EOF + } + + let(:flag_only_json) { <<-EOF +{ + "flags": { + "flag1": { + "key": "flag1", + "on": true, + "fallthrough": { + "variation": 2 + }, + "variations": [ "fall", "off", "on" ] + } + } +} +EOF + } + + let(:segment_only_json) { <<-EOF +{ + "segments": { + "seg1": { + "key": "seg1", + "include": ["user1"] + } + } +} +EOF + } + + let(:flag_values_only_json) { <<-EOF +{ + "flagValues": { + "flag2": "value2" + } +} +EOF + } + + class MockSelectorStore + include LaunchDarkly::Interfaces::DataSystem::SelectorStore + + def initialize(selector) + @selector = selector + end + + def selector + @selector + end + end + + before do + @tmp_dir = Dir.mktmpdir + end + + after do + FileUtils.rm_rf(@tmp_dir) + end + + def make_temp_file(content) + file = Tempfile.new('flags', @tmp_dir) + IO.write(file, content) + file + end + + def no_selector_store + MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector) + end + + describe "initializer (fetch)" do + it "creates valid initializer" do + file = make_temp_file(all_properties_json) + + source = Impl::Integrations::FileDataSourceV2.new(logger, paths: [file.path]) + + begin + result = source.fetch(no_selector_store) + + expect(result.success?).to eq(true) + + basis = result.value + expect(basis.persist).to eq(false) + expect(basis.environment_id).to be_nil + expect(basis.change_set.intent_code).to eq(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + + # Should have 2 flags and 1 segment + changes = basis.change_set.changes + expect(changes.length).to eq(3) + + flag_changes = changes.select { |c| c.kind == LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG } + segment_changes = changes.select { |c| c.kind == LaunchDarkly::Interfaces::DataSystem::ObjectKind::SEGMENT } + + expect(flag_changes.length).to eq(2) + expect(segment_changes.length).to eq(1) + + # Check selector is no_selector + expect(basis.change_set.selector).to eq(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector) + ensure + source.stop + end + end + + it "handles missing file" do + source = Impl::Integrations::FileDataSourceV2.new(logger, paths: ['no-such-file.json']) + + begin + result = source.fetch(no_selector_store) + + expect(result.success?).to eq(false) + expect(result.error).to include("no-such-file.json") + ensure + source.stop + end + end + + it "handles invalid JSON" do + file = make_temp_file('{"flagValues":{') + + source = Impl::Integrations::FileDataSourceV2.new(logger, paths: [file.path]) + + begin + result = source.fetch(no_selector_store) + + expect(result.success?).to eq(false) + expect(result.error).to include("Unable to load flag data") + ensure + source.stop + end + end + + it "handles duplicate keys" do + file1 = make_temp_file(flag_only_json) + file2 = make_temp_file(flag_only_json) + + source = Impl::Integrations::FileDataSourceV2.new(logger, paths: [file1.path, file2.path]) + + begin + result = source.fetch(no_selector_store) + + expect(result.success?).to eq(false) + expect(result.error).to include("was used more than once") + ensure + source.stop + end + end + + it "loads multiple files" do + file1 = make_temp_file(flag_only_json) + file2 = make_temp_file(segment_only_json) + + source = Impl::Integrations::FileDataSourceV2.new(logger, paths: [file1.path, file2.path]) + + begin + result = source.fetch(no_selector_store) + + expect(result.success?).to eq(true) + + changes = result.value.change_set.changes + flag_changes = changes.select { |c| c.kind == LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG } + segment_changes = changes.select { |c| c.kind == LaunchDarkly::Interfaces::DataSystem::ObjectKind::SEGMENT } + + expect(flag_changes.length).to eq(1) + expect(segment_changes.length).to eq(1) + ensure + source.stop + end + end + + it "loads YAML" do + file = make_temp_file(all_properties_yaml) + + source = Impl::Integrations::FileDataSourceV2.new(logger, paths: [file.path]) + + begin + result = source.fetch(no_selector_store) + + expect(result.success?).to eq(true) + expect(result.value.change_set.changes.length).to eq(3) # 2 flags + 1 segment + ensure + source.stop + end + end + + it "handles flag values" do + file = make_temp_file(flag_values_only_json) + + source = Impl::Integrations::FileDataSourceV2.new(logger, paths: [file.path]) + + begin + result = source.fetch(no_selector_store) + + expect(result.success?).to eq(true) + + changes = result.value.change_set.changes + flag_changes = changes.select { |c| c.kind == LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG } + expect(flag_changes.length).to eq(1) + + # Check the flag was created with the expected structure + flag_change = flag_changes[0] + expect(flag_change.key).to eq(:flag2) + expect(flag_change.object[:key]).to eq("flag2") + expect(flag_change.object[:on]).to eq(true) + expect(flag_change.object[:variations]).to eq(["value2"]) + ensure + source.stop + end + end + end + + describe "synchronizer (sync)" do + it "creates valid synchronizer" do + file = make_temp_file(all_properties_json) + + source = Impl::Integrations::FileDataSourceV2.new( + logger, + paths: [file.path], + force_polling: true, + poll_interval: 0.1 + ) + + updates = [] + + begin + sync_thread = Thread.new do + source.sync(no_selector_store) do |update| + updates << update + break if updates.length >= 1 + end + end + + # Wait for initial update with timeout + deadline = Time.now + 5 + while updates.empty? && Time.now < deadline + sleep 0.1 + end + + expect(updates.length).to be >= 1 + expect(updates[0].state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + expect(updates[0].change_set).not_to be_nil + expect(updates[0].change_set.intent_code).to eq(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + expect(updates[0].change_set.changes.length).to eq(3) + ensure + source.stop + sync_thread&.join(2) + end + end + + it "detects file changes" do + file = make_temp_file(flag_only_json) + + source = Impl::Integrations::FileDataSourceV2.new( + logger, + paths: [file.path], + force_polling: true, + poll_interval: 0.1 + ) + + updates = [] + update_received = Concurrent::Event.new + + begin + sync_thread = Thread.new do + source.sync(no_selector_store) do |update| + updates << update + update_received.set + break if updates.length >= 2 + end + end + + # Wait for initial update + expect(update_received.wait(5)).to eq(true), "Did not receive initial update" + expect(updates.length).to eq(1) + expect(updates[0].state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + + initial_flags = updates[0].change_set.changes.select { |c| c.kind == LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG } + expect(initial_flags.length).to eq(1) + + # Modify the file + update_received.reset + sleep 0.2 # Ensure filesystem timestamp changes + IO.write(file, segment_only_json) + + # Wait for the change to be detected + expect(update_received.wait(5)).to eq(true), "Did not receive update after file change" + expect(updates.length).to eq(2) + expect(updates[1].state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + + segment_changes = updates[1].change_set.changes.select { |c| c.kind == LaunchDarkly::Interfaces::DataSystem::ObjectKind::SEGMENT } + expect(segment_changes.length).to eq(1) + ensure + source.stop + sync_thread&.join(2) + end + end + + it "reports error on invalid file update" do + file = make_temp_file(flag_only_json) + + source = Impl::Integrations::FileDataSourceV2.new( + logger, + paths: [file.path], + force_polling: true, + poll_interval: 0.1 + ) + + updates = [] + update_received = Concurrent::Event.new + + begin + sync_thread = Thread.new do + source.sync(no_selector_store) do |update| + updates << update + update_received.set + break if updates.length >= 2 + end + end + + # Wait for initial update + expect(update_received.wait(5)).to eq(true), "Did not receive initial update" + expect(updates.length).to eq(1) + expect(updates[0].state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + + # Make the file invalid + update_received.reset + sleep 0.2 # Ensure filesystem timestamp changes + IO.write(file, '{"invalid json') + + # Wait for the error to be detected + expect(update_received.wait(5)).to eq(true), "Did not receive update after file became invalid" + expect(updates.length).to eq(2) + expect(updates[1].state).to eq(LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED) + expect(updates[1].error).not_to be_nil + ensure + source.stop + sync_thread&.join(2) + end + end + + it "can be stopped" do + file = make_temp_file(all_properties_json) + + source = Impl::Integrations::FileDataSourceV2.new(logger, paths: [file.path]) + + updates = [] + + sync_thread = Thread.new do + source.sync(no_selector_store) do |update| + updates << update + end + end + + # Give it a moment to process initial data + sleep 0.3 + + # Stop it + source.stop + + # Thread should complete + sync_thread.join(2) + expect(sync_thread.alive?).to eq(false) + + # Should have received at least the initial update + expect(updates.length).to be >= 1 + expect(updates[0].state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + end + end + + describe "fetch after stop" do + it "returns error" do + file = make_temp_file(all_properties_json) + + source = Impl::Integrations::FileDataSourceV2.new(logger, paths: [file.path]) + + # First fetch should work + result = source.fetch(no_selector_store) + expect(result.success?).to eq(true) + + # Stop the source + source.stop + + # Second fetch should fail + result = source.fetch(no_selector_store) + expect(result.success?).to eq(false) + expect(result.error).to include("closed") + end + end + + describe "name property" do + it "returns correct name" do + file = make_temp_file(all_properties_json) + + source = Impl::Integrations::FileDataSourceV2.new(logger, paths: [file.path]) + + begin + expect(source.name).to eq("FileDataV2") + ensure + source.stop + end + end + end + + describe "accepts single path string" do + it "works with string instead of array" do + file = make_temp_file(flag_only_json) + + # Pass a single string instead of a list + source = Impl::Integrations::FileDataSourceV2.new(logger, paths: file.path) + + begin + result = source.fetch(no_selector_store) + + expect(result.success?).to eq(true) + expect(result.value.change_set.changes.length).to eq(1) + ensure + source.stop + end + end + end + + describe "public API (data_source_v2)" do + it "creates builder that works as initializer" do + file = make_temp_file(all_properties_json) + + builder = FileData.data_source_v2(paths: [file.path]) + config = LaunchDarkly::Config.new(logger: logger) + + source = builder.call('sdk-key', config) + + begin + result = source.fetch(no_selector_store) + + expect(result.success?).to eq(true) + expect(result.value.change_set.changes.length).to eq(3) + ensure + source.stop + end + end + + it "creates builder that works as synchronizer" do + file = make_temp_file(all_properties_json) + + builder = FileData.data_source_v2(paths: [file.path], force_polling: true, poll_interval: 0.1) + config = LaunchDarkly::Config.new(logger: logger) + + source = builder.call('sdk-key', config) + + updates = [] + + begin + sync_thread = Thread.new do + source.sync(no_selector_store) do |update| + updates << update + break if updates.length >= 1 + end + end + + deadline = Time.now + 5 + while updates.empty? && Time.now < deadline + sleep 0.1 + end + + expect(updates.length).to be >= 1 + expect(updates[0].state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + ensure + source.stop + sync_thread&.join(2) + end + end + end + end + end +end From 1ffa255736c38f887e390e4704b0bc9af3180711 Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Wed, 28 Jan 2026 19:25:33 +0000 Subject: [PATCH 2/3] cleanup comments and version logic --- .../impl/integrations/file_data_source_v2.rb | 47 ++++++------------- 1 file changed, 15 insertions(+), 32 deletions(-) diff --git a/lib/ldclient-rb/impl/integrations/file_data_source_v2.rb b/lib/ldclient-rb/impl/integrations/file_data_source_v2.rb index 9e6ce094..d054780e 100644 --- a/lib/ldclient-rb/impl/integrations/file_data_source_v2.rb +++ b/lib/ldclient-rb/impl/integrations/file_data_source_v2.rb @@ -65,9 +65,6 @@ def initialize(logger, paths:, poll_interval: 1, force_polling: false) @update_queue = Queue.new @lock = Mutex.new @listener = nil - - @version_lock = Mutex.new - @last_version = 1 end # @@ -136,7 +133,6 @@ def sync(selector_store) return end - # Yield the initial successful state yield LaunchDarkly::Interfaces::DataSystem::Update.new( state: LaunchDarkly::Interfaces::DataSource::Status::VALID, change_set: initial_result.value.change_set @@ -147,16 +143,13 @@ def sync(selector_store) @listener = start_listener unless @closed end - # Continue yielding updates as they arrive until @closed begin - # stop() will push nil to the queue to wake us up when shutting down update = @update_queue.pop - # Handle nil sentinel for shutdown + # stop() pushes nil to wake us up when shutting down break if update.nil? - # Yield the actual update yield update rescue => e yield LaunchDarkly::Interfaces::DataSystem::Update.new( @@ -213,11 +206,9 @@ def load_all_to_changeset end end - # Build a full transfer changeset builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) - # Add all flags to the changeset flags_dict.each do |key, flag_data| builder.add_put( LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, @@ -227,7 +218,6 @@ def load_all_to_changeset ) end - # Add all segments to the changeset segments_dict.each do |key, segment_data| builder.add_put( LaunchDarkly::Interfaces::DataSystem::ObjectKind::SEGMENT, @@ -251,25 +241,19 @@ def load_all_to_changeset # @param segments_dict [Hash] dictionary to add segments to # def load_file(path, flags_dict, segments_dict) - version = 1 - @version_lock.synchronize do - version = @last_version - @last_version += 1 - end - parsed = parse_content(File.read(path)) (parsed[:flags] || {}).each do |key, flag| - flag[:version] = version + flag[:version] ||= 1 add_item(flags_dict, 'flags', flag) end (parsed[:flagValues] || {}).each do |key, value| - add_item(flags_dict, 'flags', make_flag_with_value(key.to_s, value, version)) + add_item(flags_dict, 'flags', make_flag_with_value(key.to_s, value)) end (parsed[:segments] || {}).each do |key, segment| - segment[:version] = version + segment[:version] ||= 1 add_item(segments_dict, 'segments', segment) end end @@ -281,9 +265,7 @@ def load_file(path, flags_dict, segments_dict) # @return [Hash] parsed dictionary with symbolized keys # def parse_content(content) - # We can use the Ruby YAML parser for both YAML and JSON (JSON is a subset of YAML and while - # not all YAML parsers handle it correctly, we have verified that the Ruby one does, at least - # for all the samples of actual flag data that we've tested). + # Ruby's YAML parser correctly handles JSON as well symbolize_all_keys(YAML.safe_load(content)) end @@ -292,10 +274,7 @@ def parse_content(content) # # @param value [Object] the value to symbolize # @return [Object] the value with all keys symbolized - # def symbolize_all_keys(value) - # This is necessary because YAML.load doesn't have an option for parsing keys as symbols, and - # the SDK expects all objects to be formatted that way. if value.is_a?(Hash) value.map { |k, v| [k.to_sym, symbolize_all_keys(v)] }.to_h elsif value.is_a?(Array) @@ -326,14 +305,13 @@ def add_item(items_dict, kind_name, item) # # @param key [String] flag key # @param value [Object] flag value - # @param version [Integer] version number # @return [Hash] flag dictionary # - def make_flag_with_value(key, value, version) + def make_flag_with_value(key, value) { key: key, on: true, - version: version, + version: 1, fallthrough: { variation: 0 }, variations: [value], } @@ -349,18 +327,15 @@ def on_file_change return if @closed begin - # Reload all files result = load_all_to_changeset if result.success? - # Queue a successful update update = LaunchDarkly::Interfaces::DataSystem::Update.new( state: LaunchDarkly::Interfaces::DataSource::Status::VALID, change_set: result.value ) @update_queue.push(update) else - # Queue an error update error_update = LaunchDarkly::Interfaces::DataSystem::Update.new( state: LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( @@ -435,6 +410,14 @@ def start_listener_with_listen_gem(resolved_paths) # Used internally by FileDataSourceV2 to track data file changes if the 'listen' gem is not available. # class FileDataSourcePollerV2 + # + # Initialize the file data poller. + # + # @param resolved_paths [Array] resolved file paths to watch + # @param interval [Float] polling interval in seconds + # @param on_change_callback [Proc] callback to invoke when files change + # @param logger [Logger] the logger + # def initialize(resolved_paths, interval, on_change_callback, logger) @stopped = Concurrent::AtomicBoolean.new(false) @on_change = on_change_callback From d2c06dd8d544e7a37100c1209c1421cc22fe7467 Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Thu, 29 Jan 2026 15:34:01 +0000 Subject: [PATCH 3/3] address feedback --- .../impl/integrations/file_data_source_v2.rb | 29 +++++++---------- lib/ldclient-rb/integrations/file_data.rb | 32 ++++++++++++------- spec/integrations/file_data_source_v2_spec.rb | 9 ++---- 3 files changed, 35 insertions(+), 35 deletions(-) diff --git a/lib/ldclient-rb/impl/integrations/file_data_source_v2.rb b/lib/ldclient-rb/impl/integrations/file_data_source_v2.rb index d054780e..eded0ad8 100644 --- a/lib/ldclient-rb/impl/integrations/file_data_source_v2.rb +++ b/lib/ldclient-rb/impl/integrations/file_data_source_v2.rb @@ -52,14 +52,11 @@ class FileDataSourceV2 # @param logger [Logger] the logger # @param paths [Array, String] file paths to load (or a single path string) # @param poll_interval [Float] seconds between polling checks when watching files (default: 1) - # @param force_polling [Boolean] force polling even if listen gem is available (default: false) # - def initialize(logger, paths:, poll_interval: 1, force_polling: false) + def initialize(logger, paths:, poll_interval: 1) @logger = logger @paths = paths.is_a?(Array) ? paths : [paths] @poll_interval = poll_interval - @force_polling = force_polling - @use_listen = @@have_listen && !@force_polling @closed = false @update_queue = Queue.new @@ -186,14 +183,12 @@ def stop @update_queue.push(nil) end - private - # # Load all files and build a changeset. # # @return [LaunchDarkly::Result] A Result containing either a ChangeSet or an error message # - def load_all_to_changeset + private def load_all_to_changeset flags_dict = {} segments_dict = {} @@ -240,7 +235,7 @@ def load_all_to_changeset # @param flags_dict [Hash] dictionary to add flags to # @param segments_dict [Hash] dictionary to add segments to # - def load_file(path, flags_dict, segments_dict) + private def load_file(path, flags_dict, segments_dict) parsed = parse_content(File.read(path)) (parsed[:flags] || {}).each do |key, flag| @@ -264,9 +259,9 @@ def load_file(path, flags_dict, segments_dict) # @param content [String] file content string # @return [Hash] parsed dictionary with symbolized keys # - def parse_content(content) + private def parse_content(content) # Ruby's YAML parser correctly handles JSON as well - symbolize_all_keys(YAML.safe_load(content)) + symbolize_all_keys(YAML.safe_load(content)) || {} end # @@ -274,7 +269,7 @@ def parse_content(content) # # @param value [Object] the value to symbolize # @return [Object] the value with all keys symbolized - def symbolize_all_keys(value) + private def symbolize_all_keys(value) if value.is_a?(Hash) value.map { |k, v| [k.to_sym, symbolize_all_keys(v)] }.to_h elsif value.is_a?(Array) @@ -291,7 +286,7 @@ def symbolize_all_keys(value) # @param kind_name [String] name of the kind (for error messages) # @param item [Hash] item to add # - def add_item(items_dict, kind_name, item) + private def add_item(items_dict, kind_name, item) key = item[:key].to_sym if items_dict[key].nil? items_dict[key] = item @@ -307,7 +302,7 @@ def add_item(items_dict, kind_name, item) # @param value [Object] flag value # @return [Hash] flag dictionary # - def make_flag_with_value(key, value) + private def make_flag_with_value(key, value) { key: key, on: true, @@ -322,7 +317,7 @@ def make_flag_with_value(key, value) # # Reloads all files and queues an update. # - def on_file_change + private def on_file_change @lock.synchronize do return if @closed @@ -368,7 +363,7 @@ def on_file_change # # @return [Object] auto-updater instance # - def start_listener + private def start_listener resolved_paths = @paths.map do |p| begin Pathname.new(File.absolute_path(p)).realpath.to_s @@ -378,7 +373,7 @@ def start_listener end end.compact - if @use_listen + if @@have_listen start_listener_with_listen_gem(resolved_paths) else FileDataSourcePollerV2.new(resolved_paths, @poll_interval, method(:on_file_change), @logger) @@ -391,7 +386,7 @@ def start_listener # @param resolved_paths [Array] resolved file paths to watch # @return [Listen::Listener] the listener instance # - def start_listener_with_listen_gem(resolved_paths) + private def start_listener_with_listen_gem(resolved_paths) path_set = resolved_paths.to_set dir_paths = resolved_paths.map { |p| File.dirname(p) }.uniq opts = { latency: @poll_interval } diff --git a/lib/ldclient-rb/integrations/file_data.rb b/lib/ldclient-rb/integrations/file_data.rb index 1931c05b..fe29f0c8 100644 --- a/lib/ldclient-rb/integrations/file_data.rb +++ b/lib/ldclient-rb/integrations/file_data.rb @@ -125,9 +125,7 @@ def self.data_source(options={}) # @option options [Float] :poll_interval The minimum interval, in seconds, between checks for # file modifications - used only if the native file-watching mechanism from 'listen' is not # being used. The default value is 1 second. - # @option options [Boolean] :force_polling Force polling even if the 'listen' gem is available. - # The default value is false. - # @return [Proc] a builder proc that can be used as an FDv2 initializer or synchronizer + # @return [FileDataSourceV2Builder] a builder that can be used as an FDv2 initializer or synchronizer # # @example Using as an initializer # file_source = LaunchDarkly::Integrations::FileData.data_source_v2(paths: ['flags.json']) @@ -154,16 +152,26 @@ def self.data_source(options={}) def self.data_source_v2(options = {}) paths = options[:paths] || [] poll_interval = options[:poll_interval] || 1 - force_polling = options[:force_polling] || false - lambda { |_sdk_key, config| - Impl::Integrations::FileDataSourceV2.new( - config.logger, - paths: paths, - poll_interval: poll_interval, - force_polling: force_polling - ) - } + FileDataSourceV2Builder.new(paths, poll_interval) + end + end + + # + # Builder for FileDataSourceV2. + # + class FileDataSourceV2Builder + def initialize(paths, poll_interval) + @paths = paths + @poll_interval = poll_interval + end + + def build(_sdk_key, config) + Impl::Integrations::FileDataSourceV2.new( + config.logger, + paths: @paths, + poll_interval: @poll_interval + ) end end end diff --git a/spec/integrations/file_data_source_v2_spec.rb b/spec/integrations/file_data_source_v2_spec.rb index 4344408a..670e9adf 100644 --- a/spec/integrations/file_data_source_v2_spec.rb +++ b/spec/integrations/file_data_source_v2_spec.rb @@ -265,7 +265,6 @@ def no_selector_store source = Impl::Integrations::FileDataSourceV2.new( logger, paths: [file.path], - force_polling: true, poll_interval: 0.1 ) @@ -302,7 +301,6 @@ def no_selector_store source = Impl::Integrations::FileDataSourceV2.new( logger, paths: [file.path], - force_polling: true, poll_interval: 0.1 ) @@ -350,7 +348,6 @@ def no_selector_store source = Impl::Integrations::FileDataSourceV2.new( logger, paths: [file.path], - force_polling: true, poll_interval: 0.1 ) @@ -475,7 +472,7 @@ def no_selector_store builder = FileData.data_source_v2(paths: [file.path]) config = LaunchDarkly::Config.new(logger: logger) - source = builder.call('sdk-key', config) + source = builder.build('sdk-key', config) begin result = source.fetch(no_selector_store) @@ -490,10 +487,10 @@ def no_selector_store it "creates builder that works as synchronizer" do file = make_temp_file(all_properties_json) - builder = FileData.data_source_v2(paths: [file.path], force_polling: true, poll_interval: 0.1) + builder = FileData.data_source_v2(paths: [file.path], poll_interval: 0.1) config = LaunchDarkly::Config.new(logger: logger) - source = builder.call('sdk-key', config) + source = builder.build('sdk-key', config) updates = []