From b7794b8b4bea4f2ce4dda734e561d8fada875a21 Mon Sep 17 00:00:00 2001 From: nicdavidson Date: Sat, 17 Jan 2026 22:53:50 +0000 Subject: [PATCH] Add configurable timeout and failure handling for Logstash connections - Add configurable connection timeout (default 2 seconds) to prevent blocking when Logstash is unavailable - Add on_failure option with three modes: - 'ignore': Request proceeds, log message is lost (default, preserves existing behavior) - 'fail_request': Returns 503 error when logging fails (for compliance requirements) - 'fallback_file': Request proceeds, log message written to Laravel log - Fix Monolog 3.x compatibility: Handle Level enum in addition to integers - Add database migration for new timeout and on_failure config fields - Properly close socket connections in finally block - Add detailed debug/warning logging for connection diagnostics --- ...imeout_fields_to_logstash_config_table.php | 33 ++++++ src/Components/GelfLogger.php | 45 ++++++-- src/Components/HttpLogger.php | 27 ++++- src/Components/NetworkLogger.php | 107 ++++++++++++++++-- src/Components/TcpLogger.php | 4 +- src/Components/UdpLogger.php | 4 +- src/Models/LogstashConfig.php | 34 +++++- src/Services/Logstash.php | 19 +++- 8 files changed, 240 insertions(+), 33 deletions(-) create mode 100644 database/migrations/2026_01_17_000000_add_timeout_fields_to_logstash_config_table.php diff --git a/database/migrations/2026_01_17_000000_add_timeout_fields_to_logstash_config_table.php b/database/migrations/2026_01_17_000000_add_timeout_fields_to_logstash_config_table.php new file mode 100644 index 0000000..dcb79f9 --- /dev/null +++ b/database/migrations/2026_01_17_000000_add_timeout_fields_to_logstash_config_table.php @@ -0,0 +1,33 @@ +float('timeout')->default(2.0)->after('protocol'); + $t->string('on_failure')->default('ignore')->after('timeout'); + }); + } + + /** + * Reverse the migrations. + * + * @return void + */ + public function down() + { + Schema::table('logstash_config', function (Blueprint $t) { + $t->dropColumn('timeout'); + $t->dropColumn('on_failure'); + }); + } +} diff --git a/src/Components/GelfLogger.php b/src/Components/GelfLogger.php index a93a9e4..28a1ce9 100644 --- a/src/Components/GelfLogger.php +++ b/src/Components/GelfLogger.php @@ -84,25 +84,54 @@ public function send($message) throw new \InvalidArgumentException('Message is not a GelfMessage.'); } + $startTime = microtime(true); + $_url = $this->protocol . '://' . $this->host . ':' . $this->port; + $_sock = null; + try { if (false === ($_chunks = $this->prepareMessage($message))) { - return false; + return $this->handleFailure("Failed to prepare GELF message", $message->toJson()); } - $_url = $this->protocol . '://' . $this->host . ':' . $this->port; - $_sock = stream_socket_client($_url); + \Illuminate\Support\Facades\Log::debug("Logstash GELF: Connecting to {$_url} (timeout: {$this->timeout}s)"); + + $_sock = @stream_socket_client( + $_url, + $errno, + $errstr, + $this->timeout + ); + + if (!$_sock) { + $elapsed = round(microtime(true) - $startTime, 3); + \Illuminate\Support\Facades\Log::warning("Logstash GELF: Connection failed to {$_url} after {$elapsed}s - [$errno] $errstr"); + return $this->handleFailure("Connection failed: [$errno] $errstr", $message->toJson()); + } + + stream_set_timeout($_sock, (int)$this->timeout, (int)(($this->timeout - (int)$this->timeout) * 1000000)); foreach ($_chunks as $_chunk) { if (!fwrite($_sock, $_chunk)) { - return false; + $elapsed = round(microtime(true) - $startTime, 3); + \Illuminate\Support\Facades\Log::warning("Logstash GELF: Write failed to {$_url} after {$elapsed}s"); + return $this->handleFailure("Write failed", $message->toJson()); } } + + $elapsed = round(microtime(true) - $startTime, 3); + \Illuminate\Support\Facades\Log::debug("Logstash GELF: Successfully sent to {$_url} in {$elapsed}s"); + + return true; + } catch (\Exception $_ex) { - // Failure is not an option - return false; + $elapsed = round(microtime(true) - $startTime, 3); + \Illuminate\Support\Facades\Log::error("Logstash GELF: Exception after {$elapsed}s - " . $_ex->getMessage()); + return $this->handleFailure($_ex->getMessage(), $message->toJson()); + } finally { + if (is_resource($_sock)) { + fclose($_sock); + } } - - return true; } /** diff --git a/src/Components/HttpLogger.php b/src/Components/HttpLogger.php index 4b6ef05..32e0c88 100644 --- a/src/Components/HttpLogger.php +++ b/src/Components/HttpLogger.php @@ -2,6 +2,7 @@ namespace DreamFactory\Core\Logger\Components; use DreamFactory\Core\Utility\Curl; +use Illuminate\Support\Facades\Log; use Psr\Log\LoggerInterface; class HttpLogger extends NetworkLogger implements LoggerInterface @@ -16,9 +17,9 @@ class HttpLogger extends NetworkLogger implements LoggerInterface const PROTOCOL = 'http'; /** {@inheritdoc} */ - public function __construct($host = self::DEFAULT_HOST, $port = self::DEFAULT_PORT) + public function __construct($host = self::DEFAULT_HOST, $port = self::DEFAULT_PORT, $timeout = null, $onFailure = null) { - parent::__construct($host, $port, static::PROTOCOL); + parent::__construct($host, $port, static::PROTOCOL, $timeout, $onFailure); } /** @@ -28,9 +29,27 @@ public function __construct($host = self::DEFAULT_HOST, $port = self::DEFAULT_PO */ public function send($message) { + $startTime = microtime(true); $url = $this->protocol . '://' . $this->host . ':' . $this->port; - $result = Curl::post($url, $message); - return ('ok' === $result) ? true : false; + try { + Log::debug("Logstash HTTP: Posting to {$url} (timeout: {$this->timeout}s)"); + + $result = Curl::post($url, $message, [], $this->timeout); + $elapsed = round(microtime(true) - $startTime, 3); + + if ('ok' === $result) { + Log::debug("Logstash HTTP: Successfully sent to {$url} in {$elapsed}s"); + return true; + } + + Log::warning("Logstash HTTP: Unexpected response from {$url} after {$elapsed}s: " . print_r($result, true)); + return $this->handleFailure("Unexpected response: " . print_r($result, true), $message); + + } catch (\Exception $ex) { + $elapsed = round(microtime(true) - $startTime, 3); + Log::error("Logstash HTTP: Exception after {$elapsed}s - " . $ex->getMessage()); + return $this->handleFailure($ex->getMessage(), $message); + } } } \ No newline at end of file diff --git a/src/Components/NetworkLogger.php b/src/Components/NetworkLogger.php index 41cf9a8..aedd0a3 100644 --- a/src/Components/NetworkLogger.php +++ b/src/Components/NetworkLogger.php @@ -2,7 +2,9 @@ namespace DreamFactory\Core\Logger\Components; use DreamFactory\Core\Exceptions\BadRequestException; +use DreamFactory\Core\Exceptions\ServiceUnavailableException; use Illuminate\Support\Facades\Log; +use Monolog\Level; use Monolog\Logger; abstract class NetworkLogger @@ -15,6 +17,14 @@ abstract class NetworkLogger * @const integer Port that logstash listens on */ const DEFAULT_PORT = 12202; + /** + * @const float Default connection timeout in seconds + */ + const DEFAULT_TIMEOUT = 2.0; + /** + * @const string Default failure behavior: 'ignore', 'fail_request', 'fallback_file' + */ + const DEFAULT_ON_FAILURE = 'ignore'; /** * @var string Logstash host */ @@ -27,6 +37,14 @@ abstract class NetworkLogger * @var string Communication protocol */ protected $protocol; + /** + * @var float Connection timeout in seconds + */ + protected $timeout; + /** + * @var string Behavior on failure: 'ignore', 'fail_request', 'fallback_file' + */ + protected $onFailure; /** * NetworkLogger constructor. @@ -34,12 +52,16 @@ abstract class NetworkLogger * @param $host * @param $port * @param string $protocol + * @param float $timeout Connection timeout in seconds + * @param string $onFailure Behavior on failure: 'ignore', 'fail_request', 'fallback_file' */ - public function __construct($host, $port, $protocol = 'udp') + public function __construct($host, $port, $protocol = 'udp', $timeout = null, $onFailure = null) { $this->host = $host; $this->port = $port; $this->protocol = $protocol; + $this->timeout = $timeout ?? static::DEFAULT_TIMEOUT; + $this->onFailure = $onFailure ?? static::DEFAULT_ON_FAILURE; } /** {@inheritdoc} */ @@ -47,13 +69,19 @@ public function log($level, string|\Stringable $message, array $context = []): v { try { $levelVal = Logger::toMonologLevel($level); - if(!is_int($levelVal)){ + // Monolog 3.x returns Level enum, Monolog 2.x returns int + if ($levelVal instanceof Level) { + $levelVal = $levelVal->value; + } elseif (!is_int($levelVal)) { throw new BadRequestException('Unknown log level [' . $level . ']'); } $context['_message'] = $message; $context['_level'] = $levelVal; $this->send(json_encode($context, JSON_UNESCAPED_SLASHES)); + } catch (ServiceUnavailableException $exception) { + // Re-throw to block request when on_failure='fail_request' + throw $exception; } catch (\Throwable $exception) { Log::error($exception->getMessage()); Log::error($exception->getTraceAsString()); @@ -113,23 +141,84 @@ public function debug(string|\Stringable $message, array $context = []): void * @param string|\Stringable $message * * @return bool + * @throws ServiceUnavailableException */ public function send(string|\Stringable $message) { + $startTime = microtime(true); + $_url = $this->protocol . '://' . $this->host . ':' . $this->port; + $_sock = null; + try { - $_url = $this->protocol . '://' . $this->host . ':' . $this->port; - $_sock = stream_socket_client($_url); + Log::debug("Logstash: Connecting to {$_url} (timeout: {$this->timeout}s)"); + + $_sock = @stream_socket_client( + $_url, + $errno, + $errstr, + $this->timeout + ); + + if (!$_sock) { + $elapsed = round(microtime(true) - $startTime, 3); + Log::warning("Logstash: Connection failed to {$_url} after {$elapsed}s - [$errno] $errstr"); + return $this->handleFailure("Connection failed: [$errno] $errstr", $message); + } + + // Set write timeout to match connection timeout + stream_set_timeout($_sock, (int)$this->timeout, (int)(($this->timeout - (int)$this->timeout) * 1000000)); if (!fwrite($_sock, $message)) { - return false; + $elapsed = round(microtime(true) - $startTime, 3); + Log::warning("Logstash: Write failed to {$_url} after {$elapsed}s"); + return $this->handleFailure("Write failed", $message); } + + $elapsed = round(microtime(true) - $startTime, 3); + Log::debug("Logstash: Successfully sent to {$_url} in {$elapsed}s"); + + return true; + + } catch (ServiceUnavailableException $ex) { + // Re-throw to block request when on_failure='fail_request' + throw $ex; } catch (\Exception $ex) { - Log::error($ex->getMessage()); + $elapsed = round(microtime(true) - $startTime, 3); + Log::error("Logstash: Exception after {$elapsed}s - " . $ex->getMessage()); Log::error($ex->getTraceAsString()); - // Failure is not an option - return false; + return $this->handleFailure($ex->getMessage(), $message); + } finally { + if (is_resource($_sock)) { + fclose($_sock); + } } + } - return true; + /** + * Handle send failure based on configured behavior + * + * @param string $reason Failure reason + * @param string|\Stringable $message The message that failed to send + * @return bool + * @throws ServiceUnavailableException + */ + protected function handleFailure($reason, $message) + { + switch ($this->onFailure) { + case 'fail_request': + throw new ServiceUnavailableException( + "Logging service unavailable: $reason. Request blocked per logging policy." + ); + + case 'fallback_file': + Log::warning("Logstash: Falling back to file logging - $reason"); + Log::info("Logstash fallback message: " . (is_string($message) ? $message : json_encode($message))); + return true; // Don't block request, message is preserved in file log + + case 'ignore': + default: + Log::debug("Logstash: Ignoring failure - $reason"); + return false; // Don't block request, message is lost + } } } \ No newline at end of file diff --git a/src/Components/TcpLogger.php b/src/Components/TcpLogger.php index dbce7eb..24bda13 100644 --- a/src/Components/TcpLogger.php +++ b/src/Components/TcpLogger.php @@ -11,8 +11,8 @@ class TcpLogger extends NetworkLogger implements LoggerInterface const PROTOCOL = 'tcp'; /** {@inheritdoc} */ - public function __construct($host = self::DEFAULT_HOST, $port = self::DEFAULT_PORT) + public function __construct($host = self::DEFAULT_HOST, $port = self::DEFAULT_PORT, $timeout = null, $onFailure = null) { - parent::__construct($host, $port, static::PROTOCOL); + parent::__construct($host, $port, static::PROTOCOL, $timeout, $onFailure); } } \ No newline at end of file diff --git a/src/Components/UdpLogger.php b/src/Components/UdpLogger.php index e364f55..c98fcc5 100644 --- a/src/Components/UdpLogger.php +++ b/src/Components/UdpLogger.php @@ -11,8 +11,8 @@ class UdpLogger extends NetworkLogger implements LoggerInterface const PROTOCOL = 'udp'; /** {@inheritdoc} */ - public function __construct($host = self::DEFAULT_HOST, $port = self::DEFAULT_PORT) + public function __construct($host = self::DEFAULT_HOST, $port = self::DEFAULT_PORT, $timeout = null, $onFailure = null) { - parent::__construct($host, $port, static::PROTOCOL); + parent::__construct($host, $port, static::PROTOCOL, $timeout, $onFailure); } } \ No newline at end of file diff --git a/src/Models/LogstashConfig.php b/src/Models/LogstashConfig.php index d3870a5..9c31e71 100644 --- a/src/Models/LogstashConfig.php +++ b/src/Models/LogstashConfig.php @@ -20,13 +20,14 @@ class LogstashConfig extends BaseServiceConfigModel protected $table = 'logstash_config'; /** @var array */ - protected $fillable = ['service_id', 'host', 'port', 'protocol', 'context']; + protected $fillable = ['service_id', 'host', 'port', 'protocol', 'timeout', 'on_failure', 'context']; /** @var array */ protected $casts = [ - 'service_id' => 'integer', - 'port' => 'integer', - 'context' => 'array' + 'service_id' => 'integer', + 'port' => 'integer', + 'timeout' => 'float', + 'context' => 'array' ]; protected static function formatMaps(&$maps, $incoming = true) @@ -194,6 +195,31 @@ protected static function prepareConfigSchemaField(array &$schema) $schema['label'] = 'Protocol/Format'; $schema['description'] = 'Network protocol/format that Logstash input is configured for.'; break; + case 'timeout': + $schema['label'] = 'Connection Timeout'; + $schema['default'] = 2.0; + $schema['description'] = 'Maximum time in seconds to wait for connection to Logstash. Lower values prevent request blocking if Logstash is unavailable.'; + break; + case 'on_failure': + $schema['type'] = 'picklist'; + $schema['default'] = 'ignore'; + $schema['values'] = [ + [ + 'label' => 'Ignore (let request proceed, log is lost)', + 'name' => 'ignore' + ], + [ + 'label' => 'Fail Request (return 503 error)', + 'name' => 'fail_request' + ], + [ + 'label' => 'Fallback to File (write to Laravel log)', + 'name' => 'fallback_file' + ] + ]; + $schema['label'] = 'On Failure'; + $schema['description'] = 'Behavior when Logstash connection fails. "Ignore" allows requests to proceed without logging. "Fail Request" blocks requests if logging fails (for compliance). "Fallback to File" writes to local Laravel log.'; + break; case 'context': $schema['type'] = 'multi_picklist'; $schema['columns'] = 3; diff --git a/src/Services/Logstash.php b/src/Services/Logstash.php index 4e5788a..6f78102 100644 --- a/src/Services/Logstash.php +++ b/src/Services/Logstash.php @@ -4,6 +4,7 @@ use DreamFactory\Core\Exceptions\InternalServerErrorException; use DreamFactory\Core\Logger\Components\GelfLogger; use DreamFactory\Core\Logger\Components\HttpLogger; +use DreamFactory\Core\Logger\Components\NetworkLogger; use DreamFactory\Core\Logger\Components\TcpLogger; use DreamFactory\Core\Logger\Components\UdpLogger; use Arr; @@ -30,26 +31,36 @@ class Logstash extends BaseService protected function setLogger($config) { $protocol = Arr::get($config, 'protocol'); + $timeout = Arr::get($config, 'timeout', NetworkLogger::DEFAULT_TIMEOUT); + $onFailure = Arr::get($config, 'on_failure', NetworkLogger::DEFAULT_ON_FAILURE); if (static::GELF === $protocol) { $this->logger = new GelfLogger( Arr::get($config, 'host', GelfLogger::DEFAULT_HOST), - Arr::get($config, 'port', GelfLogger::DEFAULT_PORT) + Arr::get($config, 'port', GelfLogger::DEFAULT_PORT), + $timeout, + $onFailure ); } elseif (static::UDP === $protocol) { $this->logger = new UdpLogger( Arr::get($config, 'host', UdpLogger::DEFAULT_HOST), - Arr::get($config, 'port', UdpLogger::DEFAULT_PORT) + Arr::get($config, 'port', UdpLogger::DEFAULT_PORT), + $timeout, + $onFailure ); } elseif (static::TCP === $protocol) { $this->logger = new TcpLogger( Arr::get($config, 'host', TcpLogger::DEFAULT_HOST), - Arr::get($config, 'port', TcpLogger::DEFAULT_PORT) + Arr::get($config, 'port', TcpLogger::DEFAULT_PORT), + $timeout, + $onFailure ); } elseif (static::HTTP === $protocol) { $this->logger = new HttpLogger( Arr::get($config, 'host', HttpLogger::DEFAULT_HOST), - Arr::get($config, 'port', HttpLogger::DEFAULT_PORT) + Arr::get($config, 'port', HttpLogger::DEFAULT_PORT), + $timeout, + $onFailure ); } else { throw new InternalServerErrorException('Unknown Logstash network protocol: [' . $protocol . ']');