Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ define require_phar
@[ -f ./$(1) ] || wget -q $(2) -O ./$(1) && chmod +x $(1);
endef

lint: ## Lint source code
lint: lint-php lint-psr2 lint-squiz

.PHONY: lint-php
Expand All @@ -29,6 +30,7 @@ lint-squiz:
./phpcs.phar --standard=Squiz,./ruleset.xml --colors -w -s --warning-severity=0 $(SOURCE_CODE_PATHS)


test: ## Execute all tests suites TDD and BDD
test: test-tdd test-bdd

.PHONY: test-tdd
Expand All @@ -39,26 +41,29 @@ test-tdd:
test-bdd:
./vendor/bin/phpspec run --format=pretty -v

cover:
cover: ## Generate coverage report
./vendor/bin/phpunit --coverage-html $(COVERAGE_PATH) test

deps:
deps: ## Install dependencies
$(call require_phar,composer.phar,$(COMPOSER_PHAR))
./composer.phar install --no-dev

dev-deps:
dev-deps: ## Install development dependencies
$(call require_phar,composer.phar,$(COMPOSER_PHAR))
./composer.phar install

dist-clean:
dist-clean: ## Clean developer files, dependenciers and temporary files
rm -rf $(CLEAN_PATHS)

docker-nats:
docker-nats: ## Start NATS container ( for testing purposes )
docker run --rm -p 8222:8222 -p 4222:4222 -d --name nats-main nats

phpdoc:
phpdoc: ## Generate phpdoc API documentation
$(call require_phar,phpdoc.phar,$(PHPDOCUMENTOR_PHAR_URL))
./phpdoc.phar -d ./src/ -t $(API_DOCS_PATH) --template=checkstyle --template=responsive-twig

serve-phpdoc:
serve-phpdoc: ## Serve phpdoc API documentation at http;://localhost:8000
cd $(API_DOCS_PATH) && php -S localhost:8000 && cd ../..

include Makefile.help.mk

5 changes: 5 additions & 0 deletions Makefile.help.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
.PHONY: help
help: ## Show this help ( default )
@fgrep -h "##" $(MAKEFILE_LIST) | fgrep -v fgrep | sed -e 's/\\$$//' | sed -e 's/##//'

.DEFAULT_GOAL := help
44 changes: 30 additions & 14 deletions src/Nats/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,10 @@ public function isConnected()
* @param string $address Server url string.
* @param float $timeout Number of seconds until the connect() system call should timeout.
*
* @return resource
* @throws \Exception Exception raised if connection fails.
* @return resource
*/
private function getStream($address, $timeout)
private function getStream($address, $timeout, $context)
{
$errno = null;
$errstr = null;
Expand All @@ -242,7 +242,8 @@ function () {
return true;
}
);
$fp = stream_socket_client($address, $errno, $errstr, $timeout, STREAM_CLIENT_CONNECT);

$fp = stream_socket_client($address, $errno, $errstr, $timeout, STREAM_CLIENT_CONNECT, $context);
restore_error_handler();

if ($fp === false) {
Expand Down Expand Up @@ -315,8 +316,8 @@ public function __construct(ConnectionOptions $options = null)
*
* @param string $payload Message data.
*
* @return void
* @throws \Exception Raises if fails sending data.
* @return void
*/
private function send($payload)
{
Expand Down Expand Up @@ -394,8 +395,8 @@ private function handlePING()
*
* @param string $line Message command from Nats.
*
* @return void
* @throws Exception If subscription not found.
* @return void
* @codeCoverageIgnore
*/
private function handleMSG($line)
Expand All @@ -408,7 +409,7 @@ private function handleMSG($line)
if (count($parts) === 5) {
$length = trim($parts[4]);
$subject = $parts[3];
} else if (count($parts) === 4) {
} elseif (count($parts) === 4) {
$length = trim($parts[3]);
$subject = $parts[1];
}
Expand Down Expand Up @@ -443,19 +444,34 @@ public function connect($timeout = null)
}

$this->timeout = $timeout;
$this->streamSocket = $this->getStream($this->options->getAddress(), $timeout);
$this->streamSocket = $this->getStream(
$this->options->getAddress(), $timeout, $this->options->getStreamContext());
$this->setStreamTimeout($timeout);

$msg = 'CONNECT '.$this->options;
$this->send($msg);
$connectResponse = $this->receive();
$infoResponse = $this->receive();

if ($this->isErrorResponse($connectResponse) === true) {
throw Exception::forFailedConnection($connectResponse);
if ($this->isErrorResponse($infoResponse) === true) {
throw Exception::forFailedConnection($infoResponse);
} else {
$this->processServerInfo($connectResponse);
$this->processServerInfo($infoResponse);
if ($this->serverInfo->isTLSRequired()) {
set_error_handler(
function ($errno, $errstr, $errfile, $errline) {
restore_error_handler();
throw Exception::forFailedConnection($errstr);
});

if (!stream_socket_enable_crypto(
$this->streamSocket, true, STREAM_CRYPTO_METHOD_TLSv1_2_CLIENT)) {
throw Exception::forFailedConnection('Error negotiating crypto');
}

restore_error_handler();
}
}

$msg = 'CONNECT '.$this->options;
$this->send($msg);
$this->ping();
$pingResponse = $this->receive();

Expand Down Expand Up @@ -560,9 +576,9 @@ public function unsubscribe($sid, $quantity = null)
* @param string $payload Message data.
* @param string $inbox Message inbox.
*
* @throws Exception If subscription not found.
* @return void
*
* @throws Exception If subscription not found.
*/
public function publish($subject, $payload = null, $inbox = null)
{
Expand Down
35 changes: 35 additions & 0 deletions src/Nats/ConnectionOptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ class ConnectionOptions
*/
private $reconnect = true;

/**
* Stream context to use.
*
* @var resource
*/
private $streamContext = null;

/**
* Allows to define parameters which can be set by passing them to the class constructor.
*
Expand All @@ -97,6 +104,7 @@ class ConnectionOptions
'verbose',
'pedantic',
'reconnect',
'streamContext',
];


Expand All @@ -120,6 +128,9 @@ class ConnectionOptions
*/
public function __construct($options = null)
{
//Default stream context
$this->streamContext = stream_context_get_default();

if (empty($options) === false) {
$this->initialize($options);
}
Expand Down Expand Up @@ -420,6 +431,30 @@ public function setReconnect($reconnect)
return $this;
}

/**
* Get stream context.
*
* @return resource
*/
public function getStreamContext()
{
return $this->streamContext;
}

/**
* Set stream context.
*
* @param resource $streamContext Stream context.
*
* @return $this
*/
public function setStreamContext($streamContext)
{
$this->streamContext = $streamContext;

return $this;
}

/**
* Set the connection options.
*
Expand Down
6 changes: 3 additions & 3 deletions src/Nats/ServerInfo.php
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ public function __construct($connectionResponse)
$this->setPort($data['port']);
$this->setVersion($data['version']);
$this->setGoVersion($data['go']);
$this->setAuthRequired($data['auth_required']);
$this->setTLSRequired($data['tls_required']);
$this->setTLSVerify($data['tls_verify']);
$this->setAuthRequired(isset($data['auth_required']) ? $data['auth_required'] : false);
$this->setTLSRequired(isset($data['tls_required']) ? $data['tls_required'] : false);
$this->setTLSVerify(isset($data['tls_verify']) ? $data['tls_verify'] : false);
$this->setMaxPayload($data['max_payload']);

if (version_compare($data['version'], '1.1.0') === -1) {
Expand Down
Loading