如何模仿SimpMessagingTemplate.convertAndSendToUser使用RabbitTempla



所以我一直在阅读关于Spring Message Relay (Spring Messaging的东西)与RabbitMQ代理的能力。我想达到的目标如下:

有一个服务(1),它作为rabbitmq和浏览器之间的消息中继。现在可以正常工作了。我使用的是MessageBrokerRegistry。enableStompBrokerRelay .

在后端有另一个服务(2),它将消息发送到RabbitMQ上的已知队列,并将该消息路由到特定的用户。作为发送者,我想控制消息传递给谁。

通常,你会使用SimpMessagingTemplate来做这件事。问题是,虽然,消息的来源实际上没有访问该模板,因为它不充当中继,它不使用websockets,它不持有队列名称到会话id的映射。

我能想到的一种方法是,在服务1上编写一个简单的类,它将监听所有队列并使用simp模板转发它们。但是我觉得这不是一个理想的方法,我觉得可能已经有一种方法可以使用Spring来实现它。

你能告诉我吗?

这个问题让我想到了我所面临的同样的困境。我已经开始使用自定义UserDestinationResolver,它达到一致的主题命名方案,该方案仅使用用户名,而不使用默认解析器使用的会话ID。

让我在JS中订阅"/user/exchange/amq"。但是通过一个普通的RabbitMQ应用程序发送到"/exchange/amqp.direct/users.me"。

当前时间

最新的源代码在这里,我将它"注册"为我现有的@Configuration类中的@Bean。

这是自定义的UserDestinationResolver本身:

public class ConsistentUserDestinationResolver implements UserDestinationResolver {
    private static final Pattern USER_DEST_PREFIXING_PATTERN =
            Pattern.compile("/user/(?<name>.+?)/(?<routing>.+)/(?<dest>.+?)");
    private static final Pattern USER_AUTHENTICATED_PATTERN =
            Pattern.compile("/user/(?<routing>.*)/(?<dest>.+?)");
    @Override
    public UserDestinationResult resolveDestination(Message<?> message) {
        SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);
        final String destination = accessor.getDestination();
        final String authUser = accessor.getUser() != null ? accessor.getUser().getName() : null;
        if (destination != null) {
            if (SimpMessageType.SUBSCRIBE.equals(accessor.getMessageType()) ||
                    SimpMessageType.UNSUBSCRIBE.equals(accessor.getMessageType())) {
                if (authUser != null) {
                    final Matcher authMatcher = USER_AUTHENTICATED_PATTERN.matcher(destination);
                    if (authMatcher.matches()) {
                        String result = String.format("/%s/users.%s.%s",
                                authMatcher.group("routing"), authUser, authMatcher.group("dest"));
                        UserDestinationResult userDestinationResult =
                                new UserDestinationResult(destination, Collections.singleton(result), result, authUser);
                        return userDestinationResult;
                    }
                }
            }
            else if (accessor.getMessageType().equals(SimpMessageType.MESSAGE)) {
                final Matcher prefixMatcher = USER_DEST_PREFIXING_PATTERN.matcher(destination);
                if (prefixMatcher.matches()) {
                    String user = prefixMatcher.group("name");
                    String result = String.format("/%s/users.%s.%s",
                            prefixMatcher.group("routing"), user, prefixMatcher.group("dest"));
                    UserDestinationResult userDestinationResult =
                            new UserDestinationResult(destination, Collections.singleton(result), result, user);
                    return userDestinationResult;
                }
            }
        }
        return null;
    }
}

相关内容

  • 没有找到相关文章

最新更新