stream_socket_accept() 和 fgetc 在其他工作 websocket 服务器中"randomly"停滞



我正在用php运行一个webssocket服务器,它的工作做得很好。除了一件事:

有时 stream_socket_accept() 会停滞 60 秒。这可能在服务器启动几秒钟后发生,也可能在服务器启动数小时后发生。我自己无法重现这种行为。

有时它会在调用 stream_socket_accept() 时停止,有时它会在返回后直接从客户端读取标头时stream_socket_accept停止。

更重要的是:default_socket_timeout系统范围设置为 10 秒,php.ini 显示此值。

即使有stream_socket_accept($socket,0);它也会停滞。给定的超时将被简单地忽略。

我的问题:

  1. 为什么它首先会停滞不前?当侦听器指示新连接时,stream_socket_accept不应该停止,不是吗?

  2. 为什么 fgetc 停在连接的第一个字节上(在检索传入连接的标头时,紧跟在 stream_socket_accept() 之后?

  3. 为什么当我肯定将其更改为 10 秒(显示在 phpinfo() 中)时,它正好停滞 60 秒(套接字的标准default_timeout)。

我的想法快用完了。

任何想法都受到高度赞赏。

这是套接字的完整代码(它也执行一些代理管理逻辑工作)。

我希望有人能发现一些东西。

<?php
// STALLING HAPPENS SOMETIMES IN LINE 52 fgetc()
// AND IN LINE 271 stream_socket_accept()
class WS {
    const
        //! UUID magic string
        Magic='258EAFA5-E914-47DA-95CA-C5AB0DC85B11',
        //! Packet size
        Packet=65536;
    //@{ Mask bits for first byte of header
    const
        Text=0x01,
        Binary=0x02,
        Close=0x08,
        Ping=0x09,
        Pong=0x0a,
        OpCode=0x0f,
        Finale=0x80;
    //@}
    //@{ Mask bits for second byte of header
    const
        Length=0x7f;
    //@}
}
//! RFC6455 server socket
class Server {
    protected
        $addr,
        $ctx,
        $wait,
        $sockets,
        $agents=[],
        $events=[];
    /**
    *   Allocate stream socket
    *   @return NULL
    *   @param $socket resource
    **/
    function alloc($socket) {
        trace("Fetching http header...");        
        // if header does not start with "GET"
        // immediately close connection
        foreach( ['G','E','T'] as $get) {
            $character=fgetc($socket);
            $metadata=stream_get_meta_data($socket);
            // this MUST NOT BE REWRITTEN!
            // unread_bytes can not be checked against 0
            if ($character==$get && !feof($socket) && $metadata['unread_bytes'] > 0)
                continue;
            else {
                trace("Error: Header does not start with GET – connection closed");
                stream_socket_shutdown($socket,STREAM_SHUT_RDWR);
                return;
            }
        }
        $str="GET";
        do {
            $str.=fgetc($socket);
            $metadata=stream_get_meta_data($socket);
        } while (!feof($socket) && $metadata['unread_bytes'] > 0);
        // Get WebSocket headers
        $hdrs=[];
        $CRLF="rn";
        $verb=NULL;
        $uri=NULL;
        foreach (explode($CRLF,$str) as $line)
            if (preg_match('/^(w+)s(.+)sHTTP/1.d$/',
                trim($line),$match)) {
                $verb=$match[1];
                $uri=$match[2];
            }
            else
            if (preg_match('/^(.+): (.+)/',trim($line),$match))
                // Standardize header
                $hdrs[
                    strtr(
                        ucwords(
                            strtolower(
                                strtr($match[1],'-',' ')
                            )
                        ),' ','-'
                    )
                ]=$match[2];
        if (empty($hdrs['Upgrade']) &&
            empty($hdrs['Sec-Websocket-Key'])) {
            // Not a WebSocket request
            $this->write(
                $socket,
                $str='HTTP/1.1 400 Bad Request'.$CRLF.
                    'Connection: close'.$CRLF.$CRLF
            );
            stream_socket_shutdown($socket,STREAM_SHUT_RDWR);
            // 1 @fclose($socket);
            return;
        }
        // Handshake
        $bytes=$this->write(
            $socket,
            $str='HTTP/1.1 101 Switching Protocols'.$CRLF.
                'Upgrade: websocket'.$CRLF.
                'Connection: Upgrade'.$CRLF.
                'Sec-WebSocket-Accept: '.
                    base64_encode(
                        sha1(
                            $hdrs['Sec-Websocket-Key'].
                            WS::Magic,
                            TRUE
                        )
                    ).$CRLF.$CRLF
        );
        if (is_int($bytes)) {
            // Connect agent to server
            $this->sockets[]=$socket;
            $this->agents[(int)$socket]=
                new Agent($this,$socket,$verb,$uri,$hdrs);
        }
        else
            stream_socket_shutdown($socket,STREAM_SHUT_RDWR);
    }
    /**
    *   Free stream socket
    *   @return bool
    *   @param $socket resource
    **/
    function free($socket) {
        unset($this->sockets[array_search($socket,$this->sockets)]);
        unset($this->agents[(int)$socket]);
        stream_socket_shutdown($socket,STREAM_SHUT_WR);
        // 1 @fclose($socket);
    }
    /**
    *   Read from stream socket
    *   @return string|FALSE
    *   @param $socket resource
    **/
    function read($socket) {
        return is_string($str=@fread($socket,WS::Packet)) && strlen($str)?
            $str:
            FALSE;
    }
    /**
    *   Write to stream socket
    *   @return int|FALSE
    *   @param $socket resource
    *   @param $str string
    **/
    function write($socket,$str) {
        for ($i=0,$bytes=0;$i<strlen($str);$i+=$bytes) {
            if (($bytes=@fwrite($socket,substr($str,$i))) &&
                @fflush($socket))
                continue;
            return FALSE;
        }
        return $bytes;
    }
    /**
    *   Return socket agents
    *   @return array
    *   @param $uri string
    ***/
    function agents($uri=NULL) {
        return array_filter(
            $this->agents,
            function($val) use($uri) {
                return $uri?($val->uri()==$uri):TRUE;
            }
        );
    }
    /**
    *   Return event handlers
    *   @return array
    **/
    function events() {
        return $this->events;
    }
    /**
    *   Bind function to event handler
    *   @return object
    *   @param $event string
    *   @param $func callable
    **/
    function on($event,$func) {
        $this->events[$event]=$func;
        return $this;
    }
    /**
    *   Execute the server process
    *   @return object
    **/
    function run() {
        $fw=Base::instance();
        // Activate WebSocket listener
        $listen=stream_socket_server(
            $this->addr,$errno,$errstr,
            STREAM_SERVER_BIND|STREAM_SERVER_LISTEN,
            $this->ctx
        );
        // stream_set_timeout($listen,0);
        stream_set_read_buffer($listen,WS::Packet);
        stream_set_write_buffer($listen,WS::Packet);
        $socket=socket_import_stream($listen);
        socket_set_option(
            $socket,
            SOL_SOCKET,
            SO_REUSEADDR,
            1
        );
        socket_set_option(
            $socket,
            SOL_SOCKET,
            SO_LINGER,
            ['l_onoff'=>1,'l_linger'=>1]
        );
        register_shutdown_function(function() use($listen) {
            foreach ($this->sockets as $socket)
                if ($socket!=$listen)
                    $this->free($socket);
            stream_socket_shutdown($listen,STREAM_SHUT_RDWR);
            @fclose($listen);
            if (isset($this->events['stop']) &&
                is_callable($func=$this->events['stop']))
                $func($this);
        });
        if ($errstr)
            user_error($errstr,E_USER_ERROR);
        if (isset($this->events['start']) &&
            is_callable($func=$this->events['start']))
            $func($this);
        $this->sockets=[$listen];
        $empty=[];
        $wait=$this->wait;
        while (TRUE) {
            $active=$this->sockets;
            $mark=microtime(TRUE);
            trace("Waiting for socket action...");
            $count=@stream_select(
                $active,$empty,$empty,(int)$wait,round(1e6*($wait-(int)$wait))
            );
            if (is_bool($count) && $wait) {
                if (isset($this->events['error']) &&
                    is_callable($func=$this->events['error']))
                    $func($this);
                die;
            }
            if ($count) {
                // Process active connections
                foreach ($active as $socket) {
                    if (!is_resource($socket))
                        continue;                       
                    if ($socket==$listen) {
                        trace("New connection pending...");
                        if ($socket=stream_socket_accept($listen)) {
                            $this->alloc($socket);
                            trace("alloc() finished");
                        }
                        else {
                            trace("Connection failed...");                      
                            continue;                           
                        }
                    }
                    else {
                        $id=(int)$socket;
                        if (isset($this->agents[$id]) &&
                            $raw=$this->agents[$id]->fetch()) {
                            list($op,$data)=$raw;
                            // Dispatch
                            switch ($op & WS::OpCode) {
                            case WS::Text:
                                $data=trim($data);
                            case WS::Binary:
                            case WS::Pong:
                                if (isset($this->events['receive']) &&
                                    is_callable($func=$this->events['receive']))
                                    $func($this->agents[$id],$op,$data);
                                break;
                            case WS::Ping:
                                $this->agents[$id]->send(WS::Pong);
                                break;
                            default:
                                if (isset($this->events['invalid']) &&
                                    is_callable($func=$this->events['invalid']))
                                    $func($this->agents[$id],$op,$data);
                            case WS::Close:
                                $this->free($socket);
                                break;
                            }
                        }
                    }
                }
                $wait-=microtime(TRUE)-$mark;
                while ($wait<1e-6) {
                    $wait+=$this->wait;
                    $count=0;
                }
            }
            if (!$count) {
                $mark=microtime(TRUE);
                foreach ($this->sockets as $socket) {
                    if (!is_resource($socket))
                        continue;
                    $id=(int)$socket;
                    if ($socket!=$listen &&
                        isset($this->agents[$id]) &&
                        is_string($this->agents[$id]->send(WS::Ping)) &&
                        isset($this->events['idle']) &&
                        is_callable($func=$this->events['idle']))
                        $func($this->agents[$id]);
                }
                $wait=$this->wait-microtime(TRUE)+$mark;
            }
        }
    }
    /**
    *   Instantiate object
    *   @return object
    *   @param $addr string
    *   @param $ctx resource
    *   @param $wait int
    **/
    function __construct($addr,$ctx=NULL,$wait=60) {
        $this->addr=$addr;
        $this->ctx=$ctx?:stream_context_create();
        $this->wait=$wait;
        $this->events=[];
    }
}
//! RFC6455 remote socket
class Agent {
    protected
        $server,
        $id,
        $socket,
        $flag,
        $verb,
        $uri,
        $headers,
        $events,
        $buffer='';
    /**
    *   Return server instance
    *   @return object
    **/
    function server() {
        return $this->server;
    }
    /**
    *   Return socket ID
    *   @return string
    **/
    function id() {
        return $this->id;
    }
    /**
    *   Return request method
    *   @return string
    **/
    function verb() {
        return $this->verb;
    }
    /**
    *   Return request URI
    *   @return string
    **/
    function uri() {
        return $this->uri;
    }
    /**
    *   Return socket headers
    *   @return string
    **/
    function headers() {
        return $this->headers;
    }
    /**
    *   Frame and transmit payload
    *   @return string|FALSE
    *   @param $socket resource
    *   @param $op int
    *   @param $payload string
    **/
    function send($op,$data='') {
        $mask=WS::Finale | $op & WS::OpCode;
        $len=strlen($data);
        $str='';
        if ($len<126)
            $str=pack('CC',$mask,$len);
        else
        if ($len>125 && $len<65536)
            $str=pack('CCn',$mask,126,$len);
        else
        if ($len>65535)
            $str=pack('CCNN',$mask,127,$len);
        $str.=$data;
        $server=$this->server();
        if (is_bool($server->write($this->socket,$str))) {
            $server->free($this->socket);
            return FALSE;
        }
        if (!in_array($op,[WS::Pong,WS::Close]) &&
            isset($this->events['send']) &&
            is_callable($func=$this->events['send']))
            $func($this,$op,$data);
        return $data;
    }
    /**
    *   Retrieve and unmask payload
    *   @return array|FALSE
    **/
    function fetch() {
        // Unmask payload
        $server=$this->server();
        if (is_bool($str=$server->read($this->socket))) {
            $server->free($this->socket);
            return FALSE;
        }
        $buf=($this->buffer.=$str);
        $op=ord($buf[0]) & WS::OpCode;
        $len=ord($buf[1]) & WS::Length;
        $pos=2;
        if ($len==126) {
            $len=ord($buf[2])*256+ord($buf[3]);
            $pos+=2;
        }
        else
        if ($len==127) {
            for ($i=0,$len=0;$i<8;$i++)
                $len=$len*256+ord($buf[$i+2]);
            $pos+=8;
        }
        for ($i=0,$mask=[];$i<4;$i++)
            $mask[$i]=ord($buf[$pos+$i]);
        $pos+=4;
        if (strlen($buf)<$len+$pos)
            return FALSE;
        $this->buffer='';
        for ($i=0,$data='';$i<$len;$i++)
            $data.=chr(ord($buf[$pos+$i])^$mask[$i%4]);
        return [$op,$data];
    }
    /**
    *   Destroy object
    *   @return NULL
    **/
    function __destruct() {
        if (isset($this->events['disconnect']) &&
            is_callable($func=$this->events['disconnect']))
            $func($this);
    }
    /**
    *   Instantiate object
    *   @return object
    *   @param $server object
    *   @param $socket resource
    *   @param $verb string
    *   @param $uri string
    *   @param $hdrs array
    **/
    function __construct($server,$socket,$verb,$uri,array $hdrs) {
        $this->server=$server;
        $this->id=stream_socket_get_name($socket,TRUE);
        $this->socket=$socket;
        $this->verb=$verb;
        $this->uri=$uri;
        $this->headers=$hdrs;
        $this->events=$server->events();
        if (isset($this->events['connect']) &&
            is_callable($func=$this->events['connect']))
            $func($this);
    }
}
/**
*   Simple console logger
*   @return NULL
*   @param $line string
**/
function trace($line) {
    echo "r".date('H:i:s').' '.$line.PHP_EOL;
}
/**
*   Process handler for graceful exit (routed to registered shutdown handler)
*   @return NULL
**/
function kill($signal) {
    die;
}
pcntl_signal(SIGINT,'kill');
pcntl_signal(SIGTERM,'kill');
if (PHP_SAPI!='cli') {
    // Prohibit direct HTTP access
    header('HTTP/1.1 404 Not Found');
    die;
}
chdir(__DIR__);
require('lib/base.php');
error_reporting((E_ALL|E_STRICT)&~(E_NOTICE|E_USER_NOTICE|E_WARNING|E_USER_WARNING));
// Load .ini files
$fw=Base::instance();
$fw->
    config('app/ini/config.ini')->
    config('app/ini/dev.ini');
if (!is_file($pid='ws.pid') ||
    !is_dir('/proc/'.file_get_contents($pid))) {
    // Override any error handler specified in .ini files
    ini_set('error_log','/dev/null');
    $fw->DEBUG=2;
    $fw->ONERROR=function($fw) {
        trace($fw->get('ERROR.text'));
        foreach (explode("n",trim($fw->get('ERROR.trace'))) as $line)
            trace($line);
    };
    $fw->VERBOSE=(bool)preg_grep('/[/-]v/',$argv);
    // Instantiate the server
    $ws=new Server(
        $fw->get('SITE.websocket'),
        stream_context_create([
            'ssl'=>$fw->get('SSL')+[
                'allow_self_signed'=>TRUE,
                'verify_peer'=>FALSE
            ]
        ])
    );
    // Intercept OpenSSL errors
    $err=FALSE;
    while (TRUE)
        if ($msg=openssl_error_string()) {
            $err=TRUE;
            trace($msg);
        }
        else
            break;
    if ($err)
        die;
    $ws->
        on('start',function($server) use($fw) {
            trace('WebSocket server started ('.$fw->get('SITE.websocket').')');
            file_put_contents('ws.pid',getmypid());
        })->
        on('error',function($server) use($fw) {
            if ($err=socket_last_error()) {
                trace(socket_strerror($err));
                socket_clear_error();
            }
            if ($err=error_get_last())
                trace($err['message']);
        })->
        on('stop',function($server) use($fw) {
            trace('Shutting down ('.$fw->get('SITE.websocket').')');
            @unlink('ws.pid');
        })->
        on('connect',function($agent) use($fw) {
            trace(
                '(0x00'.$agent->uri().') '.$agent->id().' connected '.
                '<'.(count($agent->server()->agents())+1).'>'
            );
            if ($fw->VERBOSE) {
                $hdrs=$agent->headers();
                trace(
                    $hdrs['User-Agent'].' '.
                    '[v'.$hdrs['Sec-Websocket-Version'].']'
                );
            }
            $agent->hash=dechex(crc32(file_get_contents(__FILE__)));
            $agent->feature=[];
            $agent->query='';
            $agent->session=[];
        })->
        on('disconnect',function($agent) use($fw) {
            trace('(0x08'.$agent->uri().') '.$agent->id().' disconnected');
            if ($err=socket_last_error()) {
                trace(socket_strerror($err));
                socket_clear_error();
            }
            if (preg_match('/^/(.+)/',$agent->uri(),$match)) {
                $class='WebSocket\'.$match[1];
                if (isset($agent->feature[$class])) {
                    $obj=$agent->feature[$class];
                    foreach ($agent->feature as $key=>$obj)
                        if (is_callable([$obj,'disconnect']))
                            $fw->call([$obj,'disconnect'],[$fw,$agent]);
                }
            }
        })->
        on('idle',function($agent) use($fw) {
            foreach ($agent->feature as $key=>$obj)
                if (is_callable([$obj,'idle']))
                    $fw->call([$obj,'idle'],[$fw,$agent]);
        })->
        on('receive',function($agent,$op,$data) use($fw) {
            switch($op) {
            case WS::Pong:
                $text='pong';
                break;
            case WS::Text:
                $data=trim($data);
            case WS::Binary:
                $text='data';
                break;
            default:
                $text='unknown';
                break;
            }
            trace(
                '(0x'.str_pad(dechex($op),2,'0',STR_PAD_LEFT).
                $agent->uri().') '.$agent->id().' '.$text.' received'
            );
            if ($op==WS::Text && $data) {
                if ($fw->VERBOSE)
                    trace($data);
                $in=json_decode($data,TRUE);
                if (json_last_error()==JSON_ERROR_NONE &&
                    preg_match('/^/(.+)/',$agent->uri(),$match)) {
                    $class='WebSocket\'.$match[1];
                    if (isset($agent->feature[$class])) {
                        if (isset($in['query']))
                            $agent->query=$in['query'];
                        if (isset($in['session']))
                            foreach ($in['session'] as $key=>$val)
                                $agent->session[$key]=$val;
                        $obj=$agent->feature[$class];
                        if (isset($in['func']) &&
                            is_callable([$obj,$in['func']]))
                            $fw->call([$obj,$in['func']],[$fw,$agent]);
                        return;
                    }
                    else
                    if (isset($in['nonce']) &&
                        isset($agent->headers()['Cookie']) &&
                        preg_match(
                            '/PHPSESSID=(w+)/',
                            $agent->headers()['Cookie'],
                            $match
                        ) &&
                        Bcrypt::instance()->
                            verify($match[1],'$2y$12$'.$in['nonce'])) {
                        if (isset($in['session']))
                            foreach ($in['session'] as $key=>$val)
                                $agent->session[$key]=$val;
                        if (empty($agent->feature[$class]))
                            $agent->feature[$class]=new $class($fw,$agent);
                        return;
                    }
                    else
                        trace(
                            '(0x00'.$agent->uri().') '.$agent->id().' '.
                            'authentication failed');
                }
            }
        })->
        on('send',function($agent,$op,$data) use($fw) {
            switch($op) {
            case WS::Ping:
                $text='ping';
                break;
            case WS::Text:
                $data=trim($data);
            case WS::Binary:
                $text='data';
                break;
            default:
                $text='unknown';
                break;
            }
            trace(
                '(0x'.str_pad(dechex($op),2,'0',STR_PAD_LEFT).
                $agent->uri().') '.$agent->id().' '.$text.' sent'
            );
            if ($op==WS::Text && $fw->VERBOSE)
                trace($data);
        })->
        on('invalid',function($agent,$op,$data) use($fw) {
            trace(
                '(0x'.str_pad(dechex($op),2,'0',STR_PAD_LEFT).
                $agent->uri().') '.$agent->id().' invalid opcode'
            );
        })->
        run();
}
else
    trace('A socket server instance is already running!');

这部分是对以下问题的答案:

超时被忽略了,因为 php-cli 当然使用不同的 php.ini。我完全忘记了这一点。一个简单的ini_set就成功了。

相关内容

  • 没有找到相关文章

最新更新