Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 39 additions & 16 deletions lib/logstash/outputs/s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base
include LogStash::PluginMixins::AwsConfig::V2

PREFIX_KEY_NORMALIZE_CHARACTER = "_"
PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15
PERIODIC_FILE_ROTATOR_INTERVAL_IN_SECONDS = 15
PERIODIC_STALE_SWEEPER_INTERVAL_IN_SECONDS = 60

CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new({
:min_threads => 1,
:max_threads => 2,
Expand All @@ -114,7 +116,7 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base

# Set the time, in MINUTES, to close the current sub_time_section of bucket.
# If you also define file_size you have a number of files related to the section and the current tag.
# If it's valued 0 and rotation_strategy is 'time' or 'size_and_time' then the plugin reaise a configuration error.
# If it's valued 0 and rotation_strategy is 'time' or 'size_and_time' then the plugin raise a configuration error.
config :time_file, :validate => :number, :default => 15

# If `restore => false` is specified and Logstash crashes, the unprocessed files are not sent into the bucket.
Expand Down Expand Up @@ -186,7 +188,7 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base
config :rotation_strategy, :validate => ["size_and_time", "size", "time"], :default => "size_and_time"

# The common use case is to define permission on the root bucket and give Logstash full access to write its logs.
# In some circonstances you need finer grained permission on subfolder, this allow you to disable the check at startup.
# In some circumstances you need finer grained permission on subfolder, this allow you to disable the check at startup.
config :validate_credentials_on_root_bucket, :validate => :boolean, :default => true

# The number of times to retry a failed S3 upload.
Expand Down Expand Up @@ -217,7 +219,7 @@ def register
raise LogStash::ConfigurationError, "The S3 plugin must have at least one of time_file or size_file set to a value greater than 0"
end

@file_repository = FileRepository.new(@tags, @encoding, @temporary_directory)
@file_repository = FileRepository.new(@tags, @encoding, @temporary_directory, @logger)

@rotation = rotation_strategy

Expand All @@ -234,7 +236,9 @@ def register

# If we need time based rotation we need to do periodic check on the file
# to take care of file that were not updated recently
start_periodic_check if @rotation.needs_periodic?
start_periodic_file_rotator if @rotation.needs_periodic?

start_periodic_stale_sweeper
end

def multi_receive_encoded(events_and_encoded)
Expand All @@ -259,12 +263,11 @@ def multi_receive_encoded(events_and_encoded)
end

def close
stop_periodic_check if @rotation.needs_periodic?
stop_periodic_file_rotator if @rotation.needs_periodic?
stop_periodic_stale_sweeper # stop periodic stale sweeps

@logger.debug("Uploading current workspace")

@file_repository.shutdown # stop stale sweeps

# The plugin has stopped receiving new events, but we still have
# data on disk, lets make sure it get to S3.
# If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup
Expand Down Expand Up @@ -319,21 +322,40 @@ def upload_options
end

private
# We start a task in the background for check for stale files and make sure we rotate them to S3 if needed.
def start_periodic_check
@logger.debug("Start periodic rotation check")
# We start a task in the background to check at periodic cadence for file rotation.
# Rotated files are uploaded to S3 if their size > 0.
# Rotated files with size == 0 will be swept by stale sweeper.
def start_periodic_file_rotator
@logger.debug("Start periodic file rotation check")

@periodic_check = Concurrent::TimerTask.new(:execution_interval => PERIODIC_CHECK_INTERVAL_IN_SECONDS) do
@logger.debug("Periodic check for stale files")
@periodic_file_rotator = Concurrent::TimerTask.new(:execution_interval => PERIODIC_FILE_ROTATOR_INTERVAL_IN_SECONDS) do
@logger.debug("Periodic check for file rotation")

rotate_if_needed(@file_repository.keys)
end

@periodic_check.execute
@periodic_file_rotator.execute
end

def start_periodic_stale_sweeper
@periodic_stale_sweeper = Concurrent::TimerTask.new(:execution_interval => PERIODIC_STALE_SWEEPER_INTERVAL_IN_SECONDS) do
LogStash::Util.set_thread_name("S3, Stale factory sweeper")

@logger.debug("Periodic stale sweeper check started.")
@file_repository.keys.each do | prefix_key |
@file_repository.remove_if_stale(prefix_key)
end
end

@periodic_stale_sweeper.execute
end

def stop_periodic_file_rotator
@periodic_file_rotator.shutdown
end

def stop_periodic_check
@periodic_check.shutdown
def stop_periodic_stale_sweeper
@periodic_stale_sweeper.shutdown
end

def bucket_resource
Expand Down Expand Up @@ -387,6 +409,7 @@ def rotation_strategy
def clean_temporary_file(file)
@logger.debug? && @logger.debug("Removing temporary file", :path => file.path)
file.delete!
@file_repository.stop_tracking_temp_file(file.prefix, file)
end

# The upload process will use a separate uploader/threadpool with less resource allocated to it.
Expand Down
76 changes: 34 additions & 42 deletions lib/logstash/outputs/s3/file_repository.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ module LogStash
module Outputs
class S3
class FileRepository
DEFAULT_STATE_SWEEPER_INTERVAL_SECS = 60
DEFAULT_STALE_TIME_SECS = 15 * 60
TEMP_FILE_STALE_TIME_SECS = 15 * 60
# Ensure that all access or work done
# on a factory is threadsafe
class PrefixedValue
def initialize(file_factory, stale_time)
def initialize(file_factory, stale_time, logger)
@file_factory = file_factory
@lock = Monitor.new # reentrant Mutex
@stale_time = stale_time
@is_deleted = false
@logger = logger
end

def with_lock
Expand All @@ -28,52 +28,57 @@ def with_lock
}
end

def stale?
with_lock { |factory| factory.current.size == 0 && (Time.now - factory.current.ctime > @stale_time) }
def remove_staled_files
with_lock do |factory|
factory.temp_files = factory.temp_files.delete_if do |temp_file|
is_staled = temp_file.size == 0 && (Time.now - temp_file.ctime > @stale_time)
is_temp_dir_empty = false
begin
# checking Dir emptiness and remove file
# reading file and checking size doesn't make sense as it will not possible after temp_file.size == 0 filter
temp_file.delete! if is_staled
is_temp_dir_empty = Dir.empty?(::File.join(temp_file.temp_path, temp_file.prefix)) unless is_staled
temp_file.delete! if is_temp_dir_empty
rescue => e
@logger.error("An error occurred while sweeping temp dir.", :exception => e.class, :message => e.message, :path => temp_file.path, :backtrace => e.backtrace)
end
is_staled || is_temp_dir_empty
end
# mark as deleted once we finish tracking all temp files created
@is_deleted = factory.temp_files.length == 0
end
end

def apply(prefix)
return self
end

def delete!
with_lock do |factory|
factory.current.delete!
@is_deleted = true
end
end

def deleted?
with_lock { |_| @is_deleted }
end
end

class FactoryInitializer
include java.util.function.Function
def initialize(tags, encoding, temporary_directory, stale_time)
def initialize(tags, encoding, temporary_directory, stale_time, logger)
@tags = tags
@encoding = encoding
@temporary_directory = temporary_directory
@stale_time = stale_time
@logger = logger
end

def apply(prefix_key)
PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time)
PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time, @logger)
end
end

def initialize(tags, encoding, temporary_directory,
stale_time = DEFAULT_STALE_TIME_SECS,
sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS)
stale_time = TEMP_FILE_STALE_TIME_SECS, logger)
# The path need to contains the prefix so when we start
# logtash after a crash we keep the remote structure
# Logstash after a crash we keep the remote structure
@prefixed_factories = ConcurrentHashMap.new

@sweeper_interval = sweeper_interval

@factory_initializer = FactoryInitializer.new(tags, encoding, temporary_directory, stale_time)

start_stale_sweeper
@factory_initializer = FactoryInitializer.new(tags, encoding, temporary_directory, stale_time, logger)
end

def keys
Expand Down Expand Up @@ -146,10 +151,6 @@ def get_file(prefix_key)
nil # void return avoid leaking unsynchronized access
end

def shutdown
stop_stale_sweeper
end

def size
@prefixed_factories.size
end
Expand All @@ -162,8 +163,8 @@ def remove_if_stale(prefix_key)
# for stale detection, marking it as deleted before releasing the lock
# and causing it to become deleted from the map.
prefixed_factory.with_lock do |_|
if prefixed_factory.stale?
prefixed_factory.delete! # mark deleted to prevent reuse
prefixed_factory.remove_staled_files
if prefixed_factory.deleted?
nil # cause deletion
else
prefixed_factory # keep existing
Expand All @@ -172,20 +173,11 @@ def remove_if_stale(prefix_key)
end
end

def start_stale_sweeper
@stale_sweeper = Concurrent::TimerTask.new(:execution_interval => @sweeper_interval) do
LogStash::Util.set_thread_name("S3, Stale factory sweeper")

@prefixed_factories.keys.each do |prefix|
remove_if_stale(prefix)
end
def stop_tracking_temp_file(prefix_key, file)
prefix_val = @prefixed_factories.get(prefix_key)
prefix_val&.with_lock do |factory|
factory.temp_files.delete(file)
end

@stale_sweeper.execute
end

def stop_stale_sweeper
@stale_sweeper.shutdown
end
end
end
Expand Down
9 changes: 7 additions & 2 deletions lib/logstash/outputs/s3/temporary_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ class TemporaryFile

attr_reader :fd

def initialize(key, fd, temp_path)
def initialize(key, fd, temp_path, prefix)
@fd = fd
@key = key
@temp_path = temp_path
@prefix = prefix
@created_at = Time.now
end

Expand Down Expand Up @@ -55,6 +56,10 @@ def key
@key.gsub(/^\//, "")
end

def prefix
@prefix
end

# Each temporary file is created inside a directory named with an UUID,
# instead of deleting the file directly and having the risk of deleting other files
# we delete the root of the UUID, using a UUID also remove the risk of deleting unwanted file, it acts as
Expand Down Expand Up @@ -82,7 +87,7 @@ def self.create_from_existing_file(file_path, temporary_folder)
end
TemporaryFile.new(key_parts.slice(1, key_parts.size).join("/"),
::File.exist?(file_path) ? ::File.open(file_path, "r") : nil, # for the nil case, file size will be 0 and upload will be ignored.
::File.join(temporary_folder, key_parts.slice(0, 1)))
::File.join(temporary_folder, key_parts.slice(0, 1)), "")
end

def self.gzip_extension
Expand Down
6 changes: 4 additions & 2 deletions lib/logstash/outputs/s3/temporary_file_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class TemporaryFileFactory
FILE_MODE = "a"
STRFTIME = "%Y-%m-%dT%H.%M"

attr_accessor :counter, :tags, :prefix, :encoding, :temporary_directory, :current
attr_accessor :counter, :tags, :prefix, :encoding, :temporary_directory, :current, :temp_files

def initialize(prefix, tags, encoding, temporary_directory)
@counter = 0
Expand All @@ -31,13 +31,15 @@ def initialize(prefix, tags, encoding, temporary_directory)
@encoding = encoding
@temporary_directory = temporary_directory
@lock = Mutex.new
@temp_files = Array.new

rotate!
end

def rotate!
@lock.synchronize {
@current = new_file
@temp_files.push(@current)
increment_counter
@current
}
Expand Down Expand Up @@ -86,7 +88,7 @@ def new_file
::File.open(::File.join(path, key), FILE_MODE)
end

TemporaryFile.new(key, io, path)
TemporaryFile.new(key, io, path, prefix)
end

class IOWrappedGzip
Expand Down