使用PriorityBlockingQueue输入日志对象以供处理



我有一个应用程序,它从多个序列化的对象日志中读取对象,并将它们交给另一个类进行处理。我的问题集中在如何高效、干净地读取对象并发送它们。

代码是从旧版本的应用程序中提取的,但我们最终保留了它。它直到上周才真正被使用,但我最近开始更仔细地研究代码,试图改进它。

它打开N个ObjectInputStream s,并从每个流中读取一个对象以将它们存储在一个数组中(假设下面的inputStreams只是对应于每个日志文件的ObjectInputStream对象的数组):

for (int i = 0; i < logObjects.length; i++) {
    if (inputStreams[i] == null) {
        continue;
    }
    try {
        if (logObjects[i] == null) {
            logObjects[i] = (LogObject) inputStreams[i].readObject();
        }
    } catch (final InvalidClassException e) {
        LOGGER.warn("Invalid object read from " + logFileList.get(i).getAbsolutePath(), e);
    } catch (final EOFException e) {
        inputStreams[i] = null;
    }
}

序列化到file的对象是LogObject对象。下面是LogObject类:

public class LogObject implements Serializable {
    private static final long serialVersionUID = -5686286252863178498L;
    private Object logObject;
    private long logTime;
    public LogObject(Object logObject) {
        this.logObject = logObject;
        this.logTime = System.currentTimeMillis();
    }
    public Object getLogObject() {
        return logObject;
    }
    public long getLogTime() {
        return logTime;
    }
}

一旦对象在数组中,它就比较日志时间并发送最早时间的对象:

// handle the LogObject with the earliest log time
minTime = Long.MAX_VALUE;
for (int i = 0; i < logObjects.length; i++) {
    logObject = logObjects[i];
    if (logObject == null) {
        continue;
    }
    if (logObject.getLogTime() < minTime) {
        index = i;
        minTime = logObject.getLogTime();
    }
}
handler.handleOutput(logObjects[index].getLogObject());

我的第一个想法是为每个文件创建一个线程,读取并将对象放入PriorityBlockingQueue(使用使用LogObject日志时间进行比较的自定义比较器)。然后,另一个线程可能会取出这些值并将它们发送出去。

这里唯一的问题是,一个线程可以将一个对象放入队列,并在另一个线程可以将一个对象放入队列之前将其取出,而另一个线程可能有更早的时间。这就是为什么在检查日志时间之前,对象被读入并存储在一个数组中。

这个约束是否禁止我实现多线程设计?或者我是否可以调整我的解决方案使其更有效?

据我所知,您需要严格按顺序处理LogObjects。在这种情况下,代码的初始部分是完全正确的。这段代码的作用是合并几个输入流。你需要为每个流读取一个对象(这就是为什么需要临时数组),然后取适当的(最小/最大)LogObject并处理处理器。

根据您的上下文,您可能能够在多个线程中进行处理。您唯一需要更改的是将LogObjects放在ArrayBlockingQueue中,处理器可能在几个独立的线程上运行。另一种选择是将LogObjects发送到ThreadPoolExecutor中进行处理。最后一个选项更简单直接。

但要注意路上的几个陷阱:

  • 要使该算法正确工作,必须对单个流进行排序。否则你的程序就坏了;
  • 在并行处理消息时严格来说是没有定义处理顺序的。因此所提出的算法只保证消息处理的开始顺序(调度顺序)。这可能不是你想要的。

那么现在你应该面对几个问题:

  1. 真的需要加工订单吗?
  2. 如果是,需要全局顺序(对所有消息)还是本地顺序(对独立的消息组)?

对这些问题的回答将对你进行并行处理的能力有很大的影响。

如果第一个问题的答案是,很遗憾,并行处理不是一个选项。

我同意。把这个扔掉,使用PriorityBlockingQueue.

这里唯一的问题是,如果线程1从文件1中读取对象并将其放入队列中(并且文件2将要读取的对象具有更早的日志时间),则读取线程可以将其取出并发送它,导致首先发送时间较晚的日志对象

这与平衡合并(Knuth ACP vol 3)的合并阶段完全相同。您必须从获得前一个最低元素的同一文件中读取下一个输入。

这个约束是否禁止我实现多线程设计?

这不是约束。这是虚构的。

或者我可以调整我的解决方案使其更有效?

优先级队列已经相当高效了。在任何情况下,您都应该首先考虑正确性。然后添加缓冲;-)将ObjectInputStreams包裹在BufferedInputStreams周围,并确保在输出堆栈中有BufferedOutputStream

最新更新