我正在尝试使用 Akka 流将消费者与生产者联系起来。
while (true) {
JsonNode msg = producer.getNextDataEnvelope();
if (msg == null) {
break;
}
System.out.println(msg.toString());
final Source<JsonNode, NotUsed> source = Source.from(getJSONMessage(msg));
final Sink<JsonNode, CompletionStage<Done>> sink =
Sink.foreach(receivedMsg -> consumer.sendJson((ObjectNode) receivedMsg));
final RunnableGraph<CompletionStage<Done>> runnable = source.toMat(sink, Keep.right());
final CompletionStage<Done> producerConsumer = runnable.run(system);
Thread.sleep(1);
}
private static ObjectNode getJSONMessage(JsonNode message) {
JsonNode pipelineMsg = message.get(KEYNAME);
return (ObjectNode)pipelineMsg;
}
在getJSONMessage之后产生的json是这样的。
{
a: {
},
b: {
}
}
When this JSON goes to consumer it is processing it as
答:{ }
先,然后
b: {
}
我将如何在 akka-stream 中一次处理完整的 JSON 有效负载,而不是遍历 JSON 有效负载。
Source.from
方法从Iterable
对象创建 Akka 流Source
。由于ObjectNode
实现了Iterable<JsonNode>
,通过迭代其子级,您的流将在对象中的每个值都有一个元素。
您可以使用Source.single
创建一个具有一个元素的Source
,但在这种情况下,在此方法中使用 Akka Streams 实际上没有任何好处。将节点直接传递给consumer.sendJson
会简单得多,如以下示例所示:
while (true) {
JsonNode msg = producer.getNextDataEnvelope();
if (msg == null) {
break;
}
System.out.println(msg.toString());
ObjectNode receivedMsg = getJSONMessage(msg);
consumer.sendJson(receivedMsg);
}
private static ObjectNode getJSONMessage(JsonNode message) {
JsonNode pipelineMsg = message.get(KEYNAME);
return (ObjectNode)pipelineMsg;
}
另一方面,如果您确实想利用 Akka 流,将producer
变成Source<JsonNode>
、consumer
变成Sink
或Flow
,并在流中执行所有处理会更有意义:
producer
.map(dataEnvelope -> getJSONMessage(dataEnvelope))
.runWith(consumer, system);