Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

use Illuminate\Database\Schema\Blueprint;
use Illuminate\Database\Migrations\Migration;

class AddTimeoutFieldsToLogstashConfigTable extends Migration
{
/**
* Run the migrations.
*
* @return void
*/
public function up()
{
Schema::table('logstash_config', function (Blueprint $t) {
$t->float('timeout')->default(2.0)->after('protocol');
$t->string('on_failure')->default('ignore')->after('timeout');
});
}

/**
* Reverse the migrations.
*
* @return void
*/
public function down()
{
Schema::table('logstash_config', function (Blueprint $t) {
$t->dropColumn('timeout');
$t->dropColumn('on_failure');
});
}
}
45 changes: 37 additions & 8 deletions src/Components/GelfLogger.php
Original file line number Diff line number Diff line change
Expand Up @@ -84,25 +84,54 @@ public function send($message)
throw new \InvalidArgumentException('Message is not a GelfMessage.');
}

$startTime = microtime(true);
$_url = $this->protocol . '://' . $this->host . ':' . $this->port;
$_sock = null;

try {
if (false === ($_chunks = $this->prepareMessage($message))) {
return false;
return $this->handleFailure("Failed to prepare GELF message", $message->toJson());
}

$_url = $this->protocol . '://' . $this->host . ':' . $this->port;
$_sock = stream_socket_client($_url);
\Illuminate\Support\Facades\Log::debug("Logstash GELF: Connecting to {$_url} (timeout: {$this->timeout}s)");

$_sock = @stream_socket_client(
$_url,
$errno,
$errstr,
$this->timeout
);

if (!$_sock) {
$elapsed = round(microtime(true) - $startTime, 3);
\Illuminate\Support\Facades\Log::warning("Logstash GELF: Connection failed to {$_url} after {$elapsed}s - [$errno] $errstr");
return $this->handleFailure("Connection failed: [$errno] $errstr", $message->toJson());
}

stream_set_timeout($_sock, (int)$this->timeout, (int)(($this->timeout - (int)$this->timeout) * 1000000));

foreach ($_chunks as $_chunk) {
if (!fwrite($_sock, $_chunk)) {
return false;
$elapsed = round(microtime(true) - $startTime, 3);
\Illuminate\Support\Facades\Log::warning("Logstash GELF: Write failed to {$_url} after {$elapsed}s");
return $this->handleFailure("Write failed", $message->toJson());
}
}

$elapsed = round(microtime(true) - $startTime, 3);
\Illuminate\Support\Facades\Log::debug("Logstash GELF: Successfully sent to {$_url} in {$elapsed}s");

return true;

} catch (\Exception $_ex) {
// Failure is not an option
return false;
$elapsed = round(microtime(true) - $startTime, 3);
\Illuminate\Support\Facades\Log::error("Logstash GELF: Exception after {$elapsed}s - " . $_ex->getMessage());
return $this->handleFailure($_ex->getMessage(), $message->toJson());
} finally {
if (is_resource($_sock)) {
fclose($_sock);
}
}

return true;
}

/**
Expand Down
27 changes: 23 additions & 4 deletions src/Components/HttpLogger.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
namespace DreamFactory\Core\Logger\Components;

use DreamFactory\Core\Utility\Curl;
use Illuminate\Support\Facades\Log;
use Psr\Log\LoggerInterface;

class HttpLogger extends NetworkLogger implements LoggerInterface
Expand All @@ -16,9 +17,9 @@ class HttpLogger extends NetworkLogger implements LoggerInterface
const PROTOCOL = 'http';

/** {@inheritdoc} */
public function __construct($host = self::DEFAULT_HOST, $port = self::DEFAULT_PORT)
public function __construct($host = self::DEFAULT_HOST, $port = self::DEFAULT_PORT, $timeout = null, $onFailure = null)
{
parent::__construct($host, $port, static::PROTOCOL);
parent::__construct($host, $port, static::PROTOCOL, $timeout, $onFailure);
}

/**
Expand All @@ -28,9 +29,27 @@ public function __construct($host = self::DEFAULT_HOST, $port = self::DEFAULT_PO
*/
public function send($message)
{
$startTime = microtime(true);
$url = $this->protocol . '://' . $this->host . ':' . $this->port;
$result = Curl::post($url, $message);

return ('ok' === $result) ? true : false;
try {
Log::debug("Logstash HTTP: Posting to {$url} (timeout: {$this->timeout}s)");

$result = Curl::post($url, $message, [], $this->timeout);
$elapsed = round(microtime(true) - $startTime, 3);

if ('ok' === $result) {
Log::debug("Logstash HTTP: Successfully sent to {$url} in {$elapsed}s");
return true;
}

Log::warning("Logstash HTTP: Unexpected response from {$url} after {$elapsed}s: " . print_r($result, true));
return $this->handleFailure("Unexpected response: " . print_r($result, true), $message);

} catch (\Exception $ex) {
$elapsed = round(microtime(true) - $startTime, 3);
Log::error("Logstash HTTP: Exception after {$elapsed}s - " . $ex->getMessage());
return $this->handleFailure($ex->getMessage(), $message);
}
}
}
107 changes: 98 additions & 9 deletions src/Components/NetworkLogger.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
namespace DreamFactory\Core\Logger\Components;

use DreamFactory\Core\Exceptions\BadRequestException;
use DreamFactory\Core\Exceptions\ServiceUnavailableException;
use Illuminate\Support\Facades\Log;
use Monolog\Level;
use Monolog\Logger;

abstract class NetworkLogger
Expand All @@ -15,6 +17,14 @@ abstract class NetworkLogger
* @const integer Port that logstash listens on
*/
const DEFAULT_PORT = 12202;
/**
* @const float Default connection timeout in seconds
*/
const DEFAULT_TIMEOUT = 2.0;
/**
* @const string Default failure behavior: 'ignore', 'fail_request', 'fallback_file'
*/
const DEFAULT_ON_FAILURE = 'ignore';
/**
* @var string Logstash host
*/
Expand All @@ -27,33 +37,51 @@ abstract class NetworkLogger
* @var string Communication protocol
*/
protected $protocol;
/**
* @var float Connection timeout in seconds
*/
protected $timeout;
/**
* @var string Behavior on failure: 'ignore', 'fail_request', 'fallback_file'
*/
protected $onFailure;

/**
* NetworkLogger constructor.
*
* @param $host
* @param $port
* @param string $protocol
* @param float $timeout Connection timeout in seconds
* @param string $onFailure Behavior on failure: 'ignore', 'fail_request', 'fallback_file'
*/
public function __construct($host, $port, $protocol = 'udp')
public function __construct($host, $port, $protocol = 'udp', $timeout = null, $onFailure = null)
{
$this->host = $host;
$this->port = $port;
$this->protocol = $protocol;
$this->timeout = $timeout ?? static::DEFAULT_TIMEOUT;
$this->onFailure = $onFailure ?? static::DEFAULT_ON_FAILURE;
}

/** {@inheritdoc} */
public function log($level, string|\Stringable $message, array $context = []): void
{
try {
$levelVal = Logger::toMonologLevel($level);
if(!is_int($levelVal)){
// Monolog 3.x returns Level enum, Monolog 2.x returns int
if ($levelVal instanceof Level) {
$levelVal = $levelVal->value;
} elseif (!is_int($levelVal)) {
throw new BadRequestException('Unknown log level [' . $level . ']');
}
$context['_message'] = $message;
$context['_level'] = $levelVal;

$this->send(json_encode($context, JSON_UNESCAPED_SLASHES));
} catch (ServiceUnavailableException $exception) {
// Re-throw to block request when on_failure='fail_request'
throw $exception;
} catch (\Throwable $exception) {
Log::error($exception->getMessage());
Log::error($exception->getTraceAsString());
Expand Down Expand Up @@ -113,23 +141,84 @@ public function debug(string|\Stringable $message, array $context = []): void
* @param string|\Stringable $message
*
* @return bool
* @throws ServiceUnavailableException
*/
public function send(string|\Stringable $message)
{
$startTime = microtime(true);
$_url = $this->protocol . '://' . $this->host . ':' . $this->port;
$_sock = null;

try {
$_url = $this->protocol . '://' . $this->host . ':' . $this->port;
$_sock = stream_socket_client($_url);
Log::debug("Logstash: Connecting to {$_url} (timeout: {$this->timeout}s)");

$_sock = @stream_socket_client(
$_url,
$errno,
$errstr,
$this->timeout
);

if (!$_sock) {
$elapsed = round(microtime(true) - $startTime, 3);
Log::warning("Logstash: Connection failed to {$_url} after {$elapsed}s - [$errno] $errstr");
return $this->handleFailure("Connection failed: [$errno] $errstr", $message);
}

// Set write timeout to match connection timeout
stream_set_timeout($_sock, (int)$this->timeout, (int)(($this->timeout - (int)$this->timeout) * 1000000));

if (!fwrite($_sock, $message)) {
return false;
$elapsed = round(microtime(true) - $startTime, 3);
Log::warning("Logstash: Write failed to {$_url} after {$elapsed}s");
return $this->handleFailure("Write failed", $message);
}

$elapsed = round(microtime(true) - $startTime, 3);
Log::debug("Logstash: Successfully sent to {$_url} in {$elapsed}s");

return true;

} catch (ServiceUnavailableException $ex) {
// Re-throw to block request when on_failure='fail_request'
throw $ex;
} catch (\Exception $ex) {
Log::error($ex->getMessage());
$elapsed = round(microtime(true) - $startTime, 3);
Log::error("Logstash: Exception after {$elapsed}s - " . $ex->getMessage());
Log::error($ex->getTraceAsString());
// Failure is not an option
return false;
return $this->handleFailure($ex->getMessage(), $message);
} finally {
if (is_resource($_sock)) {
fclose($_sock);
}
}
}

return true;
/**
* Handle send failure based on configured behavior
*
* @param string $reason Failure reason
* @param string|\Stringable $message The message that failed to send
* @return bool
* @throws ServiceUnavailableException
*/
protected function handleFailure($reason, $message)
{
switch ($this->onFailure) {
case 'fail_request':
throw new ServiceUnavailableException(
"Logging service unavailable: $reason. Request blocked per logging policy."
);

case 'fallback_file':
Log::warning("Logstash: Falling back to file logging - $reason");
Log::info("Logstash fallback message: " . (is_string($message) ? $message : json_encode($message)));
return true; // Don't block request, message is preserved in file log

case 'ignore':
default:
Log::debug("Logstash: Ignoring failure - $reason");
return false; // Don't block request, message is lost
}
}
}
4 changes: 2 additions & 2 deletions src/Components/TcpLogger.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ class TcpLogger extends NetworkLogger implements LoggerInterface
const PROTOCOL = 'tcp';

/** {@inheritdoc} */
public function __construct($host = self::DEFAULT_HOST, $port = self::DEFAULT_PORT)
public function __construct($host = self::DEFAULT_HOST, $port = self::DEFAULT_PORT, $timeout = null, $onFailure = null)
{
parent::__construct($host, $port, static::PROTOCOL);
parent::__construct($host, $port, static::PROTOCOL, $timeout, $onFailure);
}
}
4 changes: 2 additions & 2 deletions src/Components/UdpLogger.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ class UdpLogger extends NetworkLogger implements LoggerInterface
const PROTOCOL = 'udp';

/** {@inheritdoc} */
public function __construct($host = self::DEFAULT_HOST, $port = self::DEFAULT_PORT)
public function __construct($host = self::DEFAULT_HOST, $port = self::DEFAULT_PORT, $timeout = null, $onFailure = null)
{
parent::__construct($host, $port, static::PROTOCOL);
parent::__construct($host, $port, static::PROTOCOL, $timeout, $onFailure);
}
}
34 changes: 30 additions & 4 deletions src/Models/LogstashConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ class LogstashConfig extends BaseServiceConfigModel
protected $table = 'logstash_config';

/** @var array */
protected $fillable = ['service_id', 'host', 'port', 'protocol', 'context'];
protected $fillable = ['service_id', 'host', 'port', 'protocol', 'timeout', 'on_failure', 'context'];

/** @var array */
protected $casts = [
'service_id' => 'integer',
'port' => 'integer',
'context' => 'array'
'service_id' => 'integer',
'port' => 'integer',
'timeout' => 'float',
'context' => 'array'
];

protected static function formatMaps(&$maps, $incoming = true)
Expand Down Expand Up @@ -194,6 +195,31 @@ protected static function prepareConfigSchemaField(array &$schema)
$schema['label'] = 'Protocol/Format';
$schema['description'] = 'Network protocol/format that Logstash input is configured for.';
break;
case 'timeout':
$schema['label'] = 'Connection Timeout';
$schema['default'] = 2.0;
$schema['description'] = 'Maximum time in seconds to wait for connection to Logstash. Lower values prevent request blocking if Logstash is unavailable.';
break;
case 'on_failure':
$schema['type'] = 'picklist';
$schema['default'] = 'ignore';
$schema['values'] = [
[
'label' => 'Ignore (let request proceed, log is lost)',
'name' => 'ignore'
],
[
'label' => 'Fail Request (return 503 error)',
'name' => 'fail_request'
],
[
'label' => 'Fallback to File (write to Laravel log)',
'name' => 'fallback_file'
]
];
$schema['label'] = 'On Failure';
$schema['description'] = 'Behavior when Logstash connection fails. "Ignore" allows requests to proceed without logging. "Fail Request" blocks requests if logging fails (for compliance). "Fallback to File" writes to local Laravel log.';
break;
case 'context':
$schema['type'] = 'multi_picklist';
$schema['columns'] = 3;
Expand Down
Loading