在 Java 中并行执行依赖任务



我需要找到一种方法在java中并行执行任务(依赖和独立)。

  1. 任务 A 和任务 C 可以独立运行。
  2. 任务 B 依赖于任务 A 的输出。

我检查了java.util.concurrent Future和Fork/Join,但看起来我们无法向任务添加依赖项。

谁能指出我正确的Java API。

在 Scala

中,这很容易做到,我认为你最好使用 Scala。这是我从这里提取的一个例子 http://danielwestheide.com/(Scala 新手指南第 16 部分:从这里去哪里) 这家伙有一个很棒的博客(我不是那个人)

让我们以一位正在煮咖啡的大律师为例。要执行的任务是:

  1. 研磨所需的咖啡豆(无前面的任务)
  2. 加热一些水(没有前面的任务)
  3. 使用研磨咖啡和热水冲泡浓缩咖啡(取决于1和2)
  4. 给一些牛奶起泡(没有前面的任务)
  5. 将泡沫牛奶和浓缩咖啡混合(取决于 3,4)

或作为一棵树:

Grind   _
Coffe    
             
Heat    ____Brew____ 
Water                _____Combine
                     /
Foam    ____________/
Milk

在使用并发 API 的 Java 中,这将是:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Barrista {
    static class HeatWater implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("Heating Water");
            Thread.sleep(1000);
            return "hot water";
        }
    }
    static class GrindBeans implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("Grinding Beans");
            Thread.sleep(2000);
            return "grinded beans";
        }
    }
    static class Brew implements Callable<String> {
        final Future<String> grindedBeans;
        final Future<String> hotWater;
        public Brew(Future<String> grindedBeans, Future<String> hotWater) {
            this.grindedBeans = grindedBeans;
            this.hotWater = hotWater;
        }
        @Override
        public String call() throws Exception
        {
            System.out.println("brewing coffee with " + grindedBeans.get()
                    + " and " + hotWater.get());
            Thread.sleep(1000);
            return "brewed coffee";
        }
    }
    static class FrothMilk implements Callable<String> {
        @Override
        public String call() throws Exception {
            Thread.sleep(1000);
            return "some milk";
        }
    }
    static class Combine implements Callable<String> {
        public Combine(Future<String> frothedMilk, Future<String> brewedCoffee) {
            super();
            this.frothedMilk = frothedMilk;
            this.brewedCoffee = brewedCoffee;
        }
        final Future<String> frothedMilk;
        final Future<String> brewedCoffee;
        @Override
        public String call() throws Exception {
            Thread.sleep(1000);
            System.out.println("Combining " + frothedMilk.get() + " "
                    + brewedCoffee.get());
            return "Final Coffee";
        }
    }
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        FutureTask<String> heatWaterFuture = new FutureTask<String>(new HeatWater());
        FutureTask<String> grindBeans = new FutureTask<String>(new GrindBeans());
        FutureTask<String> brewCoffee = new FutureTask<String>(new Brew(grindBeans, heatWaterFuture));
        FutureTask<String> frothMilk = new FutureTask<String>(new FrothMilk());
        FutureTask<String> combineCoffee = new FutureTask<String>(new Combine(frothMilk, brewCoffee));
        executor.execute(heatWaterFuture);
        executor.execute(grindBeans);
        executor.execute(brewCoffee);
        executor.execute(frothMilk);
        executor.execute(combineCoffee);

        try {
            /**
             *  Warning this code is blocking !!!!!!!
             */         
            System.out.println(combineCoffee.get(20, TimeUnit.SECONDS));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            System.out.println("20 SECONDS FOR A COFFEE !!!! I am !@#! leaving!!");
            e.printStackTrace();
        } finally{
                executor.shutdown();
            }
        }
    }

确保添加超时,以确保您的代码不会永远等待某些事情完成,这是通过使用 Future.get(long, TimeUnit) 完成的,然后相应地处理失败。

但是,它在scala中要好得多,就像在博客上一样:准备一些咖啡的代码看起来像这样:

def prepareCappuccino(): Try[Cappuccino] = for {
  ground <- Try(grind("arabica beans"))
  water <- Try(heatWater(Water(25)))
  espresso <- Try(brew(ground, water))
  foam <- Try(frothMilk("milk"))
} yield combine(espresso, foam)

其中所有方法都返回一个未来(类型化的未来),例如 grind 将是这样的:

def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future {
   // grinding function contents
}

对于所有实现,请查看博客,但仅此而已。您也可以轻松集成 Scala 和 Java。我真的建议在Scala而不是Java中做这种事情。Scala需要更少的代码,更简洁和事件驱动。

具有依赖项的任务的常规编程模型是数据流。简化模型,其中每个任务只有一个(尽管重复)依赖项是Actor模型。Java有很多参与者库,但数据流很少。另请参阅:哪个-actor-model-library-framework-for-java,java-pattern-for-nested-callbacks

使用 BlockingQueue。将任务 A 的输出放入队列中,任务 B 将阻塞,直到队列中有可用内容。

文档包含实现此目的的示例代码:http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html

Java定义了一个类CompletableFuture。

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

这就是你要找的。它有助于构建执行流。

你需要的是一个CountDownLatch。

final CountDownLatch gate = new CountDownLatch(2);
// thread a
new Thread() {
    public void run() {
        // process
        gate.countDown();
    }
}.start();
// thread c
new Thread() {
    public void run() {
        // process
        gate.countDown();
    }
}.start();
new Thread() {
    public void run() {
        try {
            gate.await();
            // both thread a and thread c have completed
            // process thread b
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}.start();

作为替代方法,根据您的方案,您还可以使用 BlockingQueue 来实现生产者-使用者模式。请参阅文档页面上的示例。

如果任务 B 依赖于任务 A 的输出,我首先会质疑任务 B 是否真的是一个单独的任务。如果存在以下情况,则分离任务是有意义的:

  • 任务 B 在需要任务 A 的结果之前可以完成的一些非平凡工作量
  • 任务 B 是一个长期持续的过程,用于处理任务 A 的许多不同实例的输出
  • 还有一些其他任务(比如 D)也使用任务 A 的结果

假设它是一个单独的任务,那么你可以允许任务A和B共享一个BlockingQueue,以便任务A可以传递任务B的数据。

使用此库 https://github.com/familysyan/TaskOrchestration。它为您管理任务依赖项。

有一个专门用于此目的的 java 库(免责声明:我是这个库的所有者),称为 Dexecutor

这是您实现预期结果的方法,您可以在此处阅读更多相关信息

@Test
public void testDependentTaskExecution() {
    DefaultDependentTasksExecutor<String, String> executor = newTaskExecutor();
    executor.addDependency("A", "B");
    executor.addIndependent("C");
    executor.execute(ExecutionBehavior.RETRY_ONCE_TERMINATING);
}
private DefaultDependentTasksExecutor<String, String> newTaskExecutor() {
    return new DefaultDependentTasksExecutor<String, String>(newExecutor(), new SleepyTaskProvider());
}
private ExecutorService newExecutor() {
    return Executors.newFixedThreadPool(ThreadPoolUtil.ioIntesivePoolSize());
}
private static class SleepyTaskProvider implements TaskProvider<String, String> {
    public Task<String, String> provid(final String id) {
        return new Task<String, String>() {
            @Override
            public String execute() {
                try {
                    //Perform some task
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String result = id + "processed";
                return result;
            }
            @Override
            public boolean shouldExecute(ExecutionResults<String, String> parentResults) {
                ExecutionResult<String, String> firstParentResult = parentResults.getFirst();
                //Do some logic with parent result
                if ("B".equals(id) && firstParentResult.isSkipped()) {
                    return false;
                }
                return true;
            }
        };          
    }
}

最新更新