要求:
- 我将消息分为不同的类型,例如
Type1, Type2 ... Type100
. - 我想并行执行不同类型的消息。假设在 10 个线程中,但必须逐个执行相同类型的所有消息。执行顺序无关紧要。
- 一旦线程完成
TypeX
的所有消息。它应该开始处理另一种类型。
我经历了不同的答案: 他们中的大多数建议执行器服务来处理多线程。 假设我们创建执行器服务,例如
ExecutorService executorService = Executors.newFixedThreadPool(10);
但是一旦我们使用executorService.submit(runnableMessage);
提交消息
我们无法控制将特定类型的消息分配给特定线程。
溶液:
创建单线程执行程序数组
ExecutorService[] pools = new ExecutorService[10];
并最初传递类型 1、类型 2 的消息...类型10 然后,如果任何执行器已完成执行,则将 Type11 分配给它并继续执行,直到处理所有类型。
有没有更好的方法?
类似于具有多个队列的执行器服务,我可以在其中将每种类型的消息推送到不同的队列?
我建议看看Akka。 它们提供了一个更适合此用例的Actor框架。 除了定义你自己的 ExecutorService 接口实现之外,JDK 提供的默认实现并不能给人那么多的调度控制。
创建ExecutionServices的硬编码数组不会非常动态或健壮,特别是因为每个ExecutionService将有一个线程池。 可以用哈希映射替换数组,然后将其放在 ExecutionService 的自定义实现后面,这样做的好处是对调用者隐藏这些细节,但它不会解决拥有如此多线程池的线程浪费。
在 Akka 中,每个 Actor 都有自己的与之关联的消息队列。 每个Actor有效地在自己的线程中运行,一次处理其队列中的每条消息。 Akka 将管理多个 Actor 之间的线程共享。 因此,如果您要为每个消息类型创建一个Actor,然后将消息与这些Actor一起排队,那么您将获得的目标是让每个消息类型一次最多由一个线程处理,同时仅由一个线程池支持。
技术演示:
马文对阿卡的依赖。
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.4.17</version>
</dependency>
Java 8 代码。 复制并粘贴到 Java 文件中,然后在 IDE 中运行 main 方法。
package com.softwaremosaic.demos.akka;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
public class ActorDemo {
public static void main( String[] args ) throws InterruptedException {
// The following partitioner will spread the requests over
// multiple actors, which I chose to demonstrate the technique.
// You will need to change it to one that better maps the the
// jobs to your use case. Remember that jobs that get mapped
// to the same key, will get executed in serial (probably
// but not necessarily) by the same thread.
ExecutorService exectorService = new ActorExecutionService( job -> job.hashCode()+"" );
for ( int i=0; i<100; i++ ) {
int id = i;
exectorService.submit( () -> System.out.println("JOB " + id) );
}
exectorService.shutdown();
exectorService.awaitTermination( 1, TimeUnit.MINUTES );
System.out.println( "DONE" );
}
}
class ActorExecutionService extends AbstractExecutorService {
private final ActorSystem actorSystem;
private final Function<Runnable, String> partitioner;
private final ConcurrentHashMap<String,ActorRef> actors = new ConcurrentHashMap<>();
public ActorExecutionService( Function<Runnable,String> partitioner ) {
this.actorSystem = ActorSystem.create("demo");
this.partitioner = partitioner;
}
public void execute( Runnable command ) {
String partitionKey = partitioner.apply( command );
ActorRef actorRef = actors.computeIfAbsent( partitionKey, this::createNewActor );
actorRef.tell( command, actorRef );
}
private ActorRef createNewActor( String partitionKey ) {
return actorSystem.actorOf( Props.create(ExecutionServiceActor.class), partitionKey );
}
public void shutdown() {
actorSystem.terminate();
}
public List<Runnable> shutdownNow() {
actorSystem.terminate();
try {
awaitTermination( 1, TimeUnit.MINUTES );
} catch ( InterruptedException e ) {
throw new RuntimeException( e );
}
return Collections.emptyList();
}
public boolean isShutdown() {
return actorSystem.isTerminated();
}
public boolean isTerminated() {
return actorSystem.isTerminated();
}
public boolean awaitTermination( long timeout, TimeUnit unit ) throws InterruptedException {
actorSystem.awaitTermination();
return actorSystem.isTerminated();
}
}
class ExecutionServiceActor extends UntypedActor {
public void onReceive(Object message) throws Exception {
if (message instanceof Runnable) {
((Runnable) message).run();
} else {
unhandled(message);
}
}
}
注意上面的代码将以未定义的顺序打印 1-100。 由于批处理(Akka 这样做是为了获得额外的性能优势),订单看起来主要是串行的。 但是,当不同的线程穿插在工作中时,您会看到数字的一些随机性。 每个作业运行的时间越长,分配给 Akka 线程池的线程就越多,使用的分区键越多,底层 CPU 内核越多,序列可能变得越随机。
更简单的解决方案可能是:
而不是使每条消息都可运行。 我们可以根据其类型创建群组消息:
例如,我们为类型1的所有消息创建组1
class MessageGroup implements Runnable {
String type;
String List<Message> messageList;
@Override
public void run() {
for(Message message : MessageList) {
message.process();
}
}
}
我们可以使用固定线程创建通常的执行器服务,例如
ExecutorService executorService = Executors.newFixedThreadPool(10);
我们可以提交一组消息,而不是提交单个消息,例如
executorService.submit(runnableGroup);
并且每个组将在同一线程中按顺序执行相同类型的消息。
这是我最基本的示例,说明它的外观。 您创建一个包含 10 个由其"Typ"寻址的 ArrayDeques 的映射。 此外,您还启动了 10 个计划执行程序。 每个队列最初等待 5 秒,然后每 200 毫秒轮询一次其队列。 在此当前示例中,输出将始终为"TypeX: null 的当前消息队列",因为队列均为空。
但是您现在可以打开它并将消息传递到匹配队列中。该服务将每 200 毫秒获取一次,并用它做任何您想做的事情。 当您使用队列时,消息的处理方式也会自动排序。
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Messages {
public static void main(String[] args) {
Map<String, ArrayDeque<String>> messages = new HashMap<String, ArrayDeque<String>>();
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
long initialDelay = 5000;
long period = 200;
// create 10 Queues, indexed by the type
// create 10 executor-services, focused on their message queue
for(int i=1; i<11; i++) {
String type = "Type" + i;
Runnable task = () -> System.out.println(
"current message of " + type + ": " + messages.get(type).poll()
);
messages.put(type, new ArrayDeque<String>());
service.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.MILLISECONDS);
}
}
}
每种消息类型保留一个执行程序服务
是的,您在问题末尾提到的一系列执行器服务是正确的想法。
但是,让我们让它成为一种更易于管理的Map
。
Map < MessageType , ExecutorService >
让我们为您的消息类型定义一个枚举。
enum MessageType { Type01, Type02, Type03 }
还有地图。
Map < MessageType , ExecutorService > messageTypeExecutorMap = new EnumMap<>( MessageType.class ) ;
使用每种消息类型的执行程序服务填充映射。您希望逐个处理每个消息类型集合,因此请使用单线程执行程序服务。
for( MessageType messageType : MessageType.values )
{
messageTypeExecutorMap.put( messageType , Executors.newSingleThreadExecutor() ) ;
}
若要提交消息进行处理,请按消息类型检索执行程序服务。
messageTypeExecutorMap
.get( task.getMessageType() )
.submit( task ) ;
保持剩余的遗嘱执行人服务并为工作做好准备并没有错。如果它们不执行任务,则在内存或 CPU 方面占用的开销很小。只需确保部署计算机上有足够的内核来支持您希望同时执行的最大数量的消息类型。