修复和动态调整工作线程的数量



我目前正在实现一个数据容器/数据结构,该结构具有addRecord(最终Redcord记录)和close()方法。

public class Container{
Container() {
}
public void addRecord(final Record record){
// do something with the record
// add the modified it to the container
}
public void close(){
}     
}

由于为了将记录存储到容器中,需要执行几个步骤,因此我想将这些步骤外包给线程,然后再最终将它们添加到容器中。

我的问题是我不希望任何容器实例使用超过 4 个线程,并且如果我同时打开多个容器实例,则总共不超过 12 个线程应该使用。除此之外,我希望一旦我创建了第 4 个容器,其他 3 个打开的容器中的每一个都应该丢失它们的一个线程,这样所有 4 个容器都可以使用 3 个线程。

我正在研究ThreadPools和ExecutorServices。虽然设置大小为 12 的线程池很简单,但在使用 ExecutorServices 时,很难将使用的线程数限制为 4。回想一下,我最多想要 12 个线程,这就是为什么我有一个大小为 12 的静态实例,我在容器实例之间共享。

这就是我开始的

public class Container{
public static final ExecutorService SERVICE= Executors.newFixedThreadPool(12);
final List<Future<Entry>> _fRecords = new LinkedList<>();
Container() {
}
public void addRecord(final Record record){
_fRecords.add(SERVICE.submit(new Callable<Entry>{
@Override
public Entry call() throws Exception {
return record.createEntry();
}
}));
}
public void close() throws Exception {
for(final Future<Entry> f) {
f.get(); // and add the entry to the container           
}
}     
}

显然,这并不能确保单个实例最多使用 4 个线程。此外,如果另一个容器需要几个线程,我不明白如何强制这些执行器服务关闭工作线程或重新分配它们。

问题:

  1. 由于 Redord 本身是一个接口,并且 createEntry() 的运行时间取决于实际实现,我想确保不同的容器不使用相同的线程/工作线程,即,我不希望任何线程为两个不同的容器完成工作。这背后的想法是,我不希望仅处理"长时间运行"记录的容器实例被仅处理"短运行记录"的容器减慢速度。如果这在概念上没有意义(向您提问),则无需为不同的容器设置隔离的线程/工作线程。

  2. 实施我自己的执行器服务是正确的方法吗?有人可以指出我有一个有类似问题/解决方案的项目吗?否则,我想我将不得不实现自己的执行器服务并在我的容器中共享它,或者有更好的解决方案吗?

  3. 目前,我正在以关闭方法收集所有期货,因为容器必须按照记录/条目到达的顺序存储它们。显然,这需要很多时间,因此我想在《未来》完成后再做这件事。让一个额外的工人专门处理期货是否有意义,还是让我的工人做这项工作更好,即进行线程交互。

我不认为Java运行时提供了开箱即用的东西。

我编写了自己的类来满足这些需求(公共池 + 给定队列的限制)。

public static class ThreadLimitedQueue {
private final ExecutorService executorService;
private final int limit;
private final Object lock = new Object();
private final LinkedList<Runnable> pending = new LinkedList<>();
private int inProgress = 0;
public ThreadLimitedQueue(final ExecutorService executorService, final int limit) {
this.executorService = executorService;
this.limit = limit;
}
public void submit(Runnable runnable) {
final Runnable wrapped = () -> {
try {
runnable.run();
} finally {
onComplete();
}
};
synchronized (lock) {
if (inProgress < limit) {
inProgress++;
executorService.submit(wrapped);
} else {
pending.add(wrapped);
}
}
}
private void onComplete() {
synchronized (lock) {
final Runnable pending = this.pending.poll();
if (pending == null || inProgress > limit) {
inProgress--;
} else {
executorService.submit(pending);
}
}
}
}

就我而言,唯一的区别limit是恒定的,但您可以修改它。例如,您可以将int limit替换为Supplier<Integer> limitFunction,并且此函数必须提供动态限制,例如

Supplier<Integer> limitFunction = 12 / containersList.size();

只需使其更健壮(例如,如果 containersList 为空或超过 12 该怎么办)

最新更新