创建事件调度线程安全信号量



我一直在尝试制作一个二进制信号量,它将能够安全地阻止在事件调度线程(EDT)上运行的方法的执行,而不会实际阻止线程处理更多事件。这在最初看起来可能是不可能的,但Java有一些与此相关的内置功能,但我不能完全让它工作。

用例

目前,如果您从EDT显示一个模式摆动对话框,它似乎会阻止EDT(因为显示模式对话框的方法在对话框关闭之前不会继续到下一行),但实际上有一些幕后魔法使EDT进入一个新的事件循环,该循环将继续调度事件,直到模式对话框关闭。

我的团队目前有一些应用程序从swing迁移到JavaFX的速度非常慢(这是一个有点棘手的过渡),我希望能够像显示swing模式对话框一样,从AWT事件调度线程显示模式JavaFX对话框。似乎有某种EDT安全信号灯可以满足这种用例,并可能在未来的其他用途中派上用场。

方法

java.awt.EventQueue.createSecondaryLoop()是一种创建SecondaryLoop对象的方法,然后可以使用该对象启动新的事件处理循环。当您调用SecondaryLoop.enter()时,该调用将在处理新的事件循环时被阻止(请注意,调用被阻止,但线程没有被阻止,因为它在事件处理循环中继续)。新的事件循环将继续,直到您调用SecondaryLoop.exit()(这并不完全正确,请参阅我的相关SO问题)。

因此,我创建了一个信号量,其中获取的阻塞调用导致在锁存器上等待正常线程,或者进入EDT的辅助循环。每个要获取的阻塞调用还添加了一个在释放信号量时要调用的取消阻塞操作(对于普通线程,它只是递减锁存,对于EDT,它退出辅助循环)。

这是我的代码:


import java.awt.EventQueue;
import java.awt.SecondaryLoop;
import java.awt.Toolkit;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
@SuppressWarnings("serial")
public class EventDispatchThreadSafeBinarySemaphore extends Semaphore{
/** Operations used to unblock threads when a semaphore is released.
* Must be a stack because secondary loops have to be exited in the
* reverse of the order in which they were entered in order to unblock
* the execution of the method that entered the loop.
*/
private Stack<Runnable> releaseOperations = new Stack<>();
private boolean semaphoreAlreadyAcquired = false;

public EventDispatchThreadSafeBinarySemaphore() {
super(0);
}
@Override
public boolean isFair() {
return false;
}
@Override
public void acquire() throws InterruptedException {
Runnable blockingOperation = () -> {};
synchronized(this) {
if(semaphoreAlreadyAcquired) {
//We didn't acquire the semaphore, need to set up an operation to execute
//while we're waiting on the semaphore and an operation for another thread
//to execute in order to unblock us when the semaphore becomes available
if(EventQueue.isDispatchThread()) {
//For the EDT, we don't want to actually block, rather we'll enter a new loop that will continue
//processing AWT events.
SecondaryLoop temporaryAwtLoop = Toolkit.getDefaultToolkit().getSystemEventQueue().createSecondaryLoop();
releaseOperations.add(() -> temporaryAwtLoop.exit());
blockingOperation = () -> {
if(!temporaryAwtLoop.enter()) {
//I don't think we'll run into this, but I'm leaving this here for now for debug purposes
System.err.println("Failed to enter event loop");
}
};
}
else {
//Non-dispatch thread is a little simpler, we'll just wait on a latch
CountDownLatch blockedLatch = new CountDownLatch(1);
releaseOperations.add(() -> blockedLatch.countDown());
blockingOperation = () -> {
try {
blockedLatch.await();
} catch (InterruptedException e) {
//I'll worry about handling this better once I have the basics figured out
e.printStackTrace();
}
};
}
}
else {
semaphoreAlreadyAcquired = true;
}
}
//This part must be executed outside of the synchronized block so that we don't block
//the EDT if it tries to acquire the semaphore while this statement is blocked
blockingOperation.run();
}
@Override
public void release() {
synchronized(this) {
if(releaseOperations.size() > 0) {
//Release the last blocked thread
releaseOperations.pop().run();
}
else {
semaphoreAlreadyAcquired = false;
}
}
}
}

这是我的相关JUnit测试代码(我为太大的尺寸道歉,这是迄今为止我能想出的最小的可验证示例):

public class TestEventDispatchThreadSafeBinarySemaphore {
private static EventDispatchThreadSafeBinarySemaphore semaphore;
//See https://stackoverflow.com/questions/58192008/secondaryloop-enter-not-blocking-until-exit-is-called-on-the-edt
//for why we need this timer
private static Timer timer = new Timer(500, null);
@BeforeClass
public static void setupClass() {
timer.start();
}
@Before
public void setup() {
semaphore = new EventDispatchThreadSafeBinarySemaphore();
}
@AfterClass
public static void cleanupClass() {
timer.stop();
}
//This test passes just fine
@Test(timeout = 1000)
public void testBlockingAcquireReleaseOnEDT() throws InterruptedException {
semaphore.acquire();
CountDownLatch edtCodeStarted = new CountDownLatch(1);
CountDownLatch edtCodeFinished = new CountDownLatch(1);
SwingUtilities.invokeLater(() -> {
//One countdown to indicate that this has begun running
edtCodeStarted.countDown();
try {
semaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//This countdown indicates that it has finished running
edtCodeFinished.countDown();
});
//Ensure that the code on the EDT has started
edtCodeStarted.await();
assertEquals("Code on original AWT event thread should still be blocked", 1, edtCodeFinished.getCount());
//Ensure that things can still run on the EDT
CountDownLatch edtActiveCheckingLatch = new CountDownLatch(1);
SwingUtilities.invokeLater(() -> edtActiveCheckingLatch.countDown());
//If we get past this line, then we know that the EDT is live even though the 
//code in the invokeLater call is blocked
edtActiveCheckingLatch.await();
assertEquals("Code on original AWT event thread should still be blocked", 1, edtCodeFinished.getCount());
semaphore.release();
//If we get past this line, then the code on the EDT got past the semaphore
edtCodeFinished.await();
}
//This test fails intermittently, but so far only after the previous test was run first
@Test(timeout = 10000)
public void testConcurrentAcquiresOnEDT() throws InterruptedException {
int numThreads =100;
CountDownLatch doneLatch = new CountDownLatch(numThreads);
try {
semaphore.acquire();
//Queue up a bunch of threads to acquire and release the semaphore
//as soon as it becomes available
IntStream.range(0, numThreads)
.parallel()
.forEach((threadNumber) -> 
SwingUtilities.invokeLater(() -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
finally {
semaphore.release();
//Count down the latch to indicate that the thread terminated
doneLatch.countDown();
}
})
);
semaphore.release();
doneLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

问题

testConcurrentAcquiresOnEDT有时通过,有时失败。我相信我知道为什么。我深入研究了Java源代码,在WaitDispatchSupport(SecondaryLoop的具体实现)中,循环基本上继续调度事件,直到清除一个名为keepBlockingEDT的标志。它将在事件之间对此进行检查。当我调用exit时,它将清除该标志并发送一个事件来唤醒事件队列,以防它正在等待更多事件。然而,它不会导致enter()方法立即退出(我认为无论如何都不可能)。

以下是死锁的结果:

  • 主线程获取信号量
  • EDT线程试图获取信号量,但信号量已被获取,因此:
    • 创建一个新的辅助循环
    • 创建一个Runnable,它将退出新的辅助循环并将其推送到releaseOperations堆栈
    • 进入辅助循环,导致执行阻塞(注意,最后一步必须在synchronized块之外
  • 主线程释放信号量,从而导致以下情况发生:
    • 弹出releaseOperations堆栈,它在辅助循环上调用exit
    • exit调用将该辅助循环的keepBlockingEDT标志设置为false
  • 回到EDT,它刚刚完成了对keepBlockingEDT标志的检查(就在它被设置为false之前),它正在获取下一个事件
  • 事实证明,下一个事件是另一个阻塞信号量的可运行事件,因此它试图获取它
  • 这将在原始SecondaryLoop之上创建另一个SecondaryLoop并输入
  • 在这一点上,原始SecondaryLoopkeepBlockingEDT标志已经被清除,并且它将能够停止阻塞,除了它当前在运行第二个SecondaryLoop时被阻塞。第二个SecondaryLoop永远不会调用出口,因为现在没有人真正获得信号量,因此我们永远阻止它

我已经为此工作了几天,我想到的每一个想法都是死胡同。

我相信我有一个可能的部分解决方案,那就是不允许一次在信号量上阻塞多个线程(如果另一个线程试图获取它,我只会抛出一个IllegalStateException)。如果每个次要循环使用自己的信号量,我仍然可以有多个次要循环,但每个信号量最多会创建一个次要循环。我认为这会起作用,它将很好地满足我最可能的用例(因为大多数情况下,我只想从事件线程中显示一个JavaFX模式对话框)。我只是想知道是否还有其他人有其他想法,因为我觉得我已经快要做一些很酷的东西了,但它不太管用。

如果你有什么想法,请告诉我。"我确信这是不可能的,原因如下……"也是一个可以接受的答案。

使用Semaphore很可能不是正确的方法。您想要的是输入嵌套的事件循环,而不是使用阻塞机制。从API的阅读来看,你似乎也过于复杂了。同样,您所需要的只是在一个UI线程上输入一个嵌套的事件循环,然后在其他UI线程完成工作后退出该循环。我相信以下内容符合您的要求:

import java.awt.EventQueue;
import java.awt.SecondaryLoop;
import java.awt.Toolkit;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javafx.application.Platform;
import javax.swing.SwingUtilities;
public class Foo {
public static <T> T getOnFxAndWaitOnEdt(Supplier<? extends T> supplier) {
Objects.requireNonNull(supplier, "supplier");
if (!EventQueue.isDispatchThread()) {
throw new IllegalStateException("current thread != EDT");
}
final SecondaryLoop loop = Toolkit.getDefaultToolkit()
.getSystemEventQueue()
.createSecondaryLoop();
final AtomicReference<T> valueRef = new AtomicReference<>();
Platform.runLater(() -> {
valueRef.set(supplier.get());
SwingUtilities.invokeLater(loop::exit);
});
loop.enter();
return valueRef.get();
}
public static <T> T getOnEdtAndWaitOnFx(Supplier<? extends T> supplier) {
Objects.requireNonNull(supplier, "supplier");
if (!Platform.isFxApplicationThread()) {
throw new IllegalStateException(
"current thread != JavaFX Application Thread");
}
final Object key = new Object();
final AtomicReference<T> valueRef = new AtomicReference<>();
SwingUtilities.invokeLater(() -> {
valueRef.set(supplier.get());
Platform.runLater(() -> Platform.exitNestedEventLoop(key, null));
});
Platform.enterNestedEventLoop(key);
return valueRef.get();
}
}

在JavaFX9中添加了Platform#enterNestedEventLoopPlatform#exitNestedEventLoop方法,尽管在JavaFX8中有等效的内部方法。之所以使用AtomicReference,是因为在lambda表达式内部使用时,局部变量必须是final或实际上是final。然而,由于通知独立线程的方式,我不认为AtomicReference#get()#set(T)方法提供的波动性语义是严格需要的,但我使用了这些方法以防万一。

下面是一个使用上面的示例来显示事件调度线程:中的模式JavaFX对话框

Optional<T> optional = Foo.getOnFxAndWaitOnEdt(() -> {
Dialog<T> dialog = new Dialog<>();
// configure dialog...
return dialog.showAndWait();
});

上述实用程序方法用于从事件调度线程JavaFX应用程序线程

// Run on EDT
T result = CompletableFuture.supplyAysnc(/*Supplier*/, SwingUtilities::invokeLater).join();
// Run on FX thread
T result = CompletableFuture.supplyAsync(/*Supplier*/, Platform::runLater).join();

join()的调用将阻塞调用线程,因此请确保不要从任何一个UI线程调用该方法。

最新更新