Websockets with Phalcon



是否可以使用 Phalcon 既提供页面又作为 websockets 服务器运行?

在构建我的 Web 应用程序时,最好在同一框架内进行所有服务器端编码。

如果可能,您能否解释如何设置它或提供教程链接?

如果不可能,您可以推荐一个为这两种目的而设计的基于 PHP 的框架吗?

谢谢!

Phalcon CLI 任务

<?php
namespace ProjectCliTasks;
use ZMQ,
    PDO,
    ReactZMQContext as ZMQContext,
    ReactEventLoopFactory as EventLoopFactory,
    RatchetServerIoServer,
    RatchetHttpHttpServer,
    RatchetWebSocketWsServer,
    RatchetSessionSessionProvider,
    SymfonyComponentHttpFoundationSessionStorageHandler,
    WebirdCliTaskBase,
    WebirdCliChat;
/**
 * Task for websocket
 *
 */
class ServiceTask extends TaskBase
{
    public function mainAction(array $params)
    {
        echo "The default action inside of the ", CURRENT_TASK, " task is not configuredn";
    }
    public function websocketListenAction(array $params)
    {
        // $this->ensureRunningAsWebUser();
        $opts = $params['opts'];
        $config = $this->config;
        $wsPort = (isset($opts['wsport'])) ? $opts['wsport'] : $config->app->wsPort;
        $zmqPort = (isset($opts['zmqport'])) ? $opts['zmqport'] : $config->app->zmqPort;
        $loop = EventLoopFactory::create();
        $chat = new Chat();
        $chat->setDI($this->getDI());
        // Listen for the web server to make a ZeroMQ push after an ajax request
        // $context = new ZMQContext($loop);
        // $pull = $context->getSocket(ZMQ::SOCKET_PULL);
        // $pull->bind("tcp://127.0.0.1:${zmqPort}"); // Binding to 127.0.0.1 means the only client that can connect is itself
        // $pull->on('message', [$chat, 'onUserJoin']);
        $wsServer = new WsServer($chat);
        $ioServer = IoServer::factory(
            new HttpServer($wsServer),
            $wsPort
        );
        echo "websocket listening on port $wsPort in " . ENVIRONMENT . " moden";
        $ioServer->run();
    }
}

瑞奇的聊天应用程序

<?php
namespace WebirdCli;
use PhalconDIInjectable as DIInjectable,
    RatchetMessageComponentInterface,
    RatchetConnectionInterface,
    WebirdDatabaseSessionReader;
/**
 * Basic chat logic for a Ratchet application
 */
class Chat extends DIInjectable implements MessageComponentInterface
{
    protected $clients;
    /**
     * Class constructor
     */
    public function __construct()
    {
        $this->clients = new SplObjectStorage();
    }
    /**
     * Connection open function
     *
     * @param RatchetConnectionInterface  $conn
     */
    public function onOpen(ConnectionInterface $conn)
    {
try {
        echo "New connection! ({$conn->resourceId})n";
        $cookies = $conn->WebSocket->request->getCookies();
        if (! array_key_exists('PHPSESSID', $cookies)) {
            echo "Connection Rejected: Session Cookie was not present.n";
            return $conn->close();
        }
        $sessionId = $cookies['PHPSESSID'];
        $sessionReader = $this->getDI()->getSessionReader();
        if ($sessionReader->read($sessionId) === false) {
            echo "Connection Rejected: Session could not be found.n";
            return $conn->close();
        }
        if (($identity = $sessionReader->get('auth-identity')) === false) {
            echo "Connection Rejected: session auth-identity data is not present.n";
            return $conn->close();
        }
        if (!isset($identity['role'])) {
            echo "Connection Rejected: session user role data is not present.n";
            return $conn->close();
        }
        $role = $identity['role'];
        $acl = $this->getDI()->getAcl();
        if (!$this->acl->isAllowed($role, 'websocket', 'open')) {
            echo "Connection Rejected: user does not have permission to open a websocket.n";
            return $conn->close();
        }
} catch (Exception $e) {
    echo $e->getMessage() . "n";
}
        // var_export($sessionReader->get('id'));
        // var_export($sess);
        // Store the new connection to send messages to later
        $this->clients->attach($conn, $sessionId);
    }
    /**
     * Receives a message when registered in the websocket server
     *
     * @param RatchetConnectionInterface  $from
     * @param string                        $msg
    */
    public function onMessage(ConnectionInterface $from, $msg) {
        $numRecv = $this->clients->count() - 1;
        // echo $from->Session->getName() . "n";
        echo sprintf('Connection %d sending message "%s" to %d other connection%s' . "n"
            , $from->resourceId, $msg, $numRecv, $numRecv == 1 ? '' : 's');
        foreach ($this->clients as $client) {
            if ($from !== $client) {
                // The sender is not the receiver, send to each client connected
                $client->send($msg);
            }
        }
    }
    /**
     * Handle closing of a connection
     *
     * @param RatchetConnectionInterface  $conn
    */
    public function onClose(ConnectionInterface $conn) {
        // The connection is closed, remove it, as we can no longer send it messages
        $this->clients->detach($conn);
        echo "Connection {$conn->resourceId} has disconnectedn";
    }
    /**
     * Handles exceptions in the application
     *
     * @param RatchetConnectionInterface  $from
     * @param Exception                    $e
    */
    public function onError(ConnectionInterface $conn, Exception $e) {
        echo "An error has occurred: {$e->getMessage()}n";
        $conn->close();
    }
}

只读数据库会话读取器。

<?php
namespace Webird;
use PhalconDb;
/**
 * Read-only session access
 *
 */
class DatabaseSessionReader
{
    private $options;
    private $data;
    /**
     * {@inheritdoc}
     *
     * @param  array                      $options
     * @throws PhalconSessionException
     */
    public function __construct($options = null)
    {
        if (!isset($options['db'])) {
            throw new Exception("The parameter 'db' is required");
        }
        if (!isset($options['unique_id'])) {
            throw new Exception("The parameter unique_id is required");
        }
        if (!isset($options['db_table'])) {
            throw new Exception("The parameter 'db_table' is required");
        }
        if (!isset($options['db_id_col'])) {
            throw new Exception("The parameter 'db_id_col' is required");
        }
        if (!isset($options['db_data_col'])) {
            throw new Exception("The parameter 'db_data_col' is required");
        }
        if (!isset($options['db_time_col'])) {
            throw new Exception("The parameter 'db_time_col' is required");
        }
        $this->options = $options;
        $this->data = false;
    }
    protected function getOptions()
    {
        return $this->options;
    }
    /**
     * {@inheritdoc}
     * @param  string $sessionId
     * @return string
     */
    public function read($sessionId)
    {
        $options = $this->getOptions();
        $row = $options['db']->fetchOne(
            sprintf(
                'SELECT %s FROM %s WHERE %s = ?',
                $options['db']->escapeIdentifier($options['db_data_col']),
                $options['db']->escapeIdentifier($options['db_table']),
                $options['db']->escapeIdentifier($options['db_id_col'])
            ),
            Db::FETCH_NUM,
            [$sessionId]
        );
        $this->data = (empty($row[0])) ? false : $this->unserialize_php($row[0]);
        return ($this->data !== false);
    }
    public function has($key)
    {
        if (!is_string($key)) {
            throw new Exception('The key must be a string');
        }
        if ($this->data == false) {
            return false;
        }
        $uniqueId = $this->getOptions()['unique_id'];
        return (array_key_exists("{$uniqueId}{$key}", $this->data));
    }
    public function get($key)
    {
        if (!$this->has($key)) {
            return false;
        }
        $uniqueId = $this->getOptions()['unique_id'];
        return $this->data["{$uniqueId}{$key}"];
    }
    private function unserialize_php($session_data)
    {
        $return_data = array();
        $offset = 0;
        while ($offset < strlen($session_data)) {
            if (!strstr(substr($session_data, $offset), "|")) {
                throw new Exception("invalid data, remaining: " . substr($session_data, $offset));
            }
            $pos = strpos($session_data, "|", $offset);
            $num = $pos - $offset;
            $varname = substr($session_data, $offset, $num);
            $offset += $num + 1;
            $data = unserialize(substr($session_data, $offset));
            $return_data[$varname] = $data;
            $offset += strlen(serialize($data));
        }
        return $return_data;
    }
}

这是用于设置会话读取器的 DI。

$di->set('sessionReader', function() use ($di) {
    $config = $di->get('config');
    $connection = $di->get('db');
    $sessionReader = new DatabaseSessionReader([
        'db'          => $connection,
        'unique_id'    => $config->session->unique_id,
        'db_table'    => $config->session->db_table,
        'db_id_col'   => $config->session->db_id_col,
        'db_data_col' => $config->session->db_data_col,
        'db_time_col' => $config->session->db_time_col,
        'uniqueId'    => $config->session->unique_id
    ]);
    return $sessionReader;
});

参考: https://forum.phalconphp.com/discussion/3539/phalcon-websockets

相关内容

  • 没有找到相关文章

最新更新