使用固定数量的多线程处理大量数据,并允许异常



我正在主线程中逐行浏览大型文本文件(5GB)。同时创建一些其他线程来格式化这些行。

我已经写了一个解决方案,使用Runnable类和Semaphore,它控制线程运行的数量。不幸的是,Runnable不提供返回值或抛出异常。如果在任何线程中抛出异常,我希望我的整个应用程序停止。

我正在尝试使用CallableFuture现在,但我得到内存错误。

public class ProcessLine implements Callable<Boolean> {
  private final String inputLine;
  public ProcessLine(String inputLine) {
    this.inputLine = inputLine;
  }
  @Override
  public Boolean call() throws Exception {
    formatLine(inputLine); // huge method which can throw exceptions
    return true;
  }
}

在打开文本文件之前:

ExecutorService executor = Executors.newFixedThreadPool(threads, new DaemonThreadFactory());
List<Future> futures = new ArrayList<Future>();

然后在遍历所有行的循环中:

ProcessLine processLine = new ProcessLine(inputLine);
Future f = executor.submit(processLine);
futures.add(f);

这里的第一个问题是所有Future对象都收集在futures列表中。当我每行只有一个条目时,内存不足是不足为奇的。

第二个问题是:我会在处理文本文件的最后用get()方法检查所有Future项。我甚至不会注意到第一行是否抛出了异常。

请帮助我找出如何解决这个问题

您可以通过使用此构造函数创建自定义ThreadPoolExecutor来限制挂起任务的数量,如下所示:

ExecutorService executor = new ThreadPoolExecutor(
        threads,
        threads,
        0L,
        TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>(WORK_QUEUE_SIZE));

其中WORK_QUEUE_SIZE决定挂起的最大行数。


这是我想到的另一种方法。我不确定如何以一种优雅的方式合并ExecutorService

import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class Scratch {
    static Object lock = new Object();
    static AtomicBoolean keepRunning = new AtomicBoolean(true);
    static BlockingQueue<String> buf = new LinkedBlockingDeque<>(100);
    static List<Consumer> consumers  = Arrays.asList(new Consumer(),
                                                     new Consumer(),
                                                     new Consumer(),
                                                     new Consumer());
    public static void main(String [] args) {    
        // Start a producer
        new Producer().start();
        // Start consumers
        for (Consumer c : consumers)
            c.start();
    }
    static void stopConsumers() {
        System.out.println("Stopping consumers");
        keepRunning.set(false);
        for (Consumer c : consumers)
            c.interrupt();
    }
    static class Producer extends Thread {
        public void run() {
            try (BufferedReader br =
                    new BufferedReader(new FileReader("lines.txt"))) {
                String line;
                while (null != (line = br.readLine())) {
                    System.out.println(line);
                    buf.put(line);
                }
            } catch (Exception e) {
                e.printStackTrace();
                // Producer exception
            }
            // Wait for the consumers to finish off the last lines in the queue
            synchronized (lock) {
                while (!buf.isEmpty()) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        // TODO: Deal with interruption
                    }
                }
            }
            // The consumers are now hanging on buf.take. Interrupt them!
            stopConsumers();
        }
    }

    static class Consumer extends Thread {
        // Dummy process
        private boolean process(String str) {
            try {
                Thread.sleep(20);
            } catch (InterruptedException e) {
            }
            return true;
        }
        public void run() {
            System.out.println("Starting");
            while (keepRunning.get()) {
                try {
                    process(buf.take());
                } catch (InterruptedException e) {
                    // TODO: Handle interrupt
                    e.printStackTrace();
                } catch (Exception e) {
                    stopConsumers();  // Processing exception: Graceful shutdown
                }
                // Notify the producer that the queue might be empty.
                synchronized (lock) {
                    lock.notify();
                }
            }
            System.out.println("Stopping");
        }
    }
}

因此,存储任务处理的所有结果(为每个任务使用一个Future)会占用太多内存,但是您可以单独对这些结果进行进一步处理,而不需要一个完整的集合(对吗?)

您可以考虑让每个任务将其结果传递给另一个工作队列,由另一个线程池进行处理。如果第二个工作队列具有固定大小,则保证内存使用是有限的。这是管道和过滤器设计模式的一个变体。它有一个很好的特性,如果第二阶段的处理太慢,最终第二个工作队列将被填满,导致第一个线程池的线程阻塞。然后,第二个线程池的线程可以使用更多的CPU时间。也就是说,它会自动在线程池之间共享CPU时间,以实现吞吐量最大化。

如果开始处理(处理的行数等于第二个队列的大小),则保证在有限时间内检查文件第一行的处理结果,可用于满足您对问题快速处理的要求。

我在一个程序中使用了这种设计,该程序下载数据并将其写入文件,以防止程序持有太多等待处理的数据。

我尝试了一些其他的解决方案,但我想我已经找到了最适合我的。

public static final ThreadStatus threadStatus = new ThreadStatus();
public static class ThreadStatus {
 private Exception exception = null;
 public void setException(Exception exception) {
   if(exception == null) {
     return;
   }
   this.exception = exception;
 }
 public Exception getException() {
   return exception;
 }
 public boolean exceptionThrown() {
   return exception != null;
 }

}

然后在线程的run()方法中:

catch(Exception e) {
  Main.threadStatus.setException(e);
}

在遍历所有行的循环中:

if(Main.threadStatus.exceptionThrown()) {
  throw Main.threadStatus.getException();
}

感谢所有帮助过我的人。

相关内容

最新更新