-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker_bg.php
More file actions
executable file
·99 lines (84 loc) · 3.44 KB
/
worker_bg.php
File metadata and controls
executable file
·99 lines (84 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#!/usr/bin/php
<?php
//only available via command line (this file shold be outside the web folder anyway)
if (empty($_SERVER['argc'])) {
die();
}
//init composer autoloader
$loader = require __DIR__.'/vendor/autoload.php';
//enable PHP 5.3+ garbage collection
gc_enable();
//define config file
$configFile = __DIR__.'/config.php';
$config = array();
require_once($configFile);
$channelName = 'default';
foreach ($argv as $arg) {
if (strpos('--channel=', $arg) === 0) {
$channelName = substr($arg, 10);
break;
}
}
$factory = new \CacheQueue\Factory\Factory($config);
$logger = $factory->getLogger();
$connection = $factory->getConnection();
$worker = $factory->getWorker();
//exit after proceeding X tasks
$exitAfterTasksCount = $config['general']['workerscript_bg_exitAfterTasksCount']; //exit after X Tasks without break
$exitAfterMoreThanSeconds = $config['general']['workerscript_bg_exitAfterMoreThanSeconds']; //exit after X seconds without a break
//log a "status" message if a single task takes longer than X seconds
$noticeSlowTaskMoreThanSeconds = $config['general']['workerscript_noticeSlowTaskMoreThanSeconds'];
//log a "status" message if a single task data is larger than X bytes
$noticeLargeDataMoreThanBytes = $config['general']['workerscript_noticeLargeDataMoreThanBytes'];
if (!empty($config['channels'][$channelName]) && (int) $config['channels'][$channelName] > 0) {
$channel = (int) $config['channels'][$channelName];
} else {
$logger->logError('Worker: Invalid channel "'.$channelName.'"');
exit;
}
$start = microtime(true);
$processed = 0;
$errors = 0;
try {
do {
try {
if ($job = $worker->getJob($channel)) {
$jobStart = microtime(true);
$result = $worker->work($job);
$jobEnd = microtime(true);
if ($noticeLargeDataMoreThanBytes) {
$size = strlen(is_array($result) ? serialize($result) : $result);
if ($size > $noticeLargeDataMoreThanBytes) {
$base = log($size) / log(1024);
$suffixes = array('', 'k', 'M', 'G', 'T');
$hrSize = round(pow(1024, $base - floor($base)), 2) . $suffixes[floor($base)];
$logger->logNotice('Worker: Job '.$job['key'].' (task '.$job['task'].') data size is '.$hrSize.'.');
}
}
if ($jobEnd - $jobStart > $noticeSlowTaskMoreThanSeconds) {
$logger->logNotice('Worker: Job '.$job['key'].' (task '.$job['task'].') took '.(number_format($jobEnd - $jobStart, 4,'.','')).'s.');
}
} else {
//done, exit
break;
}
} catch (\CacheQueue\Exception\Exception $e) {
//log CacheQueue exceptions
$errors++;
$logger->logException($e);
unset ($e);
}
$processed++;
$end = microtime(true);
if ($exitAfterTasksCount && $processed >= $exitAfterTasksCount) {
break; //not finished, exiting anyway to prevent memory leaks...
}
if ($processed && $exitAfterMoreThanSeconds && $end - $start > $exitAfterMoreThanSeconds) {
break; //not finished, exiting anyway to prevent memory leaks...
}
} while (true);
} catch (Exception $e) {
//handle exceptions
$logger->logException($e);
}
echo "\n".$processed.'|'.$errors;