Skip to content
This repository was archived by the owner on Oct 18, 2018. It is now read-only.

Small updates #10

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ Laravel5 Queue
Branch 1.0 for php 5.6
Branch master for php 7.0

First of all add laravel5queue component to Yii config (console and main) like this:
First of all add laravel5queue component to Yii config (console and main):
------------------------------------------------------------------------------------
```php
'components' => ['laravel5queue' => ['class' => 'yiicod\laravel5queue\Laravel5Queue']]
```

and console command like this:
and console command:

```php
'queueWorker' => ['class' => 'yiicod\laravel5queue\commands\WorkerCommand'],
Expand Down Expand Up @@ -43,7 +43,7 @@ Note: $data - additional data to your handler
Start worker:
------------

run worker daemon with console command like this:
run worker daemon with console command:
```php
$ php yiic queueWorker start
```
Expand Down
20 changes: 16 additions & 4 deletions Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
use Illuminate\Queue\Events\WorkerStopping;
use Illuminate\Queue\Failed\FailedJobProviderInterface;
use Illuminate\Queue\QueueManager;
use ReflectionClass;
use Throwable;
use yiicod\laravel5queue\handlers\FatalThrowableError;

Expand Down Expand Up @@ -67,7 +66,6 @@ class Worker
* @param QueueManager $manager
* @param FailedJobProviderInterface $failer
* @param Dispatcher $events
* @return void
*/
public function __construct(QueueManager $manager, FailedJobProviderInterface $failer = null, Dispatcher $events = null)
{
Expand All @@ -85,6 +83,7 @@ public function __construct(QueueManager $manager, FailedJobProviderInterface $f
* @param int $memory
* @param int $sleep
* @param int $maxTries
*
* @return array
*/
public function daemon($connectionName, $queue = null, $delay = 0, $memory = 128, $sleep = 3, $maxTries = 0)
Expand Down Expand Up @@ -114,6 +113,7 @@ public function daemon($connectionName, $queue = null, $delay = 0, $memory = 128
* @param int $delay
* @param int $sleep
* @param int $maxTries
*
* @return void
*/
protected function runNextJobForDaemon($connectionName, $queue, $delay, $sleep, $maxTries)
Expand Down Expand Up @@ -153,6 +153,7 @@ protected function daemonShouldRun()
* @param int $delay
* @param int $sleep
* @param int $maxTries
*
* @return array
*/
public function pop($connectionName, $queue = null, $delay = 0, $sleep = 3, $maxTries = 0)
Expand All @@ -162,8 +163,8 @@ public function pop($connectionName, $queue = null, $delay = 0, $sleep = 3, $max
$job = $this->getNextJob($connection, $queue);

//If job equal TRUE this this is async job was run
if(true === $job){
return ;
if (true === $job) {
return;
}
// If we're able to pull a job off of the stack, we will process it and
// then immediately return back out. If there is no job on the queue
Expand All @@ -184,6 +185,7 @@ public function pop($connectionName, $queue = null, $delay = 0, $sleep = 3, $max
*
* @param Queue $connection
* @param string $queue
*
* @return Job|null
*/
protected function getNextJob($connection, $queue)
Expand All @@ -206,6 +208,7 @@ protected function getNextJob($connection, $queue)
* @param Job $job
* @param int $maxTries
* @param int $delay
*
* @return array|null
*
* @throws \Throwable
Expand Down Expand Up @@ -248,6 +251,7 @@ public function process($connection, Job $job, $maxTries = 0, $delay = 0)
*
* @param string $connection
* @param Job $job
*
* @return void
*/
protected function raiseAfterJobEvent($connection, Job $job)
Expand All @@ -264,6 +268,7 @@ protected function raiseAfterJobEvent($connection, Job $job)
*
* @param string $connection
* @param Job $job
*
* @return array
*/
protected function logFailedJob($connection, Job $job)
Expand All @@ -286,6 +291,7 @@ protected function logFailedJob($connection, Job $job)
*
* @param string $connection
* @param Job $job
*
* @return void
*/
protected function raiseFailedJobEvent($connection, Job $job)
Expand All @@ -301,6 +307,7 @@ protected function raiseFailedJobEvent($connection, Job $job)
* Determine if the memory limit has been exceeded.
*
* @param int $memoryLimit
*
* @return bool
*/
public function memoryExceeded($memoryLimit)
Expand All @@ -324,6 +331,7 @@ public function stop()
* Sleep the script for a given number of seconds.
*
* @param int $seconds
*
* @return void
*/
public function sleep($seconds)
Expand All @@ -347,6 +355,7 @@ protected function getTimestampOfLastQueueRestart()
* Determine if the queue worker should restart.
*
* @param int|null $lastRestart
*
* @return bool
*/
protected function queueShouldRestart($lastRestart)
Expand All @@ -358,6 +367,7 @@ protected function queueShouldRestart($lastRestart)
* Set the exception handler to use in Daemon mode.
*
* @param ExceptionHandler $handler
*
* @return void
*/
public function setDaemonExceptionHandler(ExceptionHandler $handler)
Expand All @@ -369,6 +379,7 @@ public function setDaemonExceptionHandler(ExceptionHandler $handler)
* Set the cache repository implementation.
*
* @param Repository $cache
*
* @return void
*/
public function setCache(CacheContract $cache)
Expand All @@ -390,6 +401,7 @@ public function getManager()
* Set the queue manager instance.
*
* @param QueueManager $manager
*
* @return void
*/
public function setManager(QueueManager $manager)
Expand Down
27 changes: 25 additions & 2 deletions base/WorkerInterface.php
Original file line number Diff line number Diff line change
@@ -1,10 +1,33 @@
<?php

namespace yiicod\laravel5queue\base;

/**
* Interface WorkerInterface
* Main interface for workers
*
* @package yiicod\laravel5queue\base
*/
interface WorkerInterface
{
public function actionStart($connection, $queue/*, $force*/);
/**
* Start worker
*
* @param $connection
* @param $queue
*
* @return mixed
*/
public function actionStart($connection, $queue);

/**
* Stop worker
*
* @param $connection
* @param $queue
*
* @return mixed
*/
public function actionStop($connection, $queue);

}
11 changes: 6 additions & 5 deletions commands/AsyncQueueCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
namespace yiicod\laravel5queue\commands;

use CConsoleCommand;
use Symfony\Component\Process\Process;
use Yii;
use yiicod\laravel5queue\failed\MongoFailedJobProvider;
use yiicod\laravel5queue\handlers\DaemonExceptionHandler;
use yiicod\laravel5queue\jobs\MongoJob;
use yiicod\laravel5queue\Worker;

/**
Expand All @@ -33,14 +31,19 @@ protected function worker()
{
$queueManager = Yii::app()->laravel5queue->connect();

// automatically send every new message to available log routes
Yii::getLogger()->autoFlush = 1;
// when sending a message to log routes, also notify them to dump the message
// into the corresponding persistent storage (e.g. DB, email)
Yii::getLogger()->autoDump = true;

$worker = new Worker($queueManager->getQueueManager(), new MongoFailedJobProvider(Yii::app()->mongodb, 'YiiJobsFailed'));
$worker->setDaemonExceptionHandler(new DaemonExceptionHandler());
return $worker;
}

/**
* Process the job
*
*/
protected function processJob($connectionName, $id)
{
Expand All @@ -54,8 +57,6 @@ protected function processJob($connectionName, $id)
// then immediately return back out. If there is no job on the queue
// we will "sleep" the worker for the specified number of seconds.
if (!is_null($job)) {
// $sleep = max($job->getDatabaseJob()->available_at - time(), 0);var_dump($sleep);die;
// sleep($sleep);
return $worker->process(
$manager->getName($connectionName), $job, 1, 0
);
Expand Down
32 changes: 16 additions & 16 deletions commands/WorkerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,25 @@ class WorkerCommand extends CConsoleCommand implements WorkerInterface

/**
* Default action. Starts daemon.
*
* @param string $connection
* @param string $queue
*/
public function actionStart($connection, $queue/*, $force = 0*/)
public function actionStart($connection = 'default', $queue = 'default')
{
$this->queue = $queue;
$this->connection = $connection;
$this->createDaemon(/*$force*/);
$this->createDaemon();
}

/**
* Stops daemon.
*
* Close server and close all connections.
*
* @param string $connection
* @param string $queue
*/
public function actionStop($connection, $queue)
public function actionStop($connection = 'default', $queue = 'default')
{
$this->queue = $queue;
$this->connection = $connection;
Expand All @@ -91,9 +96,9 @@ public function actionStop($connection, $queue)
* Creates daemon.
* Check is daemon already run and if false then starts daemon and update lock file.
*/
protected function createDaemon(/*$force = false*/)
protected function createDaemon()
{
if (true === $this->isAlreadyRunning(/*(bool)$force*/)) {
if (true === $this->isAlreadyRunning()) {
echo sprintf("[%s] is running already.\n", $this->daemonName);
Yii::app()->end();
}
Expand All @@ -108,12 +113,13 @@ protected function createDaemon(/*$force = false*/)
$this->addPid($pid);
}

$this->worker();

echo sprintf("[%s] running with PID: %s\n", $this->daemonName, $pid);
$this->worker();
}


/**
* Stop daemon if exists.
*/
protected function stopDaemon()
{
if (file_exists($this->getPidsFilePath())) {
Expand Down Expand Up @@ -151,19 +157,13 @@ protected function isAlreadyRunning(/*$force = false*/)
}
}

// if ($force && count($runingPids) >= $this->threads) {
// $result = true;
// } elseif ($force && count($runingPids) < $this->threads) {
// $result = false;
// }


return $result;
}


/**
* Add pid
*
* @param $pid
*/
protected function addPid($pid)
Expand Down
45 changes: 25 additions & 20 deletions composer.json
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
{
"name": "yiicod/laravel5queue",
"type": "yii-extension",
"license": "New BSD License",
"authors": [
{
"name": "Alexey Orlov",
"email": "aaorlov88@gmail.com",
"role": "Developer"
"name": "yiicod/laravel5queue",
"type": "yii-extension",
"license": "New BSD License",
"authors": [
{
"name": "Alexey Orlov",
"email": "aaorlov88@gmail.com",
"role": "Developer"
},
{
"name": "Virchenko Maksim",
"email": "muslim1992@gmail.com",
"role": "Developer"
}
],
"require": {
"illuminate/queue": "5.2.*",
"illuminate/encryption": "5.2.*",
"paragonie/random_compat": "^1.2",
"jeremeamia/superclosure": "^2.0",
"symfony/process": "~2.3|~3.0"
},
{
"name": "Virchenko Maksim",
"email": "muslim1992@gmail.com",
"role": "Developer"
"autoload": {
"psr-4": {
"yiicod\\laravel5queue\\": "/"
}
}
],
"require": {
"illuminate/queue": "^5.2",
"illuminate/encryption": "^5.2",
"paragonie/random_compat": "^1.2",
"jeremeamia/superclosure": "^2.0",
"symfony/process": "~2.3|~3.0"
}
}
2 changes: 1 addition & 1 deletion connectors/AsyncMongoConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ class AsyncMongoConnector implements ConnectorInterface
* Create a new connector instance.
*
* @param \Illuminate\Database\ConnectionResolverInterface $connection
* @return void
*/
public function __construct($connection)
{
Expand All @@ -36,6 +35,7 @@ public function __construct($connection)
* Establish a queue connection.
*
* @param array $config
*
* @return Queue
*/
public function connect(array $config)
Expand Down
Loading