我如何知道执行器服务何时完成,ES 上的项目是否可以重新提交到 ES



我的Java应用程序处理文件夹中的音乐文件,它旨在并行和独立地处理多个文件夹。为此,每个文件夹都由执行器服务处理,该执行器服务的最大池大小与计算机的 CPU 不匹配。

例如,如果我们有 8 个 CPU 的计算机,那么(理论上)可以同时处理八个文件夹,如果我们有一台 16 个 CPU 的计算机,那么可以同时处理 16 个文件夹。如果我们只有 1 个 CPU,那么我们将池大小设置为 3,以便在一个文件夹在 I/O 上阻塞时允许 CPU 继续执行某些操作。

但是,我们实际上不仅有一个执行器服务,我们还有多个,因为每个文件夹都可以经历多个阶段。

进程 1(使用 ExecutorService1) → 进程 2 (执行服务2) → 进程 3 (执行器服务3)

进程 1、2、3 等都实现了 Callable,并且都有自己关联的执行器服务。我们启动了一个文件加载器进程,它加载文件夹,然后为每个文件夹创建一个 Process1 可调用对象并提交给 Process1 执行器,对于每个 Process1 可调用对象,它将完成其工作,然后提交到不同的可调用对象,这可能是 Process2,Process3 ecetera,但我们永远不会倒退,例如 Process3 永远不会提交到 Process1。 我们实际上有 12 个进程,但任何特定的文件夹都不可能经历所有12个进程

但我意识到这是有缺陷的,因为在 16 CPU 计算机的情况下,每个 ES 的池大小可以达到 16,所以我们实际上有 48 个线程在运行,这只会导致太多的争用。

所以我要做的是让所有进程(Process1,Process2...)使用相同的ExecutorService,这样我们只工作线程匹配CPU。

但是,在我目前的情况下,我们有一个 SongLoader 进程,它只提交了一个任务(加载所有文件夹),然后我们调用 shutdown(),这在将所有内容提交到 Process0 之前不会完成,然后 Process0 上的 shutdown() 不会成功,直到所有内容都发送到 Process1 等等。

//Init Services
services.add(songLoaderService);
services.add(Process1.getExecutorService());
services.add(Process2.getExecutorService());
services.add(Process3.getExecutorService());
for (ExecutorService service : services)
//Request Shutdown
service.shutdown();
//Now wait for all submitted tasks to complete
service.awaitTermination(10, TimeUnit.DAYS);
}
//...............
//Finish Off work

但是,如果所有内容都在同一个 ES 上,并且进程 1 正在提交到 进程 2,这将不再起作用,因为在调用 shutdown() 时,并非所有 Process1 都会提交给进程 2 的文件夹,因此它会过早关闭。

那么,当该 ES 上的任务可以提交到同一 ES 上的其他任务时,如何使用单个执行器服务检测所有工作何时完成?

还是有更好的方法?

请注意,您可能会想,他为什么不将 Process1,2 和 3 的逻辑合并到一个 Process 中。困难在于,虽然我最初按文件夹对歌曲进行分组,但有时歌曲会被分成更小的组,它们被分配到单独的进程,而不是同一个进程,实际上总共有 12 个进程。

基于肖尔姆斯思想的尝试

主线程

private static List<Future> futures = Collections.synchronizedList(new ArrayList<Future>());
private static AnalyserService analyserService = new MainAnalyserService(SongKongThreadGroup.THREAD_WORKER);
...
SongLoader loader = SongLoader.getInstanceOf(parentFolder);
ExecutorService songLoaderService =  SongLoader.getExecutorService();
songLoaderService.submit(loader);
for(Future future : futures)
{
try
{
future.get();
}
catch (InterruptedException ie)
{
SongKong.logger.warning(">>>>>> Interrupted - shutting down tasks immediately");
getAnalyserService().getExecutorService().awaitTermination(30, TimeUnit.SECONDS);
}
catch(ExecutionException e)
{
SongKong.logger.log(Level.SEVERE, ">>>>>> ExecutionException:"+e.getMessage(), e);
}
}
songLoaderService.shutdown();

使用过程代码从主分析器服务使用此函数提交新任务

public void submit(Callable<Boolean> task) //throws Exception
{
FixSongsController.getFutures().add(getExecutorService().submit(task));
}

看起来它正在工作,但它失败了

java.util.ConcurrentModificationException
at java.base/java.util.ArrayList$Itr.checkForComodification(Unknown Source)
at java.base/java.util.ArrayList$Itr.next(Unknown Source)
at com.jthink.songkong.analyse.toplevelanalyzer.FixSongsController.start(FixSongsController.java:220)
at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:49)
at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:18)
at java.desktop/javax.swing.SwingWorker$1.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.desktop/javax.swing.SwingWorker.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)

我现在意识到我不能调用一个线程调用 future.get()(它等到完成),同时其他线程正在添加到列表中。

我同意 Shloim 的观点,这里不需要多个ExecutorService实例——只有一个(根据可用的 CPU 数量调整大小)就足够了,而且实际上是最佳的。实际上,我认为您可能不需要ExecutorService;一个简单的Executor可以完成这项工作,如果你使用信令完整性的外部机制。

我将首先构建一个类来表示整个较大的工作项。如果需要使用每个子工作项的结果,可以使用队列,但如果只想知道是否还有剩余工作要做,则只需要一个计数器。

例如,您可以执行以下操作:

public class FolderWork implements Runnable {
private final Executor executor;
private final File folder;
private int pendingItems;  // guarded by monitor lock on this instance
public FolderWork(Executor executor, File folder) {
this.executor = executor;
this.folder = folder;
}
@Override
public void run() {
for (File file : folder.listFiles()) {
enqueueMoreWork(file);
}
}
public synchronized void enqueueMoreWork(File file) {
pendingItems++;
executor.execute(new FileWork(file, this));
}
public synchronized void markWorkItemCompleted() {
pendingItems--;
notifyAll();
}
public synchronized boolean hasPendingWork() {
return pendingItems > 0;
}
public synchronized void awaitCompletion() {
while (pendingItems > 0) {
wait();
}
}
}
public class FileWork implements Runnable {
private final File file;
private final FolderWork parent;
public FileWork(File file, FolderWork parent) {
this.file = file;
this.parent = parent;
}
@Override
public void run() {
try {
// do some work with the file
if (/* found more work to do */) {
parent.enqueueMoreWork(...);
}
} finally {
parent.markWorkItemCompleted();
}
}
}

如果担心pendingItems计数器的同步开销,可以改用AtomicInteger。然后,您需要一个单独的机制来通知等待线程我们已经完成了;例如,您可以使用CountDownLatch。下面是一个示例实现:

public class FolderWork implements Runnable {
private final Executor executor;
private final File folder;
private final AtomicInteger pendingItems = new AtomicInteger(0);
private final CountDownLatch latch = new CountDownLatch(1);
public FolderWork(Executor executor, File folder) {
this.executor = executor;
this.folder = folder;
}
@Override
public void run() {
for (File file : folder.listFiles()) {
enqueueMoreWork(file);
}
}
public void enqueueMoreWork(File file) {
if (latch.getCount() == 0) {
throw new IllegalStateException(
"Cannot call enqueueMoreWork() again after awaitCompletion() returns!");
}
pendingItems.incrementAndGet();
executor.execute(new FileWork(file, this));
}
public void markWorkItemCompleted() {
int remainingItems = pendingItems.decrementAndGet();
if (remainingItems == 0) {
latch.countDown();
}
}
public boolean hasPendingWork() {
return pendingItems.get() > 0;
}
public void awaitCompletion() {
latch.await();
}
}

你可以这样称呼它:

Executor executor = Executors.newCachedThreadPool(...);
FolderWork topLevel = new FolderWork(executor, new File(...));
executor.execute(topLevel);
topLevel.awaitCompletion();

此示例仅显示一个级别的子工作项,但您可以使用任意数量的子工作项,只要它们都使用相同的pendingItems计数器来跟踪剩余的工作量。

不要shutdown()ExecutorService。相反,请创建Callable对象并保留它们创建Future对象。 现在,您可以等待Future对象,而不是等待ExecutorService。请注意,现在您将不得不单独等待每个未来的对象,但是如果您只需要知道最后一个对象何时完成,那么您也可以以任何给定的顺序迭代它们并调用get()

任何任务都可以提交更多任务,并且需要确保将其未来对象放入将由主线程监控的队列中。

// put these somewhere public
ConcurrentLinkedQueue<Future<Boolean>> futures = new ConcurrentLinkedQueue<Future<Boolean>>();
ExecutorService executor = ...
void submit(Callable<Boolean> c) {
futures.add(executor.submit(c));
}

现在,您的主线程可以开始提交任务并等待所有任务和子任务:

void mainThread() {
// add some tasks from main thread
for(int i=0 ; i<N ; ++i){
Callable<Boolean> callable = new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
...
}
submit(callable);
}
Future<Boolean> head = null;
while((head=futures.poll()) != null){
try {
head.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
// At this point, all of your tasks are complete including subtasks.
executor.shutdown();
executor.awaitTermination(); // should return almost immediately
}

这在理论上@DanielPrydens解决方案,但我稍微按摩了一下,以便它更清楚地显示如何解决我的特定问题

创建了一个新的类MainAnalyserService,用于处理ExecutorService的创建,并提供在提交新的可调用任务和完成任务时进行计数的功能

public class MainAnalyserService 
{
public static final int MIN_NUMBER_OF_WORKER_THREADS = 3;
protected static int BOUNDED_QUEUE_SIZE = 100;
private final AtomicInteger pendingItems = new AtomicInteger(0);
private final CountDownLatch latch = new CountDownLatch(1);
private static final int TIMEOUT_PER_TASK = 30;
protected  ExecutorService      executorService;
protected String threadGroup;
public MainAnalyserService(String threadGroup)
{
this.threadGroup=threadGroup;
initExecutorService();
}
protected void initExecutorService()
{
int workerSize = Runtime.getRuntime().availableProcessors();
//Even if only have single cpu we still have multithread so we dont just have single thread waiting on I/O
if(workerSize< MIN_NUMBER_OF_WORKER_THREADS)
{
workerSize = MIN_NUMBER_OF_WORKER_THREADS;
}
executorService = new TimeoutThreadPoolExecutor(workerSize,
new SongKongThreadFactory(threadGroup),
new LinkedBlockingQueue<Runnable>(BOUNDED_QUEUE_SIZE),
TIMEOUT_PER_TASK,
TimeUnit.MINUTES);
}
public void submit(Callable<Boolean> task) //throws Exception
{
executorService.submit(task);
pendingItems.incrementAndGet();
}
public void workDone()
{
int remainingItems = pendingItems.decrementAndGet();
if (remainingItems == 0)
{
latch.countDown();
}
}
public void awaitCompletion() throws InterruptedException{
latch.await();
}
}

FixSongsController线程中,我们有

analyserService = new MainAnalyserService(THREAD_WORKER);
//SongLoader uses CompletionService when calls LoadFolderWorkers so shutdown wont return until all initial folder submissions completed
ExecutorService songLoaderService = SongLoader.getExecutorService();
songLoaderService.submit(loader);
songLoaderService.shutdown();
//Wait for all aysnc tasks to complete
analyserService.awaitCompletion();

然后任何调用的(如Process1,Process2等)调用submit()ExecutorService上提交一个新的可调用对象,然后它必须在完成后调用workDone(),所以为了确保我这样做,我在每个Process类方法的call()中添加了一个final块。

例如

public Boolean call() 
{
try
{
//do stuff
//Possibly make multiple calls to                      
FixSongsController.getAnalyserService().submit();
}
finally
{
FixSongsController.getAnalyserService().workDone();
}
}

相关内容

  • 没有找到相关文章

最新更新