Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/Activity.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,20 @@ public function __construct(
) {
$this->arguments = $arguments;

if (property_exists($this, 'connection')) {
$options = $this->storedWorkflow->workflowOptions();
$connection = $options->connection;

if ($connection !== null) {
$this->onConnection($connection);
} elseif (property_exists($this, 'connection')) {
$this->onConnection($this->connection);
}

if (property_exists($this, 'queue')) {
$queue = $options->queue;

if ($queue !== null) {
$this->onQueue($queue);
} elseif (property_exists($this, 'queue')) {
$this->onQueue($this->queue);
}

Expand Down Expand Up @@ -102,7 +111,7 @@ public function handle()

$this->container = App::make(Container::class);

if ($this->storedWorkflow->logs()->whereIndex($this->index)->exists()) {
if ($this->storedWorkflow->hasLogByIndex($this->index)) {
return;
}

Expand Down
21 changes: 9 additions & 12 deletions src/ActivityStub.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,22 @@ public static function make($activity, ...$arguments): PromiseInterface
{
$context = WorkflowStub::getContext();

$log = $context->storedWorkflow->logs()
->whereIndex($context->index)
->first();
$log = $context->storedWorkflow->findLogByIndex($context->index);

if (WorkflowStub::faked()) {
$mocks = WorkflowStub::mocks();

if (! $log && array_key_exists($activity, $mocks)) {
$result = $mocks[$activity];

$log = $context->storedWorkflow->logs()
->create([
'index' => $context->index,
'now' => $context->now,
'class' => $activity,
'result' => Serializer::serialize(
is_callable($result) ? $result($context, ...$arguments) : $result
),
]);
$log = $context->storedWorkflow->createLog([
'index' => $context->index,
'now' => $context->now,
'class' => $activity,
'result' => Serializer::serialize(
is_callable($result) ? $result($context, ...$arguments) : $result
),
]);

WorkflowStub::recordDispatched($activity, $arguments);
}
Expand Down
9 changes: 6 additions & 3 deletions src/ChildWorkflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ public function __construct(
$connection = null,
$queue = null
) {
$connection = $connection ?? config('queue.default');
$queue = $queue ?? config('queue.connections.' . $connection . '.queue', 'default');
$connection = $connection ?? $this->storedWorkflow->effectiveConnection() ?? config('queue.default');
$queue = $queue ?? $this->storedWorkflow->effectiveQueue() ?? config(
'queue.connections.' . $connection . '.queue',
'default'
);
$this->onConnection($connection);
$this->onQueue($queue);
}
Expand All @@ -54,7 +57,7 @@ public function handle()
$workflow = $this->parentWorkflow->toWorkflow();

try {
if ($this->parentWorkflow->logs()->whereIndex($this->index)->exists()) {
if ($this->parentWorkflow->hasLogByIndex($this->index)) {
$workflow->resume();
} else {
$workflow->next($this->index, $this->now, $this->storedWorkflow->class, $this->return);
Expand Down
32 changes: 20 additions & 12 deletions src/ChildWorkflowStub.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,22 @@ public static function make($workflow, ...$arguments): PromiseInterface
{
$context = WorkflowStub::getContext();

$log = $context->storedWorkflow->logs()
->whereIndex($context->index)
->first();
$log = $context->storedWorkflow->findLogByIndex($context->index);

if (WorkflowStub::faked()) {
$mocks = WorkflowStub::mocks();

if (! $log && array_key_exists($workflow, $mocks)) {
$result = $mocks[$workflow];

$log = $context->storedWorkflow->logs()
->create([
'index' => $context->index,
'now' => $context->now,
'class' => $workflow,
'result' => Serializer::serialize(
is_callable($result) ? $result($context, ...$arguments) : $result
),
]);
$log = $context->storedWorkflow->createLog([
'index' => $context->index,
'now' => $context->now,
'class' => $workflow,
'result' => Serializer::serialize(
is_callable($result) ? $result($context, ...$arguments) : $result
),
]);

WorkflowStub::recordDispatched($workflow, $arguments);
}
Expand All @@ -58,6 +55,17 @@ public static function make($workflow, ...$arguments): PromiseInterface

$childWorkflow = $storedChildWorkflow ? $storedChildWorkflow->toWorkflow() : WorkflowStub::make($workflow);

$hasOptions = collect($arguments)
->contains(static fn ($argument): bool => $argument instanceof WorkflowOptions);

if (! $hasOptions) {
$options = new WorkflowOptions(WorkflowStub::connection(), WorkflowStub::queue());

if ($options->connection !== null || $options->queue !== null) {
$arguments[] = $options;
}
}

if ($childWorkflow->running() && ! $childWorkflow->created()) {
try {
$childWorkflow->resume();
Expand Down
9 changes: 6 additions & 3 deletions src/Exception.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@ public function __construct(
$connection = null,
$queue = null
) {
$connection = $connection ?? config('queue.default');
$queue = $queue ?? config('queue.connections.' . $connection . '.queue', 'default');
$connection = $connection ?? $this->storedWorkflow->effectiveConnection() ?? config('queue.default');
$queue = $queue ?? $this->storedWorkflow->effectiveQueue() ?? config(
'queue.connections.' . $connection . '.queue',
'default'
);
$this->onConnection($connection);
$this->onQueue($queue);
}
Expand All @@ -47,7 +50,7 @@ public function handle()
$workflow = $this->storedWorkflow->toWorkflow();

try {
if ($this->storedWorkflow->logs()->whereIndex($this->index)->exists()) {
if ($this->storedWorkflow->hasLogByIndex($this->index)) {
$workflow->resume();
} else {
$workflow->next($this->index, $this->now, self::class, $this->exception);
Expand Down
164 changes: 164 additions & 0 deletions src/Models/StoredWorkflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@
namespace Workflow\Models;

use Illuminate\Database\Eloquent\Builder;
use Illuminate\Database\Eloquent\Collection;
use Illuminate\Database\Eloquent\Model;
use Illuminate\Database\Eloquent\Prunable;
use Illuminate\Database\Eloquent\Relations\BelongsToMany;
use Illuminate\Support\Arr;
use Spatie\ModelStates\HasStates;
use Workflow\States\WorkflowContinuedStatus;
use Workflow\States\WorkflowStatus;
use Workflow\WorkflowMetadata;
use Workflow\WorkflowOptions;
use Workflow\WorkflowStub;

class StoredWorkflow extends Model
Expand Down Expand Up @@ -52,12 +56,130 @@ public function toWorkflow()
return WorkflowStub::fromStoredWorkflow($this);
}

public function workflowMetadata(): WorkflowMetadata
{
$arguments = $this->arguments;

if ($arguments === null) {
return new WorkflowMetadata([]);
}

return WorkflowMetadata::fromSerializedArguments(
\Workflow\Serializers\Serializer::unserialize($arguments)
);
}

/**
* @return array<int, mixed>
*/
public function workflowArguments(): array
{
return $this->workflowMetadata()
->arguments;
}

public function workflowOptions(): WorkflowOptions
{
return $this->workflowMetadata()
->options;
}

public function effectiveConnection(): ?string
{
$connection = $this->workflowOptions()
->connection;

if ($connection !== null) {
return $connection;
}

if (! is_string($this->class) || $this->class === '') {
return null;
}

return Arr::get(WorkflowStub::getDefaultProperties($this->class), 'connection');
}

public function effectiveQueue(): ?string
{
$queue = $this->workflowOptions()
->queue;

if ($queue !== null) {
return $queue;
}

if (! is_string($this->class) || $this->class === '') {
return null;
}

$connection = $this->effectiveConnection() ?? config('queue.default');

return Arr::get(WorkflowStub::getDefaultProperties($this->class), 'queue')
?? config('queue.connections.' . $connection . '.queue', 'default');
}

public function logs(): \Illuminate\Database\Eloquent\Relations\HasMany
{
return $this->hasMany(config('workflows.stored_workflow_log_model', StoredWorkflowLog::class))
->orderBy('id');
}

public function findLogByIndex(int $index, bool $fresh = false): ?StoredWorkflowLog
{
if ($fresh) {
$log = $this->logs()
->whereIndex($index)
->first();

if ($this->relationLoaded('logs') && $log !== null) {
/** @var Collection<int, StoredWorkflowLog> $logs */
$logs = $this->getRelation('logs');
if (! $logs->contains('id', $log->id)) {
$this->setRelation('logs', $logs->push($log)->sortBy('id')->values());
}
}

return $log;
}

if ($this->relationLoaded('logs')) {
/** @var Collection<int, StoredWorkflowLog> $logs */
$logs = $this->getRelation('logs');
return $logs->firstWhere('index', $index);
}

return $this->logs()
->whereIndex($index)
->first();
}

public function hasLogByIndex(int $index): bool
{
if ($this->relationLoaded('logs')) {
return $this->findLogByIndex($index) !== null;
}

return $this->logs()
->whereIndex($index)
->exists();
}

public function createLog(array $attributes): StoredWorkflowLog
{
/** @var StoredWorkflowLog $log */
$log = $this->logs()
->create($attributes);

if ($this->relationLoaded('logs')) {
/** @var Collection<int, StoredWorkflowLog> $logs */
$logs = $this->getRelation('logs');
$this->setRelation('logs', $logs->push($log)->sortBy('id')->values());
}

return $log;
}

public function signals(): \Illuminate\Database\Eloquent\Relations\HasMany
{
return $this->hasMany(config('workflows.stored_workflow_signal_model', StoredWorkflowSignal::class))
Expand All @@ -70,6 +192,48 @@ public function timers(): \Illuminate\Database\Eloquent\Relations\HasMany
->orderBy('id');
}

public function findTimerByIndex(int $index): ?StoredWorkflowTimer
{
if ($this->relationLoaded('timers')) {
/** @var Collection<int, StoredWorkflowTimer> $timers */
$timers = $this->getRelation('timers');
return $timers->firstWhere('index', $index);
}

return $this->timers()
->whereIndex($index)
->first();
}

public function createTimer(array $attributes): StoredWorkflowTimer
{
/** @var StoredWorkflowTimer $timer */
$timer = $this->timers()
->create($attributes);

if ($this->relationLoaded('timers')) {
/** @var Collection<int, StoredWorkflowTimer> $timers */
$timers = $this->getRelation('timers');
$this->setRelation('timers', $timers->push($timer)->sortBy('id')->values());
}

return $timer;
}

public function orderedSignals(): Collection
{
if ($this->relationLoaded('signals')) {
/** @var Collection<int, StoredWorkflowSignal> $signals */
$signals = $this->getRelation('signals');
return $signals->sortBy('created_at')
->values();
}

return $this->signals()
->orderBy('created_at')
->get();
}

public function exceptions(): \Illuminate\Database\Eloquent\Relations\HasMany
{
return $this->hasMany(config('workflows.stored_workflow_exception_model', StoredWorkflowException::class))
Expand Down
Loading