提高逐行读取文件和处理的性能



我有一段java代码可以执行以下操作-

  1. 打开包含格式为 {A,B,C} 的数据的文件,每个文件大约有 5000000 行。
  2. 对于文件中的每一行,调用一个提供列 D 的服务,并将其作为 {A,B,C,D} 追加到 {A,B,C}。
  3. 将此条目写入分块编写器,该写片最终将 10000 行组合在一起,将分块写回远程位置

现在代码需要 32 小时才能执行。此过程将再次在另一个文件中重复,假设还需要 32 小时,但我们需要这些进程每天运行。

步骤 2由于有时服务没有 D,但设计为从其超级数据存储中获取 D,因此它会抛出暂时性异常要求您等待,因此步骤 2 变得更加复杂。我们有重试来处理此问题,因此从技术上讲,一个条目可以重试 5 次,最大延迟为 60000 毫秒。因此,在最坏的情况下,我们可能会查看 5000000 * 5。

{A,B,C} 的组合是唯一的,因此结果 D 无法缓存和重用,并且每次都必须发出新的请求才能获取 D。

我尝试添加这样的线程:

temporaryFile = File.createTempFile(key, ".tmp");
Files.copy(stream, temporaryFile.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
reader = new BufferedReader(new InputStreamReader(new 
FileInputStream(temporaryFile), StandardCharsets.UTF_8));
String entry;
while ((entry = reader.readLine()) != null) {
final String finalEntry = entry;
service.execute(() -> {
try {
processEntry(finalEntry);
} catch (Exception e) {
log.error("something");
});
count++;
}

这里 processEntry 方法抽象了上面解释的实现细节,线程定义为

ExecutorService service = Executors.newFixedThreadPool(10);

我遇到的问题是第一组线程启动,但该过程不会等到所有线程完成其工作并且所有 5000000 行都完成。因此,过去等待完成 32 小时的任务现在在 <1 分钟内结束,这扰乱了我们系统的状态。有没有其他方法可以做到这一点?如何让进程等待所有线程完成?

如果你想
  • 在任务完成时接受任务,请考虑使用ExecutorCompletionService,你需要一个ExecutorCompletionService。这充当一个阻塞队列,允许您在任务完成时轮询任务。
  • 另一种解决方案是等待执行程序终止,然后使用以下方法将其关闭: ExecutorService service = Executors.newFixedThreadPool(10); service .shutdown(); while (!service .isTerminated()) {}

一种替代方法是使用闩锁等待所有任务完成,然后再关闭主线程上的执行器。

使用 1 初始化倒计时锁存器。
退出提交任务的循环后,调用 latch.await((;

在你开始的任务中,你必须对起始类有一个回调,让它知道任务何时完成。

请注意,在起始类中,必须同步回调函数。

在起始类中,使用此回调来获取已完成任务的计数。

同样在回调中,当所有任务都完成后,您调用 latch.countdown(( 以使主线程继续,例如关闭执行器并退出。

这显示了主要概念,如有必要,可以对已完成的任务进行更详细的实施和更多的控制。

它会是这样的:

public class StartingClass {

CountDownLatch latch = new CountDownLatch(1);
ExecutorService service = Executors.newFixedThreadPool(10);
BufferedReader reader;
Path stream;
int count = 0;
int completed = 0;
public void runTheProcess() {
File temporaryFile = File.createTempFile(key, ".tmp");
Files.copy(stream, temporaryFile.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
reader = new BufferedReader(new InputStreamReader(new 
FileInputStream(temporaryFile), StandardCharsets.UTF_8));
String entry;
while ((entry = reader.readLine()) != null) {
final String finalEntry = entry;
service.execute(new Task(this,finalEntry));
count++;
}
latch.await();
service.shutdown();
}
public synchronized void processEntry(String entry) {
}
public synchronized void taskCompleted() {
completed++;
if(completed == count) {
latch.countDown();;
}
}
//This can be put in a different file.
public static class Task implements Runnable {
StartingClass startingClass;
String finalEntry;
public Task(StartingClass startingClass, String finalEntry) {
this.startingClass = startingClass;
this.finalEntry = finalEntry;
}
@Override
public void run() {
try {
startingClass.processEntry(finalEntry);
startingClass.taskCompleted();
} catch (Exception e) {
//log.error("something");
}; 
}
}
}

请注意,您需要关闭该文件。此外,可以编写执行程序的关闭以在强制关闭之前等待几秒钟。

我遇到的问题是第一组线程启动,但该过程不会等到所有线程完成其工作并且所有 5000000 行都已完成。

当您使用ExecutorService运行作业时,这些作业将添加到服务中并在后台运行。 要等待它们完成,您需要等待服务终止:

ExecutorService service = Executors.newFixedThreadPool(10);
// submit jobs to the service here
// after the last job has been submitted, we immediately shutdown the service
service.shutdown();
// then we can wait for it to terminate as the jobs run in the background
service.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

此外,如果这些文件中有大量的行,我建议您为作业使用有界队列,这样您就不会有效地缓存文件中的所有行时耗尽内存。 这仅在文件保留并且不会消失时才有效。

// this is the same as a newFixedThreadPool(10) but with a queue of 100
ExecutorService service = new ThreadPoolExecutor(10, 10,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(100));
// set a rejected execution handler so we block the caller once the queue is full
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
});

将此条目写入分块编写器,该写片最终将 10000 行组合在一起,将分块写回远程位置

随着每个 A,B,C 作业的完成,如果需要在第二步中处理它,那么我还建议您研究一个允许您将各种不同的线程池链接在一起的ExecutorCompletionService,以便随着行的完成,它们将立即开始处理的第二阶段。

相反,如果这个chunkedWriter只是一个线程,那么我建议共享一个BlockingQueue<Result>,并在行完成后将执行器线程放入队列,chunkedWriter从队列中取出并进行结果的分块和写入。 在这种情况下,需要小心处理向编写器线程指示它已完成 - 也许由主线程将某种END_RESULT常量放入队列,等待服务终止。

相关内容

  • 没有找到相关文章

最新更新