Skip to content

Commit e0b7814

Browse files
authored
Fix middleware (#321)
1 parent 30ac201 commit e0b7814

9 files changed

Lines changed: 300 additions & 29 deletions

src/ChildWorkflow.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public function handle()
6969
public function middleware()
7070
{
7171
return [
72-
new WithoutOverlappingMiddleware($this->parentWorkflow->id, WithoutOverlappingMiddleware::ACTIVITY),
72+
new WithoutOverlappingMiddleware($this->parentWorkflow->id, WithoutOverlappingMiddleware::ACTIVITY, 0, 15),
7373
];
7474
}
7575
}

src/Middleware/WithoutOverlappingMiddleware.php

Lines changed: 48 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -98,19 +98,31 @@ public function lock($job)
9898

9999
case self::ACTIVITY:
100100
$locked = false;
101-
if ($workflowSemaphore === 0) {
102-
$job->key = $this->getActivitySemaphoreKey() . ':' . (string) Str::uuid();
103-
$locked = $this->compareAndSet(
104-
$this->getActivitySemaphoreKey(),
105-
$activitySemaphores,
106-
array_merge($activitySemaphores, [$job->key])
107-
);
108-
if ($locked) {
109-
if ($this->expiresAfter) {
110-
$this->cache->put($job->key, 1, $this->expiresAfter);
111-
} else {
112-
$this->cache->put($job->key, 1);
101+
$maxAttempts = 5;
102+
for ($attempt = 0; $attempt < $maxAttempts; $attempt++) {
103+
if ($attempt > 0) {
104+
$workflowSemaphore = (int) $this->cache->get($this->getWorkflowSemaphoreKey(), 0);
105+
if ($workflowSemaphore !== 0) {
106+
break;
107+
}
108+
$activitySemaphores = $this->cache->get($this->getActivitySemaphoreKey(), []);
109+
}
110+
if ($workflowSemaphore === 0) {
111+
$job->key = $this->getActivitySemaphoreKey() . ':' . (string) Str::uuid();
112+
$locked = $this->compareAndSet(
113+
$this->getActivitySemaphoreKey(),
114+
$activitySemaphores,
115+
array_merge($activitySemaphores, [$job->key])
116+
);
117+
if ($locked) {
118+
if ($this->expiresAfter) {
119+
$this->cache->put($job->key, 1, $this->expiresAfter);
120+
} else {
121+
$this->cache->put($job->key, 1);
122+
}
123+
break;
113124
}
125+
usleep(500);
114126
}
115127
}
116128
break;
@@ -153,7 +165,7 @@ private function unlockActivity($job): bool
153165
$retries = 0;
154166

155167
while ($retries < $maxRetries) {
156-
$lock = $this->cache->lock($this->getLockKey());
168+
$lock = $this->cache->lock($this->getLockKey(), 5);
157169

158170
if (! $lock->get()) {
159171
$retries++;
@@ -185,25 +197,35 @@ private function unlockActivity($job): bool
185197

186198
private function compareAndSet($key, $expectedValue, $newValue, $expiresAfter = 0)
187199
{
188-
$lock = $this->cache->lock($this->getLockKey());
200+
$maxRetries = 10;
201+
$retries = 0;
189202

190-
if ($lock->get()) {
191-
try {
192-
$currentValue = $this->cache->get($key, $expectedValue);
203+
while ($retries < $maxRetries) {
204+
$lock = $this->cache->lock($this->getLockKey(), 5);
193205

194-
$currentValue = is_int($expectedValue) ? (int) $currentValue : $currentValue;
206+
if ($lock->get()) {
207+
try {
208+
$currentValue = $this->cache->get($key, $expectedValue);
195209

196-
if ($currentValue === $expectedValue) {
197-
if ($expiresAfter) {
198-
$this->cache->put($key, $newValue, $expiresAfter);
199-
} else {
200-
$this->cache->put($key, $newValue);
210+
$currentValue = is_int($expectedValue) ? (int) $currentValue : $currentValue;
211+
212+
if ($currentValue === $expectedValue) {
213+
if ($expiresAfter) {
214+
$this->cache->put($key, $newValue, $expiresAfter);
215+
} else {
216+
$this->cache->put($key, $newValue);
217+
}
218+
return true;
201219
}
202-
return true;
220+
} finally {
221+
$lock->release();
203222
}
204-
} finally {
205-
$lock->release();
223+
224+
return false;
206225
}
226+
227+
$retries++;
228+
usleep(1000);
207229
}
208230

209231
return false;

src/Workflow.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ class Workflow implements ShouldBeEncrypted, ShouldBeUnique, ShouldQueue
4444

4545
public int $maxExceptions = 0;
4646

47+
public $timeout = 0;
48+
4749
public $arguments;
4850

4951
public $coroutine;
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests\Feature;
6+
7+
use RuntimeException;
8+
use Tests\Fixtures\TestStressParentWorkflow;
9+
use Tests\TestCase;
10+
use Workflow\States\WorkflowCompletedStatus;
11+
use Workflow\WorkflowStub;
12+
13+
class RaceConditionTest extends TestCase
14+
{
15+
public function testParentWorkflowWithParallelChildWorkflows(int $children = 100, int $actPerChild = 10): void
16+
{
17+
$runId = (int) now()
18+
->format('Uu');
19+
20+
$workflow = WorkflowStub::make(TestStressParentWorkflow::class);
21+
$workflow->start($runId, $children, $actPerChild);
22+
23+
$deadline = now()
24+
->addSeconds(120);
25+
26+
while ($workflow->running() && now()->lt($deadline)) {
27+
usleep(50000);
28+
$workflow->fresh();
29+
}
30+
31+
if ($workflow->running()) {
32+
throw new RuntimeException(sprintf(
33+
'Race run %d did not complete before timeout. Current status: %s',
34+
$runId,
35+
(string) $workflow->status()
36+
));
37+
}
38+
39+
$this->assertSame(WorkflowCompletedStatus::class, $workflow->status());
40+
$this->assertSame([
41+
'run_id' => $runId,
42+
'children' => $children,
43+
'activities_per_child' => $actPerChild,
44+
], $workflow->output());
45+
}
46+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests\Fixtures;
6+
7+
use Generator;
8+
use Illuminate\Support\Facades\Log;
9+
use function Workflow\activity;
10+
use Workflow\Workflow;
11+
12+
class TestStressChildWorkflow extends Workflow
13+
{
14+
public function execute(int $runId, int $childIndex, int $activitiesPerChild = 10): Generator
15+
{
16+
for ($activityIndex = 0; $activityIndex < $activitiesPerChild; $activityIndex++) {
17+
Log::info(__METHOD__ . ':' . __LINE__, [
18+
'run_id' => $runId,
19+
'workflow_id' => $this->storedWorkflow->id,
20+
'child_index' => $childIndex,
21+
'activity_index' => $activityIndex,
22+
'worker_pid' => getmypid(),
23+
]);
24+
25+
yield activity(TestStressLogActivity::class, $runId, $childIndex, $activityIndex);
26+
}
27+
28+
return true;
29+
}
30+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests\Fixtures;
6+
7+
use Illuminate\Support\Facades\Log;
8+
use Workflow\Activity;
9+
10+
class TestStressLogActivity extends Activity
11+
{
12+
public function execute(int $runId, int $childIndex, int $activityIndex): bool
13+
{
14+
Log::info(__METHOD__ . ':' . __LINE__, [
15+
'run_id' => $runId,
16+
'child_index' => $childIndex,
17+
'activity_index' => $activityIndex,
18+
'workflow_id' => $this->workflowId(),
19+
'index' => $this->index,
20+
'worker_pid' => getmypid(),
21+
]);
22+
23+
return true;
24+
}
25+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests\Fixtures;
6+
7+
use Generator;
8+
use Illuminate\Support\Facades\Log;
9+
use function Workflow\all;
10+
use function Workflow\child;
11+
use Workflow\Workflow;
12+
13+
class TestStressParentWorkflow extends Workflow
14+
{
15+
public function execute(int $runId, int $children = 100, int $activitiesPerChild = 10): Generator
16+
{
17+
$promises = [];
18+
19+
for ($childIndex = 0; $childIndex < $children; $childIndex++) {
20+
$promises[] = child(TestStressChildWorkflow::class, $runId, $childIndex, $activitiesPerChild);
21+
}
22+
23+
Log::info(__METHOD__ . ':' . __LINE__, [
24+
'run_id' => $runId,
25+
'workflow_id' => $this->storedWorkflow->id,
26+
'worker_pid' => getmypid(),
27+
]);
28+
29+
yield all($promises);
30+
31+
Log::info(__METHOD__ . ':' . __LINE__, [
32+
'run_id' => $runId,
33+
'workflow_id' => $this->storedWorkflow->id,
34+
'worker_pid' => getmypid(),
35+
]);
36+
37+
return [
38+
'run_id' => $runId,
39+
'children' => $children,
40+
'activities_per_child' => $activitiesPerChild,
41+
];
42+
}
43+
}

tests/TestCase.php

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,27 @@ public static function setUpBeforeClass(): void
2525
}
2626
}
2727

28+
foreach ($_ENV as $key => $value) {
29+
if (is_string($value) && getenv($key) === false) {
30+
putenv("{$key}={$value}");
31+
}
32+
}
33+
34+
$redisHost = getenv('REDIS_HOST') ?: ($_ENV['REDIS_HOST'] ?? null);
35+
$redisPort = getenv('REDIS_PORT') ?: ($_ENV['REDIS_PORT'] ?? 6379);
36+
if ($redisHost && class_exists(\Redis::class)) {
37+
try {
38+
$redis = new \Redis();
39+
$redis->connect($redisHost, (int) $redisPort);
40+
$redis->flushDB();
41+
} catch (\Throwable $e) {
42+
// Ignore if no redis
43+
}
44+
}
45+
2846
for ($i = 0; $i < self::NUMBER_OF_WORKERS; $i++) {
2947
self::$workers[$i] = new Process(['php', __DIR__ . '/../vendor/bin/testbench', 'queue:work']);
48+
self::$workers[$i]->disableOutput();
3049
self::$workers[$i]->start();
3150
}
3251
}
@@ -51,6 +70,18 @@ protected function setUp(): void
5170
parent::setUp();
5271

5372
Cache::flush();
73+
74+
$redisHost = getenv('REDIS_HOST') ?: ($_ENV['REDIS_HOST'] ?? null);
75+
$redisPort = getenv('REDIS_PORT') ?: ($_ENV['REDIS_PORT'] ?? 6379);
76+
if ($redisHost && class_exists(\Redis::class)) {
77+
try {
78+
$redis = new \Redis();
79+
$redis->connect($redisHost, (int) $redisPort);
80+
$redis->flushDB();
81+
} catch (\Throwable $e) {
82+
// Ignore if no redis
83+
}
84+
}
5485
}
5586

5687
protected function defineDatabaseMigrations()

0 commit comments

Comments
 (0)