summaryrefslogtreecommitdiff
path: root/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Pheanstalk.php
diff options
context:
space:
mode:
Diffstat (limited to 'intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Pheanstalk.php')
-rw-r--r--intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Pheanstalk.php436
1 files changed, 436 insertions, 0 deletions
diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Pheanstalk.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Pheanstalk.php
new file mode 100644
index 0000000..ee83553
--- /dev/null
+++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Pheanstalk.php
@@ -0,0 +1,436 @@
+<?php
+
+namespace Pheanstalk;
+
+/**
+ * Pheanstalk is a PHP client for the beanstalkd workqueue.
+ * The Pheanstalk class is a simple facade for the various underlying components.
+ *
+ * @see http://github.com/kr/beanstalkd
+ * @see http://xph.us/software/beanstalkd/
+ *
+ * @author Paul Annesley
+ * @package Pheanstalk
+ * @license http://www.opensource.org/licenses/mit-license.php
+ */
+class Pheanstalk implements PheanstalkInterface
+{
+ const VERSION = '3.0.2';
+
+ private $_connection;
+ private $_using = PheanstalkInterface::DEFAULT_TUBE;
+ private $_watching = array(PheanstalkInterface::DEFAULT_TUBE => true);
+
+ /**
+ * @param string $host
+ * @param int $port
+ * @param int $connectTimeout
+ * @param bool $connectPersistent
+ */
+ public function __construct($host, $port = PheanstalkInterface::DEFAULT_PORT, $connectTimeout = null, $connectPersistent = false)
+ {
+ $this->setConnection(new Connection($host, $port, $connectTimeout, $connectPersistent));
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setConnection(Connection $connection)
+ {
+ $this->_connection = $connection;
+
+ return $this;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getConnection()
+ {
+ return $this->_connection;
+ }
+
+ // ----------------------------------------
+
+ /**
+ * {@inheritdoc}
+ */
+ public function bury($job, $priority = PheanstalkInterface::DEFAULT_PRIORITY)
+ {
+ $this->_dispatch(new Command\BuryCommand($job, $priority));
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function delete($job)
+ {
+ $this->_dispatch(new Command\DeleteCommand($job));
+
+ return $this;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function ignore($tube)
+ {
+ if (isset($this->_watching[$tube])) {
+ $this->_dispatch(new Command\IgnoreCommand($tube));
+ unset($this->_watching[$tube]);
+ }
+
+ return $this;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function kick($max)
+ {
+ $response = $this->_dispatch(new Command\KickCommand($max));
+
+ return $response['kicked'];
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function kickJob($job)
+ {
+ $this->_dispatch(new Command\KickJobCommand($job));
+
+ return $this;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function listTubes()
+ {
+ return (array) $this->_dispatch(
+ new Command\ListTubesCommand()
+ );
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function listTubesWatched($askServer = false)
+ {
+ if ($askServer) {
+ $response = (array) $this->_dispatch(
+ new Command\ListTubesWatchedCommand()
+ );
+ $this->_watching = array_fill_keys($response, true);
+ }
+
+ return array_keys($this->_watching);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function listTubeUsed($askServer = false)
+ {
+ if ($askServer) {
+ $response = $this->_dispatch(
+ new Command\ListTubeUsedCommand()
+ );
+ $this->_using = $response['tube'];
+ }
+
+ return $this->_using;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function pauseTube($tube, $delay)
+ {
+ $this->_dispatch(new Command\PauseTubeCommand($tube, $delay));
+
+ return $this;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function resumeTube($tube)
+ {
+ // Pause a tube with zero delay will resume the tube
+ $this->pauseTube($tube, 0);
+
+ return $this;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function peek($jobId)
+ {
+ $response = $this->_dispatch(
+ new Command\PeekCommand($jobId)
+ );
+
+ return new Job($response['id'], $response['jobdata']);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function peekReady($tube = null)
+ {
+ if ($tube !== null) {
+ $this->useTube($tube);
+ }
+
+ $response = $this->_dispatch(
+ new Command\PeekCommand(Command\PeekCommand::TYPE_READY)
+ );
+
+ return new Job($response['id'], $response['jobdata']);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function peekDelayed($tube = null)
+ {
+ if ($tube !== null) {
+ $this->useTube($tube);
+ }
+
+ $response = $this->_dispatch(
+ new Command\PeekCommand(Command\PeekCommand::TYPE_DELAYED)
+ );
+
+ return new Job($response['id'], $response['jobdata']);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function peekBuried($tube = null)
+ {
+ if ($tube !== null) {
+ $this->useTube($tube);
+ }
+
+ $response = $this->_dispatch(
+ new Command\PeekCommand(Command\PeekCommand::TYPE_BURIED)
+ );
+
+ return new Job($response['id'], $response['jobdata']);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function put(
+ $data,
+ $priority = PheanstalkInterface::DEFAULT_PRIORITY,
+ $delay = PheanstalkInterface::DEFAULT_DELAY,
+ $ttr = PheanstalkInterface::DEFAULT_TTR
+ )
+ {
+ $response = $this->_dispatch(
+ new Command\PutCommand($data, $priority, $delay, $ttr)
+ );
+
+ return $response['id'];
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function putInTube(
+ $tube,
+ $data,
+ $priority = PheanstalkInterface::DEFAULT_PRIORITY,
+ $delay = PheanstalkInterface::DEFAULT_DELAY,
+ $ttr = PheanstalkInterface::DEFAULT_TTR
+ )
+ {
+ $this->useTube($tube);
+
+ return $this->put($data, $priority, $delay, $ttr);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function release(
+ $job,
+ $priority = PheanstalkInterface::DEFAULT_PRIORITY,
+ $delay = PheanstalkInterface::DEFAULT_DELAY
+ )
+ {
+ $this->_dispatch(
+ new Command\ReleaseCommand($job, $priority, $delay)
+ );
+
+ return $this;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function reserve($timeout = null)
+ {
+ $response = $this->_dispatch(
+ new Command\ReserveCommand($timeout)
+ );
+
+ $falseResponses = array(
+ Response::RESPONSE_DEADLINE_SOON,
+ Response::RESPONSE_TIMED_OUT,
+ );
+
+ if (in_array($response->getResponseName(), $falseResponses)) {
+ return false;
+ } else {
+ return new Job($response['id'], $response['jobdata']);
+ }
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function reserveFromTube($tube, $timeout = null)
+ {
+ $this->watchOnly($tube);
+
+ return $this->reserve($timeout);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function statsJob($job)
+ {
+ return $this->_dispatch(new Command\StatsJobCommand($job));
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function statsTube($tube)
+ {
+ return $this->_dispatch(new Command\StatsTubeCommand($tube));
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function stats()
+ {
+ return $this->_dispatch(new Command\StatsCommand());
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function touch($job)
+ {
+ $this->_dispatch(new Command\TouchCommand($job));
+
+ return $this;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function useTube($tube)
+ {
+ if ($this->_using != $tube) {
+ $this->_dispatch(new Command\UseCommand($tube));
+ $this->_using = $tube;
+ }
+
+ return $this;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function watch($tube)
+ {
+ if (!isset($this->_watching[$tube])) {
+ $this->_dispatch(new Command\WatchCommand($tube));
+ $this->_watching[$tube] = true;
+ }
+
+ return $this;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function watchOnly($tube)
+ {
+ $this->watch($tube);
+
+ $ignoreTubes = array_diff_key($this->_watching, array($tube => true));
+ foreach ($ignoreTubes as $ignoreTube => $true) {
+ $this->ignore($ignoreTube);
+ }
+
+ return $this;
+ }
+
+ // ----------------------------------------
+
+ /**
+ * Dispatches the specified command to the connection object.
+ *
+ * If a SocketException occurs, the connection is reset, and the command is
+ * re-attempted once.
+ *
+ * @param Command $command
+ * @return Response
+ */
+ private function _dispatch($command)
+ {
+ try {
+ $response = $this->_connection->dispatchCommand($command);
+ } catch (Exception\SocketException $e) {
+ $this->_reconnect();
+ $response = $this->_connection->dispatchCommand($command);
+ }
+
+ return $response;
+ }
+
+ /**
+ * Creates a new connection object, based on the existing connection object,
+ * and re-establishes the used tube and watchlist.
+ */
+ private function _reconnect()
+ {
+ $new_connection = new Connection(
+ $this->_connection->getHost(),
+ $this->_connection->getPort(),
+ $this->_connection->getConnectTimeout()
+ );
+
+ $this->setConnection($new_connection);
+
+ if ($this->_using != PheanstalkInterface::DEFAULT_TUBE) {
+ $tube = $this->_using;
+ $this->_using = null;
+ $this->useTube($tube);
+ }
+
+ foreach ($this->_watching as $tube => $true) {
+ if ($tube != PheanstalkInterface::DEFAULT_TUBE) {
+ unset($this->_watching[$tube]);
+ $this->watch($tube);
+ }
+ }
+
+ if (!isset($this->_watching[PheanstalkInterface::DEFAULT_TUBE])) {
+ $this->ignore(PheanstalkInterface::DEFAULT_TUBE);
+ }
+ }
+}