如何使用CometD和Jetty制作Pub/Sub服务



我需要创建一个基于Java 8的服务,该服务提供多个客户机可以订阅的CometD通道。其思想是,当某些事件发生时,服务器可以向客户端发送通知。

我使用Jetty 9作为我的servlet容器(必须满足我的组的需求)。我一直在阅读CometD文档,并寻找一些我可以使用的例子。文档是广泛的,但没有帮助(缺乏上下文),我还没能找到一个像样的例子,我正在尝试做什么。

谁能提供一个简单的例子,在Java中创建一个可以与Jetty一起使用的发布机制?如果做不到这一点,有人能给我举个例子吗?

请建议。

CometD项目有一项突出的任务,就是把教程带回来。

服务器端股票价格教程回答了这个特殊的问题,您可以在这里找到源代码,而我们正在努力将其作为文档的一部分重新上线。

忽略一些细节,您需要编写的服务类似于教程的股票价格服务:在接收到外部事件时,该服务应该将该事件广播给订阅者。
@Service
public class StockPriceService implements StockPriceEmitter.Listener
{
    @Inject
    private BayeuxServer bayeuxServer;
    @Session
    private LocalSession sender;
    public void onUpdates(List<StockPriceEmitter.Update> updates)
    {
        for (StockPriceEmitter.Update update : updates)
        {
            // Create the channel name using the stock symbol.
            String channelName = "/stock/" + update.getSymbol().toLowerCase(Locale.ENGLISH);
            // Initialize the channel, making it persistent and lazy.
            bayeuxServer.createChannelIfAbsent(channelName, new ConfigurableServerChannel.Initializer()
            {
                public void configureChannel(ConfigurableServerChannel channel)
                {
                    channel.setPersistent(true);
                    channel.setLazy(true);
                }
            });
            // Convert the Update business object to a CometD-friendly format.
            Map<String, Object> data = new HashMap<>(4);
            data.put("symbol", update.getSymbol());
            data.put("oldValue", update.getOldValue());
            data.put("newValue", update.getNewValue());
            // Publish to all subscribers.
            ServerChannel channel = bayeuxServer.getChannel(channelName);
            channel.publish(sender, data);
        }
    }
}

StockPriceEmitter是你的外部事件的来源,并以StockPriceEmitter.Update事件的形式发布给StockPriceEmitter.Listener

外部事件如何传递到CometD服务器是StockPriceEmitter隐藏的细节;它可以通过JMS消息,或通过轮询外部REST服务,或通过自定义网络协议,或通过轮询数据库等来完成。

重要的是,当外部事件到达时,调用StockPriceService.onUpdates(...),在那里您可以将事件转换为CometD友好的JSON格式,然后将它们发布到CometD通道。

发布到CometD通道,反过来,将消息发送到该通道的所有订阅者,通常是远程客户端,如浏览器。

CometD通道已被设置为lazy,因为这是一种避免以非常频繁的更新速率(例如,高于每秒2-4次更新)轰炸客户端的方法。您将需要根据您的特定用例来决定通道的惰性。

最新更新