下面的代码从pubsub-source主题中获取消息->根据模板进行转换->然后将转换后的消息发布到目标主题。
但为了提高性能,我需要同时完成这项任务。也就是说,我需要轮询500条消息,然后并行转换,然后将它们发布到目标主题。
从camel-gcp组件文档中,我相信maxMessagesPerPoll和concurrentConsumers参数可以完成这项工作。由于缺乏文档,我不确定它在内部是如何工作的。
我的意思是:a(如果我轮询500条消息,它会创建500条并行路由来处理消息并将其发布到目标主题吗?b(消息的排序如何?c(我是否应该将并行处理EIP作为替代
等等。
的概念我不清楚
是
// my route
private void addRouteToContext(final PubSub pubSub) throws Exception {
this.camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
errorHandler(deadLetterChannel("google-pubsub:{{gcp_project_id}}:{{pubsub.dead.letter.topic}}")
.useOriginalMessage().onPrepareFailure(new FailureProcessor()));
/*
* from topic
*/
from("google-pubsub:{{gcp_project_id}}:" + pubSub.getFromSubscription() + "?"
+ "maxMessagesPerPoll={{consumer.maxMessagesPerPoll}}&"
+ "concurrentConsumers={{consumer.concurrentConsumers}}").
/*
* transform using the velocity
*/
to("velocity:" + pubSub.getToTemplate() + "?contentCache=true").
/*
* attach header to the transform message
*/
setHeader("Header ", simple("${date:now:yyyyMMdd}")).routeId(pubSub.getRouteId()).
/*
* log the transformed event
*/
log("${body}").
/*
* publish the transformed event to the target topic
*/
to("google-pubsub:{{gcp_project_id}}:" + pubSub.getToTopic());
}
});
}
a(如果我轮询500条消息,它会创建500条并行路由来处理消息并将其发布到目标主题吗
不,Camel在这种情况下不会创建500个并行线程。正如您所怀疑的,并发使用者线程的数量是用concurrentConsumers
设置的。因此,如果定义maxMessagesPerPoll
为500的5个concurrentConsumers
,每个消费者将获取多达500条消息,并在单个线程中一个接一个地处理它们。也就是说,您有5条消息被并行处理。
消息的排序如何
一旦并行处理消息,消息的顺序就会混乱。但是,当您遇到处理错误时,1 Consumer已经发生了这种情况,这些错误会绕过deadLetterChannel
,稍后再进行处理。
我是否应该将并行处理EIP作为一种替代
仅当concurrentConsumers
选项不足时。
当您提到concurrentConsumers
选项(比如concurrentConsumers=10
(时,您要求Camel创建一个由10个线程组成的线程池,这10个线程中的每一个都将从pub子队列中获取不同的消息并进行处理。
这里需要注意的是,当您指定concurrentConsumers选项时,线程池使用固定的大小,这意味着固定数量的活动线程一直在等待处理传入消息。因此,10个线程(因为我指定了concurrentConsumers=10(将等待处理我的消息,即使没有10条消息同时传入。
显然,这并不能保证传入消息将以相同的顺序进行处理。如果您希望以相同的顺序排列消息,您可以查看Resequencer EIP以对消息进行排序。
至于你的第三个问题,我认为谷歌pubsub组件不允许并行处理选项。您可以使用Threads EIP制作自己的。这肯定会让您能够更好地控制并发。
使用Threads,您的代码看起来像这样:
from("google-pubsub:project-id:destinationName?maxMessagesPerPoll=20")
// the 2 parameters are 'pool size' and 'max pool size'
.threads(5, 20)
.to("direct:out");