我正在使用一个composer包owlycode/streaming-bird来调用twitter流API。流API在你的应用程序和twitter之间打开一个套接字,以接收具有指定关键字的tweet。在我的例子中,关键字是"你好"。
以下是使用owlycode/流媒体鸟包的代码:
<?PHP
$oauthToken = '';
$oauthSecret = '';
$consumerKey = '';
$consumerSecret = '';
$bird = new StreamingBird($consumerKey, $consumerSecret, $oauthToken, $oauthSecret);
$bird
->createStreamReader(StreamReader::METHOD_FILTER)
->setTrack(['hello']) // Fetch every tweet containing one of the following words
->consume(function ($tweet) { // Now we provide a callback to execute on every received tweet.
echo '------------------------' . "n";
echo $tweet['text'] . "n";
});
?>
我的问题是,当这个连接被错误关闭时,我无法知道。因此,我无法再次连接推特。
PHP中有什么东西可以根据域名搜索打开的套接字吗?
也许像
check_if_socket_open('https://stream.twitter.com/1.1/statuses/firehose.json')
注意:我不能使用socket_get_status,因为我没有socket变量。
如果您无法访问套接字,则无法检查套接字状态。
如果您正在搜索一个不涉及StreamBird代码的解决方法,那么您可以基于OwlyCodeStreamingBird
创建一个类,然后实现其connect
方法:
<?php
class MyStreamReader extends OwlyCodeStreamingBird
{
protected $stream;
protected function connect($timeout = 5, $attempts = 10)
{
return $this->stream = parent::connect($timeout, $attempts);
}
protected function isConnected()
{
return $this->stream && stream_get_meta_data($this->stream)['eof'];
}
}
class MyStreamingBird extends OwlyCodeStreamingBird
{
public function createStreamReader($method)
{
$oauth = new OwlyCodeStreamingBirdOauth($this->consumerKey,
$this->consumerSecret, $this->oauthToken, $this->oauthSecret);
return new MyStreamReader(new OwlyCodeStreamingBirdConnection(), $oauth, $method);
}
}
$bird = new MyStreamingBird($consumerKey, $consumerSecret, $oauthToken, $oauthSecret);
$reader = $bird->createStreamReader(StreamReader::METHOD_FILTER); // ...
$reader->isConnected();
您还可以基于OwlyCodeStreamingBird
创建一个类,该类也可以访问流。但是,您必须跟踪这些流,因为这是一种工厂方法。
如果您只是在包本身中进行一个小的添加,那么看起来您毕竟可以使用socket_get_status。
这两个函数在streamreader类中,这里提供了套接字处理程序
public function consume(callable $handler)
{
$this->running = true;
while ($this->running) { /// while $this->running is true socket will try to reconnect always.
$this->consumeOnce($handler);
}
}
protected function consumeOnce(callable $handler)
{
$this->connection = $this->connect();
$lastStreamActivity = time();
$this->connection->read(function ($tweet) use (&$lastStreamActivity, $handler) {
$idle = (time() - $lastStreamActivity);
$this->monitor->stat('max_idle_time', $idle);
$this->monitor->stat('idle_time', $idle);
$this->monitor->stat('tweets', 1);
$lastStreamActivity = time();
call_user_func($handler, $tweet, $this->monitor);
});
$this->connection->close();
}
在连接类中,您有可用的套接字处理程序,因此您可以在尝试从套接字读取数据时获取套接字状态。以下是经过轻微修改的读取功能
public function read(callable $callback, $timeout = 5)
{
$this->pool = [$this->connection];
stream_set_timeout($this->connection, $timeout);
$info = stream_get_meta_data($this->connection);
while ($this->connection !== null && !feof($this->connection) && stream_select($this->pool, $fdw, $fde, $timeout) !== false && $info['timed_out']!==true) {
// @todo safeguard no tweets but connection OK. (reconnect)
$this->pool = [$this->connection];
$chunkInfo = trim(fgets($this->connection));
if (!$chunkInfo) {
continue;
}
$len = hexdec($chunkInfo) + 2;
$streamInput = '';
while (!feof($this->connection)) {
$streamInput .= fread($this->connection, $len-strlen($streamInput));
if (strlen($streamInput)>=$len) {
break;
}
}
$this->buffer .= substr($streamInput, 0, -2);
$data = json_decode($this->buffer, true);
if ($data) {
call_user_func($callback, $data);
$this->buffer = '';
}
}
}
查看StreamingBird
类的实现,您可以轻松地自己创建streamreader实例,并完全控制连接:
namespace OwlyCodeStreamingBird;
// Let's instantiate the Oauth signature handler
$oauth = new Oauth($consumerKey, $consumerSecret, $oauthToken, $oauthSecret);
// Let's create our own Connection object!
$connection = new Connection();
// And here comes our Reader
$reader = new StreamReader($connection, $oauth, StreamReader::METHOD_FILTER);
$reader->setTrack(['hello'])
->consume(function ($tweet) {
echo '------------------------' . "n";
echo $tweet['text'] . "n";
});
// Voilà
print_r(socket_get_status($connection->connection));
Connection
对象将套接字资源存储在公共属性$connection
:中
public $connection;
// ...
@$this->connection = fsockopen($host, $port, $errNo, $errStr, $timeout);