Skip to content

Commit 045fde1

Browse files
authored
Merge pull request #15 from gcarlev/fix-job-cleanup
Fix job cleanup
2 parents 6cce646 + e9ad5d9 commit 045fde1

10 files changed

Lines changed: 118 additions & 56 deletions

Command/CleanUpCommand.php

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
namespace JMS\JobQueueBundle\Command;
44

5+
use Doctrine\DBAL\Exception;
6+
use Doctrine\ORM\Exception\ORMException;
7+
use Doctrine\ORM\OptimisticLockException;
58
use Doctrine\Persistence\ManagerRegistry;
69
use Doctrine\DBAL\Connection;
710
use Doctrine\ORM\EntityManager;
@@ -37,6 +40,12 @@ protected function configure()
3740
;
3841
}
3942

43+
/**
44+
* @throws OptimisticLockException
45+
* @throws \Throwable
46+
* @throws ORMException
47+
* @throws Exception
48+
*/
4049
protected function execute(InputInterface $input, OutputInterface $output): int
4150
{
4251
/** @var EntityManager $em */
@@ -49,7 +58,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
4958
return 0;
5059
}
5160

52-
private function collectStaleJobs(EntityManager $em)
61+
private function collectStaleJobs(EntityManager $em): void
5362
{
5463
foreach ($this->findStaleJobs($em) as $job) {
5564
if ($job->isRetried()) {
@@ -63,7 +72,7 @@ private function collectStaleJobs(EntityManager $em)
6372
/**
6473
* @return Job[]
6574
*/
66-
private function findStaleJobs(EntityManager $em)
75+
private function findStaleJobs(EntityManager $em): iterable
6776
{
6877
$excludedIds = array(-1);
6978

@@ -88,7 +97,13 @@ private function findStaleJobs(EntityManager $em)
8897
} while ($job !== null);
8998
}
9099

91-
private function cleanUpExpiredJobs(EntityManager $em, Connection $con, InputInterface $input)
100+
/**
101+
* @throws OptimisticLockException
102+
* @throws \Throwable
103+
* @throws ORMException
104+
* @throws Exception
105+
*/
106+
private function cleanUpExpiredJobs(EntityManager $em, Connection $con, InputInterface $input): void
92107
{
93108
$incomingDepsSql = $con->getDatabasePlatform()->modifyLimitQuery("SELECT 1 FROM jms_job_dependencies WHERE dest_job_id = :id", 1);
94109

@@ -99,7 +114,7 @@ private function cleanUpExpiredJobs(EntityManager $em, Connection $con, InputInt
99114
$count++;
100115

101116
$result = $con->executeQuery($incomingDepsSql, array('id' => $job->getId()));
102-
if ($result->fetchColumn() !== false) {
117+
if ($result->fetchOne() !== false) {
103118
$em->transactional(function() use ($em, $job) {
104119
$this->resolveDependencies($em, $job);
105120
$em->remove($job);
@@ -118,6 +133,9 @@ private function cleanUpExpiredJobs(EntityManager $em, Connection $con, InputInt
118133
$em->flush();
119134
}
120135

136+
/**
137+
* @throws Exception
138+
*/
121139
private function resolveDependencies(EntityManager $em, Job $job)
122140
{
123141
// If this job has failed, or has otherwise not succeeded, we need to set the
@@ -137,7 +155,7 @@ private function resolveDependencies(EntityManager $em, Job $job)
137155
}
138156
}
139157

140-
$em->getConnection()->executeUpdate("DELETE FROM jms_job_dependencies WHERE dest_job_id = :id", array('id' => $job->getId()));
158+
$em->getConnection()->executeStatement("DELETE FROM jms_job_dependencies WHERE dest_job_id = :id", array('id' => $job->getId()));
141159
}
142160

143161
private function findExpiredJobs(EntityManager $em, InputInterface $input)

Command/RunCommand.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,13 +285,13 @@ private function checkRunningJobs()
285285

286286
if ( ! empty($newOutput)) {
287287
$event = new NewOutputEvent($data['job'], $newOutput, NewOutputEvent::TYPE_STDOUT);
288-
$this->dispatcher->dispatch('jms_job_queue.new_job_output', $event);
288+
$this->dispatcher->dispatch($event, 'jms_job_queue.new_job_output');
289289
$newOutput = $event->getNewOutput();
290290
}
291291

292292
if ( ! empty($newErrorOutput)) {
293293
$event = new NewOutputEvent($data['job'], $newErrorOutput, NewOutputEvent::TYPE_STDERR);
294-
$this->dispatcher->dispatch('jms_job_queue.new_job_output', $event);
294+
$this->dispatcher->dispatch($event, 'jms_job_queue.new_job_output');
295295
$newErrorOutput = $event->getNewOutput();
296296
}
297297

@@ -352,7 +352,7 @@ private function checkRunningJobs()
352352
private function startJob(Job $job)
353353
{
354354
$event = new StateChangeEvent($job, Job::STATE_RUNNING);
355-
$this->dispatcher->dispatch('jms_job_queue.job_state_change', $event);
355+
$this->dispatcher->dispatch($event, 'jms_job_queue.job_state_change');
356356
$newState = $event->getNewState();
357357

358358
if (Job::STATE_CANCELED === $newState) {

Command/ScheduleCommand.php

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
namespace JMS\JobQueueBundle\Command;
44

5+
use Doctrine\DBAL\Exception;
6+
use Doctrine\ORM\Exception\NotSupported;
7+
use Doctrine\ORM\Exception\ORMException;
8+
use Doctrine\ORM\NonUniqueResultException;
9+
use Doctrine\ORM\NoResultException;
10+
use Doctrine\ORM\OptimisticLockException;
511
use Doctrine\Persistence\ManagerRegistry;
612
use Doctrine\ORM\EntityManager;
713
use Doctrine\ORM\Query;
@@ -111,14 +117,19 @@ private function scheduleJobs(OutputInterface $output, array $jobSchedulers, arr
111117
}
112118
}
113119

114-
private function acquireLock($commandName, \DateTime $lastRunAt)
120+
/**
121+
* @throws NonUniqueResultException
122+
* @throws Exception
123+
* @throws NoResultException
124+
*/
125+
private function acquireLock($commandName, \DateTime $lastRunAt): array
115126
{
116127
/** @var EntityManager $em */
117128
$em = $this->registry->getManagerForClass(CronJob::class);
118129
$con = $em->getConnection();
119130

120131
$now = new \DateTime();
121-
$affectedRows = $con->executeUpdate(
132+
$affectedRows = $con->executeStatement(
122133
"UPDATE jms_cron_jobs SET lastRunAt = :now WHERE command = :command AND lastRunAt = :lastRunAt",
123134
array(
124135
'now' => $now,
@@ -144,7 +155,7 @@ private function acquireLock($commandName, \DateTime $lastRunAt)
144155
return array(false, $cronJob->getLastRunAt());
145156
}
146157

147-
private function populateJobSchedulers()
158+
private function populateJobSchedulers(): array
148159
{
149160
$schedulers = [];
150161
foreach ($this->schedulers as $scheduler) {
@@ -166,7 +177,12 @@ private function populateJobSchedulers()
166177
return $schedulers;
167178
}
168179

169-
private function populateJobsLastRunAt(EntityManager $em, array $jobSchedulers)
180+
/**
181+
* @throws OptimisticLockException
182+
* @throws NotSupported
183+
* @throws ORMException
184+
*/
185+
private function populateJobsLastRunAt(EntityManager $em, array $jobSchedulers): array
170186
{
171187
$jobsLastRunAt = array();
172188

Console/Application.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ private function saveDebugInformation(\Exception $ex = null)
8383
return;
8484
}
8585

86-
$this->getConnection()->executeUpdate(
86+
$this->getConnection()->executeStatement(
8787
"UPDATE jms_jobs SET stackTrace = :trace, memoryUsage = :memoryUsage, memoryUsageReal = :memoryUsageReal WHERE id = :id",
8888
array(
8989
'id' => $jobId,

Entity/CronJob.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public function getCommand()
3131
return $this->command;
3232
}
3333

34-
public function getLastRunAt()
34+
public function getLastRunAt(): \DateTime
3535
{
3636
return $this->lastRunAt;
3737
}

Entity/Listener/ManyToAnyListener.php

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
<?php
22

33
namespace JMS\JobQueueBundle\Entity\Listener;
4-
use Doctrine\ORM\Event\LifecycleEventArgs;
4+
use Doctrine\Common\Util\ClassUtils;
5+
use Doctrine\DBAL\Exception;
6+
use Doctrine\DBAL\Schema\SchemaException;
7+
use Doctrine\ORM\Tools\Event\GenerateSchemaEventArgs;
8+
use Doctrine\Persistence\Event\LifecycleEventArgs;
9+
use Doctrine\Persistence\Mapping\MappingException;
510
use JMS\JobQueueBundle\Entity\Job;
611
use Doctrine\Persistence\ManagerRegistry;
712

@@ -27,55 +32,65 @@ public function __construct(ManagerRegistry $registry)
2732
$this->ref->setAccessible(true);
2833
}
2934

30-
public function postLoad(\Doctrine\ORM\Event\LifecycleEventArgs $event)
35+
public function postLoad(LifecycleEventArgs $event): void
3136
{
32-
$entity = $event->getEntity();
33-
if ( ! $entity instanceof \JMS\JobQueueBundle\Entity\Job) {
37+
$entity = $event->getObject();
38+
if ( ! $entity instanceof Job) {
3439
return;
3540
}
3641

3742
$this->ref->setValue($entity, new PersistentRelatedEntitiesCollection($this->registry, $entity));
3843
}
3944

40-
public function preRemove(LifecycleEventArgs $event)
45+
/**
46+
* @throws Exception
47+
*/
48+
public function preRemove(LifecycleEventArgs $event): void
4149
{
42-
$entity = $event->getEntity();
50+
$entity = $event->getObject();
4351
if ( ! $entity instanceof Job) {
4452
return;
4553
}
4654

47-
$con = $event->getEntityManager()->getConnection();
48-
$con->executeUpdate("DELETE FROM jms_job_related_entities WHERE job_id = :id", array(
55+
$con = $event->getObjectManager()->getConnection();
56+
$con->executeStatement("DELETE FROM jms_job_related_entities WHERE job_id = :id", array(
4957
'id' => $entity->getId(),
5058
));
5159
}
5260

53-
public function postPersist(\Doctrine\ORM\Event\LifecycleEventArgs $event)
61+
/**
62+
* @throws Exception
63+
*/
64+
public function postPersist(LifecycleEventArgs $event): void
5465
{
55-
$entity = $event->getEntity();
56-
if ( ! $entity instanceof \JMS\JobQueueBundle\Entity\Job) {
66+
$entity = $event->getObject();
67+
if ( ! $entity instanceof Job) {
5768
return;
5869
}
5970

60-
$con = $event->getEntityManager()->getConnection();
71+
$con = $event->getObjectManager()->getConnection();
6172
foreach ($this->ref->getValue($entity) as $relatedEntity) {
62-
$relClass = \Doctrine\Common\Util\ClassUtils::getClass($relatedEntity);
73+
$relClass = ClassUtils::getClass($relatedEntity);
6374
$relId = $this->registry->getManagerForClass($relClass)->getMetadataFactory()->getMetadataFor($relClass)->getIdentifierValues($relatedEntity);
6475
asort($relId);
6576

6677
if ( ! $relId) {
6778
throw new \RuntimeException('The identifier for the related entity "'.$relClass.'" was empty.');
6879
}
6980

70-
$con->executeUpdate("INSERT INTO jms_job_related_entities (job_id, related_class, related_id) VALUES (:jobId, :relClass, :relId)", array(
81+
$con->executeStatement("INSERT INTO jms_job_related_entities (job_id, related_class, related_id) VALUES (:jobId, :relClass, :relId)", array(
7182
'jobId' => $entity->getId(),
7283
'relClass' => $relClass,
7384
'relId' => json_encode($relId),
7485
));
7586
}
7687
}
7788

78-
public function postGenerateSchema(\Doctrine\ORM\Tools\Event\GenerateSchemaEventArgs $event)
89+
/**
90+
* @throws SchemaException
91+
* @throws MappingException
92+
*/
93+
public function postGenerateSchema(GenerateSchemaEventArgs $event): void
7994
{
8095
$schema = $event->getSchema();
8196

0 commit comments

Comments
 (0)