CompletableFuture并行执行几个线程,串行执行几个线程



我需要执行一些任务。有些任务是独立的,有些任务依赖于其他任务的成功执行。独立任务可以并行运行以获得更好的性能。我把这些任务称为服务。列link告诉哪些服务将串行执行,哪些服务将并行执行。列order描述了一组定义的服务将遵循的执行顺序。对于下面的示例,服务A和B应该并行运行。如果它们已成功执行,则服务C将执行。请注意,服务C并不直接依赖于其先前服务的输出,但它必须在成功执行其先前服务之后运行,因为服务C在执行期间需要由其先前服务产生的一些数据。在成功执行服务C之后,将执行下一个服务D,依此类推,此循环将继续,直到列表中的所有服务都已被消耗。

Tasks       service     link      order
Service A   01          03        1
Service B   02          03        2
Service C   03          04        3
Service D   04          05        4
Service E   05          07        5
Service F   06          07        6
Service G   07          (null)    7

以下是我的代码。

public void executeTransactionFlow(DataVo dataVo) throws Exception {
List<Callable<Boolean>> threadList = new ArrayList<>();
List<String> serviceIds = new ArrayList<>();
List<Future<Boolean>> futureList;
String validatedRespCode = null, joinTo, prevJoinTo = null, serviceId;
// Iterating through service flows map
for (Map<String, String> map : serviceFlowsMap) {
joinTo = map.get("link");
serviceId = map.get("service");
// A simple flag to differentiate which services should execute parallel and which in serial.
if (null == prevJoinTo) {
prevJoinTo = joinTo;
}
// Check for join condition. If join condition is same as previous then do not execute the thread list yet add current service in list
if (null != joinTo && joinTo.equals(prevJoinTo)) {
threadList.add(new Callable<String, DataVo>(serviceId, dataVo));
}
/*
* 1. Run the threads in the list
* 2. Empty the thread list
* 3. Empty serviceIds list
* 4. Set prevJoinTo
*/
else {
if (threadList.size() > 0) {
prevJoinTo = joinTo;
try {
// If list contain only 1 service then call, otherwise invokeAll
futureList = MyExecutor.executeServices(threadList, dataVo);
// During execution we cannot interrupt services, so we check here after they get back to here and interrupt if it has been timedout.
if (dataVo.isTimedout()) {
throw new Exception("Transaction thread is Interrupted or Timed-out");
}
// Validate service response codes and get decision in case of any failure
validatedRespCode = validateResponseOfExecutedServices(dataVo, futureList, serviceIds);
// If validationRespCode is non 00 then do not process further
if (null != validatedRespCode && !"200".equals(validatedRespCode)) {
break;
}
}
catch (Exception e) {
throw new Exception(e.getMessage(), e);
}
finally {
// clear thread list and serviceIds list. It will be populated for next parallel set of threads
threadList.clear();
serviceIds.clear();
}
}
// Start preparing new thread list
// Adding current service_id into threadList after executing previous services in parallel.
threadList.add(new Callable<String, DataVo>(serviceId, dataVo));
}
}
// Run remaining services
if (!threadList.isEmpty()) {
try {
futureList = MyExecutor.executeServices(threadList, dataVo);
validatedRespCode = validateResponseOfExecutedServices(dataVo, futureList, serviceIds);
}
catch (Throwable e) {
throw new Exception(e.getMessage(), e);
}
}
// Check validation response code
if (null != validatedRespCode && !"200".equals(validatedRespCode)) {
MyExecutor.callDeclineFlow(dataVo, validatedRespCode, null);
}
}

/**
* This method iterates through the thread list and checks for exceptions and service responses.
* If service response is not success or if any exception has occurred then exception is thrown
*/
public String validateResponseOfExecutedServices(DataVo dataVo, List<Future<Boolean>> futureList, List<String> serviceIds) throws Exception {
String finalResponse = "200", serviceResponse = null;
/*
* future list will be null if single service is executed (no other parallel transactions). The reason is that we do
* not use invokeAll() on single service.
*/
if (null != futureList && futureList.size() > 0) {
for (Future<Boolean> future : futureList) {
try {
future.get();
}
catch (Exception e) {
throw new Exception(e.getMessage(), e);
}
}
}
// Iterate through serviceIds and check responses.
for (String serviceId : serviceIds) {
serviceResponse = dataVo.getServiceResponse(serviceId);
/*
* if one of following response is found then consider it exception
*/
if (null != serviceResponse && "400,401,402,403,404,500,501".contains(serviceResponse)) {
throw new Exception("One of the service has been declined");
}
}
return finalResponse;
}

如果CompletableFuture在这里是有益的,那么我如何有效地使用它?

并且future.get()是阻塞呼叫。如果我有10个并行执行的服务,那么这个future.get()将阻塞其他服务,即使它们在我们等待的当前服务之前已经执行。如何避免这种阻塞?

我添加了问题陈述的更多细节,即添加了订单栏。服务需要遵循定义的顺序。服务A和B的顺序分别为1和2,但它们仍将并行执行,因为两者在link中都有03值。我认为现在不需要像@Thomas在评论中建议的那样,使用基于依赖图的方法。

好问题。尽管从技术上讲,完全使用ExecutorServiceFuture肯定是可能的,但根据我的说法,更好的方法是使用反应式编程,而不是完全依赖于FutureCompletableFutureCompletionService等。主要原因是它可能很快成为一个难以阅读的代码。

以下是我如何使用RxJava 2.2.16ExecutorService:

  1. 执行不依赖于他人的操作,或者使用ExecutorServicesubmit()操作已完成其所有依赖关系
  2. 要知道操作已完成,请使用RxJava的BehaviorSubject。当一个操作完成时,为其每个依赖项触发步骤(1)
  3. 所有操作完成后,关闭ExecutorService。为此,请使用另一个BehaviorSubject

很抱歉,由于采用了新方法,我已经用自己的方式编写了整个逻辑。但它仍然围绕着你给出的主要要求。首先看一下AppRxjava中的Action模型类和createActions()方法会很好。从那里,您应该能够遵循代码。为了模拟一些时间消耗,我使用了著名的Thread.sleep()技术。

public class AppRxJava{
/* To listen to the completion of a task, so that the dependent tasks may be scheduled. */
private Subject<Action> completionSub = io.reactivex.subjects.BehaviorSubject.create();
/* To listen to the completion of all tasks, so that ExecutorService may shut down. */
private Subject<Boolean> allActionCompletedSub = io.reactivex.subjects.BehaviorSubject.create();
private ExecutorService SVC = Executors.newCachedThreadPool();
private List<Action> allActions;
public static void main( String[] args ){
new AppRxJava().start();
}
private void start() {
this.allActions = createActions();
subscribeToActionCompletions();
subscribeToSvcShutdown();
startAllActions( this.allActions );
}
private void subscribeToSvcShutdown(){
/* If all actions have been completed, shut down the ExecutorService. */
this.allActionCompletedSub.subscribe( allScheduled -> {
if( allScheduled ) {
SVC.shutdown();
try {
SVC.awaitTermination( 2, TimeUnit.SECONDS );
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
private void subscribeToActionCompletions(){
this.completionSub.subscribe( complAction -> {
/* Get the actions that are dependent on this recently completed action and "attempt" to start them. */
List<Action> deps = getDeps( complAction, this.allActions );
startAllActions( deps );
/* If all actions have got completed, raise the flag. */
if( allActionsCompleted() ) this.allActionCompletedSub.onNext( true );
});
}
/* Attempts to start all actions that are present in the passed list. */
private void startAllActions( List<Action> actions ){
for( Action action : actions ) {
startAction( action, actions );
}
}
/* Attempts to start an action. Only if it is still pending and all of its dependencies are completed. */
private void startAction( Action a, List<Action> list ){
if( !a.isPending() ) return;
if( !allDepsCompleted( a, allActions ) ) return;
if( a.isPending() ) {
synchronized (a.LOCK ) {
if( a.isPending() ) {
a.setStatus( 1 ); //Set to running, so that it is not picked up twice. 
SVC.submit( () -> {
try {
a.getAction().call();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
a.setStatus( 2 ); //Set to completed. (We may have to synchronize this.)
this.completionSub.onNext( a );
} );
}
}
}
}
private boolean allActionsCompleted(){
for( Action a : this.allActions ) if( !a.isCompleted() ) return false;
return true;
}
private static boolean allDepsCompleted( Action a, List<Action> allActions ){
for( Action dep : allActions ) {
if( a.getDependencies().contains( dep ) && !dep.isCompleted() ) return false;
}
return true;
}
/* Returns the actions that are dependent on Action <code>a</code>. */
private List<Action> getDeps( Action a, List<Action> list ){
List<Action> deps = new ArrayList<>();
for( Action dep : list ) if( dep.getDependencies().contains( a ) ) deps.add( dep );
return deps;
}
/* Creates the action list with respective dependencies. */
private List<Action> createActions(){
List<Action> actions = new ArrayList<>();
Action a = createAction( 5000, "ServiceA", null );
Action b = createAction( 5000, "ServiceB", null );
Action c = createAction( 2000, "ServiceC", a, b );
Action d = createAction( 2000, "ServiceD", c );
Action e = createAction( 2000, "ServiceE", d );
actions.add( a ); actions.add( b ); actions.add( c ); actions.add( d ); actions.add( e );
return actions;
}
private Action createAction( final long sleepMillis, final String name, Action... dependencies ) {
List<Action> deps = null;
if( dependencies != null ) {
deps = new ArrayList<>();
for( Action a : dependencies ) deps.add( a );
}
return Action.of( () -> {
System.out.println( "Service (" + name + ") started" );
try {
Thread.sleep( sleepMillis );
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println( "Service (" + name + ") completed" );
return true;
}, name, deps );
}

}

以及Action模型类。这表示一个操作及其所依赖的操作列表。(与您最初的陈述略有不同。但我认为,如果您相应地处理,无论哪种方式都可以。)

public class Action{
Callable<Boolean> action;
String name;
List<Action> dependencies = new ArrayList<>();
AtomicInteger status = new AtomicInteger( 0 ); //0 = Pending, 1 = Scheduled, 2 = Completed
public static final Object LOCK = new Object();
private Action(Callable<Boolean> action, String name, List<Action> dependencies) {
super();
this.action = action;
this.name = name;
if( dependencies != null ) this.dependencies = dependencies;
}
public static Action of( Callable<Boolean> action, String name, List<Action> dependencies ){
return new Action( action, name, dependencies );
}
public Callable<Boolean> getAction(){
return action;
}
public String getName(){
return name;
}
public List<Action> getDependencies(){
return dependencies;
}
public boolean isCompleted(){
return this.status.get() == 2;
}
public boolean isPending(){
return this.status.get() == 0;
}
public boolean isScheduled(){
return this.status.get() == 1;
}
public void setStatus( int status ){
this.status.getAndSet( status );
}
@Override
public int hashCode(){
final int prime = 31;
int result = 1;
result = prime * result + ((name == null) ? 0 : name.hashCode());
return result;
}
@Override
public boolean equals( Object obj ){
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
Action other = (Action) obj;
if (name == null) {
if (other.name != null)
return false;
} else if (!name.equalsIgnoreCase( other.name )) return false;
return true;
}
}

然后Combine可用于表示CompletionStages之间的依赖关系,允许您在两者完成后执行任务。然后,您可以使用Apply:预处理后续操作

CompletionStage<ServiceAResponse> serviceAResponse = callServiceA();
CompletionStage<ServiceBResponse> serviceBResponse = callServiceB();

CompletionStage<ServiceEResponse> result = serviceA.thenCombine(serviceBResponse, (aResponse, bResponse) -> serviceC.call(aResponse, bResponse))                                                     
.thenApply(cResponse -> serviceD.call(cResponse))                                                    
.thenApply(dResponse -> serviceE.call(eResponse))

public CompletionStage<ServiceAResponse> callServiceA() {
return CompletableFuture.supplyAsync(() -> serviceA.call());
}
public CompletionStage<ServiceBResponse> callServiceB() {
return CompletableFuture.supplyAsync(() -> serviceB.call());
}

我无法忘记用纯Java做这件事的基本问题。所以,这是我之前回答的一个修改版本。此答案包含两种样式—RxJavaExecutorService。它包含3个类:

  1. DependentSeriesOfActionsBase:一个基类,包含一些可重用的方法和公共字段。这只是为了方便和易于理解代码
  2. DependentSeriesOfActionsCoreJava:这是基于ExecutorService的实现。我使用Future.get()来等待操作的结果,与不同的是,等待本身是异步发生的。看看startAction()
  3. DependentSeriesOfActionsRxJava:早期基于RxJava的实现

代码DependentSeriesOfActionsBase

abstract class DependentSeriesOfActionsBase{
protected List<Action> allActions;
protected ExecutorService SVC = Executors.newCachedThreadPool();
protected boolean allActionsCompleted(){
for( Action a : this.allActions ) if( !a.isCompleted() ) return false;
return true;
}
protected static boolean allDepsCompleted( Action a, List<Action> allActions ){
for( Action dep : allActions ) {
if( a.getDependencies().contains( dep ) && !dep.isCompleted() ) return false;
}
return true;
}
/* Returns the actions that are dependent on Action <code>a</code>. */
protected List<Action> getDeps( Action a, List<Action> list ){
List<Action> deps = new ArrayList<>();
for( Action dep : list ) if( dep.getDependencies().contains( a ) ) deps.add( dep );
return deps;
}
/* Creates the action list with respective dependencies. */
protected List<Action> createActions(){
List<Action> actions = new ArrayList<>();
Action a = createAction( 5000, "ServiceA", null );
Action b = createAction( 5000, "ServiceB", null );
Action c = createAction( 2000, "ServiceC", a, b );
Action d = createAction( 2000, "ServiceD", c );
Action e = createAction( 2000, "ServiceE", d );
actions.add( a ); actions.add( b ); actions.add( c ); actions.add( d ); actions.add( e );
return actions;
}
protected Action createAction( final long sleepMillis, final String name, Action... dependencies ) {
List<Action> deps = null;
if( dependencies != null ) {
deps = new ArrayList<>();
for( Action a : dependencies ) deps.add( a );
}
return Action.of( () -> {
System.out.println( "Service (" + name + ") started" );
try {
Thread.sleep( sleepMillis );
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println( "Service (" + name + ") completed" );
return true;
}, name, deps );
}
/* Attempts to start all actions that are present in the passed list. */
protected void startAllActions( List<Action> actions ){
for( Action action : actions ) {
startAction( action, actions );
}
}
protected abstract void startAction( Action action, List<Action> actions );

protected void shutdown(){
SVC.shutdown();
try {
SVC.awaitTermination( 2, TimeUnit.SECONDS );
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

代码DependentSeriesOfActionsCoreJava

public class DependentSeriesOfActionsCoreJava extends DependentSeriesOfActionsBase{
public static void main( String[] args ){
new DependentSeriesOfActionsCoreJava().start();
}
private void start() {
this.allActions = createActions();
startAllActions( this.allActions );
}
protected void startAction( Action a, List<Action> list ){
if( !a.isPending() ) return;
if( !allDepsCompleted( a, allActions ) ) return;
if( a.isPending() ) {
synchronized (a.LOCK ) {
if( a.isPending() ) {
a.setStatus( 1 ); //Set to running, so that it is not picked up twice. 
/* Submit the action to the ExecutorService and get the handle to the Future. */
final Future<?> fut = SVC.submit( () -> a.action.call() );
/* Submit the Future.get() action to the ExecutorService and execute the dependencies when it returns. */
SVC.submit( () -> {
Object returnVal = null;
/* Wait */
try {
fut.get(); 
a.setStatus( 2 );
/* If all actions are completed, shut down the ExecutorService. */
if( allActionsCompleted() ) shutdown();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
startAllActions( getDeps( a, this.allActions ) );
} );
}
}
}
}
}

代码DependentSeriesOfActionsRxJava

public class DependentSeriesOfActionsRxJava extends DependentSeriesOfActionsBase{
/* To listen to the completion of a task, so that the dependent tasks may be scheduled. */
private Subject<Action> completionSub = io.reactivex.subjects.BehaviorSubject.create();
/* To listen to the completion of all tasks, so that ExecutorService may shut down. */
private Subject<Boolean> allActionCompletedSub = io.reactivex.subjects.BehaviorSubject.create();
public static void main( String[] args ){
new DependentSeriesOfActionsRxJava().start();
}
private void start() {
this.allActions = createActions();
subscribeToActionCompletions();
subscribeToSvcShutdown();
startAllActions( this.allActions );
}
private void subscribeToSvcShutdown(){
/* If all actions have been completed, shut down the ExecutorService. */
this.allActionCompletedSub.subscribe( allScheduled -> {
if( allScheduled ) shutdown();
});
}
private void subscribeToActionCompletions(){
this.completionSub.subscribe( complAction -> {
/* Get the actions that are dependent on this recently completed action and "attempt" to start them. */
List<Action> deps = getDeps( complAction, this.allActions );
startAllActions( deps );
/* If all actions have got completed, raise the flag. */
if( allActionsCompleted() ) this.allActionCompletedSub.onNext( true );
});
}
/* Attempts to start an action. Only if it is still pending and all of its dependencies are completed. */
protected void startAction( Action a, List<Action> list ){
if( !a.isPending() ) return;
if( !allDepsCompleted( a, allActions ) ) return;
if( a.isPending() ) {
synchronized (a.LOCK ) {
if( a.isPending() ) {
a.setStatus( 1 ); //Set to running, so that it is not picked up twice. 
SVC.submit( () -> {
try {
a.getAction().call();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
a.setStatus( 2 ); //Set to completed. (We may have to synchronize this.)
this.completionSub.onNext( a );
} );
}
}
}
}
}

因此,您可以做的一件简单的事情是以一种使更容易管理的方式包装您的服务

  • 提供名称
  • 维护运行它的先决条件列表,以及
  • 告诉您服务运行本身何时完成(这样就可以检查其他服务的先决条件)

这就是这样一个类的样子:

class Service implements Runnable {
private final Runnable wrappedRunnable;
private final String name;
private final List<String> preconditions = new CopyOnWriteArrayList<>();
private final Consumer<String> finishedNotification;
Service(Runnable r, String name, Consumer<String> finishedNotification, String... preconditions) {
this.wrappedRunnable = r;
this.name = name;
this.finishedNotification = finishedNotification;
this.preconditions.addAll(Arrays.asList(preconditions));
}
@Override
public void run() {
wrappedRunnable.run();
finishedNotification.accept(name);
}
void preconditionFulfilled(String precondition) {
preconditions.remove(precondition);
}
boolean arePreconditionsFulfilled() {
return preconditions.isEmpty();
}
}

Runnable参数是要包装的实际服务调用,名称是表中的管理信息("service A"、"01"或您想要的任何信息),preconditions是在执行之前需要完成运行的其他服务的名称。

现在,您仍然需要一些管理器来维护所有服务的列表——这也是服务调用完成时需要通知的内容。通过维护一个简单的服务列表,这实际上可以非常简单。

class CallManager {
List<Service> services = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(4);
void addService(Runnable r, String serviceName, String... preconditions) {
services.add(new Service(r, serviceName, this::preconditionFulfilled, preconditions));
}
void run() {
for (Iterator<Service> serviceIterator = services.iterator(); serviceIterator.hasNext(); ) {
Service service = serviceIterator.next();
if (service.arePreconditionsFulfilled()) {
executorService.submit(service);
serviceIterator.remove();
}
}
if (services.isEmpty()) {
executorService.shutdown();
}
}
private synchronized void preconditionFulfilled(String name) {
System.out.printf("service %s finished%n", name);
for (Iterator<Service> serviceIterator = services.iterator(); serviceIterator.hasNext(); ) {
Service service = serviceIterator.next();
service.preconditionFulfilled(name);
if (service.arePreconditionsFulfilled()) {
executorService.submit(service);
serviceIterator.remove();
}
}
if (services.isEmpty()) {
executorService.shutdown();
}
}
}

因此,首先需要将要运行的所有服务添加到此管理器中,然后在其上调用run()来引导整个执行链。这可能是您的示例的样子:

class Bootstrap {
private static final Random RANDOM = new Random();
public static void main(String[] args) {
CallManager manager = new CallManager();
manager.addService(simpleRunnable("A"), "A");
manager.addService(simpleRunnable("B"), "B");
manager.addService(simpleRunnable("C"), "C", "A", "B");
manager.addService(simpleRunnable("D"), "D", "C");
manager.addService(simpleRunnable("E"), "E", "D");
manager.addService(simpleRunnable("F"), "F");
manager.addService(simpleRunnable("G"), "G", "E", "F");
manager.run();
}
// create some simple pseudo service
private static Runnable simpleRunnable(String s) {
return () -> {
System.out.printf("running service %s%n", s);
try {
Thread.sleep(RANDOM.nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}
}

你可以在这里看到这个跑步。

我还没有看过你的整个代码,因为它看起来主要是并行管理,但你可以通过实现的一般概念

CompletableFuture.allOf(CompletableFuture.runAsync(serviceA::run),
CompletableFuture.runAsync(serviceB::run))
.thenRun(serviceC::run)
.thenRun(serviceD::run)
.thenRun(serviceE::run);

这将在表中创建依赖项(假设所有服务都实现Runnable);它不会等待整个事情完成执行(可以通过添加.join().get()来完成)。

您没有告诉在任务A和B之后运行的任务C是否需要A和B的结果。假设它需要(如果不需要,解决方案会更短)。那么使用DF4J库的解决方案可以是类似的:

Class A extends AsyncProc {
ScalarResult<T> res = new ScalarResult<>();
@Override
protected void runAction() {
...
res.onSuccess(value);
}
}
Class B extends AsyncProc {
ScalarResult<T> res = new ScalarResult<>();
@Override
protected void runAction() {
...
res.onSuccess(value);
}
}
Class C extends AsyncProc {
InpScalar<T> param1 = new InpScalar<>(this);
InpScalar<T> param2 = new InpScalar<>(this);
ScalarResult<T> res = new ScalarResult<>();
@Override
protected void runAction() {
value = ... param1.current() ...param2.current()...
res.onSuccess(value);
}
}
Class D extends AsyncProc {
InpScalar<T> param = new InpScalar<>(this);
ScalarResult<T> res = new ScalarResult<>();
@Override
protected void runAction() {
value = ... param.current()
res.onSuccess(value);
}
}
Class E extends AsyncProc {
InpScalar<T> param = new InpScalar<>(this);
ScalarResult<T> res = new ScalarResult<>();
@Override
protected void runAction() {
value = ... param.current()
res.onSuccess(value);
}
}

因此,我们声明了具有不同数量参数的异步过程。然后我们创建实例并将它们连接到数据流图中:

A a = new A(); a.start();
B b = new A(); b.start();
C c = new A(); c.start();
D d = new A(); d.start();
E e = new A(); e.start();
a.res.subscribe(c.param1);
b.res.subscribe(c.param2);
c.res.subscribe(d.param);
d.res.subscribe(e.param);

最后,以同步方式等待最后一个异步进程的结果:

T result = e.res.get();

最新更新