在 Akka 流中将 JSON 作为单个实体进行处理,而不是对其进行迭代



我正在尝试使用 Akka 流将消费者与生产者联系起来。

while (true) {
JsonNode msg = producer.getNextDataEnvelope();
if (msg == null) {
break;
}
System.out.println(msg.toString());
final Source<JsonNode, NotUsed> source = Source.from(getJSONMessage(msg));
final Sink<JsonNode, CompletionStage<Done>> sink =
Sink.foreach(receivedMsg -> consumer.sendJson((ObjectNode) receivedMsg));
final RunnableGraph<CompletionStage<Done>> runnable = source.toMat(sink, Keep.right());
final CompletionStage<Done> producerConsumer = runnable.run(system);
Thread.sleep(1);
}
private static ObjectNode getJSONMessage(JsonNode message) {
JsonNode pipelineMsg = message.get(KEYNAME);
return (ObjectNode)pipelineMsg;
}

在getJSONMessage之后产生的json是这样的。

{
a: {
},
b: {
}
}
When this JSON goes to consumer it is processing it as 

答:{ }

先,然后

b: {
}

我将如何在 akka-stream 中一次处理完整的 JSON 有效负载,而不是遍历 JSON 有效负载。

Source.from方法从Iterable对象创建 Akka 流Source。由于ObjectNode实现了Iterable<JsonNode>,通过迭代其子级,您的流将在对象中的每个值都有一个元素。

您可以使用Source.single创建一个具有一个元素的Source,但在这种情况下,在此方法中使用 Akka Streams 实际上没有任何好处。将节点直接传递给consumer.sendJson会简单得多,如以下示例所示:

while (true) {
JsonNode msg = producer.getNextDataEnvelope();
if (msg == null) {
break;
}
System.out.println(msg.toString());
ObjectNode receivedMsg = getJSONMessage(msg);
consumer.sendJson(receivedMsg);
}
private static ObjectNode getJSONMessage(JsonNode message) {
JsonNode pipelineMsg = message.get(KEYNAME);
return (ObjectNode)pipelineMsg;
}

另一方面,如果您确实想利用 Akka 流,将producer变成Source<JsonNode>consumer变成SinkFlow,并在流中执行所有处理会更有意义:

producer
.map(dataEnvelope -> getJSONMessage(dataEnvelope))
.runWith(consumer, system);

最新更新