summaryrefslogtreecommitdiff
path: root/intern.gospeladlershof.de/vendor/pda/pheanstalk/src/Socket/NativeSocket.php
blob: 192dd73dbf55c2f1e77f522ce9136304fb628e1f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
<?php

namespace Pheanstalk\Socket;

use Pheanstalk\Exception;
use Pheanstalk\Socket;

/**
 * A Socket implementation around a fsockopen() stream.
 *
 * @author Paul Annesley
 * @package Pheanstalk
 * @license http://www.opensource.org/licenses/mit-license.php
 */
class NativeSocket implements Socket
{
    /**
     * The default timeout for a blocking read on the socket
     */
    const SOCKET_TIMEOUT = 1;

    /**
     * Number of retries for attempted writes which return zero length.
     */
    const WRITE_RETRIES = 8;

    private $_socket;

    /**
     * @param string $host
     * @param int    $port
     * @param int    $connectTimeout
     */
    public function __construct($host, $port, $connectTimeout, $connectPersistent)
    {
        if ($connectPersistent) {
            $this->_socket = $this->_wrapper()
                ->pfsockopen($host, $port, $errno, $errstr, $connectTimeout, $connectPersistent);
        } else {
            $this->_socket = $this->_wrapper()
                ->fsockopen($host, $port, $errno, $errstr, $connectTimeout, $connectPersistent);
        }

        if (!$this->_socket) {
            throw new Exception\ConnectionException($errno, $errstr . " (connecting to $host:$port)");
        }

        $this->_wrapper()
            ->stream_set_timeout($this->_socket, self::SOCKET_TIMEOUT);
    }

    /* (non-phpdoc)
     * @see Socket::write()
     */
    public function write($data)
    {
        $history = new WriteHistory(self::WRITE_RETRIES);

        for ($written = 0, $fwrite = 0; $written < strlen($data); $written += $fwrite) {
            $fwrite = $this->_wrapper()
                ->fwrite($this->_socket, substr($data, $written));

            $history->log($fwrite);

            if ($history->isFullWithNoWrites()) {
                throw new Exception\SocketException(sprintf(
                    'fwrite() failed to write data after %u tries',
                    self::WRITE_RETRIES
                ));
            }
        }
    }

    /* (non-phpdoc)
     * @see Socket::write()
     */
    public function read($length)
    {
        $read = 0;
        $parts = '';

        while ($read < $length && !$this->_wrapper()->feof($this->_socket)) {
            $data = $this->_wrapper()
                ->fread($this->_socket, $length - $read);

            if ($data === false) {
                throw new Exception\SocketException('fread() returned false');
            }

            $read += strlen($data);
            $parts .= $data;
        }

        return $parts;
    }

    /* (non-phpdoc)
     * @see Socket::write()
     */
    public function getLine($length = null)
    {
        do {
            $data = isset($length) ?
                $this->_wrapper()->fgets($this->_socket, $length) :
                $this->_wrapper()->fgets($this->_socket);

            if ($this->_wrapper()->feof($this->_socket)) {
                throw new Exception\SocketException("Socket closed by server!");
            }
        } while ($data === false);

        return rtrim($data);
    }

    public function disconnect()
    {
        $this->_wrapper()->fclose($this->_socket);
    }

    // ----------------------------------------

    /**
     * Wrapper class for all stream functions.
     * Facilitates mocking/stubbing stream operations in unit tests.
     */
    private function _wrapper()
    {
        return StreamFunctions::instance();
    }
}