我正在用php运行一个webssocket服务器,它的工作做得很好。除了一件事:
有时 stream_socket_accept() 会停滞 60 秒。这可能在服务器启动几秒钟后发生,也可能在服务器启动数小时后发生。我自己无法重现这种行为。
有时它会在调用 stream_socket_accept() 时停止,有时它会在返回后直接从客户端读取标头时stream_socket_accept停止。
更重要的是:default_socket_timeout系统范围设置为 10 秒,php.ini 显示此值。
即使有stream_socket_accept($socket,0);它也会停滞。给定的超时将被简单地忽略。
我的问题:
-
为什么它首先会停滞不前?当侦听器指示新连接时,stream_socket_accept不应该停止,不是吗?
-
为什么 fgetc 停在连接的第一个字节上(在检索传入连接的标头时,紧跟在 stream_socket_accept() 之后?
-
为什么当我肯定将其更改为 10 秒(显示在 phpinfo() 中)时,它正好停滞 60 秒(套接字的标准default_timeout)。
我的想法快用完了。
任何想法都受到高度赞赏。
这是套接字的完整代码(它也执行一些代理管理逻辑工作)。
我希望有人能发现一些东西。
<?php
// STALLING HAPPENS SOMETIMES IN LINE 52 fgetc()
// AND IN LINE 271 stream_socket_accept()
class WS {
const
//! UUID magic string
Magic='258EAFA5-E914-47DA-95CA-C5AB0DC85B11',
//! Packet size
Packet=65536;
//@{ Mask bits for first byte of header
const
Text=0x01,
Binary=0x02,
Close=0x08,
Ping=0x09,
Pong=0x0a,
OpCode=0x0f,
Finale=0x80;
//@}
//@{ Mask bits for second byte of header
const
Length=0x7f;
//@}
}
//! RFC6455 server socket
class Server {
protected
$addr,
$ctx,
$wait,
$sockets,
$agents=[],
$events=[];
/**
* Allocate stream socket
* @return NULL
* @param $socket resource
**/
function alloc($socket) {
trace("Fetching http header...");
// if header does not start with "GET"
// immediately close connection
foreach( ['G','E','T'] as $get) {
$character=fgetc($socket);
$metadata=stream_get_meta_data($socket);
// this MUST NOT BE REWRITTEN!
// unread_bytes can not be checked against 0
if ($character==$get && !feof($socket) && $metadata['unread_bytes'] > 0)
continue;
else {
trace("Error: Header does not start with GET – connection closed");
stream_socket_shutdown($socket,STREAM_SHUT_RDWR);
return;
}
}
$str="GET";
do {
$str.=fgetc($socket);
$metadata=stream_get_meta_data($socket);
} while (!feof($socket) && $metadata['unread_bytes'] > 0);
// Get WebSocket headers
$hdrs=[];
$CRLF="rn";
$verb=NULL;
$uri=NULL;
foreach (explode($CRLF,$str) as $line)
if (preg_match('/^(w+)s(.+)sHTTP/1.d$/',
trim($line),$match)) {
$verb=$match[1];
$uri=$match[2];
}
else
if (preg_match('/^(.+): (.+)/',trim($line),$match))
// Standardize header
$hdrs[
strtr(
ucwords(
strtolower(
strtr($match[1],'-',' ')
)
),' ','-'
)
]=$match[2];
if (empty($hdrs['Upgrade']) &&
empty($hdrs['Sec-Websocket-Key'])) {
// Not a WebSocket request
$this->write(
$socket,
$str='HTTP/1.1 400 Bad Request'.$CRLF.
'Connection: close'.$CRLF.$CRLF
);
stream_socket_shutdown($socket,STREAM_SHUT_RDWR);
// 1 @fclose($socket);
return;
}
// Handshake
$bytes=$this->write(
$socket,
$str='HTTP/1.1 101 Switching Protocols'.$CRLF.
'Upgrade: websocket'.$CRLF.
'Connection: Upgrade'.$CRLF.
'Sec-WebSocket-Accept: '.
base64_encode(
sha1(
$hdrs['Sec-Websocket-Key'].
WS::Magic,
TRUE
)
).$CRLF.$CRLF
);
if (is_int($bytes)) {
// Connect agent to server
$this->sockets[]=$socket;
$this->agents[(int)$socket]=
new Agent($this,$socket,$verb,$uri,$hdrs);
}
else
stream_socket_shutdown($socket,STREAM_SHUT_RDWR);
}
/**
* Free stream socket
* @return bool
* @param $socket resource
**/
function free($socket) {
unset($this->sockets[array_search($socket,$this->sockets)]);
unset($this->agents[(int)$socket]);
stream_socket_shutdown($socket,STREAM_SHUT_WR);
// 1 @fclose($socket);
}
/**
* Read from stream socket
* @return string|FALSE
* @param $socket resource
**/
function read($socket) {
return is_string($str=@fread($socket,WS::Packet)) && strlen($str)?
$str:
FALSE;
}
/**
* Write to stream socket
* @return int|FALSE
* @param $socket resource
* @param $str string
**/
function write($socket,$str) {
for ($i=0,$bytes=0;$i<strlen($str);$i+=$bytes) {
if (($bytes=@fwrite($socket,substr($str,$i))) &&
@fflush($socket))
continue;
return FALSE;
}
return $bytes;
}
/**
* Return socket agents
* @return array
* @param $uri string
***/
function agents($uri=NULL) {
return array_filter(
$this->agents,
function($val) use($uri) {
return $uri?($val->uri()==$uri):TRUE;
}
);
}
/**
* Return event handlers
* @return array
**/
function events() {
return $this->events;
}
/**
* Bind function to event handler
* @return object
* @param $event string
* @param $func callable
**/
function on($event,$func) {
$this->events[$event]=$func;
return $this;
}
/**
* Execute the server process
* @return object
**/
function run() {
$fw=Base::instance();
// Activate WebSocket listener
$listen=stream_socket_server(
$this->addr,$errno,$errstr,
STREAM_SERVER_BIND|STREAM_SERVER_LISTEN,
$this->ctx
);
// stream_set_timeout($listen,0);
stream_set_read_buffer($listen,WS::Packet);
stream_set_write_buffer($listen,WS::Packet);
$socket=socket_import_stream($listen);
socket_set_option(
$socket,
SOL_SOCKET,
SO_REUSEADDR,
1
);
socket_set_option(
$socket,
SOL_SOCKET,
SO_LINGER,
['l_onoff'=>1,'l_linger'=>1]
);
register_shutdown_function(function() use($listen) {
foreach ($this->sockets as $socket)
if ($socket!=$listen)
$this->free($socket);
stream_socket_shutdown($listen,STREAM_SHUT_RDWR);
@fclose($listen);
if (isset($this->events['stop']) &&
is_callable($func=$this->events['stop']))
$func($this);
});
if ($errstr)
user_error($errstr,E_USER_ERROR);
if (isset($this->events['start']) &&
is_callable($func=$this->events['start']))
$func($this);
$this->sockets=[$listen];
$empty=[];
$wait=$this->wait;
while (TRUE) {
$active=$this->sockets;
$mark=microtime(TRUE);
trace("Waiting for socket action...");
$count=@stream_select(
$active,$empty,$empty,(int)$wait,round(1e6*($wait-(int)$wait))
);
if (is_bool($count) && $wait) {
if (isset($this->events['error']) &&
is_callable($func=$this->events['error']))
$func($this);
die;
}
if ($count) {
// Process active connections
foreach ($active as $socket) {
if (!is_resource($socket))
continue;
if ($socket==$listen) {
trace("New connection pending...");
if ($socket=stream_socket_accept($listen)) {
$this->alloc($socket);
trace("alloc() finished");
}
else {
trace("Connection failed...");
continue;
}
}
else {
$id=(int)$socket;
if (isset($this->agents[$id]) &&
$raw=$this->agents[$id]->fetch()) {
list($op,$data)=$raw;
// Dispatch
switch ($op & WS::OpCode) {
case WS::Text:
$data=trim($data);
case WS::Binary:
case WS::Pong:
if (isset($this->events['receive']) &&
is_callable($func=$this->events['receive']))
$func($this->agents[$id],$op,$data);
break;
case WS::Ping:
$this->agents[$id]->send(WS::Pong);
break;
default:
if (isset($this->events['invalid']) &&
is_callable($func=$this->events['invalid']))
$func($this->agents[$id],$op,$data);
case WS::Close:
$this->free($socket);
break;
}
}
}
}
$wait-=microtime(TRUE)-$mark;
while ($wait<1e-6) {
$wait+=$this->wait;
$count=0;
}
}
if (!$count) {
$mark=microtime(TRUE);
foreach ($this->sockets as $socket) {
if (!is_resource($socket))
continue;
$id=(int)$socket;
if ($socket!=$listen &&
isset($this->agents[$id]) &&
is_string($this->agents[$id]->send(WS::Ping)) &&
isset($this->events['idle']) &&
is_callable($func=$this->events['idle']))
$func($this->agents[$id]);
}
$wait=$this->wait-microtime(TRUE)+$mark;
}
}
}
/**
* Instantiate object
* @return object
* @param $addr string
* @param $ctx resource
* @param $wait int
**/
function __construct($addr,$ctx=NULL,$wait=60) {
$this->addr=$addr;
$this->ctx=$ctx?:stream_context_create();
$this->wait=$wait;
$this->events=[];
}
}
//! RFC6455 remote socket
class Agent {
protected
$server,
$id,
$socket,
$flag,
$verb,
$uri,
$headers,
$events,
$buffer='';
/**
* Return server instance
* @return object
**/
function server() {
return $this->server;
}
/**
* Return socket ID
* @return string
**/
function id() {
return $this->id;
}
/**
* Return request method
* @return string
**/
function verb() {
return $this->verb;
}
/**
* Return request URI
* @return string
**/
function uri() {
return $this->uri;
}
/**
* Return socket headers
* @return string
**/
function headers() {
return $this->headers;
}
/**
* Frame and transmit payload
* @return string|FALSE
* @param $socket resource
* @param $op int
* @param $payload string
**/
function send($op,$data='') {
$mask=WS::Finale | $op & WS::OpCode;
$len=strlen($data);
$str='';
if ($len<126)
$str=pack('CC',$mask,$len);
else
if ($len>125 && $len<65536)
$str=pack('CCn',$mask,126,$len);
else
if ($len>65535)
$str=pack('CCNN',$mask,127,$len);
$str.=$data;
$server=$this->server();
if (is_bool($server->write($this->socket,$str))) {
$server->free($this->socket);
return FALSE;
}
if (!in_array($op,[WS::Pong,WS::Close]) &&
isset($this->events['send']) &&
is_callable($func=$this->events['send']))
$func($this,$op,$data);
return $data;
}
/**
* Retrieve and unmask payload
* @return array|FALSE
**/
function fetch() {
// Unmask payload
$server=$this->server();
if (is_bool($str=$server->read($this->socket))) {
$server->free($this->socket);
return FALSE;
}
$buf=($this->buffer.=$str);
$op=ord($buf[0]) & WS::OpCode;
$len=ord($buf[1]) & WS::Length;
$pos=2;
if ($len==126) {
$len=ord($buf[2])*256+ord($buf[3]);
$pos+=2;
}
else
if ($len==127) {
for ($i=0,$len=0;$i<8;$i++)
$len=$len*256+ord($buf[$i+2]);
$pos+=8;
}
for ($i=0,$mask=[];$i<4;$i++)
$mask[$i]=ord($buf[$pos+$i]);
$pos+=4;
if (strlen($buf)<$len+$pos)
return FALSE;
$this->buffer='';
for ($i=0,$data='';$i<$len;$i++)
$data.=chr(ord($buf[$pos+$i])^$mask[$i%4]);
return [$op,$data];
}
/**
* Destroy object
* @return NULL
**/
function __destruct() {
if (isset($this->events['disconnect']) &&
is_callable($func=$this->events['disconnect']))
$func($this);
}
/**
* Instantiate object
* @return object
* @param $server object
* @param $socket resource
* @param $verb string
* @param $uri string
* @param $hdrs array
**/
function __construct($server,$socket,$verb,$uri,array $hdrs) {
$this->server=$server;
$this->id=stream_socket_get_name($socket,TRUE);
$this->socket=$socket;
$this->verb=$verb;
$this->uri=$uri;
$this->headers=$hdrs;
$this->events=$server->events();
if (isset($this->events['connect']) &&
is_callable($func=$this->events['connect']))
$func($this);
}
}
/**
* Simple console logger
* @return NULL
* @param $line string
**/
function trace($line) {
echo "r".date('H:i:s').' '.$line.PHP_EOL;
}
/**
* Process handler for graceful exit (routed to registered shutdown handler)
* @return NULL
**/
function kill($signal) {
die;
}
pcntl_signal(SIGINT,'kill');
pcntl_signal(SIGTERM,'kill');
if (PHP_SAPI!='cli') {
// Prohibit direct HTTP access
header('HTTP/1.1 404 Not Found');
die;
}
chdir(__DIR__);
require('lib/base.php');
error_reporting((E_ALL|E_STRICT)&~(E_NOTICE|E_USER_NOTICE|E_WARNING|E_USER_WARNING));
// Load .ini files
$fw=Base::instance();
$fw->
config('app/ini/config.ini')->
config('app/ini/dev.ini');
if (!is_file($pid='ws.pid') ||
!is_dir('/proc/'.file_get_contents($pid))) {
// Override any error handler specified in .ini files
ini_set('error_log','/dev/null');
$fw->DEBUG=2;
$fw->ONERROR=function($fw) {
trace($fw->get('ERROR.text'));
foreach (explode("n",trim($fw->get('ERROR.trace'))) as $line)
trace($line);
};
$fw->VERBOSE=(bool)preg_grep('/[/-]v/',$argv);
// Instantiate the server
$ws=new Server(
$fw->get('SITE.websocket'),
stream_context_create([
'ssl'=>$fw->get('SSL')+[
'allow_self_signed'=>TRUE,
'verify_peer'=>FALSE
]
])
);
// Intercept OpenSSL errors
$err=FALSE;
while (TRUE)
if ($msg=openssl_error_string()) {
$err=TRUE;
trace($msg);
}
else
break;
if ($err)
die;
$ws->
on('start',function($server) use($fw) {
trace('WebSocket server started ('.$fw->get('SITE.websocket').')');
file_put_contents('ws.pid',getmypid());
})->
on('error',function($server) use($fw) {
if ($err=socket_last_error()) {
trace(socket_strerror($err));
socket_clear_error();
}
if ($err=error_get_last())
trace($err['message']);
})->
on('stop',function($server) use($fw) {
trace('Shutting down ('.$fw->get('SITE.websocket').')');
@unlink('ws.pid');
})->
on('connect',function($agent) use($fw) {
trace(
'(0x00'.$agent->uri().') '.$agent->id().' connected '.
'<'.(count($agent->server()->agents())+1).'>'
);
if ($fw->VERBOSE) {
$hdrs=$agent->headers();
trace(
$hdrs['User-Agent'].' '.
'[v'.$hdrs['Sec-Websocket-Version'].']'
);
}
$agent->hash=dechex(crc32(file_get_contents(__FILE__)));
$agent->feature=[];
$agent->query='';
$agent->session=[];
})->
on('disconnect',function($agent) use($fw) {
trace('(0x08'.$agent->uri().') '.$agent->id().' disconnected');
if ($err=socket_last_error()) {
trace(socket_strerror($err));
socket_clear_error();
}
if (preg_match('/^/(.+)/',$agent->uri(),$match)) {
$class='WebSocket\'.$match[1];
if (isset($agent->feature[$class])) {
$obj=$agent->feature[$class];
foreach ($agent->feature as $key=>$obj)
if (is_callable([$obj,'disconnect']))
$fw->call([$obj,'disconnect'],[$fw,$agent]);
}
}
})->
on('idle',function($agent) use($fw) {
foreach ($agent->feature as $key=>$obj)
if (is_callable([$obj,'idle']))
$fw->call([$obj,'idle'],[$fw,$agent]);
})->
on('receive',function($agent,$op,$data) use($fw) {
switch($op) {
case WS::Pong:
$text='pong';
break;
case WS::Text:
$data=trim($data);
case WS::Binary:
$text='data';
break;
default:
$text='unknown';
break;
}
trace(
'(0x'.str_pad(dechex($op),2,'0',STR_PAD_LEFT).
$agent->uri().') '.$agent->id().' '.$text.' received'
);
if ($op==WS::Text && $data) {
if ($fw->VERBOSE)
trace($data);
$in=json_decode($data,TRUE);
if (json_last_error()==JSON_ERROR_NONE &&
preg_match('/^/(.+)/',$agent->uri(),$match)) {
$class='WebSocket\'.$match[1];
if (isset($agent->feature[$class])) {
if (isset($in['query']))
$agent->query=$in['query'];
if (isset($in['session']))
foreach ($in['session'] as $key=>$val)
$agent->session[$key]=$val;
$obj=$agent->feature[$class];
if (isset($in['func']) &&
is_callable([$obj,$in['func']]))
$fw->call([$obj,$in['func']],[$fw,$agent]);
return;
}
else
if (isset($in['nonce']) &&
isset($agent->headers()['Cookie']) &&
preg_match(
'/PHPSESSID=(w+)/',
$agent->headers()['Cookie'],
$match
) &&
Bcrypt::instance()->
verify($match[1],'$2y$12$'.$in['nonce'])) {
if (isset($in['session']))
foreach ($in['session'] as $key=>$val)
$agent->session[$key]=$val;
if (empty($agent->feature[$class]))
$agent->feature[$class]=new $class($fw,$agent);
return;
}
else
trace(
'(0x00'.$agent->uri().') '.$agent->id().' '.
'authentication failed');
}
}
})->
on('send',function($agent,$op,$data) use($fw) {
switch($op) {
case WS::Ping:
$text='ping';
break;
case WS::Text:
$data=trim($data);
case WS::Binary:
$text='data';
break;
default:
$text='unknown';
break;
}
trace(
'(0x'.str_pad(dechex($op),2,'0',STR_PAD_LEFT).
$agent->uri().') '.$agent->id().' '.$text.' sent'
);
if ($op==WS::Text && $fw->VERBOSE)
trace($data);
})->
on('invalid',function($agent,$op,$data) use($fw) {
trace(
'(0x'.str_pad(dechex($op),2,'0',STR_PAD_LEFT).
$agent->uri().') '.$agent->id().' invalid opcode'
);
})->
run();
}
else
trace('A socket server instance is already running!');
这部分是对以下问题的答案:
超时被忽略了,因为 php-cli 当然使用不同的 php.ini。我完全忘记了这一点。一个简单的ini_set就成功了。