弹簧云流 - 来源和水槽的多个动态目的地



我的系统上有一个更改请求,该请求目前倾听多个频道并将消息发送到多个频道,但是现在目标名称将在数据库中并随时更改。我很难相信自己是第一个遇到这个问题的人,但是我看到了有限的信息。

我发现的只是这2 ...
动态水槽目的地:https://github.com/spring-cloud-stream-app-starters/router/tree/master/master/spring-cloud-starter-starter-stream-sink-sink-router,-但是,但是,这种工作将如何工作以按照其方式积极地聆听这些频道由@StreamListener完成?

动态源目的地:https://github.com/spring-cloud/spring-cloud-stream-smamples/blob/master/source-samples/dynamic-destination-source/,它可以做此

@Bean
    @ServiceActivator(inputChannel = "sourceChannel")
    public ExpressionEvaluatingRouter router() {
        ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.id"));
        router.setDefaultOutputChannelName("default-output");
        router.setChannelResolver(resolver);
        return router;
    }

但是那是什么"有效载荷"?那里指定的目的地在哪里?

随意提高我的答案,我希望这会帮助他人。

现在代码(它在我的调试器中起作用)。这是一个例子,未准备好生产!

这是如何将消息发送到动态目标

的方式
import org.springframework.messaging.MessageChannel;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver;

@Service
@EnableBinding
public class MessageSenderService {
    @Autowired
    private BinderAwareChannelResolver resolver;
    @Transactional
    public void sendMessage(final String topicName, final String payload) {
        final MessageChannel messageChannel = resolver.resolveDestination(topicName);
        messageChannel.send(new GenericMessage<String>(payload));
    }
}

和弹簧云流的配置。

spring:
  cloud:
    stream:
      dynamicDestinations: output.topic.1,output.topic2,output.topic.3

我在这里找到https://docs.spring.io/spring-cloud-stream/docs/elmhurst.release/reference/htmlsingle/index.html#dynamicdestination它将在Spring Cloud Stream版本2 中使用。我使用2.1.2

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
    <version>2.1.2.RELEASE</version>
</dependency>

这是从动态目标

消耗消息的方法

https://stackoverflow.com/a/56148190/4587961

配置

spring:
  cloud:
    stream:
      default:
        consumer:
          concurrency: 2
          partitioned: true
      bindings:
        # inputs
        input:
          group: application_name_group
          destination: topic-1,topic-2
          content-type: application/json;charset=UTF-8

java消费者。

@Component
@EnableBinding(Sink.class)
public class CommonConsumer {
    private final static Logger logger = LoggerFactory.getLogger(CommonConsumer.class);
    @StreamListener(target = Sink.INPUT)
    public void consumeMessage(final Message<Object> message) {
        logger.info("Received a message: nmessage:n{}", message.getPayload());
        final String topic = message.getHeaders().get("kafka_receivedTopic");
        // Here I define logic which handles messages depending on message headers and topic.
        // In my case I have configuration which forwards these messages to webhooks, so I need to have mapping topic name -> webhook URI.
    }
}

最新更新