如何使用Akka求和字符串



我有一个txt文件,里面有100000个字符串,比如ID;数量在这个文件中,我有1000个唯一的ID。我想为每个ID求和金额,并将这些字符串写入一个文件。因此,结果文件应该包含1000个具有唯一id的字符串。

这里是我的代码:主类

public class Main {
private static ActorSystem system;
public static void main(String[] args) throws Exception{
    system = ActorSystem.create("ClientSystem");
    system.actorOf(Props.create(ClientActor.class));
}
}

ClientActor

public class ClientActor extends UntypedActor{

    ActorRef worker = getContext().actorOf(Props.create(WorkerActor.class));
    @Override
    public void preStart() throws Exception{
        FileInputStream fis = new FileInputStream(new File("100000.txt"));
        BufferedReader br = new BufferedReader(new InputStreamReader(fis));
        String line = null;
        while ((line = br.readLine()) != null) {
            worker.tell(line, getSelf());
        }
        br.close();
    }
    @Override
    public void onReceive(Object o) throws Exception {
    }
}

作业人员

public class WorkerActor extends UntypedActor {
    Map sum = new HashMap();
    private String getId(String s){
       return s.substring(0, s.indexOf(";"));
    }
    private String getAmount(String s){
        return s.substring(s.lastIndexOf(";") + 1);
    }
    @Override
    public void onReceive(Object o) throws Exception {
        sum.put(getId((String)o), sum.get(getId((String)o) + getAmount(getAmount((String)o))));
//        clientActor.tell("", clientActor);
    }
}

在ClientActor中,我用amount解析我的文件;id字符串,并将这些字符串发送到WorkerAction,在那里我对结果求和并将其放入求和映射中。现在我想把这个和映射写到一个文件中,但不明白如何做到这一点(如何知道所有的行都被处理了?)。

使用Akka Streams会更好地实现这样的作业,特别是因为您不应该像上面的简单示例那样在Actor内部执行任何阻塞操作;这些应该使用单独的调度器进行隔离,这样即使IO被阻止,系统也可以保持响应。Akka Streams为您处理此问题,因此做正确的事情更简单。

你可以这样写代码:

val futureBytesWritten = 
  Source.file(fIn)
  .via(Framing.delimiter(ByteString(System.lineSeparator), Int.MaxValue, true).map(_.utf8String.split(";")))
  .fold(Map[String, Long]().withDefaultValue(0l))({
    (m, v) => m.updated(v(0), m(v(0)) + Integer.parseInt(v(1)))
  })
  .mapConcat(_ map { case (k, v) => k+";"+v+System.lineSeparator })
  .runWith(Sink.file(fOut))

请阅读有关流式处理IO的这部分文档,并阅读有关Akka Streams的快速入门。

您不需要任何特殊的库(例如akka)来解决此问题;您只需要两行java:

try (PrintWriter out = new PrintWriter(new File("1000.txt"))) {
    Files.lines(Paths.get("100000.txt"))
      .map(s -> s.split(";"))
      .collect(Collectors.groupingBy(a -> a[0],
         Collectors.summingInt(a -> Integer.parseInt(a[1]))))
      .forEach((k,v) -> out.println(k + ";" + v));
}

最新更新