我已经成功地使用ExecutorService
实现来管理线程池。
现在我发现了一种情况,使用数十个并发线程将大型数据对象插入数据仓库。数据可能来自不同的来源。当两个或多个线程插入来自同一源的数据时,死锁以一定的速率发生。这些死锁是很容易理解的,只有在没有两个线程同时使用来自同一源的数据时才能避免。.
我读了关于ExecutorService
接口的内部,发现抽象类AbstractExecutorService
可以是一个很好的起点。我还认为,重写newTaskFor
方法可能足以达到目的。
我有点失去了如何标签与源的名称提交的线程,以后选择一个好的一个在newTaskFor
基于标签。其主要思想是,只有当提交的线程的标签在正在运行的线程中唯一时,才运行该线程。在任何时候,运行线程的数量当然会受到最小值(最大池大小,当前标签多样性)的限制。
有什么帮助吗?提前谢谢。
executor框架的目的是减轻您自己操作线程的负担。
显然,您想要串行地处理来自任何一个源的输入,一次一个。要实现这一点,只需定义多个执行器服务,每个执行器服务专用于一个特定的数据源。并使每个执行器都是单线程的。
ExecutorService executorService_A = Executors.newSingleThreadExecutor() ;
ExecutorService executorService_B = Executors.newSingleThreadExecutor() ;
ExecutorService executorService_C = Executors.newSingleThreadExecutor() ;
将数据源"A"的任务提交给executorService_A
,将数据源"B"的任务提交给executorService_B
,依此类推
我不认为许多,甚至数百个单线程执行器服务有什么问题……除非你有几个核心和多个同时执行任务。如果是这种情况,那么合并到更少的执行器服务,一个用于多个输入,仍然是单线程的。为源A、B和c提供一个"ABC"执行器服务。为源X、Y和z提供另一个"XYZ"执行器服务。
您可以通过使用Map< String , ExecutorService >
来管理哪个源到哪个执行器服务的跟踪,其中映射的键是源名称,映射的值是执行器服务。
Map< String , ExecutorService > map = new HashMap<>() ;
map.put( "A" , Executors.newSingleThreadExecutor() ) ; // A, B, & C share the same single-threaded executor service.
map.put( "B" , Objects.requireNonNull( map.get( "A" ) ) ) ;
map.put( "C" , Objects.requireNonNull( map.get( "A" ) ) ) ;
map.put( "X" , Executors.newSingleThreadExecutor() ) ; // X, Y, & Z share the same single-threaded executor service.
map.put( "Y" , Objects.requireNonNull( map.get( "X" ) ) ) ;
map.put( "Z" , Objects.requireNonNull( map.get( "X" ) ) ) ;
为安全起见,我将使该地图不可修改。
map = Map.copyOf( map ) ;