我对并行性和并发性很陌生,我正试图在Java中使用Fork-Join实现中值过滤算法。基本上,我将一个输入文件读取到一个ArrayList中,并使用该列表生成一个过滤过的中位数的新ArrayList(包括原始ArrayList的第一个和最后一个元素)。
现在我设法使该算法的串行/顺序版本,它工作得很好。然而,当我尝试制作Fork-Join版本时,它似乎不适用于大型数组列表(100000+)。我尝试了一个非常小的大小为5的数组列表,它工作得很好。我似乎无法找到我的错误(我确信是逻辑错误和/或实现错误)。任何帮助都会很感激。
顺序算法代码片段:
//Add first boundary element to output ArrayList
outputElements.add(this.elements.get(0));
//Start Filter Algorithm
while(elements.size()-counter >= filterSize){
for(int i = 0; i<filterSize; i++){
tempElements.add(this.elements.get(i+counter));
if(i==filterSize){
break;
}
}
Collections.sort(tempElements);
outputElements.add(tempElements.get((filterSize-1)/2));
counter++;
tempElements.clear();
}
//Add last boundary element to output ArrayList.
if (elements != null && !elements.isEmpty()) {
outputElements.add(elements.get(elements.size()-1));
}//End Filter Algorithm
这是我创建的并行类。这是不工作的部分:
public class Parallel extends RecursiveTask<List<Float>>{
int lo;
int hi;
int filterSize;
String outFile; //Output file name.
int arraySize;
List<Float> elements = new ArrayList<Float>();
List<Float> tempElements = new ArrayList<Float>();
List<Float> outputElements = new ArrayList<Float>();
int counter = 0;
static final int SEQUENTIAL_CUTOFF=1000;
public Parallel(List<Float> elements, int filterSize, String outFile, int lo, int hi) {
this.lo = lo;
this.hi = hi;
this.elements = elements;
this.outFile = outFile;
this.filterSize = filterSize;
if(lo == 0){
outputElements.add(this.elements.get(0));
}
}
@Override
protected List<Float> compute() {
long startTime = System.nanoTime(); //Algorithm starts here
if((hi-lo) < SEQUENTIAL_CUTOFF) {
while(hi-counter >= filterSize){
for(int i = lo; i<filterSize; i++){
tempElements.add(this.elements.get(i+counter));
if(i==filterSize){
break;
}
}
Collections.sort(tempElements);
outputElements.add(tempElements.get((filterSize-1)/2));
counter++;
tempElements.clear();
return outputElements;
}
}else{
Parallel left = new Parallel(this.elements, this.filterSize, this.outFile, this.lo, ((this.lo + this.hi)/2));
Parallel right = new Parallel(this.elements, this.filterSize, this.outFile, ((this.hi + this.lo)/2), this.hi);
left.fork();
List<Float> leftArr = new ArrayList<Float>();
List<Float> rightArr = new ArrayList<Float>();
rightArr = right.compute();
leftArr = left.join();
List<Float> newList = new ArrayList<Float>();
newList.addAll(leftArr);
newList.addAll(rightArr);
}
long endTime = System.nanoTime();//Algorithm ends here.
//Write elements to output file
PrintWriter writeOutput = null;
try {
writeOutput = new PrintWriter(this.outFile, "UTF-8");
} catch (FileNotFoundException | UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
writeOutput.println(outputElements.size());//Number of lines
for(int i=0; i<outputElements.size();i++){
writeOutput.println(i+1 + " " + outputElements.get(i)); //Each line is written
}
writeOutput.close(); //Close when output finished writing.
System.out.println("Parallel complete");
return null;
}
}
任何帮助都是非常感激的。我花了几个小时,在S.O和谷歌上做了很多研究,还是搞不懂。
编辑:musical_coder建议张贴我所面临的错误,这里是。这里有很多错误:
Exception in thread "main" java.lang.IndexOutOfBoundsException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:536)
at java.util.concurrent.ForkJoinTask.reportResult(ForkJoinTask.java:596)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:640)
at java.util.concurrent.ForkJoinPool.invoke(ForkJoinPool.java:1521)
at main.main(main.java:45)
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:635)
at java.util.ArrayList.get(ArrayList.java:411)
at Parallel.compute(Parallel.java:44)
at Parallel.compute(Parallel.java:57)
at Parallel.compute(Parallel.java:57)
at Parallel.compute(Parallel.java:57)
at Parallel.compute(Parallel.java:57)
at Parallel.compute(Parallel.java:57)
at Parallel.compute(Parallel.java:57)
at Parallel.compute(Parallel.java:57)
at Parallel.compute(Parallel.java:57)
at Parallel.compute(Parallel.java:57)
at Parallel.compute(Parallel.java:57)
at Parallel.compute(Parallel.java:57)
at Parallel.compute(Parallel.java:1)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:93)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334)
at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604)
at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784)
at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398)
一般来说,您应该避免在多线程代码中使用ArrayList
s,因为它不是线程安全的:
注意,这个实现是不同步的。如果多个线程同时访问一个
ArrayList
实例,并且至少有一个线程修改了列表结构,那么它必须在外部同步。
我在你发布的同时修改列表的片段中没有看到任何东西,但我做看到你把this.elements
传递给Parallel
子实例,这意味着至少你正在做一些有风险的事情(在线程之间共享指针到可变的,非线程安全的对象)。
作为第一步,将Parallel
构造函数中的this.elements = elements;
替换为以下内容:
this.elements = Collections.unmodifiableList(elements);
通过使列表不可修改,您将确保如果您的Parallel
代码试图改变列表,您将在故障点得到一个明确的错误。这不会阻止Parallel
之外的其他东西修改原始elements
列表,但这是验证Parallel
行为正确的快速简便方法。如果你得到一个UnsupportedOperationException
,你的Parallel
类将需要重新设计——你不能同时修改一个ArrayList
。
如果你没有得到一个UnsupportedOperationException
,其他东西正在修改你的列表。你需要找到并删除它。
一旦找出了导致列表并发变异的原因,就可以尝试确定最佳的前进方法。我不希望在这个回答中涵盖所有在线程之间共享数据的"正确"方法,但这里有一些一般的经验法则:
- 避免可变数据结构 -将
Parallel
类设计为只处理来自不可变数据结构的数据,如Guava的ImmutableList
。默认情况下,不可变数据结构是线程安全的。 - 使用线程安全的数据结构—例如,
ConcurrentLinkedQueue
是一种线程安全的方式,允许多个进程读取和写入相同的数据结构。ConcurrentHashMap
是另一个常用的类。你需要什么很大程度上取决于你想做什么,但这些都是很好的起点。 - 最小化并发操作的范围—即使使用并发数据结构,您的总体目标应该是使每个任务隔离运行,除了从共享源读取和向共享接收器写入之外。在只有一个线程可见的对象上做尽可能多的工作。
- Synchronize -我注意到
Parallel
写入outFile
没有任何显式同步。这是危险的,并且可能会引入问题(崩溃或更严重的数据损坏)。一次只能有一个线程写文件。要做到这一点,可以使用专用的文件写入线程,或者显式地同步文件写入操作。