Apache camel kafka aggregate before produce msg but lost hea



我使用apache camel Kafka with spring boot
3.14.2
我使用apache camel Kafka组件的默认配置

<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-kafka-starter</artifactId>
<version>${camel.version}</version>
</dependency>

My route camel - fileconsome有6000行

from(fileConsume).split(body().tokenize()).setHeader("testHeader", "valueHeader").aggregate(new GroupedMessageAggregationStrategy())
.constant(true).completionTimeout(100L).to("kafka:topicTest");

Kafka上生成的所有消息都非常快(不到2秒),但是头文件不存在。

remove aggregate

from(fileConsume).split(body().tokenize()).setHeader("testHeader", "valueHeader").to("kafka:topicTest");

Kafka从文件中生成的所有消息非常低(超过10分钟),但头存在。

我需要一些帮助,以产生消息与apache骆驼kafka组件的速度方式与头。

you must do this, in order to keep header when aggregation is doing.
from("sftp://xxxxxxx@localhost:"
+ "2222/data/in"
+ "?password="
+ "&preferredAuthentications=publickey"
+ "&knownHostsFile=~/.ssh/known_hosts"
+ "&privateKeyFile=xxxxxxx"
+ "&privateKeyPassphrase="
+ "&passiveMode=true"
+ "&fastExistsCheck=true"
+ "&download=true"
+ "&delete=true"
+ "&stepwise=false"
+ "&antInclude=*"
+ "&antExclude=**reject**"
+ "&recursive=false"
+ "&maxMessagesPerPoll=10"
+ "&initialDelay=0"
+ "&delay=0"
+ "&connectTimeout=10000"
+ "&soTimeout=300000"
+ "&timeout=30000"
+ "&shuffle=true"
+ "&eagerMaxMessagesPerPoll=false"
+ "&moveFailed=reject"
+ "&binary=true"
+ "&localWorkDirectory=/opt/camel_data/kafka/"
+ "&readLock=none"
+ "&readLockCheckInterval=1000"
+ "&readLockMinLength=1"
+ "&readLockLoggingLevel=INFO"
+ "&readLockIdempotentReleaseDelay=10000"
+ "&readLockRemoveOnCommit=false"
+ "&readLockRemoveOnRollback=true"
+ "&bulkRequests=1000"
+ "&charset=utf-8")
.routeId("Consume SFTP")
.id("Consume SFTP")
.setProperty("yoda_core_technical_id").header(Exchange.BREADCRUMB_ID)
.setProperty("x_filename_source").header(Exchange.FILE_NAME_ONLY)
.setProperty("x_filepath_source").header("CamelFileAbsolutePath")
.setProperty("x_correlation_id").header("CamelFileName")
.split(body().tokenize())
.setHeader("test",constant("test"))
.aggregate(new GroupedMessageAggregationStrategy())
.constant(true)
.completionTimeout(100L)
.to("direct:aggregate");
from("direct:aggregate")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println(exchange);
GenericFileMessage<String> message =(GenericFileMessage<String>) exchange.getMessage().getBody(List.class).get(0);
exchange.getMessage().setHeader("test",
message.getHeader("test"));
}
})
.to("mock:result");

相关内容

最新更新