我有一个应用程序,它从多个序列化的对象日志中读取对象,并将它们交给另一个类进行处理。我的问题集中在如何高效、干净地读取对象并发送它们。
代码是从旧版本的应用程序中提取的,但我们最终保留了它。它直到上周才真正被使用,但我最近开始更仔细地研究代码,试图改进它。
它打开N个ObjectInputStream
s,并从每个流中读取一个对象以将它们存储在一个数组中(假设下面的inputStreams
只是对应于每个日志文件的ObjectInputStream
对象的数组):
for (int i = 0; i < logObjects.length; i++) {
if (inputStreams[i] == null) {
continue;
}
try {
if (logObjects[i] == null) {
logObjects[i] = (LogObject) inputStreams[i].readObject();
}
} catch (final InvalidClassException e) {
LOGGER.warn("Invalid object read from " + logFileList.get(i).getAbsolutePath(), e);
} catch (final EOFException e) {
inputStreams[i] = null;
}
}
序列化到file的对象是LogObject
对象。下面是LogObject
类:
public class LogObject implements Serializable {
private static final long serialVersionUID = -5686286252863178498L;
private Object logObject;
private long logTime;
public LogObject(Object logObject) {
this.logObject = logObject;
this.logTime = System.currentTimeMillis();
}
public Object getLogObject() {
return logObject;
}
public long getLogTime() {
return logTime;
}
}
一旦对象在数组中,它就比较日志时间并发送最早时间的对象:
// handle the LogObject with the earliest log time
minTime = Long.MAX_VALUE;
for (int i = 0; i < logObjects.length; i++) {
logObject = logObjects[i];
if (logObject == null) {
continue;
}
if (logObject.getLogTime() < minTime) {
index = i;
minTime = logObject.getLogTime();
}
}
handler.handleOutput(logObjects[index].getLogObject());
我的第一个想法是为每个文件创建一个线程,读取并将对象放入PriorityBlockingQueue
(使用使用LogObject
日志时间进行比较的自定义比较器)。然后,另一个线程可能会取出这些值并将它们发送出去。
这里唯一的问题是,一个线程可以将一个对象放入队列,并在另一个线程可以将一个对象放入队列之前将其取出,而另一个线程可能有更早的时间。这就是为什么在检查日志时间之前,对象被读入并存储在一个数组中。
这个约束是否禁止我实现多线程设计?或者我是否可以调整我的解决方案使其更有效?
据我所知,您需要严格按顺序处理LogObjects
。在这种情况下,代码的初始部分是完全正确的。这段代码的作用是合并几个输入流。你需要为每个流读取一个对象(这就是为什么需要临时数组),然后取适当的(最小/最大)LogObject
并处理处理器。
根据您的上下文,您可能能够在多个线程中进行处理。您唯一需要更改的是将LogObjects
放在ArrayBlockingQueue
中,处理器可能在几个独立的线程上运行。另一种选择是将LogObjects
发送到ThreadPoolExecutor
中进行处理。最后一个选项更简单直接。
但要注意路上的几个陷阱:
- 要使该算法正确工作,必须对单个流进行排序。否则你的程序就坏了;
- 在并行处理消息时严格来说是没有定义处理顺序的。因此所提出的算法只保证消息处理的开始顺序(调度顺序)。这可能不是你想要的。
那么现在你应该面对几个问题:
- 真的需要加工订单吗?
- 如果是,需要全局顺序(对所有消息)还是本地顺序(对独立的消息组)?
对这些问题的回答将对你进行并行处理的能力有很大的影响。
如果第一个问题的答案是是,很遗憾,并行处理不是一个选项。
我同意。把这个扔掉,使用PriorityBlockingQueue.
这里唯一的问题是,如果线程1从文件1中读取对象并将其放入队列中(并且文件2将要读取的对象具有更早的日志时间),则读取线程可以将其取出并发送它,导致首先发送时间较晚的日志对象
这与平衡合并(Knuth ACP vol 3)的合并阶段完全相同。您必须从获得前一个最低元素的同一文件中读取下一个输入。
这个约束是否禁止我实现多线程设计?
这不是约束。这是虚构的。
或者我可以调整我的解决方案使其更有效?
优先级队列已经相当高效了。在任何情况下,您都应该首先考虑正确性。然后添加缓冲;-)将ObjectInputStreams
包裹在BufferedInputStreams
周围,并确保在输出堆栈中有BufferedOutputStream
。