Skip to content

Commit

Permalink
RPC Queues: Allow overriding of the reply queue (allowing reuse of an…
Browse files Browse the repository at this point in the history
… existing reply queue from another RPC queue);

Bumped PHP version required to 7.4
  • Loading branch information
AllenJB authored Sep 27, 2022
2 parents 88dbcc4 + 32cdfa0 commit 5923009
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/php.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
strategy:
fail-fast: false
matrix:
php-versions: ['7.3', '7.4', '8.0', '8.1']
php-versions: ['7.4', '8.0', '8.1']
steps:
- uses: actions/checkout@v2
- name: Setup PHP
Expand Down
7 changes: 5 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
}
},
"config": {
"sort-packages": true
"sort-packages": true,
"allow-plugins": {
"dealerdirect/phpcodesniffer-composer-installer": true
}
},
"require": {
"php": "^7.3|^8.0",
"php": "^7.4|^8.0",
"ext-pdo": "*",
"bunny/bunny": "^0.4.1|^0.5",
"guzzlehttp/guzzle": "^6.3|^7.0",
Expand Down
2 changes: 1 addition & 1 deletion php-compatibility.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<ini name="memory_limit" value="2G" />
<config name="ignore_errors_on_exit" value="1" />
<config name="ignore_warnings_on_exit" value="1" />
<config name="testVersion" value="7.3-"/>
<config name="testVersion" value="7.4-"/>

<!-- commandline parameter equivalents -->
<!-- use the "colors" commandline parameter for colored output - not here as colors don't work well with Jenkins log -->
Expand Down
48 changes: 29 additions & 19 deletions src/Pdo/RPCQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,58 +4,68 @@
namespace AllenJB\Queues\Pdo;

use AllenJB\Queues\QueueMessage;
use AllenJB\Queues\ReplyQueueInterface;
use AllenJB\Queues\RPCQueueInterface;
use React\Promise\PromiseInterface;

class RPCQueue implements RPCQueueInterface
{

/**
* @var string|null
*/
protected $correlationId;
protected ReplyQueueInterface $replyQueue;

/**
* @var ReplyQueue
*/
protected $replyQueue;
protected Queue $publishQueue;

/**
* @var Queue
*/
protected $publishQueue;
protected \PDO $pdo;

protected \DateTimeZone $dbTz;


public function __construct(string $name, \PDO $pdo, \DateTimeZone $dbTz)
{
$this->replyQueue = new ReplyQueue(null, $pdo, $dbTz);
$this->correlationId = $this->replyQueue->getCorrelationId();
$this->pdo = $pdo;
$this->dbTz = $dbTz;
$this->publishQueue = new Queue($name, $pdo, $dbTz);
}


public function publish(QueueMessage $message): PromiseInterface
{
$message = $message->withCorrelationId($this->correlationId);
$message = $message->withReplyTo($this->replyQueue->getName());
$message = $message->withCorrelationId($this->getReplyQueue()->getCorrelationId());
$message = $message->withReplyTo($this->getReplyQueue()->getName());
return $this->publishQueue->publish($message);
}


public function consume(callable $callback, float $timeoutSecs, ?float $pollIntervalSecs = 0.1): void
{
$this->replyQueue->consume($callback, $timeoutSecs);
$this->getReplyQueue()->consume($callback, $timeoutSecs);
}

public function setExpectedResponseCount(?int $count): void
{
$this->replyQueue->setExpectedResponseCount($count);
$this->getReplyQueue()->setExpectedResponseCount($count);
}


public function incrementExpectedResponseCount(int $by = 1): void
{
$this->replyQueue->incrementExpectedResponseCount($by);
$this->getReplyQueue()->incrementExpectedResponseCount($by);
}

public function setReplyQueue(ReplyQueueInterface $replyQueue): void
{
if (isset($this->replyQueue)) {
trigger_error("Reply queue already initialized! Overriding it now may cause unexpected behavior", E_USER_WARNING);
}
$this->replyQueue = $replyQueue;
}

public function getReplyQueue(): ReplyQueueInterface
{
if (! isset($this->replyQueue)) {
$this->replyQueue = new ReplyQueue(null, $this->pdo, $this->dbTz);
}
return $this->replyQueue;
}

}
4 changes: 4 additions & 0 deletions src/RPCQueueInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ public function setExpectedResponseCount(?int $count): void;

public function incrementExpectedResponseCount(int $by = 1): void;

public function setReplyQueue(ReplyQueueInterface $replyQueue): void;

public function getReplyQueue(): ReplyQueueInterface;

}
46 changes: 27 additions & 19 deletions src/Rabbit/RPCQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,60 +4,68 @@
namespace AllenJB\Queues\Rabbit;

use AllenJB\Queues\QueueMessage;
use AllenJB\Queues\ReplyQueueInterface;
use AllenJB\Queues\RPCQueueInterface;
use Bunny\Channel;
use React\Promise\PromiseInterface;

class RPCQueue implements RPCQueueInterface
{

/**
* @var ReplyQueue
*/
protected $replyQueue;
protected ReplyQueueInterface $replyQueue;

/**
* @var Queue
*/
protected $publishQueue;
protected Queue $publishQueue;

/**
* @var string|null
*/
protected $correlationId;
protected Channel $bunnyChannel;


public function __construct(string $name, Channel $bunnyChannel, int $apiPort)
{
$this->replyQueue = new ReplyQueue(null, $bunnyChannel);
$this->correlationId = $this->replyQueue->getCorrelationId();
$this->bunnyChannel = $bunnyChannel;
$this->publishQueue = new Queue($name, $bunnyChannel, $apiPort);
}


public function publish(QueueMessage $message): PromiseInterface
{
$message = $message->withCorrelationId($this->correlationId);
$message = $message->withReplyTo($this->replyQueue->getName());
$replyQueue = $this->getReplyQueue();
$message = $message->withCorrelationId($replyQueue->getCorrelationId());
$message = $message->withReplyTo($replyQueue->getName());
return $this->publishQueue->publish($message);
}


public function consume(callable $callback, float $timeoutSecs, ?float $pollIntervalSecs = null): void
{
$this->replyQueue->consume($callback, $timeoutSecs);
$this->getReplyQueue()->consume($callback, $timeoutSecs);
}


public function setExpectedResponseCount(?int $count): void
{
$this->replyQueue->setExpectedResponseCount($count);
$this->getReplyQueue()->setExpectedResponseCount($count);
}


public function incrementExpectedResponseCount(int $by = 1): void
{
$this->replyQueue->incrementExpectedResponseCount($by);
$this->getReplyQueue()->incrementExpectedResponseCount($by);
}

public function setReplyQueue(ReplyQueueInterface $replyQueue): void
{
if (isset($this->replyQueue)) {
trigger_error("Reply queue already initialized! Overriding it now may cause unexpected behavior", E_USER_WARNING);
}
$this->replyQueue = $replyQueue;
}

public function getReplyQueue(): ReplyQueueInterface
{
if (! isset($this->replyQueue)) {
$this->replyQueue = new ReplyQueue(null, $this->bunnyChannel);
}
return $this->replyQueue;
}


Expand Down

0 comments on commit 5923009

Please sign in to comment.