diff options
Diffstat (limited to 'intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Pheanstalk.php')
| -rw-r--r-- | intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Pheanstalk.php | 436 |
1 files changed, 436 insertions, 0 deletions
diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Pheanstalk.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Pheanstalk.php new file mode 100644 index 0000000..ee83553 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Pheanstalk.php @@ -0,0 +1,436 @@ +<?php + +namespace Pheanstalk; + +/** + * Pheanstalk is a PHP client for the beanstalkd workqueue. + * The Pheanstalk class is a simple facade for the various underlying components. + * + * @see http://github.com/kr/beanstalkd + * @see http://xph.us/software/beanstalkd/ + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Pheanstalk implements PheanstalkInterface +{ + const VERSION = '3.0.2'; + + private $_connection; + private $_using = PheanstalkInterface::DEFAULT_TUBE; + private $_watching = array(PheanstalkInterface::DEFAULT_TUBE => true); + + /** + * @param string $host + * @param int $port + * @param int $connectTimeout + * @param bool $connectPersistent + */ + public function __construct($host, $port = PheanstalkInterface::DEFAULT_PORT, $connectTimeout = null, $connectPersistent = false) + { + $this->setConnection(new Connection($host, $port, $connectTimeout, $connectPersistent)); + } + + /** + * {@inheritdoc} + */ + public function setConnection(Connection $connection) + { + $this->_connection = $connection; + + return $this; + } + + /** + * {@inheritdoc} + */ + public function getConnection() + { + return $this->_connection; + } + + // ---------------------------------------- + + /** + * {@inheritdoc} + */ + public function bury($job, $priority = PheanstalkInterface::DEFAULT_PRIORITY) + { + $this->_dispatch(new Command\BuryCommand($job, $priority)); + } + + /** + * {@inheritdoc} + */ + public function delete($job) + { + $this->_dispatch(new Command\DeleteCommand($job)); + + return $this; + } + + /** + * {@inheritdoc} + */ + public function ignore($tube) + { + if (isset($this->_watching[$tube])) { + $this->_dispatch(new Command\IgnoreCommand($tube)); + unset($this->_watching[$tube]); + } + + return $this; + } + + /** + * {@inheritdoc} + */ + public function kick($max) + { + $response = $this->_dispatch(new Command\KickCommand($max)); + + return $response['kicked']; + } + + /** + * {@inheritdoc} + */ + public function kickJob($job) + { + $this->_dispatch(new Command\KickJobCommand($job)); + + return $this; + } + + /** + * {@inheritdoc} + */ + public function listTubes() + { + return (array) $this->_dispatch( + new Command\ListTubesCommand() + ); + } + + /** + * {@inheritdoc} + */ + public function listTubesWatched($askServer = false) + { + if ($askServer) { + $response = (array) $this->_dispatch( + new Command\ListTubesWatchedCommand() + ); + $this->_watching = array_fill_keys($response, true); + } + + return array_keys($this->_watching); + } + + /** + * {@inheritdoc} + */ + public function listTubeUsed($askServer = false) + { + if ($askServer) { + $response = $this->_dispatch( + new Command\ListTubeUsedCommand() + ); + $this->_using = $response['tube']; + } + + return $this->_using; + } + + /** + * {@inheritdoc} + */ + public function pauseTube($tube, $delay) + { + $this->_dispatch(new Command\PauseTubeCommand($tube, $delay)); + + return $this; + } + + /** + * {@inheritdoc} + */ + public function resumeTube($tube) + { + // Pause a tube with zero delay will resume the tube + $this->pauseTube($tube, 0); + + return $this; + } + + /** + * {@inheritdoc} + */ + public function peek($jobId) + { + $response = $this->_dispatch( + new Command\PeekCommand($jobId) + ); + + return new Job($response['id'], $response['jobdata']); + } + + /** + * {@inheritdoc} + */ + public function peekReady($tube = null) + { + if ($tube !== null) { + $this->useTube($tube); + } + + $response = $this->_dispatch( + new Command\PeekCommand(Command\PeekCommand::TYPE_READY) + ); + + return new Job($response['id'], $response['jobdata']); + } + + /** + * {@inheritdoc} + */ + public function peekDelayed($tube = null) + { + if ($tube !== null) { + $this->useTube($tube); + } + + $response = $this->_dispatch( + new Command\PeekCommand(Command\PeekCommand::TYPE_DELAYED) + ); + + return new Job($response['id'], $response['jobdata']); + } + + /** + * {@inheritdoc} + */ + public function peekBuried($tube = null) + { + if ($tube !== null) { + $this->useTube($tube); + } + + $response = $this->_dispatch( + new Command\PeekCommand(Command\PeekCommand::TYPE_BURIED) + ); + + return new Job($response['id'], $response['jobdata']); + } + + /** + * {@inheritdoc} + */ + public function put( + $data, + $priority = PheanstalkInterface::DEFAULT_PRIORITY, + $delay = PheanstalkInterface::DEFAULT_DELAY, + $ttr = PheanstalkInterface::DEFAULT_TTR + ) + { + $response = $this->_dispatch( + new Command\PutCommand($data, $priority, $delay, $ttr) + ); + + return $response['id']; + } + + /** + * {@inheritdoc} + */ + public function putInTube( + $tube, + $data, + $priority = PheanstalkInterface::DEFAULT_PRIORITY, + $delay = PheanstalkInterface::DEFAULT_DELAY, + $ttr = PheanstalkInterface::DEFAULT_TTR + ) + { + $this->useTube($tube); + + return $this->put($data, $priority, $delay, $ttr); + } + + /** + * {@inheritdoc} + */ + public function release( + $job, + $priority = PheanstalkInterface::DEFAULT_PRIORITY, + $delay = PheanstalkInterface::DEFAULT_DELAY + ) + { + $this->_dispatch( + new Command\ReleaseCommand($job, $priority, $delay) + ); + + return $this; + } + + /** + * {@inheritdoc} + */ + public function reserve($timeout = null) + { + $response = $this->_dispatch( + new Command\ReserveCommand($timeout) + ); + + $falseResponses = array( + Response::RESPONSE_DEADLINE_SOON, + Response::RESPONSE_TIMED_OUT, + ); + + if (in_array($response->getResponseName(), $falseResponses)) { + return false; + } else { + return new Job($response['id'], $response['jobdata']); + } + } + + /** + * {@inheritdoc} + */ + public function reserveFromTube($tube, $timeout = null) + { + $this->watchOnly($tube); + + return $this->reserve($timeout); + } + + /** + * {@inheritdoc} + */ + public function statsJob($job) + { + return $this->_dispatch(new Command\StatsJobCommand($job)); + } + + /** + * {@inheritdoc} + */ + public function statsTube($tube) + { + return $this->_dispatch(new Command\StatsTubeCommand($tube)); + } + + /** + * {@inheritdoc} + */ + public function stats() + { + return $this->_dispatch(new Command\StatsCommand()); + } + + /** + * {@inheritdoc} + */ + public function touch($job) + { + $this->_dispatch(new Command\TouchCommand($job)); + + return $this; + } + + /** + * {@inheritdoc} + */ + public function useTube($tube) + { + if ($this->_using != $tube) { + $this->_dispatch(new Command\UseCommand($tube)); + $this->_using = $tube; + } + + return $this; + } + + /** + * {@inheritdoc} + */ + public function watch($tube) + { + if (!isset($this->_watching[$tube])) { + $this->_dispatch(new Command\WatchCommand($tube)); + $this->_watching[$tube] = true; + } + + return $this; + } + + /** + * {@inheritdoc} + */ + public function watchOnly($tube) + { + $this->watch($tube); + + $ignoreTubes = array_diff_key($this->_watching, array($tube => true)); + foreach ($ignoreTubes as $ignoreTube => $true) { + $this->ignore($ignoreTube); + } + + return $this; + } + + // ---------------------------------------- + + /** + * Dispatches the specified command to the connection object. + * + * If a SocketException occurs, the connection is reset, and the command is + * re-attempted once. + * + * @param Command $command + * @return Response + */ + private function _dispatch($command) + { + try { + $response = $this->_connection->dispatchCommand($command); + } catch (Exception\SocketException $e) { + $this->_reconnect(); + $response = $this->_connection->dispatchCommand($command); + } + + return $response; + } + + /** + * Creates a new connection object, based on the existing connection object, + * and re-establishes the used tube and watchlist. + */ + private function _reconnect() + { + $new_connection = new Connection( + $this->_connection->getHost(), + $this->_connection->getPort(), + $this->_connection->getConnectTimeout() + ); + + $this->setConnection($new_connection); + + if ($this->_using != PheanstalkInterface::DEFAULT_TUBE) { + $tube = $this->_using; + $this->_using = null; + $this->useTube($tube); + } + + foreach ($this->_watching as $tube => $true) { + if ($tube != PheanstalkInterface::DEFAULT_TUBE) { + unset($this->_watching[$tube]); + $this->watch($tube); + } + } + + if (!isset($this->_watching[PheanstalkInterface::DEFAULT_TUBE])) { + $this->ignore(PheanstalkInterface::DEFAULT_TUBE); + } + } +} |
