Java中跨不同任务但路径不同的多线程



我有一个队列,生产者将在其中提交任务。每个任务都有一个有效负载和一个文件路径。在使用者端,我有一个执行器线程池,它应该从队列中提取任务并将这些任务分配给线程。以下是需要遵守的一些限制:

  1. 不能有多个线程在同一文件路径上工作
  2. 我们希望为特定文件路径添加的任务按顺序完成

下面是一个问题陈述的例子。假设以下是队列的状态。这里T1是线程名称,括号中包含文件路径
head -> [T1(A),T2(B),T3(C),T4(A),T5(B),T6(A),T7(A),T8(C)] <- tail

因此,我们可以让三个线程同时处理文件路径ABC的任务。但是,以下不应发生。这里,文件路径A有两个线程同时处理它
Pool1-Thread1-T4(A)
Pool1-Thread2-T5(B)
Pool1-Thread3-T6(A)
文件路径的数量可以在10k-50k之间的任意范围内

为了解决这个问题,我提出的方法是使用两个映射,即用于任务队列的Map<String, Queue<Tasks>>和用于令牌的Map<String, AtomicBoolean>,这两个映射都具有针对文件路径的密钥。单个正在运行的使用者线程,它将在Tasks Map上不断迭代,并且在为该路径创建线程之前,需要为该文件路径获取一个令牌(布尔值应为true(。一旦线程完成,它就会返回Token(通过将Boolean标记回true(。

问题

  • 这是这个用例的最佳方法,还是我们可以做得更好和/或更简单
  • 这种方法中是否有任何多线程问题可能会在以后困扰我

编辑

  1. 明确如何完成任务。我将有一个正在运行的使用者线程,它将在Tasks Map上不断迭代。如果它发现任何任务和令牌可用,它将从执行器生成一个新线程
  2. 任务本身很小,但传入任务的吞吐量会很高。这里的文件路径代表了克隆的git repo,任务可以是例如添加文件、修改文件、提交和推送到远程

我会避免使用Executor线程池,并使用以下方法:

  • 创建一个具有BlockingXXXQueue的类(选择实现(和一个从该队列中持续消耗的线程。它显然会消耗Tasks。当它获取一个任务时,它会执行它。这可以保证队列中的任务按顺序执行,并且最多执行一个队列
  • 现在创建一些将任务映射到队列的逻辑。例如,如果您不希望同时执行相同类型的任务,请映射它们所有这些都进入同一队列。例如,A将始终转到队列1,B转到队列2,等等。它遵循前面提到的属性,以这种方式,类型A的2个任务将不会并行进行

一种常见的方法是将所有队列放在一个数组中,然后根据某个值对队列数量进行模分配,这就是调用分区,例如:

queues = new Queue[N];
typeOfTask = task.getType() (to int somehow, for example you can hash or use the char if it is a single letter)
queueIndex = typeTask % queues.length
queues[queueIndex].push(task)
  • 在上面的类中,在将任务传递给线程之前,您可以在获取任务时执行任何逻辑,就像您提到的那样

最后要注意的是,在使用文件系统时,我会避免使用许多线程(如果有的话(,因为磁盘是…一个线程。因此,并行处理它没有什么大意义,除非你必须写很多你不想缓冲的数据。相反,线程处理更多的是能够使用更多的处理器并行工作,以提高速度,或者在执行阻塞操作时。

我试图实现一部分消费者用例,似乎问题多于解决方案。请阅读代码中的内联注释。

public class Consumer implements Runnable {
private volatile boolean isStopped;
private final Map<String, Queue<Task>> tasks;
private final Map<String, AtomicBoolean> tokens;
//Preferably FixedThreadService
private final ExecutorService service;

public Consumer(Map<String, Queue<Task>> tasks, Map<String, AtomicBoolean> tokens, ExecutorService service) {
this.tasks = tasks;
this.tokens = tokens;
this.service = service;
isStopped = false;
}
public void stopConsumer(){
isStopped = true;
}

@Override
public void run() {
while(!isStopped){
// tasks map should have latest values because producer is going to update this map
// frequently. Should have to use ConcurrentHashMap.
Set<String> keys = tasks.keySet();
for ( String key:tasks.keySet() ) {
if(!tokens.get(key).get()){
continue;
}
try {
//This does not mean the task is in progress, just submitted for execution.
service.execute(tasks.get(key).poll());
//Now that we submitted the task for exicution, we can mark the token to false.
tokens.get(key).compareAndSet(true, false);
// But at what point are we going to mark token to True. If this token is updated
// From any other thread than consumer, it will create race condition.
}catch(RejectedExecutionException e){
//What will happen to the polled task. Task is removed from the queue.

//Token we can mark to true, but task has already polled from queue.
// Shall we put it back to the head of the queue and how ?
tokens.get(key).compareAndSet(false, true);
//What if the task is not completed smoothly. T1 is not completed successfully,
// what is backup plan, because next task will start any way and corrupt the file.
}
}
}
}
}

如果我们可以隔离像1这样的任务。添加文件,2,修改文件,3。承诺和4。推到远程,也许是我们可以想出更好的办法。但在这种情况下也会受到限制。

最好采用单线程方法。

最新更新