我正试图使用ExecutorService
在多个线程中处理List
中相对较大的Stream
。这个方法看起来像这样。
public void initMigration() {
ExecutorService executorService = Executors.newCachedThreadPool();
try (Stream<List<Record4<Integer, Integer, String, byte[]>>> streamOfLists = getStreamOfLists()) {
streamOfLists.forEach(record4List -> {
Runnable runnable = () -> {
try {
final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
LOGGER.info("Invoking POST with payload {}", attachments);
Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List), uploadLinks);
} catch (ExceptionA | ExceptionB e) {
e.printStackTrace();
}
};
executorService.submit(runnable);
});
}
LOGGER.info("Shutting down the ExecutorService");
executorService.shutdown();
}
基本上,我在这里要做的是,对于Stream
中的每个List
,创建一个Runnable
并提交给ExecutorService
。它似乎运行良好。但是,我现在真正想做的是,看看是否有任何方法可以让ExecutorService
运行从Stream
中的第一个List
获得的第一个Runnable
,同时阻止其他Runnables
,直到然后继续运行其他Runnables
(并行(。真的需要一些帮助。
您可以获取第一个Runnable,执行它,然后提交其他Runnable。
try (Stream<List<Record4<Integer, Integer, String, byte[]>>> streamOfLists = getStreamOfLists()) {
Iterator<List<Record4<Integer, Integer, String, byte[]>>> it = streamOfLists.iterator();
if (it.hasNext()) {
List<Record4<Integer, Integer, String, byte[]>> list = it.next();
Runnable runnable = new MyRunnable(record4List);
runnable.run();
}
while (it.hasNext()) {
List<Record4<Integer, Integer, String, byte[]>> list = it.next();
Runnable runnable = new MyRunnable(record4List);
executorService.submit(runnable);
}
}
其中
class MyRunnable implements Runnable {
Record4<Integer, Integer, String, byte[]> record4List;
MyRunnable(Record4<Integer, Integer, String, byte[]> record4List) {
this.record4List = record4List;
}
@Override
public void run() {
try {
final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
LOGGER.info("Invoking POST with payload {}", attachments);
Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List), uploadLinks);
} catch (ExceptionA | ExceptionB e) {
e.printStackTrace();
}
}
}
@Alexei的方法是(IMO(解决这个问题的正确方法。不要阻塞可运行程序。相反,当运行可运行文件的前提条件已经得到满足时,提交可运行文件。
有一个可运行的块和其他块的问题是,你很容易用被阻止等待另一个任务完成的任务来阻塞执行器的线程池。事实上,如果线程池是有界的,那么您甚至可能会遇到这样的情况:所有线程都处于这种状态,而执行器无法启动将解除所有线程锁定的任务。结果:死锁!
如果仍然想要阻止可运行程序(尽管有上述情况(,那么您可以使用CountDownLatch
来实现它。
-
在实例化
Runnable
s之前,创建一个初始计数器为1
的CountDownLatch
实例。此实例必须由所有Runnable
共享。 -
对一个
Runnable
进行编码,使其获取List
,对其进行处理,然后调用latch.count()
; -
对第二个
Runnable
进行编码以调用latch.await()
,然后获取并处理List
。 -
使用第一个
Runnable
提交一个任务,使用第二个提交其余任务。