如何在apachecamel中并行处理gcp-pubsub消息



下面的代码从pubsub-source主题中获取消息->根据模板进行转换->然后将转换后的消息发布到目标主题。

但为了提高性能,我需要同时完成这项任务。也就是说,我需要轮询500条消息,然后并行转换,然后将它们发布到目标主题。

从camel-gcp组件文档中,我相信maxMessagesPerPoll和concurrentConsumers参数可以完成这项工作。由于缺乏文档,我不确定它在内部是如何工作的。

我的意思是:a(如果我轮询500条消息,它会创建500条并行路由来处理消息并将其发布到目标主题吗?b(消息的排序如何?c(我是否应该将并行处理EIP作为替代

等等。

的概念我不清楚

// my route
private void addRouteToContext(final PubSub pubSub) throws Exception {
this.camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
errorHandler(deadLetterChannel("google-pubsub:{{gcp_project_id}}:{{pubsub.dead.letter.topic}}")
.useOriginalMessage().onPrepareFailure(new FailureProcessor()));


/*
* from topic
*/
from("google-pubsub:{{gcp_project_id}}:" + pubSub.getFromSubscription() + "?"
+ "maxMessagesPerPoll={{consumer.maxMessagesPerPoll}}&"
+ "concurrentConsumers={{consumer.concurrentConsumers}}").
/*
* transform using the velocity
*/
to("velocity:" + pubSub.getToTemplate() + "?contentCache=true").
/*
* attach header to the transform message
*/
setHeader("Header ", simple("${date:now:yyyyMMdd}")).routeId(pubSub.getRouteId()).
/*
* log the transformed event
*/
log("${body}").
/*
* publish the transformed event to the target topic
*/
to("google-pubsub:{{gcp_project_id}}:" + pubSub.getToTopic());
}
});
}

a(如果我轮询500条消息,它会创建500条并行路由来处理消息并将其发布到目标主题吗

不,Camel在这种情况下不会创建500个并行线程。正如您所怀疑的,并发使用者线程的数量是用concurrentConsumers设置的。因此,如果定义maxMessagesPerPoll为500的5个concurrentConsumers,每个消费者将获取多达500条消息,并在单个线程中一个接一个地处理它们。也就是说,您有5条消息被并行处理。

消息的排序如何

一旦并行处理消息,消息的顺序就会混乱。但是,当您遇到处理错误时,1 Consumer已经发生了这种情况,这些错误会绕过deadLetterChannel,稍后再进行处理。

我是否应该将并行处理EIP作为一种替代

仅当concurrentConsumers选项不足时。

当您提到concurrentConsumers选项(比如concurrentConsumers=10(时,您要求Camel创建一个由10个线程组成的线程池,这10个线程中的每一个都将从pub子队列中获取不同的消息并进行处理。

这里需要注意的是,当您指定concurrentConsumers选项时,线程池使用固定的大小,这意味着固定数量的活动线程一直在等待处理传入消息。因此,10个线程(因为我指定了concurrentConsumers=10(将等待处理我的消息,即使没有10条消息同时传入。

显然,这并不能保证传入消息将以相同的顺序进行处理。如果您希望以相同的顺序排列消息,您可以查看Resequencer EIP以对消息进行排序。

至于你的第三个问题,我认为谷歌pubsub组件不允许并行处理选项。您可以使用Threads EIP制作自己的。这肯定会让您能够更好地控制并发。

使用Threads,您的代码看起来像这样:

from("google-pubsub:project-id:destinationName?maxMessagesPerPoll=20")
// the 2 parameters are 'pool size' and 'max pool size'
.threads(5, 20)
.to("direct:out");

最新更新