能否在 Java 嵌入式代理中以编程方式更改队列的"dead letter"处理?



背景

在高级级别上,我有一个Java应用程序,其中某些事件应该触发为当前用户采取的特定操作。然而,事件可能非常频繁,而且行动总是一样的。因此,当第一个事件发生时,我想将行动安排在不久的将来(例如5分钟)。在该时间窗口内,后续事件不应采取任何操作,因为应用程序看到已经安排了一个操作。一旦执行了预定的操作,我们就回到步骤1,下一个事件将重新开始循环。

我的想法是通过在应用程序本身中嵌入内存中的ActiveMQ实例来实现这种过滤和节流机制(我不关心队列持久性)。

我相信JMS 2.0支持延迟传递的概念,延迟消息位于"暂存队列"中,直到到达真正的目的地。然而,我也认为ActiveMQ还不支持JMS 2.0规范……所以我正在考虑通过生存时间(TTL)值和死信队列(DLQ)处理来模仿相同的行为。

基本上,我的消息生产者代码会将消息放在一个伪暂存队列上,任何消费者都不会从中提取任何东西。消息将以5分钟的TTL值放置,到期后ActiveMQ将把它们转储到DLQ中这是我的消息消费者实际消费消息的队列。

问题

我不认为我真的想从"默认"DLQ中消费,因为我不知道ActiveMQ可能会在那里转储与我的应用程序代码完全无关的其他内部内容。所以我认为我的虚拟暂存队列最好有自己的自定义DLQ。我只看到过一页ActiveMQ文档,其中讨论了DLQ配置,它只处理独立ActiveMQ安装的XML配置文件(而不是嵌入应用程序中的内存代理)。

是否可以在运行时为嵌入式ActiveMQ实例中的队列以编程方式配置自定义DLQ

如果你认为我走错了路,我也很想听听其他建议。我对JMS比AMQP更熟悉,所以我不知道使用Qpid或其他Java可嵌入的AMQP代理是否更容易。不管ApacheCamel实际上是什么(!),我相信它应该在这方面表现出色,但对于这个用例来说,学习曲线可能过于夸张了。

尽管您担心Camel可能会对这个用例造成严重的高估,但我认为ActiveMQ对您所描述的用例已经造成了严重的高估。

你希望在事件发生5分钟后安排一些事情发生,让它只消耗第一个事件,忽略第一个事件和5分钟结束之间的所有事件,对吧?为什么不从现在起通过ScheduledExecutorService或您喜欢的调度机制将您的处理方法安排5分钟,并将事件保存在HashMap<User, Event>成员变量中呢。如果在处理方法启动之前,该用户有更多的事件,您只会看到您已经存储了一个事件,而没有存储新的事件,因此您将忽略除第一个事件之外的所有事件。在处理方法结束时,从HashMap中删除此用户的事件,将存储并安排下一个要进入的事件。

运行ActiveMQ来获得这种行为似乎远远超出了您的需要。如果没有,你能解释一下原因吗?

编辑:

如果你真的走上了这条路,不要使用消息TTL使你的消息过期;只需让(一个也是唯一的)消费者将它们读取到内存中,并使用上面描述的内存内解决方案每5分钟只处理(最多)一批。要么使用带有消息选择器的单个队列,要么使用每个用户一个的动态队列。您不需要DLQ来实现延迟,即使您可以让它实现延迟,它也不会为您提供批处理所有内容的功能,因此您每5分钟只运行一次。这不是一条你想走的路,即使你想知道怎么走。

一个简单的解决方案是跟踪并发结构中的挂起操作,并使用ScheduledExecutorService来执行它们:

private static final Object RUNNING = new Object();
private final ConcurrentMap<UserId, Object> pendingActions = 
    new ConcurrentHashMap<>();
private ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);

public void takeAction(final UserId id) {
    Object running = pendingActions.putIfAbsent(id, RUNNING);  // atomic
    if(running == null) {                // no pending action for this user
        ses.schedule(new Runnable() {
            @Override
            public void run() {
                doWork();
                pendingActions.remove(id);
            }
        }, 5, TimeUnit.MINUTES);
    }
}

使用Camel,使用参数为completionInterval的Aggregator组件可以很容易地实现这一点,因此每隔五分钟您就可以检查列表聚合消息是否为空,如果它没有向负责用户操作的路由发送消息并清空列表。您确实需要维护整个交换列表,只需维护状态(是否计划用户操作)。

最新更新