Skip to content

Commit 59c6971

Browse files
committed
Initial commit
0 parents  commit 59c6971

File tree

6 files changed

+409
-0
lines changed

6 files changed

+409
-0
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
/vendor/
2+
composer.lock
3+
.idea/

composer.json

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{
2+
"name": "stackkit/laravel-google-pubsub-queue",
3+
"license": "MIT",
4+
"authors": [
5+
{
6+
"name": "Marick van Tuil",
7+
"email": "info@marickvantuil.nl"
8+
}
9+
],
10+
"require": {
11+
"ext-json": "*",
12+
"google/cloud-pubsub": "^1.24"
13+
},
14+
"require-dev": {
15+
"mockery/mockery": "^1.2",
16+
"orchestra/testbench": "^3.5 || ^3.6 || ^3.7 || ^3.8 || ^4.0 || ^5.0",
17+
"symfony/console": "^4.4|^5.0"
18+
},
19+
"autoload": {
20+
"psr-4": {
21+
"Stackkit\\LaravelGooglePubSubQueue\\": "src/"
22+
}
23+
},
24+
"extra": {
25+
"laravel": {
26+
"providers": [
27+
"Stackkit\\LaravelGooglePubSubQueue\\LaravelGooglePubSubQueueServiceProvider"
28+
]
29+
}
30+
}
31+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
3+
namespace Stackkit\LaravelGooglePubSubQueue;
4+
5+
use Illuminate\Support\Facades\Route;
6+
use Illuminate\Support\ServiceProvider;
7+
use Google\Cloud\PubSub\PubSubClient;
8+
9+
class LaravelGooglePubSubQueueServiceProvider extends ServiceProvider
10+
{
11+
/**
12+
* Perform post-registration booting of services.
13+
*
14+
* @return void
15+
*/
16+
public function boot()
17+
{
18+
$this->app['queue']->addConnector('pubsub', function () {
19+
return new PubSubConnector();
20+
});
21+
22+
Route::post('pubsub-wake', function () {
23+
$client = new PubSubClient([
24+
"driver" => "pubsub",
25+
"connection" => "default",
26+
"queue" => "projects/test-marick/topics/geencijfer-prd-queue",
27+
"project_id" => "test-marick",
28+
"retries" => 3,
29+
"request_timeout" => 60,
30+
"subscriber" => "projects/test-marick/subscriptions/geencijfer-prd-queue-subscription",
31+
"keyFilePath" => "/var/www/gcloud-key.json",
32+
]);
33+
34+
$message = $client->consume(request()->toArray());
35+
36+
logger(base64_decode($message->data()));
37+
});
38+
}
39+
40+
/**
41+
* Register the service provider.
42+
*
43+
* @return void
44+
*/
45+
public function register()
46+
{
47+
// $this->commands([
48+
// SendEmailsCommand::class,
49+
// ]);
50+
}
51+
}

src/PubSubConnector.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
namespace Stackkit\LaravelGooglePubSubQueue;
4+
5+
use Google\Cloud\PubSub\PubSubClient;
6+
use Illuminate\Queue\Connectors\ConnectorInterface;
7+
8+
class PubSubConnector implements ConnectorInterface
9+
{
10+
public function connect(array $config)
11+
{
12+
return new PubSubQueue(
13+
new PubSubClient($config),
14+
$config['queue'] ,
15+
$config['subscriber']
16+
);
17+
}
18+
}

src/PubSubJob.php

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
<?php
2+
3+
namespace Stackkit\LaravelGooglePubSubQueue;
4+
5+
use Google\Cloud\PubSub\Message;
6+
use Illuminate\Container\Container;
7+
use Illuminate\Contracts\Queue\Job as JobContract;
8+
use Illuminate\Queue\Jobs\Job;
9+
use Stackkit\LaravelGooglePubSubQueue\PubSubQueue;
10+
11+
class PubSubJob extends Job implements JobContract
12+
{
13+
/**
14+
* The PubSub queue.
15+
*
16+
* @var \Stackkit\LaravelGooglePubSubQueue\PubSubQueue
17+
*/
18+
protected $pubsub;
19+
20+
/**
21+
* The job instance.
22+
*
23+
* @var array
24+
*/
25+
protected $job;
26+
27+
/**
28+
* Create a new job instance.
29+
*
30+
* @param \Illuminate\Container\Container $container
31+
* @param \Stackkit\LaravelGooglePubSubQueue\PubSubQueue $sqs
32+
* @param \Google\Cloud\PubSub\Message $job
33+
* @param string $connectionName
34+
* @param string $queue
35+
*/
36+
public function __construct(Container $container, PubSubQueue $pubsub, Message $job, $connectionName, $queue)
37+
{
38+
$this->pubsub = $pubsub;
39+
$this->job = $job;
40+
$this->queue = $queue;
41+
$this->container = $container;
42+
$this->connectionName = $connectionName;
43+
44+
$this->decoded = $this->payload();
45+
}
46+
47+
/**
48+
* Get the job identifier.
49+
*
50+
* @return string
51+
*/
52+
public function getJobId()
53+
{
54+
return $this->decoded['id'] ?? null;
55+
}
56+
57+
/**
58+
* Get the raw body of the job.
59+
*
60+
* @return string
61+
*/
62+
public function getRawBody()
63+
{
64+
return base64_decode($this->job->data());
65+
}
66+
67+
/**
68+
* Get the number of times the job has been attempted.
69+
*
70+
* @return int
71+
*/
72+
public function attempts()
73+
{
74+
return ((int) $this->job->attribute('attempts') ?? 0) + 1;
75+
}
76+
77+
/**
78+
* Release the job back into the queue.
79+
*
80+
* @param int $delay
81+
* @return void
82+
*/
83+
public function release($delay = 0)
84+
{
85+
parent::release($delay);
86+
87+
$attempts = $this->attempts();
88+
$this->pubsub->republish(
89+
$this->job,
90+
$this->queue,
91+
['attempts' => (string) $attempts],
92+
$delay
93+
);
94+
}
95+
}

0 commit comments

Comments
 (0)