Guava EventBus dispatching



我正在使用Guava的EventBus来启动一些处理和报告结果。这里有一个非常简单的可编译示例:

import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
public class Test {
public static class InitiateProcessing { }
public static class ProcessingStarted { }
public static class ProcessingResults { }
public static class ProcessingFinished { }
public static EventBus bus = new EventBus();
@Subscribe
public void receiveStartRequest(InitiateProcessing evt) {
System.out.println("Got processing request - starting processing");
bus.post(new ProcessingStarted());
System.out.println("Generating results");
bus.post(new ProcessingResults());
System.out.println("Generating more results");
bus.post(new ProcessingResults());
bus.post(new ProcessingFinished());
}
@Subscribe
public void processingStarted(ProcessingStarted evt) {
System.out.println("Processing has started");
}
@Subscribe
public void resultsReceived(ProcessingResults evt) {
System.out.println("got results");
}
@Subscribe
public void processingComplete(ProcessingFinished evt) {
System.out.println("Processing has completed");
}

public static void main(String[] args) {
Test t = new Test();
bus.register(t);
bus.post(new InitiateProcessing());
}
}

我使用这些事件作为其他软件组件的一种反应方式,为这个处理做准备。例如,他们可能必须在处理之前保存当前状态,然后再恢复。

我希望这个程序的输出是:

Got processing request - starting processing
Processing has started
Generating results
got results
Generating more results
got results
Processing has completed

相反,实际输出是:

Got processing request - starting processing
Generating results
Generating more results
Processing has started
got results
got results
Processing has completed

本应指示处理已开始的事件实际上发生在实际处理("生成结果")之后。

看了源代码之后,我明白了为什么它会这样。以下是EventBus的相关源代码。

/**
* Drain the queue of events to be dispatched. As the queue is being drained,
* new events may be posted to the end of the queue.
*/
void dispatchQueuedEvents() {
// don't dispatch if we're already dispatching, that would allow reentrancy
// and out-of-order events. Instead, leave the events to be dispatched
// after the in-progress dispatch is complete.
if (isDispatching.get()) {
return;
}
// dispatch event (omitted)

发生的情况是,由于我已经在调度顶级InitiateProcessing事件,所以其余的事件只是被推到队列的末尾。我希望它的行为类似于.NET事件,在.NET事件中,在所有处理程序完成之前,调用事件不会返回。

我不太明白这个实现的原因。当然,事件是有序的,但周围代码的顺序会被完全扭曲。

有没有什么方法可以让总线按照描述的方式运行并产生所需的输出?我确实在Javadocs中读到

EventBus保证它不会从同时使用多个线程,除非该方法明确允许通过带有@AllowConcurrentEvents注释。

但我认为这不适用于这里——我在单线程应用程序中看到了这个问题。

编辑

这里出现问题的原因是我在订阅服务器中使用post。由于事件总线是不可重入的,所以这些"子帖子"会排队,并在第一个处理程序完成后进行处理。我可以注释掉EventBus源代码中的if (isDispatching.get()) { return; }部分,一切都如我所料——所以真正的问题是,我这样做引入了哪些潜在的问题?设计师们似乎做出了一个认真的决定,不允许再次进入。

EventBus通常遵循这样一个原则,即将事件发布到总线的代码不应该关心订阅者对事件做了什么或什么时候做,除了遵守事件发布的顺序之外(在同步事件总线的情况下)。

如果您希望在方法过程中的特定时间调用特定方法,并且希望确保这些方法在方法继续之前完成(正如您在示例中所做的那样),为什么不直接调用这些方法呢?当您使用事件总线时,您明确地将代码与响应给定事件时发生的事情分开。这在很多情况下都是可取的,也是EventBus存在的主要原因,但它似乎不是您想要的。

我试图总结Guava的EventBus事件传递行为:

如果事件E1t1发布,则会通知所有订阅者。如果其中一个订阅者在其@Subscribe方法中发布了一个事件本身(稍后),则"新"事件E2将被排入队列,然后发送。之后的意思是:在所有@Subscribe之后,E1的方法从t1返回。

将这种"级联"事件发布与广度优先树遍历进行比较。

这似乎是EventBus的明确选择设计。

我知道这个问题已经4年了,但我今天刚刚遇到同样的问题。有一个简单的(和反直觉的)改变来获得你想要的行为。每https://stackoverflow.com/a/53136251/1296767,可以将AsyncEventBus与DirectExecutor:一起使用

public static EventBus bus = new AsyncEventBus(MoreExecutors.newDirectExecutorService());

通过以上更改运行测试代码,结果正是您想要的:

Got processing request - starting processing
Processing has started
Generating results
got results
Generating more results
got results
Processing has completed

在通知所有"订阅者"之前,向EventBus发布消息不会返回。。那些订阅者可能还没有开始执行。这意味着,当第一个bus.post返回时,您可以继续下一个post,而不会有任何中间的订阅者开始处理。

public void post(Object event)将事件发布到所有注册的订阅用户。此方法将在事件发生后成功返回已发布给所有订阅者,不考虑任何例外由订阅者抛出。如果没有订阅用户事件的类,并且事件还不是DeadEvent,它将是包装在DeadEvent中并重新发布。

参数:event-要发布的事件。

最新更新