我的系统上有一个更改请求,该请求目前倾听多个频道并将消息发送到多个频道,但是现在目标名称将在数据库中并随时更改。我很难相信自己是第一个遇到这个问题的人,但是我看到了有限的信息。
我发现的只是这2 ...
动态水槽目的地:https://github.com/spring-cloud-stream-app-starters/router/tree/master/master/spring-cloud-starter-starter-stream-sink-sink-router,-但是,但是,这种工作将如何工作以按照其方式积极地聆听这些频道由@StreamListener完成?
动态源目的地:https://github.com/spring-cloud/spring-cloud-stream-smamples/blob/master/source-samples/dynamic-destination-source/,它可以做此
@Bean
@ServiceActivator(inputChannel = "sourceChannel")
public ExpressionEvaluatingRouter router() {
ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.id"));
router.setDefaultOutputChannelName("default-output");
router.setChannelResolver(resolver);
return router;
}
但是那是什么"有效载荷"?那里指定的目的地在哪里?
随意提高我的答案,我希望这会帮助他人。
现在代码(它在我的调试器中起作用)。这是一个例子,未准备好生产!
这是如何将消息发送到动态目标
的方式import org.springframework.messaging.MessageChannel;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver;
@Service
@EnableBinding
public class MessageSenderService {
@Autowired
private BinderAwareChannelResolver resolver;
@Transactional
public void sendMessage(final String topicName, final String payload) {
final MessageChannel messageChannel = resolver.resolveDestination(topicName);
messageChannel.send(new GenericMessage<String>(payload));
}
}
和弹簧云流的配置。
spring:
cloud:
stream:
dynamicDestinations: output.topic.1,output.topic2,output.topic.3
我在这里找到https://docs.spring.io/spring-cloud-stream/docs/elmhurst.release/reference/htmlsingle/index.html#dynamicdestination它将在Spring Cloud Stream版本2 中使用。我使用2.1.2
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
这是从动态目标
消耗消息的方法https://stackoverflow.com/a/56148190/4587961
配置
spring:
cloud:
stream:
default:
consumer:
concurrency: 2
partitioned: true
bindings:
# inputs
input:
group: application_name_group
destination: topic-1,topic-2
content-type: application/json;charset=UTF-8
java消费者。
@Component
@EnableBinding(Sink.class)
public class CommonConsumer {
private final static Logger logger = LoggerFactory.getLogger(CommonConsumer.class);
@StreamListener(target = Sink.INPUT)
public void consumeMessage(final Message<Object> message) {
logger.info("Received a message: nmessage:n{}", message.getPayload());
final String topic = message.getHeaders().get("kafka_receivedTopic");
// Here I define logic which handles messages depending on message headers and topic.
// In my case I have configuration which forwards these messages to webhooks, so I need to have mapping topic name -> webhook URI.
}
}