我有一个文件,它有n列和许多行
Col 1 col2 col3 .......col n
我想读一次,然后写多个(比如m)输出,用几个键列将行分组。假设必须产生3个输出:
对于输出1:
groupingKeys[0]={1,2) //group the records on col 1 and 2
对于输出2:
groupingKeys[1]={1,4,5} //group the records on col 1 4 5
对于输出3
groupingKeys[2]={2,3} //group on col 2,3
在主线程中,我逐行读取输入文件。对于每个读取行,我想在m个不同的线程中处理读取行。所以基本上我想让
map[0].process(data,groupingKeys[0]);
map[1].process(data,groupingKeys[1]);
map[2].process(data,groupingKeys[2]);
应该在3个不同的线程中运行,并且这3个线程中的每一个都应该仅在主线程读取该行之后才继续。
我可以用具有的第I个线程的运行方法创建m个不同的线程
map[i].process(data,groupingKeys[i]);
但这3个线程应该只在主线程读取该行时进行,以便它们看到data[]
的正确值。我怎样才能做到这一点?
主线程线程-线程-1线程-2运行等待等待等待运行运行运行等待等待
每一步都会读取并处理一行所谓processed,我的意思是对每个分组键执行类似于sql groupby的操作下面是上面提到的示例代码。
public void writeMultipleGroupedOutputs(String inputfile,int groupingKeys[][])
{
Mymap<key,value>[] mapArr= new Mymap<key,value>[k]; //k maps to group records in k ways as per k grouping keys
String line;
while((line = br.readLine()) != null) {
String[] data=line.split(regex); **//one line is read in main thread**
for(int i=0;i<m;i++)
map[i].process(data,groupingKeys[i]); **//process in m different ways.How to make this happen in m independent threads?**
}
class Mymap extends HashMap<key,value> {
void process(String[] data,int[] keyIndexes)
{
//extract key from key indexes
//extract value from value indexes
put(key,value);
}
@Override
public Value put(Key k, Value v) {
if (containsKey(k)) {
oldval=get(k);
put(k,oldval.aggregate(v)); //put sum of old and new
return oldval;
}else{
put(k,v);
return null;
}
}
}
}
很抱歉我没有把我的观点说清楚。简单地说,i want map[i].produce(data,groupingKeys[i]);在单独的(第i个线程)中发生
a b 5
a b 10
a c 15
so if i want to group by {1} and {1,2}
read line map1 map2
a b 5 [a--> b,5] [a,b ->5]
a b 10 [a-> b 15] [a,b->15]
a c 15 [a->b 30] [a,b->15 a,c->15]
编辑:这个问题与我如何处理或分组逻辑无关,但它是:在读取每一行之后,我想在不同的线程中对读取行做一些事情
如果我理解正确,您希望等待处理,直到读取所有文件。如果是,根据具体情况,您可能需要查看CyclicBarrier
或CountDownLatch
- http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CyclicBarrier.html
- http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html