Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"php-amqplib/php-amqplib": "^3.1.0",
"yiisoft/factory": "^1.0",
"yiisoft/friendly-exception": "^1.0",
"yiisoft/queue": "dev-master"
"yiisoft/queue": "dev-new"
},
"require-dev": {
"maglnet/composer-require-checker": "^4.4",
Expand Down
61 changes: 31 additions & 30 deletions src/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
use Yiisoft\Queue\AMQP\Exception\NotImplementedException;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\Message\IdEnvelope;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\MessageSerializerInterface;

final class Adapter implements AdapterInterface
{
Expand All @@ -23,64 +25,63 @@ public function __construct(

public function withChannel(string $channel): self
{
$instance = clone $this;
$instance->queueProvider = $this->queueProvider->withChannelName($channel);
$new = clone $this;
$new->queueProvider = $this->queueProvider->withChannelName($channel);

return $instance;
return $new;
}

/**
* @param callable(MessageInterface): bool $handlerCallback
* @param callable(MessageInterface): bool $handlerCallback
*/
public function runExisting(callable $handlerCallback): void
{
$channel = $this->queueProvider->getChannel();
(new ExistingMessagesConsumer($channel, $this->queueProvider
->getQueueSettings()
->getName(), $this->serializer))
->consume($handlerCallback);
$queueName = $this->queueProvider->getQueueSettings()->getName();
$consumer = new ExistingMessagesConsumer(
$channel,
$queueName,
$this->serializer
);

$consumer->consume($handlerCallback);
}

/**
* @return never
*/
public function status(string $id): JobStatus
public function status(string|int $id): JobStatus
{
throw new NotImplementedException('Status check is not supported by the adapter ' . self::class . '.');
throw new NotImplementedException(sprintf('Status check is not supported by the adapter %s.', self::class));
}

public function push(MessageInterface $message): void
public function push(MessageInterface $message): MessageInterface
{
$payload = $this->serializer->serialize($message);
$amqpMessage = new AMQPMessage(
$payload,
array_merge(['message_id' => uniqid(more_entropy: true)], $this->queueProvider->getMessageProperties())
);
$exchangeSettings = $this->queueProvider->getExchangeSettings();
$this->queueProvider
->getChannel()
->basic_publish(
$amqpMessage,
$exchangeSettings?->getName() ?? '',
$exchangeSettings ? '' : $this->queueProvider
->getQueueSettings()
->getName()
);
$channel = $this->queueProvider->getChannel();
$channel->basic_publish(
$amqpMessage,
$exchangeSettings?->getName() ?? '',
$exchangeSettings ? '' : $this->queueProvider
->getQueueSettings()
->getName()
);
/** @var string $messageId */
$messageId = $amqpMessage->get('message_id');
$message->setId($messageId);

return new IdEnvelope($message, $messageId);
}

public function subscribe(callable $handlerCallback): void
{
$channel = $this->queueProvider->getChannel();
$queueName = $this->queueProvider->getQueueSettings()->getName();

$channel->basic_consume(
$this->queueProvider
->getQueueSettings()
->getName(),
$this->queueProvider
->getQueueSettings()
->getName(),
$queueName,
$queueName,
false,
false,
false,
Expand Down
54 changes: 0 additions & 54 deletions src/Exception/NoKeyInPayloadException.php

This file was deleted.

1 change: 1 addition & 0 deletions src/ExistingMessagesConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use PhpAmqpLib\Message\AMQPMessage;
use Throwable;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\MessageSerializerInterface;

/**
* @internal
Expand Down
64 changes: 0 additions & 64 deletions src/MessageSerializer.php

This file was deleted.

14 changes: 0 additions & 14 deletions src/MessageSerializerInterface.php

This file was deleted.

58 changes: 36 additions & 22 deletions src/Middleware/DelayMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,32 @@
use InvalidArgumentException;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\AMQP\Adapter;
use Yiisoft\Queue\AMQP\QueueProviderInterface;
use Yiisoft\Queue\AMQP\Settings\ExchangeSettingsInterface;
use Yiisoft\Queue\AMQP\Settings\QueueSettingsInterface;
use Yiisoft\Queue\Middleware\Push\Implementation\DelayMiddlewareInterface;
use Yiisoft\Queue\Middleware\Push\MessageHandlerPushInterface;
use Yiisoft\Queue\Middleware\Push\PushRequest;
use Yiisoft\Queue\Middleware\DelayMiddlewareInterface;
use Yiisoft\Queue\Middleware\MessageHandlerInterface;
use Yiisoft\Queue\Middleware\MiddlewareInterface;
use Yiisoft\Queue\Middleware\Request;

final class DelayMiddleware implements DelayMiddlewareInterface
final class DelayMiddleware implements MiddlewareInterface, DelayMiddlewareInterface
{
public function __construct(private float $delayInSeconds, private bool $forcePersistentMessages = true)
{
public function __construct(
private AdapterInterface $adapter,
private float $delayInSeconds,
private bool $forcePersistentMessages = true
) {
if (!$adapter instanceof Adapter) {
throw new InvalidArgumentException(
sprintf(
'This middleware works only with the %s. %s given.',
Adapter::class,
get_debug_type($adapter)
)
);
}
}

/**
Expand All @@ -39,28 +53,28 @@ public function getDelay(): float
return $this->delayInSeconds;
}

public function processPush(PushRequest $request, MessageHandlerPushInterface $handler): PushRequest
public function process(Request $request, MessageHandlerInterface $handler): Request
{
$adapter = $request->getAdapter();
if (!$adapter instanceof Adapter) {
$type = get_debug_type($adapter);
$class = Adapter::class;
throw new InvalidArgumentException(
"This middleware works only with the $class. $type given."
);
}
$queueProvider = $this->adapter->getQueueProvider();
$originalExchangeSettings = $queueProvider->getExchangeSettings();
$delayedExchangeSettings = $this->getExchangeSettings($originalExchangeSettings);
$queueSettings = $this->getQueueSettings(
$queueProvider->getQueueSettings(),
$originalExchangeSettings
);

$queueProvider = $adapter->getQueueProvider();
$exchangeSettings = $this->getExchangeSettings($queueProvider->getExchangeSettings());
$queueSettings = $this->getQueueSettings($queueProvider->getQueueSettings(), $queueProvider->getExchangeSettings());
$adapter = $adapter->withQueueProvider(
$adapter = $this->adapter->withQueueProvider(
$queueProvider
->withMessageProperties($this->getMessageProperties($queueProvider))
->withExchangeSettings($exchangeSettings)
->withExchangeSettings($delayedExchangeSettings)
->withQueueSettings($queueSettings)
);

return $handler->handlePush($request->withAdapter($adapter));
return $handler->handle(
$request->withQueue(
$request->getQueue()->withAdapter($adapter)
)
);
}

/**
Expand Down Expand Up @@ -104,7 +118,7 @@ private function getExchangeSettings(?ExchangeSettingsInterface $exchangeSetting
/** @noinspection NullPointerExceptionInspection */
return $exchangeSettings
?->withName("{$exchangeSettings->getName()}.dlx")
->withAutoDelete(true)
->withAutoDelete(false)
->withType(AMQPExchangeType::TOPIC);
}
}
1 change: 1 addition & 0 deletions src/QueueProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public function __construct(
public function __destruct()
{
$this->channel?->close();
//unset($this->channel);
}

public function getChannel(): AMQPChannel
Expand Down
2 changes: 1 addition & 1 deletion src/Settings/Exchange.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public function __construct(
private string $type = AMQPExchangeType::DIRECT,
private bool $passive = false,
private bool $durable = false,
private bool $autoDelete = true,
private bool $autoDelete = false,
private bool $internal = false,
private bool $nowait = false,
private AMQPTable|array $arguments = [],
Expand Down
2 changes: 1 addition & 1 deletion src/Settings/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public function __construct(
private bool $passive = false,
private bool $durable = false,
private bool $exclusive = false,
private bool $autoDelete = true,
private bool $autoDelete = false,
private bool $nowait = false,
private AMQPTable|array $arguments = [],
private ?int $ticket = null
Expand Down
Loading