我有一个txt文件,里面有100000个字符串,比如ID;数量在这个文件中,我有1000个唯一的ID。我想为每个ID求和金额,并将这些字符串写入一个文件。因此,结果文件应该包含1000个具有唯一id的字符串。
这里是我的代码:主类
public class Main {
private static ActorSystem system;
public static void main(String[] args) throws Exception{
system = ActorSystem.create("ClientSystem");
system.actorOf(Props.create(ClientActor.class));
}
}
ClientActor
public class ClientActor extends UntypedActor{
ActorRef worker = getContext().actorOf(Props.create(WorkerActor.class));
@Override
public void preStart() throws Exception{
FileInputStream fis = new FileInputStream(new File("100000.txt"));
BufferedReader br = new BufferedReader(new InputStreamReader(fis));
String line = null;
while ((line = br.readLine()) != null) {
worker.tell(line, getSelf());
}
br.close();
}
@Override
public void onReceive(Object o) throws Exception {
}
}
作业人员
public class WorkerActor extends UntypedActor {
Map sum = new HashMap();
private String getId(String s){
return s.substring(0, s.indexOf(";"));
}
private String getAmount(String s){
return s.substring(s.lastIndexOf(";") + 1);
}
@Override
public void onReceive(Object o) throws Exception {
sum.put(getId((String)o), sum.get(getId((String)o) + getAmount(getAmount((String)o))));
// clientActor.tell("", clientActor);
}
}
在ClientActor中,我用amount解析我的文件;id字符串,并将这些字符串发送到WorkerAction,在那里我对结果求和并将其放入求和映射中。现在我想把这个和映射写到一个文件中,但不明白如何做到这一点(如何知道所有的行都被处理了?)。
使用Akka Streams会更好地实现这样的作业,特别是因为您不应该像上面的简单示例那样在Actor内部执行任何阻塞操作;这些应该使用单独的调度器进行隔离,这样即使IO被阻止,系统也可以保持响应。Akka Streams为您处理此问题,因此做正确的事情更简单。
你可以这样写代码:
val futureBytesWritten =
Source.file(fIn)
.via(Framing.delimiter(ByteString(System.lineSeparator), Int.MaxValue, true).map(_.utf8String.split(";")))
.fold(Map[String, Long]().withDefaultValue(0l))({
(m, v) => m.updated(v(0), m(v(0)) + Integer.parseInt(v(1)))
})
.mapConcat(_ map { case (k, v) => k+";"+v+System.lineSeparator })
.runWith(Sink.file(fOut))
请阅读有关流式处理IO的这部分文档,并阅读有关Akka Streams的快速入门。
您不需要任何特殊的库(例如akka)来解决此问题;您只需要两行java:
try (PrintWriter out = new PrintWriter(new File("1000.txt"))) {
Files.lines(Paths.get("100000.txt"))
.map(s -> s.split(";"))
.collect(Collectors.groupingBy(a -> a[0],
Collectors.summingInt(a -> Integer.parseInt(a[1]))))
.forEach((k,v) -> out.println(k + ";" + v));
}