Skip to content

Commit f35e8e1

Browse files
committed
Merge pull request #6 from Neverdane/feature/sqs-bulk-messages
Feature/sqs bulk messages
2 parents d052491 + a9914db commit f35e8e1

9 files changed

Lines changed: 147 additions & 21 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Queue Client Changelog
22

3+
## v1.0.1
4+
5+
- Send SQS messages in batch
6+
37
## v1.0.0
48

59
- Initial commit

src/Adapter/AbstractAdapter.php

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
namespace ReputationVIP\QueueClient\Adapter;
4+
5+
use ReputationVIP\QueueClient\QueueClientInterface;
6+
7+
class AbstractAdapter
8+
{
9+
10+
/**
11+
* @param string $queueName
12+
* @param array $messages
13+
* @param string $priority
14+
*
15+
* @return QueueClientInterface
16+
*/
17+
public function addMessages($queueName, $messages, $priority = null)
18+
{
19+
foreach ($messages as $message) {
20+
$this->addMessage($queueName, $message, $priority);
21+
}
22+
23+
return $this;
24+
}
25+
26+
/**
27+
* @param string $queueName
28+
* @param mixed $message
29+
* @param string $priority
30+
*
31+
* @return AdapterInterface
32+
*/
33+
public function addMessage($queueName, $message, $priority = null)
34+
{
35+
return $this;
36+
}
37+
38+
}

src/Adapter/FileAdapter.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
use Symfony\Component\Finder\Finder;
1313
use Symfony\Component\Finder\SplFileInfo;
1414

15-
class FileAdapter implements AdapterInterface
15+
class FileAdapter extends AbstractAdapter implements AdapterInterface
1616
{
1717
const QUEUE_FILE_EXTENSION = 'queue';
1818
const MAX_NB_MESSAGES = 10;

src/Adapter/MemoryAdapter.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
use ReputationVIP\QueueClient\PriorityHandler\StandardPriorityHandler;
88
use SplQueue;
99

10-
class MemoryAdapter implements AdapterInterface
10+
class MemoryAdapter extends AbstractAdapter implements AdapterInterface
1111
{
1212
const MAX_TIME_IN_FLIGHT = 30;
1313

src/Adapter/NullAdapter.php

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
use ReputationVIP\QueueClient\PriorityHandler\PriorityHandlerInterface;
66
use ReputationVIP\QueueClient\PriorityHandler\StandardPriorityHandler;
77

8-
class NullAdapter implements AdapterInterface
8+
class NullAdapter extends AbstractAdapter implements AdapterInterface
99
{
1010

1111
/** @var PriorityHandlerInterface $priorityHandler */
@@ -23,14 +23,6 @@ public function __construct(PriorityHandlerInterface $priorityHandler = null)
2323
$this->priorityHandler = $priorityHandler;
2424
}
2525

26-
/**
27-
* @inheritdoc
28-
*/
29-
public function addMessage($queueName, $message, $priority = null)
30-
{
31-
return $this;
32-
}
33-
3426
/**
3527
* @inheritdoc
3628
*/

src/Adapter/SQSAdapter.php

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
use ReputationVIP\QueueClient\PriorityHandler\PriorityHandlerInterface;
99
use ReputationVIP\QueueClient\PriorityHandler\StandardPriorityHandler;
1010

11-
class SQSAdapter implements AdapterInterface
11+
class SQSAdapter extends AbstractAdapter implements AdapterInterface
1212
{
1313
/**
1414
* @var SqsClient
@@ -19,6 +19,7 @@ class SQSAdapter implements AdapterInterface
1919
private $priorityHandler;
2020

2121
const MAX_NB_MESSAGES = 10;
22+
const SENT_MESSAGES_BATCH_SIZE = 10;
2223
const PRIORITY_SEPARATOR = '-';
2324

2425
/**
@@ -51,6 +52,51 @@ public function __construct(SqsClient $sqsClient, PriorityHandlerInterface $prio
5152
return $this;
5253
}
5354

55+
/**
56+
* @inheritdoc
57+
*/
58+
public function addMessages($queueName, $messages, $priority = null)
59+
{
60+
if (null === $priority) {
61+
$priority = $this->priorityHandler->getDefault();
62+
}
63+
64+
if (empty($queueName)) {
65+
throw new InvalidArgumentException('Parameter queueName empty or not defined.');
66+
}
67+
68+
$batchMessages = [];
69+
$batchesCount = 0;
70+
$blockCounter = 0;
71+
72+
foreach ($messages as $index => $message) {
73+
if (empty($message)) {
74+
throw new InvalidArgumentException('Parameter message empty or not defined.');
75+
}
76+
$messageData = [
77+
'Id' => (string) $index,
78+
'MessageBody' => serialize($message)
79+
];
80+
if ($blockCounter >= self::SENT_MESSAGES_BATCH_SIZE) {
81+
$blockCounter = 0;
82+
$batchesCount++;
83+
} else {
84+
$blockCounter++;
85+
}
86+
$batchMessages[$batchesCount][] = $messageData;
87+
}
88+
89+
foreach ($batchMessages as $messages) {
90+
$queueUrl = $this->sqsClient->getQueueUrl(['QueueName' => $this->getQueueNameWithPrioritySuffix($queueName, $priority)])->get('QueueUrl');
91+
$this->sqsClient->sendMessageBatch([
92+
'QueueUrl' => $queueUrl,
93+
'Entries' => $messages,
94+
]);
95+
}
96+
97+
return $this;
98+
}
99+
54100
/**
55101
* @inheritdoc
56102
*

src/QueueClient.php

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,10 @@ public function __construct(AdapterInterface $adapter = null)
5353
*/
5454
public function addMessage($queueName, $message, $priority = null)
5555
{
56-
$queues = $this->resolveAliasQueueName($queueName);
57-
if (is_array($queues)) {
58-
foreach ($queues as $queue) {
59-
$this->adapter->addMessage($queue, $message, $priority);
60-
}
61-
} else {
62-
$this->adapter->addMessage($queues, $message, $priority);
56+
$queues = (array) $this->resolveAliasQueueName($queueName);
57+
58+
foreach ($queues as $queue) {
59+
$this->adapter->addMessage($queue, $message, $priority);
6360
}
6461

6562
return $this;
@@ -70,8 +67,10 @@ public function addMessage($queueName, $message, $priority = null)
7067
*/
7168
public function addMessages($queueName, $messages, $priority = null)
7269
{
73-
foreach ($messages as $message) {
74-
$this->addMessage($queueName, $message, $priority);
70+
$queues = (array) $this->resolveAliasQueueName($queueName);
71+
72+
foreach ($queues as $queue) {
73+
$this->adapter->addMessages($queue, $messages, $priority);
7574
}
7675

7776
return $this;

tests/units/Adapter/NullAdapter.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ public function testNullAdapterAddMessage()
1313
$this->given($NullAdapter)->class($NullAdapter->addMessage('testQueue', 'test Message one'))->hasInterface('\ReputationVIP\QueueClient\Adapter\AdapterInterface');
1414
}
1515

16+
public function testNullAdapterAddMessages()
17+
{
18+
$NullAdapter = new \ReputationVIP\QueueClient\Adapter\NullAdapter();
19+
$this->given($NullAdapter)->class($NullAdapter->addMessages('testQueue', ['test Message one']))->hasInterface('\ReputationVIP\QueueClient\Adapter\AdapterInterface');
20+
}
21+
1622
public function testNullAdapterGetMessages()
1723
{
1824
$NullAdapter = new \ReputationVIP\QueueClient\Adapter\NullAdapter();

tests/units/Adapter/SQSAdapter.php

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,47 @@ public function testSQSAdapterAddMessage()
4848
->class($SQSAdapter->addMessage('testQueue', 'test message'))->hasInterface('\ReputationVIP\QueueClient\Adapter\AdapterInterface');
4949
}
5050

51+
public function testSQSAdapterAddMessages()
52+
{
53+
$this->mockGenerator->orphanize('__construct');
54+
$this->mockGenerator->shuntParentClassCalls();
55+
$mockSqsClient = new \mock\Aws\Sqs\SqsClient;
56+
$mockQueueUrlModel = new \mock\Guzzle\Service\Resource\Model;
57+
$SQSAdapter = new \ReputationVIP\QueueClient\Adapter\SQSAdapter($mockSqsClient);
58+
59+
$mockSqsClient->getMockController()->getQueueUrl = function () use($mockQueueUrlModel) {
60+
return $mockQueueUrlModel;
61+
};
62+
$mockSqsClient->getMockController()->sendMessageBatch = function () {
63+
};
64+
$this->given($SQSAdapter)
65+
->class($SQSAdapter->addMessages('testQueue', array_fill(0, 11, 'test message')))->hasInterface('\ReputationVIP\QueueClient\Adapter\AdapterInterface');
66+
}
67+
68+
public function testSQSAdapterAddMessagesWithEmptyMessage()
69+
{
70+
$this->mockGenerator->orphanize('__construct');
71+
$this->mockGenerator->shuntParentClassCalls();
72+
$mockSqsClient = new \mock\Aws\Sqs\SqsClient;
73+
$SQSAdapter = new \ReputationVIP\QueueClient\Adapter\SQSAdapter($mockSqsClient);
74+
75+
$this->exception(function() use($SQSAdapter) {
76+
$SQSAdapter->addMessages('testQueue', ['test message', '']);
77+
});
78+
}
79+
80+
public function testSQSAdapterAddMessagesWithEmptyQueueName()
81+
{
82+
$this->mockGenerator->orphanize('__construct');
83+
$this->mockGenerator->shuntParentClassCalls();
84+
$mockSqsClient = new \mock\Aws\Sqs\SqsClient;
85+
$SQSAdapter = new \ReputationVIP\QueueClient\Adapter\SQSAdapter($mockSqsClient);
86+
87+
$this->exception(function() use($SQSAdapter) {
88+
$SQSAdapter->addMessages('', ['']);
89+
});
90+
}
91+
5192
public function testSQSAdapterGetMessagesWithEmptyQueueName()
5293
{
5394
$this->mockGenerator->orphanize('__construct');

0 commit comments

Comments
 (0)