如何创建PubSubTemplate的多个bean



我想创建两个PubSubTemplate类bean来设置不同的消息转换器。我有两个订阅者,其中一个接收Json响应,另一个接收String响应。为了处理这两个场景,我创建了两个PubSubTemplatebean。下面是我的PubSubTemplateConfig.java:

@Configuration
public class PubSubTemplateConfig {
@Bean
public PubSubTemplate pubSubTemplateForUserCreation(PubSubPublisherTemplate pubSubPublisherTemplate,
PubSubSubscriberTemplate pubSubSubscriberTemplate) {
PubSubTemplate template = new PubSubTemplate(pubSubPublisherTemplate, pubSubSubscriberTemplate);
template.setMessageConverter(new JacksonPubSubMessageConverter(getObjectMapper()));
return template;
}

private ObjectMapper getObjectMapper() {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
return objectMapper;
}
@Bean
public PubSubTemplate pubSubTemplateForAuditTracker(PubSubPublisherTemplate pubSubPublisherTemplate,
PubSubSubscriberTemplate pubSubSubscriberTemplate) {
PubSubTemplate template = new PubSubTemplate(pubSubPublisherTemplate, pubSubSubscriberTemplate);
template.setMessageConverter(new SimplePubSubMessageConverter());
return template;
}

}

下面两个是订户配置:AuditsubscriptioncriptionConfiguration.java

@Configuration
public class AuditsubscriptioncriptionConfiguration {

@Value("${subscriptioncription.auditsubscriptioncription}")
private String subscription;

@Bean("pubsubAuditInputChannel")
public MessageChannel pubsubAuditInputChannel() {
return new DirectChannel();
}

@Bean
public PubSubInboundChannelAdapter auditMessageChannelAdapter(@Qualifier("pubsubAuditInputChannel") MessageChannel pubsubAuditInputChannel,
@Qualifier("pubSubTemplateForAuditTracker") PubSubTemplate pubSubTemplateForAuditTracker) {

PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplateForAuditTracker, subscription);
adapter.setOutputChannel(pubsubAuditInputChannel);
adapter.setPayloadType(String.class); //need changes
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}    
}

UserSubscriptionConfiguration.java

@Configuration
public class UserSubscriptionConfiguration {

@Value("${subscription.userSubscriber}")
private String subscriber;

@Bean("pubsubInputChannel")
public MessageChannel pubsubInputChannel() {
return new DirectChannel();
}

@Bean
public PubSubInboundChannelAdapter userMessageChannelAdapter(@Qualifier("pubsubInputChannel") MessageChannel pubsubInputChannel,
@Qualifier("pubSubTemplateForUserCreation") PubSubTemplate pubSubTemplateForUserCreation) {

PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplateForUserCreation, subscriber);
adapter.setOutputChannel(pubsubInputChannel);
adapter.setPayloadType(UserChangeEvent.class);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
}

容器启动过程中观察到的步骤:

步骤1。首先用SimplePubSubMessageConverter创建pubSubTemplateForAuditTrackerbean然后配置AuditMessageChannelAdapterbean

步骤2。pubSubTemplateForUserCreationbean正在与JacksonPubSubMessageConverter一起创建,userMessageChannelAdapter正在配置

在这里,我应该有两个bean和两个不同的消息转换器,但是在调试时我发现只有一个PubSubTemplate实例存在,附加的消息转换器是JacksonPubSubMessageConverterpubSubTemplateForAuditTrackerbean正在被pubSubTemplateForUserCreationbean覆盖,尽管我已经用@Bean注释定义了它们两次。当auditMessageChannelAdapter接收String消息时,这种行为会导致错误我的期望是我想有两个独立的PubSubTemplatebean和两个不同的消息转换器。基本上,我想创建两个具有不同行为的类型为PubSubTemplate的bean。有人能帮我一下吗?我第一次探索GCP pub/sub。谢谢你

您可以像这样手动创建多个订阅

@PostConstruct
private void subscribeWithConcurrencyControl() {
// create subscription
TopicName topic = TopicName.ofProjectTopicName(projectId, this.eventTopic);
Subscription subscription = Subscription.newBuilder()
.setName("projects/XYZ/subscriptions/" + eventSubscription)
.setTopic(topic.toString())
.setPushConfig(PushConfig.getDefaultInstance())
.setAckDeadlineSeconds(100)
.build();
Subscription subscription2 = Subscription.newBuilder()
.setName("projects/XYZ/subscriptions/" + eventSubscription2)
.setTopic(topic.toString())
.setPushConfig(PushConfig.getDefaultInstance())
.setAckDeadlineSeconds(100)
.build();
try {
client.createSubscription(subscription);
} catch (AlreadyExistsException e) {
// nothing to do
}
try {
client.createSubscription(subscription2);
} catch (AlreadyExistsException e) {
// nothing to do
}
ProjectSubscriptionName subscriptionName1 = ProjectSubscriptionName.of(projectId, eventSubscription);
ProjectSubscriptionName subscriptionName2 = ProjectSubscriptionName.of(projectId, eventSubscription2);
// Instantiate an asynchronous message receiver.
MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> {
// Handle incoming message, then ack the received message.
try {
process(message);
consumer.ack();
} catch (Exception e) {
LOG.error("Failed to process message", e);
consumer.nack();
}
};
// Provides an executor service for processing messages. The default `executorProvider` used
// by the subscriber has a default thread count of 5.
ExecutorProvider executorProvider =
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(2).build();
FlowControlSettings flowControlSettings =
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(100L)
.build();
// `setParallelPullCount` determines how many StreamingPull streams the subscriber will open
// to receive message. It defaults to 1. `setExecutorProvider` configures an executor for the
// subscriber to process messages. Here, the subscriber is configured to open 2 streams for
// receiving messages, each stream creates a new executor with 4 threads to help process the
// message callbacks. In total 2x4=8 threads are used for message processing.
subscriber1 = Subscriber.newBuilder(subscriptionName1, receiver)
.setParallelPullCount(20)
.setExecutorProvider(executorProvider)
.setCredentialsProvider(credentialsProvider)
.setFlowControlSettings(flowControlSettings)
.build();
subscriber2 = Subscriber.newBuilder(subscriptionName2, receiver)
.setParallelPullCount(20)
.setExecutorProvider(executorProvider)
.setCredentialsProvider(credentialsProvider)
.setFlowControlSettings(flowControlSettings)
.build();
// Start the subscriber.
subscriber1.startAsync().awaitRunning();
subscriber2.startAsync().awaitRunning();
}

在process()方法中,您可以使用通常的objectmapper和/或instanceof命令来确定消息的类型(或者为不同的订阅设置不同的接收者,甚至在pubsub头中传输消息的类型)

private void process(PubsubMessage message) {
try {
ModificationEvent modificationEvent = objectMapper.readValue(message.getData().toStringUtf8(), ModificationEvent.class);
} catch(...)

相关内容

  • 没有找到相关文章

最新更新