按顺序处理线程池执行器中的文件



我们使用JDK 7监视服务来监视可以包含XML或csv文件的目录。这些文件被放入线程池中,稍后进行处理并推送到数据库中。此应用程序运行始终监视目录,并在可用时继续处理文件。 XML文件很小,不需要时间,但是每个csv文件可以包含超过8万条记录,因此处理需要时间才能放入数据库。Java 应用程序在从线程池处理 15 个 csv 文件时会给我们内存不足错误。当csv文件进入线程池时,有什么方法可以串行处理,即一次只能处理一个。

Java 应用程序在从线程池处理 15 个 csv 文件时会给我们内存不足错误。当csv文件进入线程池时,有什么方法可以串行处理,即一次只能处理一个。

如果我理解,如果您超过某个阈值,您希望停止添加到池中。 有一种简单的方法可以做到这一点,即使用阻塞队列和被拒绝的执行处理程序。

请参阅以下答案:

在 Java 中处理 HTTP 调用的大文件

总结一下,您可以执行以下操作:

// only allow 100 jobs to queue
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(100);
ThreadPoolExecutor threadPool =
    new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, queue);
// we need our RejectedExecutionHandler to block if the queue is full
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
       @Override
       public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
           try {
                // this will block the producer until there's room in the queue
                executor.getQueue().put(r);
           } catch (InterruptedException e) {
                throw new RejectedExecutionException(
                   "Unexpected InterruptedException", e);
           }
    }
});

这意味着它将阻止添加到队列,并且不应耗尽内存。

我会采取不同的途径来解决您的问题,我想您一切都正确,除非您开始将太多数据读取到内存中。

不确定您如何读取csv文件,建议使用LineReader并读取例如500行处理它们,然后读取接下来的500行,所有大文件都应仅以这种方式处理,因为无论您增加多少内存参数,只要您将有更大的文件要处理,您就会耗尽内存, 因此,请使用可以批量处理记录的实现。这将需要一些额外的编码工作,但无论您必须处理多大的文件,都不会失败。

干杯!!

你可以试试:

  1. 使用 -Xmx JVM 选项增加 JVM 的内存
  2. 使用不同的执行程序来减少一次处理的文件数。一个激烈的解决方案是使用SingleThreadExecutor

    public class FileProcessor implements Runnable {
        public FileProcessor(String name) { }
        public void run() {
            // process file
        }
    }
    // ...
    ExecutorService executor = Executors.newSingleThreadExecutor();
    // ...
    public void onNewFile(String fileName) {
        executor.submit(new FileProcessor(fileName));
    }
    

相关内容

  • 没有找到相关文章

最新更新