Spring Cloud Stream动态通道



我正在使用Spring Cloud Stream,并希望以编程方式创建和绑定通道。我的用例是,在应用程序启动期间,我收到了要订阅的Kafka主题的动态列表。然后我如何为每个主题创建一个通道?

我最近遇到了类似的场景,下面是我动态创建SubscriberChannels的示例。

ConsumerProperties consumerProperties = new ConsumerProperties();
consumerProperties.setMaxAttempts(1); 
BindingProperties bindingProperties = new BindingProperties();
bindingProperties.setConsumer(consumerProperties);
bindingProperties.setDestination(retryTopic);
bindingProperties.setGroup(consumerGroup);
bindingServiceProperties.getBindings().put(consumerName, bindingProperties);
SubscribableChannel channel = (SubscribableChannel)bindingTargetFactory.createInput(consumerName);
beanFactory.registerSingleton(consumerName, channel);
channel = (SubscribableChannel)beanFactory.initializeBean(channel, consumerName);
bindingService.bindConsumer(channel, consumerName);
channel.subscribe(consumerMessageHandler);

我不得不为Camel Spring Cloud Stream组件做一些类似的事情。也许绑定目的地的Consumer代码"实际上只是一个指示频道名称的String"对您有用?

在我的情况下,我只绑定一个目的地,但我认为多个目的地在概念上没有太大区别。

其要点如下:

@Override
protected void doStart() throws Exception {
SubscribableChannel bindingTarget = createInputBindingTarget();
bindingTarget.subscribe(message -> {
// have your way with the received incoming message
});
endpoint.getBindingService().bindConsumer(bindingTarget,
endpoint.getDestination());
// at this point the binding is done
}
/**
* Create a {@link SubscribableChannel} and register in the
* {@link org.springframework.context.ApplicationContext}
*/
private SubscribableChannel createInputBindingTarget() {
SubscribableChannel channel = endpoint.getBindingTargetFactory()
.createInputChannel(endpoint.getDestination());
endpoint.getBeanFactory().registerSingleton(endpoint.getDestination(), channel);
channel = (SubscribableChannel) endpoint.getBeanFactory().initializeBean(channel,
endpoint.getDestination());
return channel;
}

请参阅此处获取更多上下文的完整来源。

我有一项任务,我事先不知道主题。我通过一个输入通道来解决这个问题,该通道可以监听我需要的所有主题。

https://docs.spring.io/spring-cloud-stream/docs/Brooklyn.RELEASE/reference/html/_configuration_options.html

目标

绑定中间件上通道的目标目的地(例如RabbitMQ交换或Kafka主题)。如果通道绑定为使用者,则可以将其绑定到多个目的地,并且可以将目的地名称指定为逗号分隔的字符串值。如果未设置,则使用通道名称。

所以我的配置

spring:
cloud:
stream:
default:
consumer:
concurrency: 2
partitioned: true
bindings:
# inputs
input:
group: application_name_group
destination: topic-1,topic-2
content-type: application/json;charset=UTF-8

然后,我定义了一个使用者来处理来自所有这些主题的消息。

@Component
@EnableBinding(Sink.class)
public class CommonConsumer {
private final static Logger logger = LoggerFactory.getLogger(CommonConsumer.class);
@StreamListener(target = Sink.INPUT)
public void consumeMessage(final Message<Object> message) {
logger.info("Received a message: nmessage:n{}", message.getPayload());
// Here I define logic which handles messages depending on message headers and topic.
// In my case I have configuration which forwards these messages to webhooks, so I need to have mapping topic name -> webhook URI.
}
}

请注意,在您的情况下,这可能不是一个解决方案。我需要将消息转发到webhook,这样我就可以进行配置映射。

我还考虑了其他想法。1) 你是没有Spring Cloud的kafka客户消费者。

2) 创建预定义数量的输入,例如50。

input-1
intput-2
...
intput-50

然后对其中一些输入进行配置。

相关讨论

  • Spring云流支持动态路由消息
  • https://github.com/spring-cloud/spring-cloud-stream/issues/690
  • https://github.com/spring-cloud/spring-cloud-stream/issues/1089

我们使用Spring Cloud 2.1.1 RELEASE

MessageChannel messageChannel = createMessageChannel(channelName);
messageChannel.send(getMessageBuilder().apply(data));
public MessageChannel createMessageChannel(String channelName) {
return (MessageChannel) applicationContext.getBean(channelName);}
public Function<Object, Message<Object>> getMessageBuilder() {
return payload -> MessageBuilder
.withPayload(payload)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build();}

基于Sash的回答,我编写了一个可以动态注册消费者的演示。

@Autowired
void dynamicBinding(BindingService bindingService,
BindingServiceProperties bindingServiceProperties,
RabbitExtendedBindingProperties rabbitExtendedBindingProperties,
BindingTargetFactory channelFactory) {
final var CONSUMER_NAME = "onRefresh";               // SCS Consumer Name
final var RABBIT_EXCHANGE = "ouroboros.app.control"; // RabbitMQ Exchange Name
// RabbitMQ Properties
var rabbitConsumerProperties = new RabbitConsumerProperties();
var consumerProperties = new ExtendedConsumerProperties<>(rabbitConsumerProperties);
var rabbitBindingProperties = new RabbitBindingProperties();
var bindingProperties = new BindingProperties();
rabbitConsumerProperties.setExchangeType("fanout");
rabbitConsumerProperties.setAcknowledgeMode(AcknowledgeMode.MANUAL);
rabbitConsumerProperties.setRequeueRejected(false);
rabbitConsumerProperties.setRepublishToDlq(true);
rabbitConsumerProperties.setAutoBindDlq(true);
rabbitConsumerProperties.setDeclareDlx(true);
rabbitConsumerProperties.setDeadLetterExchange("ouroboros.app.control.dlx");
rabbitConsumerProperties.setDeadLetterExchangeType("topic");
consumerProperties.populateBindingName(CONSUMER_NAME);
bindingProperties.setDestination(RABBIT_EXCHANGE);
bindingProperties.setConsumer(consumerProperties);
rabbitBindingProperties.setConsumer(rabbitConsumerProperties);
bindingServiceProperties.getBindings().put(CONSUMER_NAME, bindingProperties);
rabbitExtendedBindingProperties.setBindings(Collections.singletonMap(CONSUMER_NAME, rabbitBindingProperties));
// Channel Name same as SCS Consumer name
var channel = (SubscribableChannel)channelFactory.createInput(CONSUMER_NAME);
// bind consumer
bindingService.bindConsumer(channel, CONSUMER_NAME);
// subscribe channel
channel.subscribe((message) -> {
logger.info("onRefresh: {}", message.getPayload());
throw new MessagingException("reject"); // How to gracefully reject consuming messages?
});
}

对于传入消息,可以显式使用BinderAwareChannelResolver来动态解析目的地。您可以查看此示例,其中router接收器使用绑定器感知通道解析器。

最新更新