我正在主线程中逐行浏览大型文本文件(5GB)。同时创建一些其他线程来格式化这些行。
我已经写了一个解决方案,使用Runnable
类和Semaphore
,它控制线程运行的数量。不幸的是,Runnable
不提供返回值或抛出异常。如果在任何线程中抛出异常,我希望我的整个应用程序停止。
我正在尝试使用Callable
和Future
现在,但我得到内存错误。
public class ProcessLine implements Callable<Boolean> {
private final String inputLine;
public ProcessLine(String inputLine) {
this.inputLine = inputLine;
}
@Override
public Boolean call() throws Exception {
formatLine(inputLine); // huge method which can throw exceptions
return true;
}
}
在打开文本文件之前:
ExecutorService executor = Executors.newFixedThreadPool(threads, new DaemonThreadFactory());
List<Future> futures = new ArrayList<Future>();
然后在遍历所有行的循环中:
ProcessLine processLine = new ProcessLine(inputLine);
Future f = executor.submit(processLine);
futures.add(f);
这里的第一个问题是所有Future
对象都收集在futures
列表中。当我每行只有一个条目时,内存不足是不足为奇的。
第二个问题是:我会在处理文本文件的最后用get()
方法检查所有Future
项。我甚至不会注意到第一行是否抛出了异常。
请帮助我找出如何解决这个问题
您可以通过使用此构造函数创建自定义ThreadPoolExecutor
来限制挂起任务的数量,如下所示:
ExecutorService executor = new ThreadPoolExecutor(
threads,
threads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(WORK_QUEUE_SIZE));
其中WORK_QUEUE_SIZE
决定挂起的最大行数。
这是我想到的另一种方法。我不确定如何以一种优雅的方式合并ExecutorService
。
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class Scratch {
static Object lock = new Object();
static AtomicBoolean keepRunning = new AtomicBoolean(true);
static BlockingQueue<String> buf = new LinkedBlockingDeque<>(100);
static List<Consumer> consumers = Arrays.asList(new Consumer(),
new Consumer(),
new Consumer(),
new Consumer());
public static void main(String [] args) {
// Start a producer
new Producer().start();
// Start consumers
for (Consumer c : consumers)
c.start();
}
static void stopConsumers() {
System.out.println("Stopping consumers");
keepRunning.set(false);
for (Consumer c : consumers)
c.interrupt();
}
static class Producer extends Thread {
public void run() {
try (BufferedReader br =
new BufferedReader(new FileReader("lines.txt"))) {
String line;
while (null != (line = br.readLine())) {
System.out.println(line);
buf.put(line);
}
} catch (Exception e) {
e.printStackTrace();
// Producer exception
}
// Wait for the consumers to finish off the last lines in the queue
synchronized (lock) {
while (!buf.isEmpty()) {
try {
lock.wait();
} catch (InterruptedException e) {
// TODO: Deal with interruption
}
}
}
// The consumers are now hanging on buf.take. Interrupt them!
stopConsumers();
}
}
static class Consumer extends Thread {
// Dummy process
private boolean process(String str) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
}
return true;
}
public void run() {
System.out.println("Starting");
while (keepRunning.get()) {
try {
process(buf.take());
} catch (InterruptedException e) {
// TODO: Handle interrupt
e.printStackTrace();
} catch (Exception e) {
stopConsumers(); // Processing exception: Graceful shutdown
}
// Notify the producer that the queue might be empty.
synchronized (lock) {
lock.notify();
}
}
System.out.println("Stopping");
}
}
}
因此,存储任务处理的所有结果(为每个任务使用一个Future
)会占用太多内存,但是您可以单独对这些结果进行进一步处理,而不需要一个完整的集合(对吗?)
如果开始处理(处理的行数等于第二个队列的大小),则保证在有限时间内检查文件第一行的处理结果,可用于满足您对问题快速处理的要求。
我在一个程序中使用了这种设计,该程序下载数据并将其写入文件,以防止程序持有太多等待处理的数据。我尝试了一些其他的解决方案,但我想我已经找到了最适合我的。
public static final ThreadStatus threadStatus = new ThreadStatus();
public static class ThreadStatus {
private Exception exception = null;
public void setException(Exception exception) {
if(exception == null) {
return;
}
this.exception = exception;
}
public Exception getException() {
return exception;
}
public boolean exceptionThrown() {
return exception != null;
}
}
然后在线程的run()
方法中:
catch(Exception e) {
Main.threadStatus.setException(e);
}
在遍历所有行的循环中:
if(Main.threadStatus.exceptionThrown()) {
throw Main.threadStatus.getException();
}
感谢所有帮助过我的人。