From a816ac102b2b14e2ac6fd4c6c424093071462694 Mon Sep 17 00:00:00 2001 From: Mark1Z Date: Fri, 20 May 2016 16:18:18 +0300 Subject: [PATCH 1/2] update with comments --- Laravel5Queue.php | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/Laravel5Queue.php b/Laravel5Queue.php index 870c9d9..7372aaa 100644 --- a/Laravel5Queue.php +++ b/Laravel5Queue.php @@ -16,7 +16,11 @@ */ class Laravel5Queue extends CApplicationComponent { - + /** + * Available connections + * + * @var array + */ public $connections = [ 'default' => [ 'driver' => 'mongoQueue', @@ -35,14 +39,22 @@ class Laravel5Queue extends CApplicationComponent ] ]; + /** + * Encrypter private key + * + * @var string + */ public $privateKey = 'rc5lgpue80sr17nx'; + /** + * QueueManager instance + * + * @var + */ private $queueManager; /** - * Initialize - * - * @author Virchenko Maksim + * Initialize component */ public function init() { @@ -54,7 +66,6 @@ public function init() /** * Connect queue manager for mongo database * - * @author Virchenko Maksim * @return Manager */ public function connect() @@ -71,10 +82,10 @@ public function connect() //Connector to successful jobs $this->queueManager->addConnector('mongoQueue', function () { - return new MongoConnector(Yii::app()->mongodb, 'YiiJobsSuccessed'); + return new MongoConnector(Yii::app()->mongodb); }); $this->queueManager->addConnector('asyncMongoQueue', function () { - return new AsyncMongoConnector(Yii::app()->mongodb, 'YiiJobsSuccessed'); + return new AsyncMongoConnector(Yii::app()->mongodb); }); foreach ($this->connections as $name => $params) { $this->queueManager->addConnection($params, $name); @@ -90,6 +101,7 @@ public function connect() * Push new job to queue * * @author Virchenko Maksim + * * @param mixed $handler * @param array $data * @param string $queue @@ -97,17 +109,12 @@ public function connect() */ public function push($handler, $data = [], $queue = 'default', $connection = 'default') { -// if (is_callable($handler)) { -// $this->queueManager->getConnection($queue)->push($handler); -// } else { Manager::push($handler, $data, $queue, $connection); -// } } /** * Push new job to queue if this job is not exist * - * @author Virchenko Maksim * @param mixed $handler * @param array $data * @param string $queue @@ -127,6 +134,7 @@ public function pushUnique($handler, $data = [], $queue = 'default', $connection * @param mixed $data * @param string $queue * @param string $connection + * * @return mixed */ public static function bulk($jobs, $data = '', $queue = null, $connection = null) @@ -142,6 +150,7 @@ public static function bulk($jobs, $data = '', $queue = null, $connection = null * @param mixed $data * @param string $queue * @param string $connection + * * @return mixed */ public static function later($delay, $job, $data = '', $queue = null, $connection = null) From 8773bf1a8ca1065d223ce7b7cb539f42e7a9e008 Mon Sep 17 00:00:00 2001 From: Mark1Z Date: Tue, 20 Sep 2016 14:25:17 +0300 Subject: [PATCH 2/2] update phpdocs, clean code from comments, compare illuminate versions in composer --- Laravel5Queue.php | 33 ++++++---------- README.md | 6 +-- Worker.php | 20 ++++++++-- base/WorkerInterface.php | 27 ++++++++++++- commands/AsyncQueueCommand.php | 11 +++--- commands/WorkerCommand.php | 32 ++++++++-------- composer.json | 45 ++++++++++++---------- connectors/AsyncMongoConnector.php | 2 +- connectors/MongoConnector.php | 10 ++--- failed/MongoFailedJobProvider.php | 19 +++++----- handlers/FatalThrowableError.php | 7 ++++ jobs/MongoJob.php | 20 +++++----- queues/AsyncMongoQueue.php | 61 ++++++++++++++++++++++-------- queues/MongoQueue.php | 19 ++++++---- 14 files changed, 193 insertions(+), 119 deletions(-) diff --git a/Laravel5Queue.php b/Laravel5Queue.php index 7372aaa..870c9d9 100644 --- a/Laravel5Queue.php +++ b/Laravel5Queue.php @@ -16,11 +16,7 @@ */ class Laravel5Queue extends CApplicationComponent { - /** - * Available connections - * - * @var array - */ + public $connections = [ 'default' => [ 'driver' => 'mongoQueue', @@ -39,22 +35,14 @@ class Laravel5Queue extends CApplicationComponent ] ]; - /** - * Encrypter private key - * - * @var string - */ public $privateKey = 'rc5lgpue80sr17nx'; - /** - * QueueManager instance - * - * @var - */ private $queueManager; /** - * Initialize component + * Initialize + * + * @author Virchenko Maksim */ public function init() { @@ -66,6 +54,7 @@ public function init() /** * Connect queue manager for mongo database * + * @author Virchenko Maksim * @return Manager */ public function connect() @@ -82,10 +71,10 @@ public function connect() //Connector to successful jobs $this->queueManager->addConnector('mongoQueue', function () { - return new MongoConnector(Yii::app()->mongodb); + return new MongoConnector(Yii::app()->mongodb, 'YiiJobsSuccessed'); }); $this->queueManager->addConnector('asyncMongoQueue', function () { - return new AsyncMongoConnector(Yii::app()->mongodb); + return new AsyncMongoConnector(Yii::app()->mongodb, 'YiiJobsSuccessed'); }); foreach ($this->connections as $name => $params) { $this->queueManager->addConnection($params, $name); @@ -101,7 +90,6 @@ public function connect() * Push new job to queue * * @author Virchenko Maksim - * * @param mixed $handler * @param array $data * @param string $queue @@ -109,12 +97,17 @@ public function connect() */ public function push($handler, $data = [], $queue = 'default', $connection = 'default') { +// if (is_callable($handler)) { +// $this->queueManager->getConnection($queue)->push($handler); +// } else { Manager::push($handler, $data, $queue, $connection); +// } } /** * Push new job to queue if this job is not exist * + * @author Virchenko Maksim * @param mixed $handler * @param array $data * @param string $queue @@ -134,7 +127,6 @@ public function pushUnique($handler, $data = [], $queue = 'default', $connection * @param mixed $data * @param string $queue * @param string $connection - * * @return mixed */ public static function bulk($jobs, $data = '', $queue = null, $connection = null) @@ -150,7 +142,6 @@ public static function bulk($jobs, $data = '', $queue = null, $connection = null * @param mixed $data * @param string $queue * @param string $connection - * * @return mixed */ public static function later($delay, $job, $data = '', $queue = null, $connection = null) diff --git a/README.md b/README.md index b291cc0..7a25380 100644 --- a/README.md +++ b/README.md @@ -3,13 +3,13 @@ Laravel5 Queue Branch 1.0 for php 5.6 Branch master for php 7.0 -First of all add laravel5queue component to Yii config (console and main) like this: +First of all add laravel5queue component to Yii config (console and main): ------------------------------------------------------------------------------------ ```php 'components' => ['laravel5queue' => ['class' => 'yiicod\laravel5queue\Laravel5Queue']] ``` -and console command like this: +and console command: ```php 'queueWorker' => ['class' => 'yiicod\laravel5queue\commands\WorkerCommand'], @@ -43,7 +43,7 @@ Note: $data - additional data to your handler Start worker: ------------ -run worker daemon with console command like this: +run worker daemon with console command: ```php $ php yiic queueWorker start ``` diff --git a/Worker.php b/Worker.php index 1fd042e..607b556 100644 --- a/Worker.php +++ b/Worker.php @@ -14,7 +14,6 @@ use Illuminate\Queue\Events\WorkerStopping; use Illuminate\Queue\Failed\FailedJobProviderInterface; use Illuminate\Queue\QueueManager; -use ReflectionClass; use Throwable; use yiicod\laravel5queue\handlers\FatalThrowableError; @@ -67,7 +66,6 @@ class Worker * @param QueueManager $manager * @param FailedJobProviderInterface $failer * @param Dispatcher $events - * @return void */ public function __construct(QueueManager $manager, FailedJobProviderInterface $failer = null, Dispatcher $events = null) { @@ -85,6 +83,7 @@ public function __construct(QueueManager $manager, FailedJobProviderInterface $f * @param int $memory * @param int $sleep * @param int $maxTries + * * @return array */ public function daemon($connectionName, $queue = null, $delay = 0, $memory = 128, $sleep = 3, $maxTries = 0) @@ -114,6 +113,7 @@ public function daemon($connectionName, $queue = null, $delay = 0, $memory = 128 * @param int $delay * @param int $sleep * @param int $maxTries + * * @return void */ protected function runNextJobForDaemon($connectionName, $queue, $delay, $sleep, $maxTries) @@ -153,6 +153,7 @@ protected function daemonShouldRun() * @param int $delay * @param int $sleep * @param int $maxTries + * * @return array */ public function pop($connectionName, $queue = null, $delay = 0, $sleep = 3, $maxTries = 0) @@ -162,8 +163,8 @@ public function pop($connectionName, $queue = null, $delay = 0, $sleep = 3, $max $job = $this->getNextJob($connection, $queue); //If job equal TRUE this this is async job was run - if(true === $job){ - return ; + if (true === $job) { + return; } // If we're able to pull a job off of the stack, we will process it and // then immediately return back out. If there is no job on the queue @@ -184,6 +185,7 @@ public function pop($connectionName, $queue = null, $delay = 0, $sleep = 3, $max * * @param Queue $connection * @param string $queue + * * @return Job|null */ protected function getNextJob($connection, $queue) @@ -206,6 +208,7 @@ protected function getNextJob($connection, $queue) * @param Job $job * @param int $maxTries * @param int $delay + * * @return array|null * * @throws \Throwable @@ -248,6 +251,7 @@ public function process($connection, Job $job, $maxTries = 0, $delay = 0) * * @param string $connection * @param Job $job + * * @return void */ protected function raiseAfterJobEvent($connection, Job $job) @@ -264,6 +268,7 @@ protected function raiseAfterJobEvent($connection, Job $job) * * @param string $connection * @param Job $job + * * @return array */ protected function logFailedJob($connection, Job $job) @@ -286,6 +291,7 @@ protected function logFailedJob($connection, Job $job) * * @param string $connection * @param Job $job + * * @return void */ protected function raiseFailedJobEvent($connection, Job $job) @@ -301,6 +307,7 @@ protected function raiseFailedJobEvent($connection, Job $job) * Determine if the memory limit has been exceeded. * * @param int $memoryLimit + * * @return bool */ public function memoryExceeded($memoryLimit) @@ -324,6 +331,7 @@ public function stop() * Sleep the script for a given number of seconds. * * @param int $seconds + * * @return void */ public function sleep($seconds) @@ -347,6 +355,7 @@ protected function getTimestampOfLastQueueRestart() * Determine if the queue worker should restart. * * @param int|null $lastRestart + * * @return bool */ protected function queueShouldRestart($lastRestart) @@ -358,6 +367,7 @@ protected function queueShouldRestart($lastRestart) * Set the exception handler to use in Daemon mode. * * @param ExceptionHandler $handler + * * @return void */ public function setDaemonExceptionHandler(ExceptionHandler $handler) @@ -369,6 +379,7 @@ public function setDaemonExceptionHandler(ExceptionHandler $handler) * Set the cache repository implementation. * * @param Repository $cache + * * @return void */ public function setCache(CacheContract $cache) @@ -390,6 +401,7 @@ public function getManager() * Set the queue manager instance. * * @param QueueManager $manager + * * @return void */ public function setManager(QueueManager $manager) diff --git a/base/WorkerInterface.php b/base/WorkerInterface.php index 5b303e5..521b5ef 100644 --- a/base/WorkerInterface.php +++ b/base/WorkerInterface.php @@ -1,10 +1,33 @@ laravel5queue->connect(); + // automatically send every new message to available log routes + Yii::getLogger()->autoFlush = 1; + // when sending a message to log routes, also notify them to dump the message + // into the corresponding persistent storage (e.g. DB, email) + Yii::getLogger()->autoDump = true; + $worker = new Worker($queueManager->getQueueManager(), new MongoFailedJobProvider(Yii::app()->mongodb, 'YiiJobsFailed')); $worker->setDaemonExceptionHandler(new DaemonExceptionHandler()); return $worker; @@ -40,7 +44,6 @@ protected function worker() /** * Process the job - * */ protected function processJob($connectionName, $id) { @@ -54,8 +57,6 @@ protected function processJob($connectionName, $id) // then immediately return back out. If there is no job on the queue // we will "sleep" the worker for the specified number of seconds. if (!is_null($job)) { -// $sleep = max($job->getDatabaseJob()->available_at - time(), 0);var_dump($sleep);die; -// sleep($sleep); return $worker->process( $manager->getName($connectionName), $job, 1, 0 ); diff --git a/commands/WorkerCommand.php b/commands/WorkerCommand.php index 3f6dcc2..5fffcf6 100644 --- a/commands/WorkerCommand.php +++ b/commands/WorkerCommand.php @@ -63,20 +63,25 @@ class WorkerCommand extends CConsoleCommand implements WorkerInterface /** * Default action. Starts daemon. + * + * @param string $connection + * @param string $queue */ - public function actionStart($connection, $queue/*, $force = 0*/) + public function actionStart($connection = 'default', $queue = 'default') { $this->queue = $queue; $this->connection = $connection; - $this->createDaemon(/*$force*/); + $this->createDaemon(); } /** * Stops daemon. - * * Close server and close all connections. + * + * @param string $connection + * @param string $queue */ - public function actionStop($connection, $queue) + public function actionStop($connection = 'default', $queue = 'default') { $this->queue = $queue; $this->connection = $connection; @@ -91,9 +96,9 @@ public function actionStop($connection, $queue) * Creates daemon. * Check is daemon already run and if false then starts daemon and update lock file. */ - protected function createDaemon(/*$force = false*/) + protected function createDaemon() { - if (true === $this->isAlreadyRunning(/*(bool)$force*/)) { + if (true === $this->isAlreadyRunning()) { echo sprintf("[%s] is running already.\n", $this->daemonName); Yii::app()->end(); } @@ -108,12 +113,13 @@ protected function createDaemon(/*$force = false*/) $this->addPid($pid); } - $this->worker(); - echo sprintf("[%s] running with PID: %s\n", $this->daemonName, $pid); + $this->worker(); } - + /** + * Stop daemon if exists. + */ protected function stopDaemon() { if (file_exists($this->getPidsFilePath())) { @@ -151,19 +157,13 @@ protected function isAlreadyRunning(/*$force = false*/) } } -// if ($force && count($runingPids) >= $this->threads) { -// $result = true; -// } elseif ($force && count($runingPids) < $this->threads) { -// $result = false; -// } - - return $result; } /** * Add pid + * * @param $pid */ protected function addPid($pid) diff --git a/composer.json b/composer.json index a48f0fc..8d33a65 100644 --- a/composer.json +++ b/composer.json @@ -1,24 +1,29 @@ { - "name": "yiicod/laravel5queue", - "type": "yii-extension", - "license": "New BSD License", - "authors": [ - { - "name": "Alexey Orlov", - "email": "aaorlov88@gmail.com", - "role": "Developer" + "name": "yiicod/laravel5queue", + "type": "yii-extension", + "license": "New BSD License", + "authors": [ + { + "name": "Alexey Orlov", + "email": "aaorlov88@gmail.com", + "role": "Developer" + }, + { + "name": "Virchenko Maksim", + "email": "muslim1992@gmail.com", + "role": "Developer" + } + ], + "require": { + "illuminate/queue": "5.2.*", + "illuminate/encryption": "5.2.*", + "paragonie/random_compat": "^1.2", + "jeremeamia/superclosure": "^2.0", + "symfony/process": "~2.3|~3.0" }, - { - "name": "Virchenko Maksim", - "email": "muslim1992@gmail.com", - "role": "Developer" + "autoload": { + "psr-4": { + "yiicod\\laravel5queue\\": "/" + } } - ], - "require": { - "illuminate/queue": "^5.2", - "illuminate/encryption": "^5.2", - "paragonie/random_compat": "^1.2", - "jeremeamia/superclosure": "^2.0", - "symfony/process": "~2.3|~3.0" - } } \ No newline at end of file diff --git a/connectors/AsyncMongoConnector.php b/connectors/AsyncMongoConnector.php index 7ff387e..d5941f8 100644 --- a/connectors/AsyncMongoConnector.php +++ b/connectors/AsyncMongoConnector.php @@ -25,7 +25,6 @@ class AsyncMongoConnector implements ConnectorInterface * Create a new connector instance. * * @param \Illuminate\Database\ConnectionResolverInterface $connection - * @return void */ public function __construct($connection) { @@ -36,6 +35,7 @@ public function __construct($connection) * Establish a queue connection. * * @param array $config + * * @return Queue */ public function connect(array $config) diff --git a/connectors/MongoConnector.php b/connectors/MongoConnector.php index 16fc2c2..ce3bce2 100644 --- a/connectors/MongoConnector.php +++ b/connectors/MongoConnector.php @@ -8,7 +8,7 @@ /** * Connector for laravel queue to mongodb - * + * * @author Virchenko Maksim */ class MongoConnector implements ConnectorInterface @@ -16,15 +16,14 @@ class MongoConnector implements ConnectorInterface /** * Database connections. - * + * */ protected $connection; /** * Create a new connector instance. * - * @param \Illuminate\Database\ConnectionResolverInterface $connection - * @return void + * @param \Illuminate\Database\ConnectionResolverInterface $connection */ public function __construct($connection) { @@ -34,7 +33,8 @@ public function __construct($connection) /** * Establish a queue connection. * - * @param array $config + * @param array $config + * * @return Queue */ public function connect(array $config) diff --git a/failed/MongoFailedJobProvider.php b/failed/MongoFailedJobProvider.php index b241bbb..0f44715 100644 --- a/failed/MongoFailedJobProvider.php +++ b/failed/MongoFailedJobProvider.php @@ -30,10 +30,8 @@ class MongoFailedJobProvider implements FailedJobProviderInterface /** * Create a new database failed job provider. * - * @param \Illuminate\Database\ConnectionResolverInterface $resolver - * @param string $database - * @param string $table - * @return void + * @param string $database + * @param string $table */ public function __construct($database, $table) { @@ -44,9 +42,10 @@ public function __construct($database, $table) /** * Log a failed job into storage. * - * @param string $connection - * @param string $queue - * @param string $payload + * @param string $connection + * @param string $queue + * @param string $payload + * * @return void */ public function log($connection, $queue, $payload) @@ -75,7 +74,8 @@ public function all() /** * Get a single failed job. * - * @param mixed $id + * @param mixed $id + * * @return array */ public function find($id) @@ -86,7 +86,8 @@ public function find($id) /** * Delete a single failed job from storage. * - * @param mixed $id + * @param mixed $id + * * @return bool */ public function forget($id) diff --git a/handlers/FatalThrowableError.php b/handlers/FatalThrowableError.php index b7070ec..86165be 100644 --- a/handlers/FatalThrowableError.php +++ b/handlers/FatalThrowableError.php @@ -12,6 +12,13 @@ */ class FatalThrowableError extends Exception { + /** + * FatalThrowableError constructor. + * + * @param string $e + * @param int $code + * @param Exception|null $previous + */ public function __construct($e, $code = 0, Exception $previous = null) { $this->message = $e->getMessage(); diff --git a/jobs/MongoJob.php b/jobs/MongoJob.php index f606db3..97e1c74 100644 --- a/jobs/MongoJob.php +++ b/jobs/MongoJob.php @@ -9,7 +9,7 @@ /** * MongoJob for laravel queue - * + * * @author Virchenko Maksim */ class MongoJob extends Job implements JobContract @@ -32,11 +32,10 @@ class MongoJob extends Job implements JobContract /** * Create a new job instance. * - * @param Container $container - * @param MongoQueue $database - * @param \StdClass $job - * @param string $queue - * @return void + * @param Container $container + * @param MongoQueue $database + * @param \StdClass $job + * @param string $queue */ public function __construct(Container $container, MongoQueue $database, $job, $queue) { @@ -66,13 +65,14 @@ public function delete() { parent::delete(); - $this->database->deleteReserved($this->queue, (string) $this->job->_id); + $this->database->deleteReserved($this->queue, (string)$this->job->_id); } /** * Release the job back into the queue. * - * @param int $delay + * @param int $delay + * * @return void */ public function release($delay = 0) @@ -91,7 +91,7 @@ public function release($delay = 0) */ public function attempts() { - return (int) $this->job->attempts; + return (int)$this->job->attempts; } /** @@ -101,7 +101,7 @@ public function attempts() */ public function getJobId() { - return (string) $this->job->_id; + return (string)$this->job->_id; } /** diff --git a/queues/AsyncMongoQueue.php b/queues/AsyncMongoQueue.php index c58aea7..50d8561 100644 --- a/queues/AsyncMongoQueue.php +++ b/queues/AsyncMongoQueue.php @@ -1,11 +1,16 @@ startProcess($id); + //$this->startProcess($id); return $id; } @@ -70,12 +75,13 @@ public function push($job, $data = '', $queue = null) * @param string $payload * @param string $queue * @param array $options + * * @return mixed */ public function pushRaw($payload, $queue = null, array $options = array()) { $id = parent::pushRaw($payload, $queue, $options); - $this->startProcess($id); + //$this->startProcess($id); return $id; } @@ -93,7 +99,7 @@ public function pushRaw($payload, $queue = null, array $options = array()) public function later($delay, $job, $data = '', $queue = null) { $id = parent::later($delay, $job, $data, $queue); - $this->startProcess($id); + //$this->startProcess($id); return $id; } @@ -102,6 +108,7 @@ public function later($delay, $job, $data = '', $queue = null) * Pop the next job off of the queue. * * @param string $queue + * * @return Job|null */ public function pop($queue = null) @@ -122,11 +129,26 @@ public function pop($queue = null) return null; } + /** + * Check if process can run + * + * @return bool + */ protected function canRunProcess() { - return $this->database->{$this->table}->count(['reserved' => 1]) < 10; + return $this->database->{$this->table}->count(['reserved' => 1]) < $this->limit; } + /** + * Push work to database + * + * @param DateTime|int $delay + * @param null|string $queue + * @param string $payload + * @param int $attempts + * + * @return string + */ protected function pushToDatabase($delay, $queue, $payload, $attempts = 0) { // if ($this->canRunProcess()) { @@ -141,7 +163,7 @@ protected function pushToDatabase($delay, $queue, $payload, $attempts = 0) // 'created_at' => $this->getTime(), // ]); // } else { - $result = parent::pushToDatabase($delay, $queue, $payload, $attempts); + $result = parent::pushToDatabase($delay, $queue, $payload, $attempts); // } return (string)$result->getInsertedId(); } @@ -149,8 +171,9 @@ protected function pushToDatabase($delay, $queue, $payload, $attempts = 0) /** * Get the next available job for the queue. * - * @param string|null $queue - * @return \StdClass|null + * @param $id + * + * @return null|\StdClass */ public function getJobFromId($id) { @@ -162,10 +185,7 @@ public function getJobFromId($id) /** * Make a Process for the Artisan command for the job id. * - * @param int $jobId - * @param int $delay - * - * @return void + * @param $id */ public function startProcess($id) { @@ -177,7 +197,7 @@ public function startProcess($id) $process = new Process($command, $cwd); $process->run(); - }else{ + } else { sleep(1); } } @@ -185,8 +205,7 @@ public function startProcess($id) /** * Get the Artisan command as a string for the job id. * - * @param int $jobId - * @param int $delay + * @param $id * * @return string */ @@ -220,11 +239,23 @@ protected function getPhpBinary() return trim($path . ' ' . $args); } + /** + * Get path for yiic + * + * @return mixed + */ protected function getYiicPath() { return \Yii::getPathOfAlias($this->yiicAlias); } + /** + * Get background cmd command + * + * @param $cmd + * + * @return string + */ protected function getBackgroundCommand($cmd) { if (defined('PHP_WINDOWS_VERSION_BUILD')) { diff --git a/queues/MongoQueue.php b/queues/MongoQueue.php index 136938e..bb40911 100644 --- a/queues/MongoQueue.php +++ b/queues/MongoQueue.php @@ -49,11 +49,10 @@ class MongoQueue extends Queue implements QueueContract /** * Create a new database queue instance. * - * @param MongoDB connection $mongo + * @param $mongo * @param string $table * @param string $default * @param int $expire - * @return void */ public function __construct($mongo, $table, $default = 'default', $expire = 60) { @@ -226,16 +225,19 @@ protected function getNextAvailableJob($queue) ->findOneAndUpdate([ 'queue' => $this->getQueue($queue), 'reserved' => 0, - '$or' => [ - ['reserved_at' => null], - ['reserved_at' => ['$lte' => $this->getTime()]], - ], + 'reserved_at' => null, +// [ + 'available_at' => ['$lte' => $this->getTime()] +// ] +// '$or' => [ +// ['reserved_at' => null], +// ['reserved_at' => ['$lte' => $this->getTime()]], +// ], ], [ '$set' => [ 'reserved' => 0, 'reserved_at' => null, ], - '$inc' => ['attempts' => 1], ], [ 'sort' => ['id' => 1], ]); @@ -255,7 +257,8 @@ protected function markJobAsReserved($id) '$set' => [ 'reserved' => 1, 'reserved_at' => $this->getTime(), - ] + ], + //'$inc' => ['attempts' => 1], ]); }