弹簧集成.缺少消息.投票结果为消息:通用消息



检查输出时,我发现某些消息丢失了。我为 org.springframework.integration 启用调试日志记录。但我不明白。为什么我发送 4 条消息(4 个文件)到队列,但只收到 2 条,而不是 4 条。它是什么 - 投票结果是消息:通用消息?如何接收所有消息?

集成.xml

   <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:task="http://www.springframework.org/schema/task"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
       ">
    <context:annotation-config/>
    <context:component-scan base-package="*"/>
    <int:poller id="defaultPoller"
            fixed-delay="1000" default="true" max-messages-per-poll="-1"/>
    <int:channel id="list">
        <int:queue capacity="100"/>
    </int:channel>
    <task:scheduled-tasks>
        <task:scheduled ref="get"
                        method="getList"
                        cron="5 * * * * *"/>
    </task:scheduled-tasks>
    <int:service-activator method="receive" ref="read" input-channel="list"/>
    <bean id="get" class="*.GetListFiles"/>
    <bean id="read" class="*.ReadOneChannel"/>
</beans>

获取列表文件.class:

      private static final Logger logger = LoggerFactory.getLogger(GetListFiles.class);
    private MessageChannel channel;
    public GetListFiles(@Qualifier("list") MessageChannel messageChannel) {
        this.channel = messageChannel;
    }
    @Value("${dir}")
    private String dir;
    public void getList() {
        try (Stream<Path> stream = Files.walk(Paths.get(dir))) {
            stream.filter(path -> Files.isRegularFile(path) && path.getFileName().toString().endsWith(".pdf"))
                    .forEach(path -> {
                        Message<Path> message = MessageBuilder.withPayload(path).build();
                        logger.info("Message send: {}", message);
                        boolean send = channel.send(message);
                    });
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

ReadOneChannel.class:

       private static final Logger LOGGER =  LoggerFactory.getLogger(ReadOneChannel.class);
    @Autowired
    @Qualifier("list")
    private QueueChannel queueChannel;
    public void receive() {
        Message message = queueChannel.receive();
       LOGGER.info("PAYLOAD: {}", message.getPayload());
    }

日志:

019-01-15 19:21:05 - Message send: GenericMessage [payload=/home/user/data/pdf/first/1.pdf, headers={id=af03c8be-5ace-c0f2-1a5f-7255202e76ba, timestamp=1547569265028}]
2019-01-15 19:21:05 - preSend on channel 'list', message: GenericMessage [payload=/home/user/data/pdf/first/1.pdf, headers={id=af03c8be-5ace-c0f2-1a5f-7255202e76ba, timestamp=1547569265028}]
2019-01-15 19:21:05 - postSend (sent=true) on channel 'list', message: GenericMessage [payload=/home/user/data/pdf/first/1.pdf, headers={id=af03c8be-5ace-c0f2-1a5f-7255202e76ba, timestamp=1547569265028}]
2019-01-15 19:21:05 - Message send: GenericMessage [payload=/home/user/data/pdf/second/2.pdf, headers={id=8cf90469-b677-9eb7-d944-388962d96fd7, timestamp=1547569265031}]
2019-01-15 19:21:05 - preSend on channel 'list', message: GenericMessage [payload=/home/user/data/pdf/second/2.pdf, headers={id=8cf90469-b677-9eb7-d944-388962d96fd7, timestamp=1547569265031}]
2019-01-15 19:21:05 - postSend (sent=true) on channel 'list', message: GenericMessage [payload=/home/user/data/pdf/second/2.pdf, headers={id=8cf90469-b677-9eb7-d944-388962d96fd7, timestamp=1547569265031}]
2019-01-15 19:21:05 - postReceive on channel 'list', message: GenericMessage [payload=/home/user/data/pdf/first/1.pdf, headers={id=af03c8be-5ace-c0f2-1a5f-7255202e76ba, timestamp=1547569265028}]
2019-01-15 19:21:05 - Poll resulted in Message: GenericMessage [payload=/home/user/data/pdf/first/1.pdf, headers={id=af03c8be-5ace-c0f2-1a5f-7255202e76ba, timestamp=1547569265028}]
2019-01-15 19:21:05 - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@6a01928b] (org.springframework.integration.config.ServiceActivatorFactoryBean#0) received message: GenericMessage [payload=/home/user/data/pdf/first/1.pdf, headers={id=af03c8be-5ace-c0f2-1a5f-7255202e76ba, timestamp=1547569265028}]
2019-01-15 19:21:05 - Message send: GenericMessage [payload=/home/user/data/pdf/third/3.pdf, headers={id=a797d7c0-16b8-8b06-07ed-3bcb4f16ff9b, timestamp=1547569265032}]
2019-01-15 19:21:05 - preSend on channel 'list', message: GenericMessage [payload=/home/user/data/pdf/third/3.pdf, headers={id=a797d7c0-16b8-8b06-07ed-3bcb4f16ff9b, timestamp=1547569265032}]
2019-01-15 19:21:05 - postSend (sent=true) on channel 'list', message: GenericMessage [payload=/home/user/data/pdf/third/3.pdf, headers={id=a797d7c0-16b8-8b06-07ed-3bcb4f16ff9b, timestamp=1547569265032}]
2019-01-15 19:21:05 - Message send: GenericMessage [payload=/home/user/data/pdf/fourth/4.pdf, headers={id=7c5b1c90-1a9c-bdd3-5e33-1110f2d5933b, timestamp=1547569265033}]
2019-01-15 19:21:05 - preSend on channel 'list', message: GenericMessage [payload=/home/user/data/pdf/fourth/4.pdf, headers={id=7c5b1c90-1a9c-bdd3-5e33-1110f2d5933b, timestamp=1547569265033}]
2019-01-15 19:21:05 - postSend (sent=true) on channel 'list', message: GenericMessage [payload=/home/user/data/pdf/fourth/4.pdf, headers={id=7c5b1c90-1a9c-bdd3-5e33-1110f2d5933b, timestamp=1547569265033}]
2019-01-15 19:21:05 - postReceive on channel 'list', message: GenericMessage [payload=/home/user/data/pdf/second/2.pdf, headers={id=8cf90469-b677-9eb7-d944-388962d96fd7, timestamp=1547569265031}]
2019-01-15 19:21:05 - PAYLOAD: /home/user/data/pdf/second/2.pdf
2019-01-15 19:21:05 - handler 'ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@6a01928b] (org.springframework.integration.config.ServiceActivatorFactoryBean#0)' produced no reply for request Message: GenericMessage [payload=/home/user/data/pdf/first/1.pdf, headers={id=af03c8be-5ace-c0f2-1a5f-7255202e76ba, timestamp=1547569265028}]
2019-01-15 19:21:05 - postReceive on channel 'list', message: GenericMessage [payload=/home/user/data/pdf/third/3.pdf, headers={id=a797d7c0-16b8-8b06-07ed-3bcb4f16ff9b, timestamp=1547569265032}]
2019-01-15 19:21:05 - Poll resulted in Message: GenericMessage [payload=/home/user/data/pdf/third/3.pdf, headers={id=a797d7c0-16b8-8b06-07ed-3bcb4f16ff9b, timestamp=1547569265032}]
2019-01-15 19:21:05 - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@6a01928b] (org.springframework.integration.config.ServiceActivatorFactoryBean#0) received message: GenericMessage [payload=/home/user/data/pdf/third/3.pdf, headers={id=a797d7c0-16b8-8b06-07ed-3bcb4f16ff9b, timestamp=1547569265032}]
2019-01-15 19:21:05 - postReceive on channel 'list', message: GenericMessage [payload=/home/user/data/pdf/fourth/4.pdf, headers={id=7c5b1c90-1a9c-bdd3-5e33-1110f2d5933b, timestamp=1547569265033}]
2019-01-15 19:21:05 - PAYLOAD: /home/user/data/pdf/fourth/4.pdf
2019-01-15 19:21:05 - handler 'ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@6a01928b] (org.springframework.integration.config.ServiceActivatorFactoryBean#0)' produced no reply for request Message: GenericMessage [payload=/home/user/data/pdf/third/3.pdf, headers={id=a797d7c0-16b8-8b06-07ed-3bcb4f16ff9b, timestamp=1547569265032}]

感谢您的支持。

您在通道list上有 2 个使用者 - 服务激活器和您的

Message message = queueChannel.receive();

您的read方法应该很简单

public void read(Message<?> message)

框架将在每条消息中调用它。

最新更新