如何扩展AbstractExecutorService以确保线程的多样性



我已经成功地使用ExecutorService实现来管理线程池。

现在我发现了一种情况,使用数十个并发线程将大型数据对象插入数据仓库。数据可能来自不同的来源。当两个或多个线程插入来自同一源的数据时,死锁以一定的速率发生。这些死锁是很容易理解的,只有在没有两个线程同时使用来自同一源的数据时才能避免。.

我读了关于ExecutorService接口的内部,发现抽象类AbstractExecutorService可以是一个很好的起点。我还认为,重写newTaskFor方法可能足以达到目的。

我有点失去了如何标签与源的名称提交的线程,以后选择一个好的一个在newTaskFor基于标签。其主要思想是,只有当提交的线程的标签在正在运行的线程中唯一时,才运行该线程。在任何时候,运行线程的数量当然会受到最小值(最大池大小,当前标签多样性)的限制。

有什么帮助吗?提前谢谢。

executor框架的目的是减轻您自己操作线程的负担。

显然,您想要串行地处理来自任何一个源的输入,一次一个。要实现这一点,只需定义多个执行器服务,每个执行器服务专用于一个特定的数据源。并使每个执行器都是单线程的。

ExecutorService executorService_A = Executors.newSingleThreadExecutor() ;
ExecutorService executorService_B = Executors.newSingleThreadExecutor() ;
ExecutorService executorService_C = Executors.newSingleThreadExecutor() ;

将数据源"A"的任务提交给executorService_A,将数据源"B"的任务提交给executorService_B,依此类推

我不认为许多,甚至数百个单线程执行器服务有什么问题……除非你有几个核心多个同时执行任务。如果是这种情况,那么合并到更少的执行器服务,一个用于多个输入,仍然是单线程的。为源A、B和c提供一个"ABC"执行器服务。为源X、Y和z提供另一个"XYZ"执行器服务。

您可以通过使用Map< String , ExecutorService >来管理哪个源到哪个执行器服务的跟踪,其中映射的键是源名称,映射的值是执行器服务。

Map< String , ExecutorService > map = new HashMap<>() ;
map.put( "A" , Executors.newSingleThreadExecutor() ) ;       // A, B, & C share the same single-threaded executor service. 
map.put( "B" , Objects.requireNonNull( map.get( "A" ) ) ) ; 
map.put( "C" , Objects.requireNonNull( map.get( "A" ) ) ) ;
map.put( "X" , Executors.newSingleThreadExecutor() ) ;       // X, Y, & Z share the same single-threaded executor service. 
map.put( "Y" , Objects.requireNonNull( map.get( "X" ) ) ) ;
map.put( "Z" , Objects.requireNonNull( map.get( "X" ) ) ) ;

为安全起见,我将使该地图不可修改。

map = Map.copyOf( map ) ;

相关内容

  • 没有找到相关文章

最新更新