Skip to content
This repository was archived by the owner on Nov 30, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Created by .ignore support plugin (hsz.mobi)
.idea
composer.phar
vendor
composer.lock
139 changes: 101 additions & 38 deletions amqp.php
Original file line number Diff line number Diff line change
@@ -1,21 +1,66 @@
<?php

/* Include namespaced code only if PhpAmqpLib available */
if(class_exists('PhpAmqpLib\Connection\AMQPConnection'))
class AMQPCapabilities
{
require_once('amqplibconnector.php');
require_once('amqplibconnectorssl.php');
protected $loader;

}
public function __construct()
{
// Try to load Composer's loader

/* Include only if predis available */
if(class_exists('Predis\Autoloader'))
{
require_once('redisconnector.php');
}
// Load as module within external application
$this->loader = @include(dirname(__FILE__) . "/../../autoload.php");
if($this->loader == null)
{
// Load with local composer for testing
$this->loader = @include('vendor/autoload.php');
}

/* Including the PECL connector never fails */
require_once('amqppeclconnector.php');
// If there is now loader, fail.
if($this->loader == null)
{
throw new CeleryException("Composer not installed");
}
}

public function TestLib($library)
{
$hasLib = $this->loader->findFile($library);
return ($hasLib !== false);
}

public function TestAndLoadPhpAmqpLib()
{
$hasLib = $this->TestLib('PhpAmqpLib\Connection\AMQPStreamConnection');
if($hasLib)
{
require_once('amqplibconnector.php');
require_once('amqplibconnectorssl.php');
}
return $hasLib;
}

public function TestAndLoadPredis()
{
$hasLib = $this->TestLib('Predis\Autoloader');
if($hasLib)
{
/* Include only if predis available */
require_once('redisconnector.php');
}
return $hasLib;
}

public function TestAndLoadPECL()
{
$hasLib = class_exists('AMQPConnection') && extension_loaded('amqp');
if($hasLib)
{
require_once('amqppeclconnector.php');
}
return $hasLib;
}
}

/**
* Abstraction for AMQP client libraries
Expand Down Expand Up @@ -46,26 +91,42 @@ static function GetConcrete($name = false)
*/
static function GetConcreteByName($name)
{
if($name == 'pecl')
{
return new PECLAMQPConnector();
}
elseif($name == 'php-amqplib')
{
return new AMQPLibConnector();
}
elseif($name == 'php-amqplib-ssl')
{
return new AMQPLibConnectorSsl();
}
elseif($name == 'redis')
{
return new RedisConnector();
}
else
{
throw new Exception('Unknown extension name ' . $name);
$caps = new AMQPCapabilities();

switch($name) {
case 'pecl':
if($caps->TestAndLoadPECL() === true)
{
return new PECLAMQPConnector();
}
break;

case 'php-amqplib':
case 'php-amqplib-ssl':
if($caps->TestAndLoadPhpAmqpLib() === true)
{
if($name === 'php-amqplib-ssl')
{
return new AMQPLibConnectorSsl();
}
else
{
return new AMQPLibConnector();
}
}
break;

case 'redis':
if($caps->TestAndLoadPredis() === true)
{
return new RedisConnector();
}
break;

default:
throw new CeleryException('Unknown extension name ' . $name);
}
throw new CeleryException('AMQP extension ' . $name . ' is not installed properly using Composer');
}

/**
Expand All @@ -74,22 +135,24 @@ static function GetConcreteByName($name)
*/
static function GetBestInstalledExtensionName($ssl = false)
{
if($ssl === true) //pecl doesn't support ssl
$caps = new AMQPCapabilities();
$hasPhpAmqpLib = $caps->TestAndLoadPhpAmqpLib();
$hasPECL = $caps->TestAndLoadPECL();

if($ssl === true && $hasPhpAmqpLib === true) //pecl doesn't support ssl
{
return 'php-amqplib-ssl';
}
elseif(class_exists('AMQPConnection') && extension_loaded('amqp'))
elseif($hasPECL === true)
{
return 'pecl';
}
elseif(class_exists('PhpAmqpLib\Connection\AMQPConnection'))
elseif($hasPhpAmqpLib === true)
{
return 'php-amqplib';
}
else
{
return 'unknown';
}

throw new CeleryException('You must install at least one AMQP extension using Composer');
}

/**
Expand Down
4 changes: 2 additions & 2 deletions amqplibconnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

require_once('amqp.php');

use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

/**
Expand Down Expand Up @@ -70,7 +70,7 @@ class AMQPLibConnector extends AbstractAMQPConnector

function GetConnectionObject($details)
{
return new AMQPConnection(
return new AMQPStreamConnection(
$details['host'],
$details['port'],
$details['login'],
Expand Down
46 changes: 24 additions & 22 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,30 @@
"description": "PHP client for Celery task queue",
"keywords": ["celery", "amqp", "task", "cron", "queue", "redis", "python"],
"homepage": "https://github.com/gjedeer/celery-php/",
"support": {
"issues": "https://github.com/gjedeer/celery-php/issues",
"docs": "https://github.com/gjedeer/celery-php/"
"license": "BSD-2-Clause",
"authors": [
{
"name": "GDR!",
"email": "info@massivescale.net",
"homepage": "http://massivescale.net/",
"role": "Developer"
}
],
"require": {
"videlalvaro/php-amqplib": ">=2.4.0",
"predis/predis": ">=0.8.5"
},
"autoload": {
"classmap": ["celery.php"]
},
"extra": {
"branch-alias": {
"dev-master": "2.1-dev"
"support": {
"issues": "https://github.com/gjedeer/celery-php/issues",
"docs": "https://github.com/gjedeer/celery-php/",
"license": "BSD-2-Clause",
"authors": [
{
"name": "GDR!",
"email": "info@massivescale.net",
"homepage": "http://massivescale.net/",
"role": "Developer"
}
],
"suggest": {
"videlalvaro/php-amqplib": ">=2.4.0",
"predis/predis": ">=0.8.5",
"ext-amqp": ">=1.0.0"
},
"autoload": {
"classmap": ["celery.php"]
},
"extra": {
"branch-alias": {
"dev-master": "2.1-dev"
}
}
}
}