是否可以使用 Phalcon 既提供页面又作为 websockets 服务器运行?
在构建我的 Web 应用程序时,最好在同一框架内进行所有服务器端编码。
如果可能,您能否解释如何设置它或提供教程链接?
如果不可能,您可以推荐一个为这两种目的而设计的基于 PHP 的框架吗?
谢谢!
Phalcon CLI 任务
<?php
namespace ProjectCliTasks;
use ZMQ,
PDO,
ReactZMQContext as ZMQContext,
ReactEventLoopFactory as EventLoopFactory,
RatchetServerIoServer,
RatchetHttpHttpServer,
RatchetWebSocketWsServer,
RatchetSessionSessionProvider,
SymfonyComponentHttpFoundationSessionStorageHandler,
WebirdCliTaskBase,
WebirdCliChat;
/**
* Task for websocket
*
*/
class ServiceTask extends TaskBase
{
public function mainAction(array $params)
{
echo "The default action inside of the ", CURRENT_TASK, " task is not configuredn";
}
public function websocketListenAction(array $params)
{
// $this->ensureRunningAsWebUser();
$opts = $params['opts'];
$config = $this->config;
$wsPort = (isset($opts['wsport'])) ? $opts['wsport'] : $config->app->wsPort;
$zmqPort = (isset($opts['zmqport'])) ? $opts['zmqport'] : $config->app->zmqPort;
$loop = EventLoopFactory::create();
$chat = new Chat();
$chat->setDI($this->getDI());
// Listen for the web server to make a ZeroMQ push after an ajax request
// $context = new ZMQContext($loop);
// $pull = $context->getSocket(ZMQ::SOCKET_PULL);
// $pull->bind("tcp://127.0.0.1:${zmqPort}"); // Binding to 127.0.0.1 means the only client that can connect is itself
// $pull->on('message', [$chat, 'onUserJoin']);
$wsServer = new WsServer($chat);
$ioServer = IoServer::factory(
new HttpServer($wsServer),
$wsPort
);
echo "websocket listening on port $wsPort in " . ENVIRONMENT . " moden";
$ioServer->run();
}
}
瑞奇的聊天应用程序
<?php
namespace WebirdCli;
use PhalconDIInjectable as DIInjectable,
RatchetMessageComponentInterface,
RatchetConnectionInterface,
WebirdDatabaseSessionReader;
/**
* Basic chat logic for a Ratchet application
*/
class Chat extends DIInjectable implements MessageComponentInterface
{
protected $clients;
/**
* Class constructor
*/
public function __construct()
{
$this->clients = new SplObjectStorage();
}
/**
* Connection open function
*
* @param RatchetConnectionInterface $conn
*/
public function onOpen(ConnectionInterface $conn)
{
try {
echo "New connection! ({$conn->resourceId})n";
$cookies = $conn->WebSocket->request->getCookies();
if (! array_key_exists('PHPSESSID', $cookies)) {
echo "Connection Rejected: Session Cookie was not present.n";
return $conn->close();
}
$sessionId = $cookies['PHPSESSID'];
$sessionReader = $this->getDI()->getSessionReader();
if ($sessionReader->read($sessionId) === false) {
echo "Connection Rejected: Session could not be found.n";
return $conn->close();
}
if (($identity = $sessionReader->get('auth-identity')) === false) {
echo "Connection Rejected: session auth-identity data is not present.n";
return $conn->close();
}
if (!isset($identity['role'])) {
echo "Connection Rejected: session user role data is not present.n";
return $conn->close();
}
$role = $identity['role'];
$acl = $this->getDI()->getAcl();
if (!$this->acl->isAllowed($role, 'websocket', 'open')) {
echo "Connection Rejected: user does not have permission to open a websocket.n";
return $conn->close();
}
} catch (Exception $e) {
echo $e->getMessage() . "n";
}
// var_export($sessionReader->get('id'));
// var_export($sess);
// Store the new connection to send messages to later
$this->clients->attach($conn, $sessionId);
}
/**
* Receives a message when registered in the websocket server
*
* @param RatchetConnectionInterface $from
* @param string $msg
*/
public function onMessage(ConnectionInterface $from, $msg) {
$numRecv = $this->clients->count() - 1;
// echo $from->Session->getName() . "n";
echo sprintf('Connection %d sending message "%s" to %d other connection%s' . "n"
, $from->resourceId, $msg, $numRecv, $numRecv == 1 ? '' : 's');
foreach ($this->clients as $client) {
if ($from !== $client) {
// The sender is not the receiver, send to each client connected
$client->send($msg);
}
}
}
/**
* Handle closing of a connection
*
* @param RatchetConnectionInterface $conn
*/
public function onClose(ConnectionInterface $conn) {
// The connection is closed, remove it, as we can no longer send it messages
$this->clients->detach($conn);
echo "Connection {$conn->resourceId} has disconnectedn";
}
/**
* Handles exceptions in the application
*
* @param RatchetConnectionInterface $from
* @param Exception $e
*/
public function onError(ConnectionInterface $conn, Exception $e) {
echo "An error has occurred: {$e->getMessage()}n";
$conn->close();
}
}
只读数据库会话读取器。
<?php
namespace Webird;
use PhalconDb;
/**
* Read-only session access
*
*/
class DatabaseSessionReader
{
private $options;
private $data;
/**
* {@inheritdoc}
*
* @param array $options
* @throws PhalconSessionException
*/
public function __construct($options = null)
{
if (!isset($options['db'])) {
throw new Exception("The parameter 'db' is required");
}
if (!isset($options['unique_id'])) {
throw new Exception("The parameter unique_id is required");
}
if (!isset($options['db_table'])) {
throw new Exception("The parameter 'db_table' is required");
}
if (!isset($options['db_id_col'])) {
throw new Exception("The parameter 'db_id_col' is required");
}
if (!isset($options['db_data_col'])) {
throw new Exception("The parameter 'db_data_col' is required");
}
if (!isset($options['db_time_col'])) {
throw new Exception("The parameter 'db_time_col' is required");
}
$this->options = $options;
$this->data = false;
}
protected function getOptions()
{
return $this->options;
}
/**
* {@inheritdoc}
* @param string $sessionId
* @return string
*/
public function read($sessionId)
{
$options = $this->getOptions();
$row = $options['db']->fetchOne(
sprintf(
'SELECT %s FROM %s WHERE %s = ?',
$options['db']->escapeIdentifier($options['db_data_col']),
$options['db']->escapeIdentifier($options['db_table']),
$options['db']->escapeIdentifier($options['db_id_col'])
),
Db::FETCH_NUM,
[$sessionId]
);
$this->data = (empty($row[0])) ? false : $this->unserialize_php($row[0]);
return ($this->data !== false);
}
public function has($key)
{
if (!is_string($key)) {
throw new Exception('The key must be a string');
}
if ($this->data == false) {
return false;
}
$uniqueId = $this->getOptions()['unique_id'];
return (array_key_exists("{$uniqueId}{$key}", $this->data));
}
public function get($key)
{
if (!$this->has($key)) {
return false;
}
$uniqueId = $this->getOptions()['unique_id'];
return $this->data["{$uniqueId}{$key}"];
}
private function unserialize_php($session_data)
{
$return_data = array();
$offset = 0;
while ($offset < strlen($session_data)) {
if (!strstr(substr($session_data, $offset), "|")) {
throw new Exception("invalid data, remaining: " . substr($session_data, $offset));
}
$pos = strpos($session_data, "|", $offset);
$num = $pos - $offset;
$varname = substr($session_data, $offset, $num);
$offset += $num + 1;
$data = unserialize(substr($session_data, $offset));
$return_data[$varname] = $data;
$offset += strlen(serialize($data));
}
return $return_data;
}
}
这是用于设置会话读取器的 DI。
$di->set('sessionReader', function() use ($di) {
$config = $di->get('config');
$connection = $di->get('db');
$sessionReader = new DatabaseSessionReader([
'db' => $connection,
'unique_id' => $config->session->unique_id,
'db_table' => $config->session->db_table,
'db_id_col' => $config->session->db_id_col,
'db_data_col' => $config->session->db_data_col,
'db_time_col' => $config->session->db_time_col,
'uniqueId' => $config->session->unique_id
]);
return $sessionReader;
});
参考: https://forum.phalconphp.com/discussion/3539/phalcon-websockets