Skip to content

Commit d64f5ea

Browse files
committed
WIP
1 parent 61b2844 commit d64f5ea

File tree

7 files changed

+193
-9
lines changed

7 files changed

+193
-9
lines changed

src/CloudTasksConnector.php

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,23 @@
22

33
namespace Stackkit\LaravelGoogleCloudTasksQueue;
44

5+
use Google\Cloud\Tasks\V2\CloudTasksClient;
56
use Illuminate\Queue\Connectors\ConnectorInterface;
67

78
class CloudTasksConnector implements ConnectorInterface
89
{
910
public function connect(array $config)
1011
{
11-
return new CloudTasksQueue();
12+
return new CloudTasksQueue(
13+
$config,
14+
new CloudTasksClient($this->buildConfig())
15+
);
16+
}
17+
18+
private function buildConfig()
19+
{
20+
return [
21+
'credentials' => Config::credentials(),
22+
];
1223
}
1324
}

src/CloudTasksException.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<?php
2+
3+
namespace Stackkit\LaravelGoogleCloudTasksQueue;
4+
5+
use Exception;
6+
7+
class CloudTasksException extends Exception
8+
{
9+
//
10+
}

src/CloudTasksJob.php

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,34 @@
22

33
namespace Stackkit\LaravelGoogleCloudTasksQueue;
44

5+
use Illuminate\Container\Container;
56
use Illuminate\Queue\Jobs\Job as LaravelJob;
67
use Illuminate\Contracts\Queue\Job as JobContract;
78

89
class CloudTasksJob extends LaravelJob implements JobContract
910
{
11+
private $job;
12+
private $attempts;
13+
14+
public function __construct($job, $attempts)
15+
{
16+
$this->job = $job;
17+
$this->attempts = $attempts;
18+
$this->container = Container::getInstance();
19+
}
20+
1021
public function getJobId()
1122
{
12-
// TODO: Implement getJobId() method.
23+
return $this->job['uuid'];
1324
}
1425

1526
public function getRawBody()
1627
{
17-
// TODO: Implement getRawBody() method.
28+
return json_encode($this->job);
1829
}
1930

2031
public function attempts()
2132
{
22-
// TODO: Implement attempts() method.
33+
return $this->attempts;
2334
}
2435
}

src/CloudTasksQueue.php

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,33 +2,81 @@
22

33
namespace Stackkit\LaravelGoogleCloudTasksQueue;
44

5+
use Carbon\Carbon;
6+
use Google\Cloud\Tasks\V2\CloudTasksClient;
7+
use Google\Cloud\Tasks\V2\HttpMethod;
8+
use Google\Cloud\Tasks\V2\HttpRequest;
9+
use Google\Cloud\Tasks\V2\Task;
10+
use Google\Protobuf\Timestamp;
511
use Illuminate\Contracts\Queue\Queue as QueueContract;
612
use Illuminate\Queue\Queue as LaravelQueue;
13+
use Illuminate\Support\InteractsWithTime;
714

815
class CloudTasksQueue extends LaravelQueue implements QueueContract
916
{
17+
use InteractsWithTime;
18+
19+
private $client;
20+
private $default;
21+
22+
public function __construct(array $config, CloudTasksClient $client)
23+
{
24+
$this->client = $client;
25+
$this->default = $config['queue'];
26+
}
27+
1028
public function size($queue = null)
1129
{
1230
// TODO: Implement size() method.
1331
}
1432

1533
public function push($job, $data = '', $queue = null)
1634
{
17-
// TODO: Implement push() method.
35+
return $this->pushToCloudTasks($queue, $this->createPayload(
36+
$job, $this->getQueue($queue), $data
37+
));
1838
}
1939

2040
public function pushRaw($payload, $queue = null, array $options = [])
2141
{
22-
// TODO: Implement pushRaw() method.
42+
return $this->pushToCloudTasks($queue, $payload);
2343
}
2444

2545
public function later($delay, $job, $data = '', $queue = null)
2646
{
27-
// TODO: Implement later() method.
47+
return $this->pushToCloudTasks($queue, $this->createPayload(
48+
$job, $this->getQueue($queue), $data
49+
), $delay);
50+
}
51+
52+
protected function pushToCloudTasks($queue, $payload, $delay = 0, $attempts = 0)
53+
{
54+
$queue = $this->getQueue($queue);
55+
$queueName = $this->client->queueName(Config::project(), Config::location(), $queue);
56+
$availableAt = $this->availableAt($delay);
57+
58+
$httpRequest = new HttpRequest();
59+
$httpRequest->setUrl(Config::handler());
60+
$httpRequest->setHttpMethod(HttpMethod::POST);
61+
$httpRequest->setBody($payload);
62+
63+
$task = new Task;
64+
$task->setHttpRequest($httpRequest);
65+
66+
if ($availableAt > time()) {
67+
$task->setScheduleTime(new Timestamp(['seconds' => $availableAt]));
68+
}
69+
70+
$this->client->createTask($queueName, $task);
2871
}
2972

3073
public function pop($queue = null)
3174
{
3275
// TODO: Implement pop() method.
3376
}
77+
78+
private function getQueue($queue = null)
79+
{
80+
return $queue ?: $this->default;
81+
}
3482
}

src/CloudTasksServiceProvider.php

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,27 @@
22

33
namespace Stackkit\LaravelGoogleCloudTasksQueue;
44

5+
use Illuminate\Queue\QueueManager;
6+
use Illuminate\Routing\Router;
57
use Illuminate\Support\ServiceProvider as LaravelServiceProvider;
68

79
class CloudTasksServiceProvider extends LaravelServiceProvider
810
{
9-
public function boot()
11+
public function boot(QueueManager $queue, Router $router)
1012
{
11-
$this->app['queue']->addConnector('cloudtasks', function () {
13+
$this->registerConnector($queue);
14+
$this->registerRoutes($router);
15+
}
16+
17+
private function registerConnector(QueueManager $queue)
18+
{
19+
$queue->addConnector('cloudtasks', function () {
1220
return new CloudTasksConnector;
1321
});
1422
}
23+
24+
private function registerRoutes(Router $router)
25+
{
26+
$router->post('handle-task', [TaskHandler::class, 'handle']);
27+
}
1528
}

src/Config.php

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<?php
2+
3+
namespace Stackkit\LaravelGoogleCloudTasksQueue;
4+
5+
class Config
6+
{
7+
public static function credentials()
8+
{
9+
return config('queue.connections.cloudtasks.credentials');
10+
}
11+
12+
public static function project()
13+
{
14+
return config('queue.connections.cloudtasks.project');
15+
}
16+
17+
public static function location()
18+
{
19+
return config('queue.connections.cloudtasks.location');
20+
}
21+
22+
public static function handler()
23+
{
24+
return config('queue.connections.cloudtasks.handler');
25+
}
26+
}

src/TaskHandler.php

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
<?php
2+
3+
namespace Stackkit\LaravelGoogleCloudTasksQueue;
4+
5+
use Illuminate\Queue\Worker;
6+
use Illuminate\Queue\WorkerOptions;
7+
use Throwable;
8+
9+
class TaskHandler
10+
{
11+
/**
12+
* @throws CloudTasksException
13+
*/
14+
public function handle()
15+
{
16+
$task = $this->captureTask();
17+
18+
$this->handleTask($task);
19+
}
20+
21+
/**
22+
* @throws CloudTasksException
23+
*/
24+
private function captureTask()
25+
{
26+
$input = file_get_contents('php://input');
27+
28+
if ($input === false) {
29+
throw new CloudTasksException('Could not read incoming task');
30+
}
31+
32+
$task = json_decode($input, true, JSON_THROW_ON_ERROR);
33+
34+
if (is_null($task)) {
35+
throw new CloudTasksException('Could not decode incoming task');
36+
}
37+
38+
return $task;
39+
}
40+
41+
/**
42+
* @param $task
43+
* @throws CloudTasksException
44+
*/
45+
private function handleTask($task)
46+
{
47+
$job = new CloudTasksJob($task, request()->header('X-CloudTasks-TaskRetryCount'));
48+
49+
$worker = $this->getQueueWorker();
50+
51+
try {
52+
$worker->process('cloudtasks', $job, new WorkerOptions());
53+
} catch (Throwable $e) {
54+
throw new CloudTasksException('Failed handling task');
55+
}
56+
}
57+
58+
/**
59+
* @return Worker
60+
*/
61+
private function getQueueWorker()
62+
{
63+
return app('queue.worker');
64+
}
65+
}

0 commit comments

Comments
 (0)