diff options
Diffstat (limited to 'intern.gospeladlershof.de/vendor/pda/pheanstalk/src')
44 files changed, 2924 insertions, 0 deletions
diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command.php new file mode 100644 index 0000000..798db22 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command.php @@ -0,0 +1,61 @@ +<?php + +namespace Pheanstalk; + +/** + * A command to be sent to the beanstalkd server, and response processing logic + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +interface Command +{ + const COMMAND_PUT = 'put'; + const COMMAND_USE = 'use'; + const COMMAND_RESERVE = 'reserve'; + const COMMAND_DELETE = 'delete'; + const COMMAND_RELEASE = 'release'; + const COMMAND_BURY = 'bury'; + const COMMAND_WATCH = 'watch'; + const COMMAND_IGNORE = 'ignore'; + const COMMAND_PEEK = 'peek'; + const COMMAND_KICK = 'kick'; + const COMMAND_STATS_JOB = 'stats-job'; + const COMMAND_STATS = 'stats'; + const COMMAND_LIST_TUBES = 'list-tubes'; + const COMMAND_LIST_TUBE_USED = 'list-tube-used'; + const COMMAND_LIST_TUBES_WATCHED = 'list-tubes-watched'; + + /** + * The command line, without trailing CRLF + * @return string + */ + public function getCommandLine(); + + /** + * Whether the command is followed by data + * @return boolean + */ + public function hasData(); + + /** + * The binary data to follow the command + * @return string + * @throws Exception\CommandException If command has no data + */ + public function getData(); + + /** + * The length of the binary data in bytes + * @return int + * @throws Exception\CommandException If command has no data + */ + public function getDataLength(); + + /** + * The response parser for the command. + * @return ResponseParser + */ + public function getResponseParser(); +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/AbstractCommand.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/AbstractCommand.php new file mode 100644 index 0000000..d966727 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/AbstractCommand.php @@ -0,0 +1,74 @@ +<?php + +namespace Pheanstalk\Command; + +use Pheanstalk\Command; +use Pheanstalk\Response; + +/** + * Common functionality for Command implementations. + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +abstract class AbstractCommand + implements Command +{ + /* (non-phpdoc) + * @see Command::hasData() + */ + public function hasData() + { + return false; + } + + /* (non-phpdoc) + * @see Command::getData() + */ + public function getData() + { + throw new Exception\CommandException('Command has no data'); + } + + /* (non-phpdoc) + * @see Command::getDataLength() + */ + public function getDataLength() + { + throw new Exception\CommandException('Command has no data'); + } + + /* (non-phpdoc) + * @see Command::getResponseParser() + */ + public function getResponseParser() + { + // concrete implementation must either: + // a) implement ResponseParser + // b) override this getResponseParser method + return $this; + } + + /** + * The string representation of the object. + * @return string + */ + public function __toString() + { + return $this->getCommandLine(); + } + + // ---------------------------------------- + // protected + + /** + * Creates a Response for the given data + * @param array + * @return object Response + */ + protected function _createResponse($name, $data = array()) + { + return new Response\ArrayResponse($name, $data); + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/BuryCommand.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/BuryCommand.php new file mode 100644 index 0000000..d16a4bb --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/BuryCommand.php @@ -0,0 +1,62 @@ +<?php + +namespace Pheanstalk\Command; + +use Pheanstalk\Exception; +use Pheanstalk\Response; + +/** + * The 'bury' command. + * Puts a job into a 'buried' state, revived only by 'kick' command. + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class BuryCommand + extends AbstractCommand + implements \Pheanstalk\ResponseParser +{ + private $_job; + private $_priority; + + /** + * @param object $job Job + * @param int $priority From 0 (most urgent) to 0xFFFFFFFF (least urgent) + */ + public function __construct($job, $priority) + { + $this->_job = $job; + $this->_priority = $priority; + } + + /* (non-phpdoc) + * @see Command::getCommandLine() + */ + public function getCommandLine() + { + return sprintf( + 'bury %u %u', + $this->_job->getId(), + $this->_priority + ); + } + + /* (non-phpdoc) + * @see ResponseParser::parseResponse() + */ + public function parseResponse($responseLine, $responseData) + { + if ($responseLine == Response::RESPONSE_NOT_FOUND) { + throw new Exception\ServerException(sprintf( + '%s: Job %u is not reserved or does not exist.', + $responseLine, + $this->_job->getId() + )); + } elseif ($responseLine == Response::RESPONSE_BURIED) { + return $this->_createResponse(Response::RESPONSE_BURIED); + } else { + throw new Exception('Unhandled response: '.$responseLine); + } + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/DeleteCommand.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/DeleteCommand.php new file mode 100644 index 0000000..fd75843 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/DeleteCommand.php @@ -0,0 +1,53 @@ +<?php + +namespace Pheanstalk\Command; + +use Pheanstalk\Exception; +use Pheanstalk\Response; + +/** + * The 'delete' command. + * Permanently deletes an already-reserved job. + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class DeleteCommand + extends AbstractCommand + implements \Pheanstalk\ResponseParser +{ + private $_job; + + /** + * @param object $job Job + */ + public function __construct($job) + { + $this->_job = $job; + } + + /* (non-phpdoc) + * @see Command::getCommandLine() + */ + public function getCommandLine() + { + return 'delete '.$this->_job->getId(); + } + + /* (non-phpdoc) + * @see ResponseParser::parseResponse() + */ + public function parseResponse($responseLine, $responseData) + { + if ($responseLine == Response::RESPONSE_NOT_FOUND) { + throw new Exception\ServerException(sprintf( + 'Cannot delete job %u: %s', + $this->_job->getId(), + $responseLine + )); + } + + return $this->_createResponse($responseLine); + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/IgnoreCommand.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/IgnoreCommand.php new file mode 100644 index 0000000..d596adb --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/IgnoreCommand.php @@ -0,0 +1,54 @@ +<?php + +namespace Pheanstalk\Command; + +use Pheanstalk\Exception; +use Pheanstalk\Response; + +/** + * The 'ignore' command. + * Removes a tube from the watch list to reserve jobs from. + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class IgnoreCommand + extends AbstractCommand + implements \Pheanstalk\ResponseParser +{ + private $_tube; + + /** + * @param string $tube + */ + public function __construct($tube) + { + $this->_tube = $tube; + } + + /* (non-phpdoc) + * @see Command::getCommandLine() + */ + public function getCommandLine() + { + return 'ignore '.$this->_tube; + } + + /* (non-phpdoc) + * @see ResponseParser::parseResponse() + */ + public function parseResponse($responseLine, $responseData) + { + if (preg_match('#^WATCHING (\d+)$#', $responseLine, $matches)) { + return $this->_createResponse('WATCHING', array( + 'count' => (int) $matches[1] + )); + } elseif ($responseLine == Response::RESPONSE_NOT_IGNORED) { + throw new Exception\ServerException($responseLine . + ': cannot ignore last tube in watchlist'); + } else { + throw new Exception('Unhandled response: '.$responseLine); + } + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/KickCommand.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/KickCommand.php new file mode 100644 index 0000000..83aba80 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/KickCommand.php @@ -0,0 +1,48 @@ +<?php + +namespace Pheanstalk\Command; + +/** + * The 'kick' command. + * Kicks buried or delayed jobs into a 'ready' state. + * If there are buried jobs, it will kick up to $max of them. + * Otherwise, it will kick up to $max delayed jobs. + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class KickCommand + extends AbstractCommand + implements \Pheanstalk\ResponseParser +{ + private $_max; + + /** + * @param int $max The maximum number of jobs to kick + */ + public function __construct($max) + { + $this->_max = (int) $max; + } + + /* (non-phpdoc) + * @see Command::getCommandLine() + */ + public function getCommandLine() + { + return 'kick '.$this->_max; + } + + /* (non-phpdoc) + * @see ResponseParser::parseResponse() + */ + public function parseResponse($responseLine, $responseData) + { + list($code, $count) = explode(' ', $responseLine); + + return $this->_createResponse($code, array( + 'kicked' => (int) $count, + )); + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/KickJobCommand.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/KickJobCommand.php new file mode 100644 index 0000000..e376754 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/KickJobCommand.php @@ -0,0 +1,59 @@ +<?php + +namespace Pheanstalk\Command; + +use Pheanstalk\Exception; +use Pheanstalk\Response; + +/** + * The 'kick-job' command. + * Kicks a specific buried or delayed job into a 'ready' state. + * + * A variant of kick that operates with a single job. If the given job + * exists and is in a buried or delayed state, it will be moved to the + * ready queue of the the same tube where it currently belongs. + * + * @author Matthieu Napoli + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class KickJobCommand + extends AbstractCommand + implements \Pheanstalk\ResponseParser +{ + private $_job; + + /** + * @param Job $job Pheanstalk job + */ + public function __construct($job) + { + $this->_job = $job; + } + + /* (non-phpdoc) + * @see Command::getCommandLine() + */ + public function getCommandLine() + { + return 'kick-job '.$this->_job->getId(); + } + + /* (non-phpdoc) + * @see ResponseParser::parseResponse() + */ + public function parseResponse($responseLine, $responseData) + { + if ($responseLine == Response::RESPONSE_NOT_FOUND) { + throw new Exception\ServerException(sprintf( + '%s: Job %d does not exist or is not in a kickable state.', + $responseLine, + $this->_job->getId() + )); + } elseif ($responseLine == Response::RESPONSE_KICKED) { + return $this->_createResponse(Response::RESPONSE_KICKED); + } else { + throw new Exception('Unhandled response: '.$responseLine); + } + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/ListTubeUsedCommand.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/ListTubeUsedCommand.php new file mode 100644 index 0000000..55b100f --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/ListTubeUsedCommand.php @@ -0,0 +1,34 @@ +<?php + +namespace Pheanstalk\Command; + +/** + * The 'list-tube-used' command. + * Returns the tube currently being used by the client. + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class ListTubeUsedCommand + extends AbstractCommand + implements \Pheanstalk\ResponseParser +{ + /* (non-phpdoc) + * @see Command::getCommandLine() + */ + public function getCommandLine() + { + return 'list-tube-used'; + } + + /* (non-phpdoc) + * @see ResponseParser::parseResponse() + */ + public function parseResponse($responseLine, $responseData) + { + return $this->_createResponse('USING', array( + 'tube' => preg_replace('#^USING (.+)$#', '$1', $responseLine) + )); + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/ListTubesCommand.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/ListTubesCommand.php new file mode 100644 index 0000000..f0ea5a1 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/ListTubesCommand.php @@ -0,0 +1,35 @@ +<?php + +namespace Pheanstalk\Command; + +use Pheanstalk\YamlResponseParser; + +/** + * The 'list-tubes' command. + * List all existing tubes. + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class ListTubesCommand + extends AbstractCommand +{ + /* (non-phpdoc) + * @see Command::getCommandLine() + */ + public function getCommandLine() + { + return 'list-tubes'; + } + + /* (non-phpdoc) + * @see Command::getResponseParser() + */ + public function getResponseParser() + { + return new YamlResponseParser( + YamlResponseParser::MODE_LIST + ); + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/ListTubesWatchedCommand.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/ListTubesWatchedCommand.php new file mode 100644 index 0000000..d66158f --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/ListTubesWatchedCommand.php @@ -0,0 +1,35 @@ +<?php + +namespace Pheanstalk\Command; + +use Pheanstalk\YamlResponseParser; + +/** + * The 'list-tubes-watched' command. + * Lists the tubes on the watchlist. + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class ListTubesWatchedCommand + extends AbstractCommand +{ + /* (non-phpdoc) + * @see Command::getCommandLine() + */ + public function getCommandLine() + { + return 'list-tubes-watched'; + } + + /* (non-phpdoc) + * @see Command::getResponseParser() + */ + public function getResponseParser() + { + return new YamlResponseParser( + YamlResponseParser::MODE_LIST + ); + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/PauseTubeCommand.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/PauseTubeCommand.php new file mode 100644 index 0000000..5483a1a --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/PauseTubeCommand.php @@ -0,0 +1,62 @@ +<?php + +namespace Pheanstalk\Command; + +use Pheanstalk\Exception; +use Pheanstalk\Response; + +/** + * The 'pause-tube' command. + * Temporarily prevent jobs being reserved from the given tube. + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class PauseTubeCommand + extends AbstractCommand + implements \Pheanstalk\ResponseParser +{ + private $_tube; + private $_delay; + + /** + * @param string $tube The tube to pause + * @param int $delay Seconds before jobs may be reserved from this queue. + */ + public function __construct($tube, $delay) + { + $this->_tube = $tube; + $this->_delay = $delay; + } + + /* (non-phpdoc) + * @see Command::getCommandLine() + */ + public function getCommandLine() + { + return sprintf( + 'pause-tube %s %u', + $this->_tube, + $this->_delay + ); + } + + /* (non-phpdoc) + * @see ResponseParser::parseResponse() + */ + public function parseResponse($responseLine, $responseData) + { + if ($responseLine == Response::RESPONSE_NOT_FOUND) { + throw new Exception\ServerException(sprintf( + '%s: tube %s does not exist.', + $responseLine, + $this->_tube + )); + } elseif ($responseLine == Response::RESPONSE_PAUSED) { + return $this->_createResponse(Response::RESPONSE_PAUSED); + } else { + throw new Exception('Unhandled response: '.$responseLine); + } + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/PeekCommand.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/PeekCommand.php new file mode 100644 index 0000000..b76c7ec --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/PeekCommand.php @@ -0,0 +1,93 @@ +<?php + +namespace Pheanstalk\Command; + +use Pheanstalk\Exception; +use Pheanstalk\Response; + +/** + * The 'peek', 'peek-ready', 'peek-delayed' and 'peek-buried' commands. + * + * The peek commands let the client inspect a job in the system. There are four + * variations. All but the first (peek) operate only on the currently used tube. + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class PeekCommand + extends AbstractCommand + implements \Pheanstalk\ResponseParser +{ + const TYPE_ID = 'id'; + const TYPE_READY = 'ready'; + const TYPE_DELAYED = 'delayed'; + const TYPE_BURIED = 'buried'; + + private $_subcommands = array( + self::TYPE_READY, + self::TYPE_DELAYED, + self::TYPE_BURIED, + ); + + private $_subcommand; + private $_jobId; + + /** + * @param mixed $peekSubject Job ID or self::TYPE_* + */ + public function __construct($peekSubject) + { + if (is_int($peekSubject) || ctype_digit($peekSubject)) { + $this->_jobId = $peekSubject; + } elseif (in_array($peekSubject, $this->_subcommands)) { + $this->_subcommand = $peekSubject; + } else { + throw new Exception\CommandException(sprintf( + 'Invalid peek subject: %s', $peekSubject + )); + } + } + + /* (non-phpdoc) + * @see Command::getCommandLine() + */ + public function getCommandLine() + { + return isset($this->_jobId) ? + sprintf('peek %u', $this->_jobId) : + sprintf('peek-%s', $this->_subcommand); + } + + /* (non-phpdoc) + * @see ResponseParser::parseResponse() + */ + public function parseResponse($responseLine, $responseData) + { + if ($responseLine == Response::RESPONSE_NOT_FOUND) { + if (isset($this->_jobId)) { + $message = sprintf( + '%s: Job %u does not exist.', + $responseLine, + $this->_jobId + ); + } else { + $message = sprintf( + "%s: There are no jobs in the '%s' status", + $responseLine, + $this->_subcommand + ); + } + + throw new Exception\ServerException($message); + } elseif (preg_match('#^FOUND (\d+) \d+$#', $responseLine, $matches)) { + return $this->_createResponse( + Response::RESPONSE_FOUND, + array( + 'id' => (int) $matches[1], + 'jobdata' => $responseData, + ) + ); + } + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/PutCommand.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/PutCommand.php new file mode 100644 index 0000000..1e61c2c --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/PutCommand.php @@ -0,0 +1,113 @@ +<?php + +namespace Pheanstalk\Command; + +use Pheanstalk\Exception; + +/** + * The 'put' command. + * Inserts a job into the client's currently used tube. + * @see UseCommand + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class PutCommand + extends AbstractCommand + implements \Pheanstalk\ResponseParser +{ + private $_data; + private $_priority; + private $_delay; + private $_ttr; + + /** + * Puts a job on the queue + * @param string $data The job data + * @param int $priority From 0 (most urgent) to 0xFFFFFFFF (least urgent) + * @param int $delay Seconds to wait before job becomes ready + * @param int $ttr Time To Run: seconds a job can be reserved for + */ + public function __construct($data, $priority, $delay, $ttr) + { + $this->_data = $data; + $this->_priority = $priority; + $this->_delay = $delay; + $this->_ttr = $ttr; + } + + /* (non-phpdoc) + * @see Command::getCommandLine() + */ + public function getCommandLine() + { + return sprintf( + 'put %u %u %u %u', + $this->_priority, + $this->_delay, + $this->_ttr, + $this->getDataLength() + ); + } + + /* (non-phpdoc) + * @see Command::hasData() + */ + public function hasData() + { + return true; + } + + /* (non-phpdoc) + * @see Command::getData() + */ + public function getData() + { + return $this->_data; + } + + /* (non-phpdoc) + * @see Command::getDataLength() + */ + public function getDataLength() + { + if (function_exists('mb_strlen')) { + return mb_strlen($this->_data, "latin1"); + } else { + return strlen($this->_data); + } + } + + /* (non-phpdoc) + * @see ResponseParser::parseResponse() + */ + public function parseResponse($responseLine, $responseData) + { + if (preg_match('#^INSERTED (\d+)$#', $responseLine, $matches)) { + return $this->_createResponse('INSERTED', array( + 'id' => (int) $matches[1] + )); + } elseif (preg_match('#^BURIED (\d)+$#', $responseLine, $matches)) { + throw new Exception(sprintf( + '%s: server ran out of memory trying to grow the priority queue data structure.', + $responseLine + )); + } elseif (preg_match('#^JOB_TOO_BIG$#', $responseLine)) { + throw new Exception(sprintf( + '%s: job data exceeds server-enforced limit', + $responseLine + )); + } elseif (preg_match('#^EXPECTED_CRLF#', $responseLine)) { + throw new Exception(sprintf( + '%s: CRLF expected', + $responseLine + )); + } else { + throw new Exception(sprintf( + 'Unhandled response: %s', + $responseLine + )); + } + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/ReleaseCommand.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/ReleaseCommand.php new file mode 100644 index 0000000..33161a0 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/ReleaseCommand.php @@ -0,0 +1,72 @@ +<?php + +namespace Pheanstalk\Command; + +use Pheanstalk\Exception; +use Pheanstalk\Response; + +/** + * The 'release' command. + * Releases a reserved job back onto the ready queue. + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class ReleaseCommand + extends AbstractCommand + implements \Pheanstalk\ResponseParser +{ + private $_job; + private $_priority; + private $_delay; + + /** + * @param object $job Job + * @param int $priority From 0 (most urgent) to 0xFFFFFFFF (least urgent) + * @param int $delay Seconds to wait before job becomes ready + */ + public function __construct($job, $priority, $delay) + { + $this->_job = $job; + $this->_priority = $priority; + $this->_delay = $delay; + } + + /* (non-phpdoc) + * @see Command::getCommandLine() + */ + public function getCommandLine() + { + return sprintf( + 'release %u %u %u', + $this->_job->getId(), + $this->_priority, + $this->_delay + ); + } + + /* (non-phpdoc) + * @see ResponseParser::parseResponse() + */ + public function parseResponse($responseLine, $responseData) + { + if ($responseLine == Response::RESPONSE_BURIED) { + throw new Exception\ServerException(sprintf( + 'Job %u %s: out of memory trying to grow data structure', + $this->_job->getId(), + $responseLine + )); + } + + if ($responseLine == Response::RESPONSE_NOT_FOUND) { + throw new Exception\ServerException(sprintf( + 'Job %u %s: does not exist or is not reserved by client', + $this->_job->getId(), + $responseLine + )); + } + + return $this->_createResponse($responseLine); + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/ReserveCommand.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/ReserveCommand.php new file mode 100644 index 0000000..9d4ccc8 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/ReserveCommand.php @@ -0,0 +1,62 @@ +<?php + +namespace Pheanstalk\Command; + +use Pheanstalk\Response; + +/** + * The 'reserve' command. + * Reserves/locks a ready job in a watched tube. + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class ReserveCommand + extends AbstractCommand + implements \Pheanstalk\ResponseParser +{ + private $_timeout; + + /** + * A timeout value of 0 will cause the server to immediately return either a + * response or TIMED_OUT. A positive value of timeout will limit the amount of + * time the client will block on the reserve request until a job becomes + * available. + * + * @param int $timeout + */ + public function __construct($timeout = null) + { + $this->_timeout = $timeout; + } + + /* (non-phpdoc) + * @see Command::getCommandLine() + */ + public function getCommandLine() + { + return isset($this->_timeout) ? + sprintf('reserve-with-timeout %s', $this->_timeout) : + 'reserve'; + } + + /* (non-phpdoc) + * @see ResponseParser::parseResponse() + */ + public function parseResponse($responseLine, $responseData) + { + if ($responseLine === Response::RESPONSE_DEADLINE_SOON || + $responseLine === Response::RESPONSE_TIMED_OUT) + { + return $this->_createResponse($responseLine); + } + + list($code, $id) = explode(' ', $responseLine); + + return $this->_createResponse($code, array( + 'id' => (int) $id, + 'jobdata' => $responseData, + )); + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/StatsCommand.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/StatsCommand.php new file mode 100644 index 0000000..907613a --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/StatsCommand.php @@ -0,0 +1,35 @@ +<?php + +namespace Pheanstalk\Command; + +use Pheanstalk\YamlResponseParser; + +/** + * The 'stats' command. + * Statistical information about the system as a whole. + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class StatsCommand + extends AbstractCommand +{ + /* (non-phpdoc) + * @see Command::getCommandLine() + */ + public function getCommandLine() + { + return 'stats'; + } + + /* (non-phpdoc) + * @see Command::getResponseParser() + */ + public function getResponseParser() + { + return new YamlResponseParser( + YamlResponseParser::MODE_DICT + ); + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/StatsJobCommand.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/StatsJobCommand.php new file mode 100644 index 0000000..ad8c235 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/StatsJobCommand.php @@ -0,0 +1,45 @@ +<?php + +namespace Pheanstalk\Command; + +use Pheanstalk\YamlResponseParser; + +/** + * The 'stats-job' command. + * Gives statistical information about the specified job if it exists. + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class StatsJobCommand + extends AbstractCommand +{ + private $_jobId; + + /** + * @param Job|int $job + */ + public function __construct($job) + { + $this->_jobId = is_object($job) ? $job->getId() : $job; + } + + /* (non-phpdoc) + * @see Command::getCommandLine() + */ + public function getCommandLine() + { + return sprintf('stats-job %u', $this->_jobId); + } + + /* (non-phpdoc) + * @see Command::getResponseParser() + */ + public function getResponseParser() + { + return new YamlResponseParser( + YamlResponseParser::MODE_DICT + ); + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/StatsTubeCommand.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/StatsTubeCommand.php new file mode 100644 index 0000000..d0711fa --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/StatsTubeCommand.php @@ -0,0 +1,45 @@ +<?php + +namespace Pheanstalk\Command; + +use Pheanstalk\YamlResponseParser; + +/** + * The 'stats-tube' command. + * Gives statistical information about the specified tube if it exists. + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class StatsTubeCommand + extends AbstractCommand +{ + private $_tube; + + /** + * @param string $tube + */ + public function __construct($tube) + { + $this->_tube = $tube; + } + + /* (non-phpdoc) + * @see Command::getCommandLine() + */ + public function getCommandLine() + { + return sprintf('stats-tube %s', $this->_tube); + } + + /* (non-phpdoc) + * @see Command::getResponseParser() + */ + public function getResponseParser() + { + return new YamlResponseParser( + YamlResponseParser::MODE_DICT + ); + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/TouchCommand.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/TouchCommand.php new file mode 100644 index 0000000..89b9eb5 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/TouchCommand.php @@ -0,0 +1,58 @@ +<?php + +namespace Pheanstalk\Command; + +use Pheanstalk\Exception; +use Pheanstalk\Response; + +/** + * The 'touch' command. + * + * The "touch" command allows a worker to request more time to work on a job. + * This is useful for jobs that potentially take a long time, but you still want + * the benefits of a TTR pulling a job away from an unresponsive worker. A worker + * may periodically tell the server that it's still alive and processing a job + * (e.g. it may do this on DEADLINE_SOON). + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class TouchCommand + extends AbstractCommand + implements \Pheanstalk\ResponseParser +{ + private $_job; + + /** + * @param Job $job + */ + public function __construct($job) + { + $this->_job = $job; + } + + /* (non-phpdoc) + * @see Command::getCommandLine() + */ + public function getCommandLine() + { + return sprintf('touch %u', $this->_job->getId()); + } + + /* (non-phpdoc) + * @see ResponseParser::parseResponse() + */ + public function parseResponse($responseLine, $responseData) + { + if ($responseLine == Response::RESPONSE_NOT_FOUND) { + throw new Exception\ServerException(sprintf( + 'Job %u %s: does not exist or is not reserved by client', + $this->_job->getId(), + $responseLine + )); + } + + return $this->_createResponse($responseLine); + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/UseCommand.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/UseCommand.php new file mode 100644 index 0000000..5e5ba70 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/UseCommand.php @@ -0,0 +1,52 @@ +<?php + +namespace Pheanstalk\Command; + +use Pheanstalk\ResponseParser; + +/** + * The 'use' command. + * + * The "use" command is for producers. Subsequent put commands will put jobs into + * the tube specified by this command. If no use command has been issued, jobs + * will be put into the tube named "default". + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class UseCommand + extends AbstractCommand + implements \Pheanstalk\ResponseParser +{ + /** + * @var string + */ + private $_tube; + + /** + * @param string $tube The name of the tube to use + */ + public function __construct($tube) + { + $this->_tube = $tube; + } + + /* (non-phpdoc) + * @see Command::getCommandLine() + */ + public function getCommandLine() + { + return 'use '.$this->_tube; + } + + /* (non-phpdoc) + * @see ResponseParser::parseResponse() + */ + public function parseResponse($responseLine, $responseData) + { + return $this->_createResponse('USING', array( + 'tube' => preg_replace('#^USING (.+)$#', '$1', $responseLine) + )); + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/WatchCommand.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/WatchCommand.php new file mode 100644 index 0000000..07dca67 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Command/WatchCommand.php @@ -0,0 +1,44 @@ +<?php + +namespace Pheanstalk\Command; + +/** + * The 'watch' command. + * Adds a tube to the watchlist to reserve jobs from. + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class WatchCommand + extends AbstractCommand + implements \Pheanstalk\ResponseParser +{ + private $_tube; + + /** + * @param string $tube + */ + public function __construct($tube) + { + $this->_tube = $tube; + } + + /* (non-phpdoc) + * @see Command::getCommandLine() + */ + public function getCommandLine() + { + return 'watch '.$this->_tube; + } + + /* (non-phpdoc) + * @see ResponseParser::parseResponse() + */ + public function parseResponse($responseLine, $responseData) + { + return $this->_createResponse('WATCHING', array( + 'count' => preg_replace('#^WATCHING (.+)$#', '$1', $responseLine) + )); + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Connection.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Connection.php new file mode 100644 index 0000000..5e7ccd3 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Connection.php @@ -0,0 +1,212 @@ +<?php + +namespace Pheanstalk; + +use Pheanstalk\Socket\NativeSocket; + +/** + * A connection to a beanstalkd server + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Connection +{ + const CRLF = "\r\n"; + const CRLF_LENGTH = 2; + const DEFAULT_CONNECT_TIMEOUT = 2; + + // responses which are global errors, mapped to their exception short-names + private static $_errorResponses = array( + Response::RESPONSE_OUT_OF_MEMORY => 'OutOfMemory', + Response::RESPONSE_INTERNAL_ERROR => 'InternalError', + Response::RESPONSE_DRAINING => 'Draining', + Response::RESPONSE_BAD_FORMAT => 'BadFormat', + Response::RESPONSE_UNKNOWN_COMMAND => 'UnknownCommand', + ); + + // responses which are followed by data + private static $_dataResponses = array( + Response::RESPONSE_RESERVED, + Response::RESPONSE_FOUND, + Response::RESPONSE_OK, + ); + + private $_socket; + private $_hostname; + private $_port; + private $_connectTimeout; + private $_connectPersistent; + + /** + * @param string $hostname + * @param int $port + * @param float $connectTimeout + * @param bool $connectPersistent + */ + public function __construct($hostname, $port, $connectTimeout = null, $connectPersistent = false) + { + if (is_null($connectTimeout) || !is_numeric($connectTimeout)) { + $connectTimeout = self::DEFAULT_CONNECT_TIMEOUT; + } + + $this->_hostname = $hostname; + $this->_port = $port; + $this->_connectTimeout = $connectTimeout; + $this->_connectPersistent = $connectPersistent; + } + + /** + * Sets a manually created socket, used for unit testing. + * + * @param Socket $socket + * @return $this + */ + public function setSocket(Socket $socket) + { + $this->_socket = $socket; + + return $this; + } + + /** + * @return bool + */ + public function hasSocket() + { + return isset($this->_socket); + } + + /** + * Disconnect the socket. + * Subsequent socket operations will create a new connection. + */ + public function disconnect() + { + $this->_getSocket()->disconnect(); + $this->_socket = null; + } + + /** + * @param object $command Command + * @return object Response + * @throws Exception\ClientException + */ + public function dispatchCommand($command) + { + $socket = $this->_getSocket(); + + $to_send = $command->getCommandLine().self::CRLF; + + if ($command->hasData()) { + $to_send .= $command->getData().self::CRLF; + } + + $socket->write($to_send); + + $responseLine = $socket->getLine(); + $responseName = preg_replace('#^(\S+).*$#s', '$1', $responseLine); + + if (isset(self::$_errorResponses[$responseName])) { + $exception = sprintf( + '\Pheanstalk\Exception\Server%sException', + self::$_errorResponses[$responseName] + ); + + throw new $exception(sprintf( + "%s in response to '%s'", + $responseName, + $command + )); + } + + if (in_array($responseName, self::$_dataResponses)) { + $dataLength = preg_replace('#^.*\b(\d+)$#', '$1', $responseLine); + $data = $socket->read($dataLength); + + $crlf = $socket->read(self::CRLF_LENGTH); + if ($crlf !== self::CRLF) { + throw new Exception\ClientException(sprintf( + 'Expected %u bytes of CRLF after %u bytes of data', + self::CRLF_LENGTH, + $dataLength + )); + } + } else { + $data = null; + } + + return $command + ->getResponseParser() + ->parseResponse($responseLine, $data); + } + + /** + * Returns the connect timeout for this connection. + * + * @return float + */ + public function getConnectTimeout() + { + return $this->_connectTimeout; + } + + /** + * Returns the host for this connection. + * + * @return string + */ + public function getHost() + { + return $this->_hostname; + } + + /** + * Returns the port for this connection. + * + * @return int + */ + public function getPort() + { + return $this->_port; + } + + // ---------------------------------------- + + /** + * Socket handle for the connection to beanstalkd + * + * @return Socket + * @throws Exception\ConnectionException + */ + private function _getSocket() + { + if (!isset($this->_socket)) { + $this->_socket = new NativeSocket( + $this->_hostname, + $this->_port, + $this->_connectTimeout, + $this->_connectPersistent + ); + } + + return $this->_socket; + } + + /** + * Checks connection to the beanstalkd socket + * + * @return true|false + */ + public function isServiceListening() + { + try { + $this->_getSocket(); + + return true; + } catch (Exception\ConnectionException $e) { + return false; + } + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception.php new file mode 100644 index 0000000..80acd6d --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception.php @@ -0,0 +1,15 @@ +<?php + +namespace Pheanstalk; + +/** + * An exception originating from the Pheanstalk package + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Exception + extends \Exception +{ +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ClientException.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ClientException.php new file mode 100644 index 0000000..4c991e0 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ClientException.php @@ -0,0 +1,17 @@ +<?php + +namespace Pheanstalk\Exception; + +use Pheanstalk\Exception; + +/** + * An exception originating from the beanstalkd client + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class ClientException + extends Exception +{ +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/CommandException.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/CommandException.php new file mode 100644 index 0000000..3cf6bba --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/CommandException.php @@ -0,0 +1,15 @@ +<?php + +namespace Pheanstalk\Exception; + +/** + * An exception relating to a Command + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class CommandException + extends ClientException +{ +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ConnectionException.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ConnectionException.php new file mode 100644 index 0000000..f0522c9 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ConnectionException.php @@ -0,0 +1,23 @@ +<?php + +namespace Pheanstalk\Exception; + +/** + * An exception relating to the client connection to the beanstalkd server + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class ConnectionException + extends ClientException +{ + /** + * @param int $errno The connection error code + * @param string $errstr The connection error message + */ + public function __construct($errno, $errstr) + { + parent::__construct(sprintf('Socket error %d: %s', $errno, $errstr)); + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ServerBadFormatException.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ServerBadFormatException.php new file mode 100644 index 0000000..e275a2b --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ServerBadFormatException.php @@ -0,0 +1,15 @@ +<?php + +namespace Pheanstalk\Exception; + +/** + * An exception originating as a beanstalkd server error + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class ServerBadFormatException + extends ServerException +{ +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ServerDrainingException.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ServerDrainingException.php new file mode 100644 index 0000000..7e70603 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ServerDrainingException.php @@ -0,0 +1,15 @@ +<?php + +namespace Pheanstalk\Exception; + +/** + * An exception originating as a beanstalkd server error + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class ServerDrainingException + extends ServerException +{ +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ServerException.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ServerException.php new file mode 100644 index 0000000..2795d22 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ServerException.php @@ -0,0 +1,17 @@ +<?php + +namespace Pheanstalk\Exception; + +use Pheanstalk\Exception; + +/** + * An exception originating as a beanstalkd server error + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class ServerException + extends Exception +{ +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ServerInternalErrorException.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ServerInternalErrorException.php new file mode 100644 index 0000000..f851388 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ServerInternalErrorException.php @@ -0,0 +1,15 @@ +<?php + +namespace Pheanstalk\Exception; + +/** + * An exception originating as a beanstalkd server error + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class ServerInternalErrorException + extends ServerException +{ +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ServerOutOfMemoryException.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ServerOutOfMemoryException.php new file mode 100644 index 0000000..55c56a1 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ServerOutOfMemoryException.php @@ -0,0 +1,15 @@ +<?php + +namespace Pheanstalk\Exception; + +/** + * An exception originating as a beanstalkd server error + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class ServerOutOfMemoryException + extends ServerException +{ +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ServerUnknownCommandException.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ServerUnknownCommandException.php new file mode 100644 index 0000000..f17f2c2 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/ServerUnknownCommandException.php @@ -0,0 +1,15 @@ +<?php + +namespace Pheanstalk\Exception; + +/** + * An exception originating as a beanstalkd server error + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class ServerUnknownCommandException + extends ServerException +{ +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/SocketException.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/SocketException.php new file mode 100644 index 0000000..508628f --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Exception/SocketException.php @@ -0,0 +1,15 @@ +<?php + +namespace Pheanstalk\Exception; + +/** + * An exception relating to the connection socket. + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class SocketException + extends ClientException +{ +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Job.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Job.php new file mode 100644 index 0000000..1bd9878 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Job.php @@ -0,0 +1,49 @@ +<?php + +namespace Pheanstalk; + +/** + * A job in a beanstalkd server + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Job +{ + const STATUS_READY = 'ready'; + const STATUS_RESERVED = 'reserved'; + const STATUS_DELAYED = 'delayed'; + const STATUS_BURIED = 'buried'; + + private $_id; + private $_data; + + /** + * @param int $id The job ID + * @param string $data The job data + */ + public function __construct($id, $data) + { + $this->_id = (int) $id; + $this->_data = $data; + } + + /** + * The job ID, unique on the beanstalkd server. + * @return int + */ + public function getId() + { + return $this->_id; + } + + /** + * The job data. + * @return string + */ + public function getData() + { + return $this->_data; + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Pheanstalk.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Pheanstalk.php new file mode 100644 index 0000000..ee83553 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Pheanstalk.php @@ -0,0 +1,436 @@ +<?php + +namespace Pheanstalk; + +/** + * Pheanstalk is a PHP client for the beanstalkd workqueue. + * The Pheanstalk class is a simple facade for the various underlying components. + * + * @see http://github.com/kr/beanstalkd + * @see http://xph.us/software/beanstalkd/ + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Pheanstalk implements PheanstalkInterface +{ + const VERSION = '3.0.2'; + + private $_connection; + private $_using = PheanstalkInterface::DEFAULT_TUBE; + private $_watching = array(PheanstalkInterface::DEFAULT_TUBE => true); + + /** + * @param string $host + * @param int $port + * @param int $connectTimeout + * @param bool $connectPersistent + */ + public function __construct($host, $port = PheanstalkInterface::DEFAULT_PORT, $connectTimeout = null, $connectPersistent = false) + { + $this->setConnection(new Connection($host, $port, $connectTimeout, $connectPersistent)); + } + + /** + * {@inheritdoc} + */ + public function setConnection(Connection $connection) + { + $this->_connection = $connection; + + return $this; + } + + /** + * {@inheritdoc} + */ + public function getConnection() + { + return $this->_connection; + } + + // ---------------------------------------- + + /** + * {@inheritdoc} + */ + public function bury($job, $priority = PheanstalkInterface::DEFAULT_PRIORITY) + { + $this->_dispatch(new Command\BuryCommand($job, $priority)); + } + + /** + * {@inheritdoc} + */ + public function delete($job) + { + $this->_dispatch(new Command\DeleteCommand($job)); + + return $this; + } + + /** + * {@inheritdoc} + */ + public function ignore($tube) + { + if (isset($this->_watching[$tube])) { + $this->_dispatch(new Command\IgnoreCommand($tube)); + unset($this->_watching[$tube]); + } + + return $this; + } + + /** + * {@inheritdoc} + */ + public function kick($max) + { + $response = $this->_dispatch(new Command\KickCommand($max)); + + return $response['kicked']; + } + + /** + * {@inheritdoc} + */ + public function kickJob($job) + { + $this->_dispatch(new Command\KickJobCommand($job)); + + return $this; + } + + /** + * {@inheritdoc} + */ + public function listTubes() + { + return (array) $this->_dispatch( + new Command\ListTubesCommand() + ); + } + + /** + * {@inheritdoc} + */ + public function listTubesWatched($askServer = false) + { + if ($askServer) { + $response = (array) $this->_dispatch( + new Command\ListTubesWatchedCommand() + ); + $this->_watching = array_fill_keys($response, true); + } + + return array_keys($this->_watching); + } + + /** + * {@inheritdoc} + */ + public function listTubeUsed($askServer = false) + { + if ($askServer) { + $response = $this->_dispatch( + new Command\ListTubeUsedCommand() + ); + $this->_using = $response['tube']; + } + + return $this->_using; + } + + /** + * {@inheritdoc} + */ + public function pauseTube($tube, $delay) + { + $this->_dispatch(new Command\PauseTubeCommand($tube, $delay)); + + return $this; + } + + /** + * {@inheritdoc} + */ + public function resumeTube($tube) + { + // Pause a tube with zero delay will resume the tube + $this->pauseTube($tube, 0); + + return $this; + } + + /** + * {@inheritdoc} + */ + public function peek($jobId) + { + $response = $this->_dispatch( + new Command\PeekCommand($jobId) + ); + + return new Job($response['id'], $response['jobdata']); + } + + /** + * {@inheritdoc} + */ + public function peekReady($tube = null) + { + if ($tube !== null) { + $this->useTube($tube); + } + + $response = $this->_dispatch( + new Command\PeekCommand(Command\PeekCommand::TYPE_READY) + ); + + return new Job($response['id'], $response['jobdata']); + } + + /** + * {@inheritdoc} + */ + public function peekDelayed($tube = null) + { + if ($tube !== null) { + $this->useTube($tube); + } + + $response = $this->_dispatch( + new Command\PeekCommand(Command\PeekCommand::TYPE_DELAYED) + ); + + return new Job($response['id'], $response['jobdata']); + } + + /** + * {@inheritdoc} + */ + public function peekBuried($tube = null) + { + if ($tube !== null) { + $this->useTube($tube); + } + + $response = $this->_dispatch( + new Command\PeekCommand(Command\PeekCommand::TYPE_BURIED) + ); + + return new Job($response['id'], $response['jobdata']); + } + + /** + * {@inheritdoc} + */ + public function put( + $data, + $priority = PheanstalkInterface::DEFAULT_PRIORITY, + $delay = PheanstalkInterface::DEFAULT_DELAY, + $ttr = PheanstalkInterface::DEFAULT_TTR + ) + { + $response = $this->_dispatch( + new Command\PutCommand($data, $priority, $delay, $ttr) + ); + + return $response['id']; + } + + /** + * {@inheritdoc} + */ + public function putInTube( + $tube, + $data, + $priority = PheanstalkInterface::DEFAULT_PRIORITY, + $delay = PheanstalkInterface::DEFAULT_DELAY, + $ttr = PheanstalkInterface::DEFAULT_TTR + ) + { + $this->useTube($tube); + + return $this->put($data, $priority, $delay, $ttr); + } + + /** + * {@inheritdoc} + */ + public function release( + $job, + $priority = PheanstalkInterface::DEFAULT_PRIORITY, + $delay = PheanstalkInterface::DEFAULT_DELAY + ) + { + $this->_dispatch( + new Command\ReleaseCommand($job, $priority, $delay) + ); + + return $this; + } + + /** + * {@inheritdoc} + */ + public function reserve($timeout = null) + { + $response = $this->_dispatch( + new Command\ReserveCommand($timeout) + ); + + $falseResponses = array( + Response::RESPONSE_DEADLINE_SOON, + Response::RESPONSE_TIMED_OUT, + ); + + if (in_array($response->getResponseName(), $falseResponses)) { + return false; + } else { + return new Job($response['id'], $response['jobdata']); + } + } + + /** + * {@inheritdoc} + */ + public function reserveFromTube($tube, $timeout = null) + { + $this->watchOnly($tube); + + return $this->reserve($timeout); + } + + /** + * {@inheritdoc} + */ + public function statsJob($job) + { + return $this->_dispatch(new Command\StatsJobCommand($job)); + } + + /** + * {@inheritdoc} + */ + public function statsTube($tube) + { + return $this->_dispatch(new Command\StatsTubeCommand($tube)); + } + + /** + * {@inheritdoc} + */ + public function stats() + { + return $this->_dispatch(new Command\StatsCommand()); + } + + /** + * {@inheritdoc} + */ + public function touch($job) + { + $this->_dispatch(new Command\TouchCommand($job)); + + return $this; + } + + /** + * {@inheritdoc} + */ + public function useTube($tube) + { + if ($this->_using != $tube) { + $this->_dispatch(new Command\UseCommand($tube)); + $this->_using = $tube; + } + + return $this; + } + + /** + * {@inheritdoc} + */ + public function watch($tube) + { + if (!isset($this->_watching[$tube])) { + $this->_dispatch(new Command\WatchCommand($tube)); + $this->_watching[$tube] = true; + } + + return $this; + } + + /** + * {@inheritdoc} + */ + public function watchOnly($tube) + { + $this->watch($tube); + + $ignoreTubes = array_diff_key($this->_watching, array($tube => true)); + foreach ($ignoreTubes as $ignoreTube => $true) { + $this->ignore($ignoreTube); + } + + return $this; + } + + // ---------------------------------------- + + /** + * Dispatches the specified command to the connection object. + * + * If a SocketException occurs, the connection is reset, and the command is + * re-attempted once. + * + * @param Command $command + * @return Response + */ + private function _dispatch($command) + { + try { + $response = $this->_connection->dispatchCommand($command); + } catch (Exception\SocketException $e) { + $this->_reconnect(); + $response = $this->_connection->dispatchCommand($command); + } + + return $response; + } + + /** + * Creates a new connection object, based on the existing connection object, + * and re-establishes the used tube and watchlist. + */ + private function _reconnect() + { + $new_connection = new Connection( + $this->_connection->getHost(), + $this->_connection->getPort(), + $this->_connection->getConnectTimeout() + ); + + $this->setConnection($new_connection); + + if ($this->_using != PheanstalkInterface::DEFAULT_TUBE) { + $tube = $this->_using; + $this->_using = null; + $this->useTube($tube); + } + + foreach ($this->_watching as $tube => $true) { + if ($tube != PheanstalkInterface::DEFAULT_TUBE) { + unset($this->_watching[$tube]); + $this->watch($tube); + } + } + + if (!isset($this->_watching[PheanstalkInterface::DEFAULT_TUBE])) { + $this->ignore(PheanstalkInterface::DEFAULT_TUBE); + } + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/PheanstalkInterface.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/PheanstalkInterface.php new file mode 100644 index 0000000..3e17dda --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/PheanstalkInterface.php @@ -0,0 +1,301 @@ +<?php + +namespace Pheanstalk; + +interface PheanstalkInterface +{ + const DEFAULT_PORT = 11300; + const DEFAULT_DELAY = 0; // no delay + const DEFAULT_PRIORITY = 1024; // most urgent: 0, least urgent: 4294967295 + const DEFAULT_TTR = 60; // 1 minute + const DEFAULT_TUBE = 'default'; + + /** + * @param Connection + * @return $this + */ + public function setConnection(Connection $connection); + + /** + * The internal connection object. + * Not required for general usage. + * + * @return Connection + */ + public function getConnection(); + + // ---------------------------------------- + + /** + * Puts a job into a 'buried' state, revived only by 'kick' command. + * + * @param Job $job + * @param int $priority + */ + public function bury($job, $priority = self::DEFAULT_PRIORITY); + + /** + * Permanently deletes a job. + * + * @param object $job Job + * @return $this + */ + public function delete($job); + + /** + * Remove the specified tube from the watchlist. + * + * Does not execute an IGNORE command if the specified tube is not in the + * cached watchlist. + * + * @param string $tube + * @return $this + */ + public function ignore($tube); + + /** + * Kicks buried or delayed jobs into a 'ready' state. + * If there are buried jobs, it will kick up to $max of them. + * Otherwise, it will kick up to $max delayed jobs. + * + * @param int $max The maximum jobs to kick + * @return int Number of jobs kicked + */ + public function kick($max); + + /** + * A variant of kick that operates with a single job. If the given job + * exists and is in a buried or delayed state, it will be moved to the + * ready queue of the the same tube where it currently belongs. + * + * @param Job $job Job + * @return $this + */ + public function kickJob($job); + + /** + * The names of all tubes on the server. + * + * @return array + */ + public function listTubes(); + + /** + * The names of the tubes being watched, to reserve jobs from. + * + * Returns the cached watchlist if $askServer is false (the default), + * or queries the server for the watchlist if $askServer is true. + * + * @param bool $askServer + * @return array + */ + public function listTubesWatched($askServer = false); + + /** + * The name of the current tube used for publishing jobs to. + * + * Returns the cached value if $askServer is false (the default), + * or queries the server for the currently used tube if $askServer + * is true. + * + * @param bool $askServer + * @return string + */ + public function listTubeUsed($askServer = false); + + /** + * Temporarily prevent jobs being reserved from the given tube. + * + * @param string $tube The tube to pause + * @param int $delay Seconds before jobs may be reserved from this queue. + * @return $this + */ + public function pauseTube($tube, $delay); + + /** + * Resume jobs for a given paused tube. + * + * @param string $tube The tube to resume + * @return $this + */ + public function resumeTube($tube); + + /** + * Inspect a job in the system, regardless of what tube it is in. + * + * @param int $jobId + * @return object Job + */ + public function peek($jobId); + + /** + * Inspect the next ready job in the specified tube. If no tube is + * specified, the currently used tube in used. + * + * @param string $tube + * @return object Job + */ + public function peekReady($tube = null); + + /** + * Inspect the shortest-remaining-delayed job in the specified tube. If no + * tube is specified, the currently used tube in used. + * + * @param string $tube + * @return object Job + */ + public function peekDelayed($tube = null); + + /** + * Inspect the next job in the list of buried jobs of the specified tube. + * If no tube is specified, the currently used tube in used. + * + * @param string $tube + * @return object Job + */ + public function peekBuried($tube = null); + + /** + * Puts a job on the queue. + * + * @param string $data The job data + * @param int $priority From 0 (most urgent) to 0xFFFFFFFF (least urgent) + * @param int $delay Seconds to wait before job becomes ready + * @param int $ttr Time To Run: seconds a job can be reserved for + * @return int The new job ID + */ + public function put($data, $priority = self::DEFAULT_PRIORITY, $delay = self::DEFAULT_DELAY, $ttr = self::DEFAULT_TTR); + + /** + * Puts a job on the queue using specified tube. + * + * Using this method is equivalent to calling useTube() then put(), with + * the added benefit that it will not execute the USE command if the client + * is already using the specified tube. + * + * @param string $tube The tube to use + * @param string $data The job data + * @param int $priority From 0 (most urgent) to 0xFFFFFFFF (least urgent) + * @param int $delay Seconds to wait before job becomes ready + * @param int $ttr Time To Run: seconds a job can be reserved for + * @return int The new job ID + */ + public function putInTube($tube, $data, $priority = self::DEFAULT_PRIORITY, $delay = self::DEFAULT_DELAY, $ttr = self::DEFAULT_TTR); + + /** + * Puts a reserved job back into the ready queue. + * + * Marks the jobs state as "ready" to be run by any client. + * It is normally used when the job fails because of a transitory error. + * + * @param object $job Job + * @param int $priority From 0 (most urgent) to 0xFFFFFFFF (least urgent) + * @param int $delay Seconds to wait before job becomes ready + * @return $this + */ + public function release($job, $priority = self::DEFAULT_PRIORITY, $delay = self::DEFAULT_DELAY); + + /** + * Reserves/locks a ready job in a watched tube. + * + * A non-null timeout uses the 'reserve-with-timeout' instead of 'reserve'. + * + * A timeout value of 0 will cause the server to immediately return either a + * response or TIMED_OUT. A positive value of timeout will limit the amount of + * time the client will block on the reserve request until a job becomes + * available. + * + * @param int $timeout + * @return object Job + */ + public function reserve($timeout = null); + + /** + * Reserves/locks a ready job from the specified tube. + * + * A non-null timeout uses the 'reserve-with-timeout' instead of 'reserve'. + * + * A timeout value of 0 will cause the server to immediately return either a + * response or TIMED_OUT. A positive value of timeout will limit the amount of + * time the client will block on the reserve request until a job becomes + * available. + * + * Using this method is equivalent to calling watch(), ignore() then + * reserve(), with the added benefit that it will not execute uneccessary + * WATCH or IGNORE commands if the client is already watching the + * specified tube. + * + * @param string $tube + * @param int $timeout + * @return object Job + */ + public function reserveFromTube($tube, $timeout = null); + + /** + * Gives statistical information about the specified job if it exists. + * + * @param Job|int $job + * @return object + */ + public function statsJob($job); + + /** + * Gives statistical information about the specified tube if it exists. + * + * @param string $tube + * @return object + */ + public function statsTube($tube); + + /** + * Gives statistical information about the beanstalkd system as a whole. + * + * @return object + */ + public function stats(); + + /** + * Allows a worker to request more time to work on a job. + * + * This is useful for jobs that potentially take a long time, but you still want + * the benefits of a TTR pulling a job away from an unresponsive worker. A worker + * may periodically tell the server that it's still alive and processing a job + * (e.g. it may do this on DEADLINE_SOON). + * + * @param Job $job + * @return $this + */ + public function touch($job); + + /** + * Change to the specified tube name for publishing jobs to. + * This method would be called 'use' if it were not a PHP reserved word. + * + * Does not execute a USE command if the client is already using the + * specified tube. + * + * @param string $tube + * @return $this + */ + public function useTube($tube); + + /** + * Add the specified tube to the watchlist, to reserve jobs from. + * + * Does not execute a WATCH command if the client is already watching the + * specified tube. + * + * @param string $tube + * @return $this + */ + public function watch($tube); + + /** + * Adds the specified tube to the watchlist, to reserve jobs from, and + * ignores any other tubes remaining on the watchlist. + * + * @param string $tube + * @return $this + */ + public function watchOnly($tube); +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Response.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Response.php new file mode 100644 index 0000000..0b2a810 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Response.php @@ -0,0 +1,46 @@ +<?php + +namespace Pheanstalk; + +/** + * A response from the beanstalkd server + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +interface Response +{ + // global error responses + const RESPONSE_OUT_OF_MEMORY = 'OUT_OF_MEMORY'; + const RESPONSE_INTERNAL_ERROR = 'INTERNAL_ERROR'; + const RESPONSE_DRAINING = 'DRAINING'; + const RESPONSE_BAD_FORMAT = 'BAD_FORMAT'; + const RESPONSE_UNKNOWN_COMMAND = 'UNKNOWN_COMMAND'; + + // command responses + const RESPONSE_INSERTED = 'INSERTED'; + const RESPONSE_BURIED = 'BURIED'; + const RESPONSE_EXPECTED_CRLF = 'EXPECTED_CRLF'; + const RESPONSE_JOB_TOO_BIG = 'JOB_TOO_BIG'; + const RESPONSE_USING = 'USING'; + const RESPONSE_DEADLINE_SOON = 'DEADLINE_SOON'; + const RESPONSE_RESERVED = 'RESERVED'; + const RESPONSE_DELETED = 'DELETED'; + const RESPONSE_NOT_FOUND = 'NOT_FOUND'; + const RESPONSE_RELEASED = 'RELEASED'; + const RESPONSE_WATCHING = 'WATCHING'; + const RESPONSE_NOT_IGNORED = 'NOT_IGNORED'; + const RESPONSE_FOUND = 'FOUND'; + const RESPONSE_KICKED = 'KICKED'; + const RESPONSE_OK = 'OK'; + const RESPONSE_TIMED_OUT = 'TIMED_OUT'; + const RESPONSE_TOUCHED = 'TOUCHED'; + const RESPONSE_PAUSED = 'PAUSED'; + + /** + * The name of the response + * @return string + */ + public function getResponseName(); +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Response/ArrayResponse.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Response/ArrayResponse.php new file mode 100644 index 0000000..23c03ed --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Response/ArrayResponse.php @@ -0,0 +1,69 @@ +<?php + +namespace Pheanstalk\Response; + +use Pheanstalk\Response; + +/** + * A response with an ArrayObject interface to key=>value data + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class ArrayResponse + extends \ArrayObject + implements Response +{ + private $_name; + + /** + * @param string $name + * @param array $data + */ + public function __construct($name, $data) + { + $this->_name = $name; + parent::__construct($data); + } + + /* (non-phpdoc) + * @see Response::getResponseName() + */ + public function getResponseName() + { + return $this->_name; + } + + /** + * Object property access to ArrayObject data. + */ + public function __get($property) + { + $key = $this->_transformPropertyName($property); + + return isset($this[$key]) ? $this[$key] : null; + } + + /** + * Object property access to ArrayObject data. + */ + public function __isset($property) + { + $key = $this->_transformPropertyName($property); + + return isset($this[$key]); + } + + // ---------------------------------------- + + /** + * Tranform underscored property name to hyphenated array key. + * @param string + * @return string + */ + private function _transformPropertyName($propertyName) + { + return str_replace('_', '-', $propertyName); + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/ResponseParser.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/ResponseParser.php new file mode 100644 index 0000000..c166330 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/ResponseParser.php @@ -0,0 +1,21 @@ +<?php + +namespace Pheanstalk; + +/** + * A parser for response data sent from the beanstalkd server + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +interface ResponseParser +{ + /** + * Parses raw response data into a Response object + * @param string $responseLine Without trailing CRLF + * @param string $responseData (null if no data) + * @return object Response + */ + public function parseResponse($responseLine, $responseData); +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Socket.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Socket.php new file mode 100644 index 0000000..c55efa0 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Socket.php @@ -0,0 +1,41 @@ +<?php + +namespace Pheanstalk; + +/** + * A mockable wrapper around PHP "socket" or "file pointer" access. + * Only the subset of socket actions required by Pheanstalk are provided. + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +interface Socket +{ + /** + * Writes data to the socket. + * @param string $data + * @return void + */ + public function write($data); + + /** + * Reads up to $length bytes from the socket. + * + * @return string + */ + public function read($length); + + /** + * Reads up to the next new-line, or $length - 1 bytes. + * Trailing whitespace is trimmed. + * + * @param int + */ + public function getLine($length = null); + + /** + * Disconnect the socket; subsequent usage of the socket will fail. + */ + public function disconnect(); +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Socket/NativeSocket.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Socket/NativeSocket.php new file mode 100644 index 0000000..192dd73 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Socket/NativeSocket.php @@ -0,0 +1,130 @@ +<?php + +namespace Pheanstalk\Socket; + +use Pheanstalk\Exception; +use Pheanstalk\Socket; + +/** + * A Socket implementation around a fsockopen() stream. + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class NativeSocket implements Socket +{ + /** + * The default timeout for a blocking read on the socket + */ + const SOCKET_TIMEOUT = 1; + + /** + * Number of retries for attempted writes which return zero length. + */ + const WRITE_RETRIES = 8; + + private $_socket; + + /** + * @param string $host + * @param int $port + * @param int $connectTimeout + */ + public function __construct($host, $port, $connectTimeout, $connectPersistent) + { + if ($connectPersistent) { + $this->_socket = $this->_wrapper() + ->pfsockopen($host, $port, $errno, $errstr, $connectTimeout, $connectPersistent); + } else { + $this->_socket = $this->_wrapper() + ->fsockopen($host, $port, $errno, $errstr, $connectTimeout, $connectPersistent); + } + + if (!$this->_socket) { + throw new Exception\ConnectionException($errno, $errstr . " (connecting to $host:$port)"); + } + + $this->_wrapper() + ->stream_set_timeout($this->_socket, self::SOCKET_TIMEOUT); + } + + /* (non-phpdoc) + * @see Socket::write() + */ + public function write($data) + { + $history = new WriteHistory(self::WRITE_RETRIES); + + for ($written = 0, $fwrite = 0; $written < strlen($data); $written += $fwrite) { + $fwrite = $this->_wrapper() + ->fwrite($this->_socket, substr($data, $written)); + + $history->log($fwrite); + + if ($history->isFullWithNoWrites()) { + throw new Exception\SocketException(sprintf( + 'fwrite() failed to write data after %u tries', + self::WRITE_RETRIES + )); + } + } + } + + /* (non-phpdoc) + * @see Socket::write() + */ + public function read($length) + { + $read = 0; + $parts = ''; + + while ($read < $length && !$this->_wrapper()->feof($this->_socket)) { + $data = $this->_wrapper() + ->fread($this->_socket, $length - $read); + + if ($data === false) { + throw new Exception\SocketException('fread() returned false'); + } + + $read += strlen($data); + $parts .= $data; + } + + return $parts; + } + + /* (non-phpdoc) + * @see Socket::write() + */ + public function getLine($length = null) + { + do { + $data = isset($length) ? + $this->_wrapper()->fgets($this->_socket, $length) : + $this->_wrapper()->fgets($this->_socket); + + if ($this->_wrapper()->feof($this->_socket)) { + throw new Exception\SocketException("Socket closed by server!"); + } + } while ($data === false); + + return rtrim($data); + } + + public function disconnect() + { + $this->_wrapper()->fclose($this->_socket); + } + + // ---------------------------------------- + + /** + * Wrapper class for all stream functions. + * Facilitates mocking/stubbing stream operations in unit tests. + */ + private function _wrapper() + { + return StreamFunctions::instance(); + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Socket/StreamFunctions.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Socket/StreamFunctions.php new file mode 100644 index 0000000..6f793f8 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Socket/StreamFunctions.php @@ -0,0 +1,99 @@ +<?php + +namespace Pheanstalk\Socket; + +/** + * Wrapper around PHP stream functions. + * Facilitates mocking/stubbing stream operations in unit tests. + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class StreamFunctions +{ + private static $_instance; + + /** + * Singleton accessor. + */ + public static function instance() + { + if (empty(self::$_instance)) { + self::$_instance = new self; + } + + return self::$_instance; + } + + /** + * Sets an alternative or mocked instance. + */ + public function setInstance($instance) + { + self::$_instance = $instance; + } + + /** + * Unsets the instance, so a new one will be created. + */ + public function unsetInstance() + { + self::$_instance = null; + } + + // ---------------------------------------- + + public function feof($handle) + { + return feof($handle); + } + + public function fgets($handle, $length = null) + { + if (isset($length)) { + return fgets($handle, $length); + } else { + return fgets($handle); + } + } + + public function fopen($filename, $mode) + { + return fopen($filename, $mode); + } + + public function fread($handle, $length) + { + return fread($handle, $length); + } + + public function fsockopen($hostname, $port = -1, &$errno = null, &$errstr = null, $timeout = null) + { + return @fsockopen($hostname, $port, $errno, $errstr, $timeout); + } + + public function pfsockopen($hostname, $port = -1, &$errno = null, &$errstr = null, $timeout = null) + { + return @pfsockopen($hostname, $port, $errno, $errstr, $timeout); + } + + public function fwrite($handle, $string, $length = null) + { + if (isset($length)) { + return fwrite($handle, $string, $length); + } else { + return fwrite($handle, $string); + } + } + + public function fclose($handle) + { + fclose($handle); + } + + public function stream_set_timeout($stream, $seconds, $microseconds = 0) + { + return stream_set_timeout($stream, $seconds, $microseconds); + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Socket/WriteHistory.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Socket/WriteHistory.php new file mode 100644 index 0000000..a01fe4c --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Socket/WriteHistory.php @@ -0,0 +1,64 @@ +<?php + +namespace Pheanstalk\Socket; + +/** + * A limited history of recent socket write length/success. + * Facilitates retrying zero-length writes a limited number of times, + * avoiding infinite loops. + * + * Based on a patch from https://github.com/leprechaun + * https://github.com/pda/pheanstalk/pull/24 + * + * A bitfield could be used instead of an array for efficiency. + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class WriteHistory +{ + private $_limit; + private $_data = array(); + + /** + * @param int $limit + */ + public function __construct($limit) + { + $this->_limit = $limit; + } + + /** + * Whether the history has reached its limit of entries. + */ + public function isFull() + { + return count($this->_data) >= $this->_limit; + } + + public function hasWrites() + { + return (bool) array_sum($this->_data); + } + + public function isFullWithNoWrites() + { + return $this->isFull() && !$this->hasWrites(); + } + + /** + * Logs the return value from a write call. + * Returns the input value. + */ + public function log($write) + { + if ($this->isFull()) { + array_shift($this->_data); + } + + $this->_data[] = (int) $write; + + return $write; + } +} diff --git a/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/YamlResponseParser.php b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/YamlResponseParser.php new file mode 100644 index 0000000..9f0a1d3 --- /dev/null +++ b/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/YamlResponseParser.php @@ -0,0 +1,83 @@ +<?php + +namespace Pheanstalk; + +/** + * A response parser for commands that return a subset of YAML. + * Expected response is 'OK', 'NOT_FOUND' response is also handled. + * Parser expects either a YAML list or dictionary, depending on mode. + * + * @author Paul Annesley + * @package Pheanstalk + * @license http://www.opensource.org/licenses/mit-license.php + */ +class YamlResponseParser + implements \Pheanstalk\ResponseParser +{ + const MODE_LIST = 'list'; + const MODE_DICT = 'dict'; + + private $_mode; + + /** + * @param string $mode self::MODE_* + */ + public function __construct($mode) + { + $this->_mode = $mode; + } + + /* (non-phpdoc) + * @see ResponseParser::parseResponse() + */ + public function parseResponse($responseLine, $responseData) + { + if ($responseLine == Response::RESPONSE_NOT_FOUND) { + throw new Exception\ServerException(sprintf( + 'Server reported %s', + $responseLine + )); + } + + if (!preg_match('#^OK \d+$#', $responseLine)) { + throw new Exception\ServerException(sprintf( + 'Unhandled response: %s', + $responseLine + )); + } + + $dataLines = preg_split("#[\r\n]+#", rtrim($responseData)); + if (isset($dataLines[0]) && $dataLines[0] == '---') { + array_shift($dataLines); // discard header line + } + + $data = array_map(array($this, '_mapYamlList'), $dataLines); + + if ($this->_mode == self::MODE_DICT) { + // TODO: do this better. + $array = array(); + foreach ($data as $line) { + if (!preg_match('#(\S+):\s*(.*)#', $line, $matches)) { + throw new Exception("YAML parse error for line: $line"); + } + + list(, $key, $value) = $matches; + + $array[$key] = $value; + } + $data = $array; + } + + return new Response\ArrayResponse('OK', $data); + } + + /** + * Callback for array_map to process YAML lines. + * @param string $line + * @return string + */ + private function _mapYamlList($line) + { + return ltrim($line, '- '); + } +} |
