我对春天的云流有一些要求:
- 它需要从一个集群上的单个Kafka主题获取KStream,并向另一个集群上的多个主题发送消息。
- 在某些情况下,它需要根据已接收的一条消息发送多条消息。
- 这些消息都需要至少被接收一次。
我已经研究过使用一个函数,但我还没有能够解决如何发送多个消息给定一个主题,我也研究过使用消费者和供应商,但我不能看到这个工作得很好。我目前发送消息的方式是使用Consumer,然后使用StreamBridge通过副作用发送。
@Bean
@SuppressWarnings("unchecked")
public Consumer<KStream<String, String>> generateMessage() {
return messages -> {
final Map<String, KStream<String, String>> splitMessages =
branchOutput(filterMessages(messages));
KStream<String, MessageData>[] ksArray = splitMessages
.values()
.stream()
.map(message ->
message.mapValues((key, jsonMessage) -> {
try {
return new MessageData(dataTransformService
.transformMessage(key, jsonMessage, extractTopic(jsonMessage)),
removeTopic(jsonMessage), "");
} catch (ClassNotFoundException e) {
return new MessageData(Collections.singletonList(CLASS_NOT_FOUND_EXCEPTION),
removeTopic(jsonMessage), e.getMessage());
}
}))
.toArray(KStream[]::new);
ksArray[0].peek((key, value) -> sendMessage(key, value.getTransformedMessages(),
OUTPUT_BINDING_1, value.getOriginalMessage(), value.getError()));
ksArray[1].peek((key, value) -> sendMessage(key, value.getTransformedMessages(),
OUTPUT_BINDING_2, value.getOriginalMessage(), value.getError()));
ksArray[2].peek((key, value) -> sendMessage(key, value.getTransformedMessages(),
OUTPUT_BINDING_3, value.getOriginalMessage(), value.getError()));
ksArray[3].peek((key, value) -> sendMessage(key, value.getTransformedMessages(),
OUTPUT_BINDING_4, value.getOriginalMessage(), value.getError()));
};
}
// send message(s) to topic or forward to dlq if there is a message handling exception
private void sendMessage(String key, List<String> transformedMessages, String binding, String originalMessage, String error) {
try {
for (String transformedMessage : transformedMessages) {
if (!transformedMessage.equals(CLASS_NOT_FOUND_EXCEPTION)) {
boolean sendTest = streamBridge.send(binding,
new GenericMessage<>(transformedMessage, Collections.singletonMap(
KafkaHeaders.KEY, (extractMessageId(transformedMessage)).getBytes())));
log.debug(String.format("message sent = %s", sendTest));
} else {
log.warn(String.format("message transform error: %s", error));
streamBridge.send(DLQ_OUTPUT_BINDING,
new GenericMessage<>(originalMessage, Collections.singletonMap(KafkaHeaders.KEY,
key.getBytes())));
}
}
} catch (MessageHandlingException e) {
log.warn(String.format("message send error: %s", e));
streamBridge.send(DLQ_OUTPUT_BINDING,
new GenericMessage<>(originalMessage, Collections.singletonMap(KafkaHeaders.KEY,
key.getBytes())));
}
}
我真正需要知道的是,是否有更好的方法来执行这些要求?如果没有,是否有一种方法可以检查我们发送到的外部kafka集群(我不管理它)的确认,以便如果未收到消息可以重新发送?
Kafka Streams不允许你从一个集群接收记录并在处理后将它们发布到另一个集群。单一拓扑中的所有处理都必须在同一集群上完成。请参阅相关的堆栈溢出线程。根据用例的需要,绕过此限制的方法是使用StreamBridge,
KafkaTemplate
等方式手动将记录发送到第二个集群。虽然这不是完美的,但在这种情况下这是一个可以接受的解决方案。然而,使用这种方法,您将失去Kafka Streams提供的任何端到端保证。例如,当你在同一个集群上运行整个拓扑时,Kafka Streams会给你一定的处理保证,比如恰好一次,至少一次等。如果你想保留那些Kafka流提供的保证,如果你愿意在第一个集群上使用另一个额外的主题,你可以使用一个策略。以下是基本思路。
public Function<KStream<String, String>, KStream<...>> generateMessage()
所以上面是一个端到端的Kafka流处理器,在同一个集群上运行。您将结果生成为集群上的出站主题。然后,使用常规的基于消息通道的Kafka绑定器-spring-cloud-stream-binder-kafka
将消息发送到第二个集群。
Function<String, String> passThroughToSecondCluster() {
}
您可以利用Spring Cloud Stream的多绑定功能,在入站使用第一个集群,在出站使用第二个集群。这里有一个例子。查看配置以了解更多细节。
这样,你得到Kafka流的端到端保证,然后通过一个单独的处理器,你把记录发送到第二个集群。很明显,缺点是在第一个集群上需要一个额外的主题。