弹簧集成入站通道适配器,用于逐行读取大文件



我目前正在使用Spring Integration 4.1.0和Spring 4.1.2。我要求能够逐行读取文件,并将每行读取用作消息。 基本上,我想允许"重播"我们的消息源之一,但消息不是保存在单个文件中,而是保存在单个文件中。 我对此用例没有事务要求。我的要求与此帖子类似,除了与运行 JVM 的文件位于同一服务器上的文件: 弹簧集成 - 逐行读取远程文件

在我看来,我有以下选择:

1.使用int-file:inbound-channel-adapter读取文件,然后"拆分"该文件,以便1条消息现在变成多条消息。示例配置文件:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
        <int-file:inbound-channel-adapter id="filereader" directory="/tmp" filename-pattern="myfile.txt" channel="channel1"/>
        <int-file:file-to-string-transformer input-channel="channel1" output-channel="channel2"/>
        <int:channel id="channel1"/>
        <int:splitter input-channel="channel2" output-channel="nullChannel"/>
        <int:channel id="channel2"/>
    </beans>

问题是文件非常大,当使用上述技术时,整个文件首先被读入内存,然后被拆分,JVM耗尽堆空间。 实际上需要的步骤是:读取一行并将行转换为消息,发送消息,从内存中删除消息,重复。

  1. int-file:tail-inbound-channel-adapterend="false"一起使用(这基本上表示从文件的开头读取)。 根据需要为每个文件启动和停止此适配器(在每次启动之前更改文件名)。示例配置文件:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
        <int-file:tail-inbound-channel-adapter id="apache"
            channel="exchangeSpringQueueChannel"
            task-executor="exchangeFileReplayTaskExecutor"
            file="C:p2-test.txt"
            delay="1"
            end="false"
            reopen="true"
            file-delay="10000" />
        <int:channel id="exchangeSpringQueueChannel" />
        <task:executor id="exchangeFileReplayTaskExecutor" pool-size="1" />
    </beans>
    
  2. 将 Spring Integration 调用到 Spring Batch 并使用ItemReader来处理文件。 当然允许对整个过程进行更细粒度的控制,但使用作业存储库等设置内容需要相当多的工作(而且我不关心作业历史记录,所以我要么告诉作业不记录状态和/或使用内存中MapJobRepository)。

4.通过扩展MessageProducerSupport创建自己的FileLineByLineInboundChannelAdapter。大部分代码可以从ApacheCommonsFileTailingMessageProducer借用(另请参阅 http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter)。 下面是一个示例,但需要一些工作才能将读数放入它自己的Thread中,以便在逐行阅读时遵守stop()命令。

    package com.xxx.exchgateway.common.util.springintegration;
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import org.apache.commons.io.IOUtils;
    import org.springframework.core.task.SimpleAsyncTaskExecutor;
    import org.springframework.core.task.TaskExecutor;
    import org.springframework.integration.core.MessageSource;
    import org.springframework.integration.endpoint.MessageProducerSupport;
    import org.springframework.integration.file.FileHeaders;
    import org.springframework.messaging.Message;
    import org.springframework.util.Assert;
    /**
     * A lot of the logic for this class came from {@link #ApacheCommonsFileTailingMessageProducer}.
     * See {@link http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter}
     */
    public class FileLineByLineInboundChannelAdapter extends MessageProducerSupport implements MessageSource<String> {
        private volatile File file;
        /**
         * The name of the file you wish to tail.
         * @param file The absolute path of the file.
         */
        public void setFile(File file) {
            Assert.notNull("'file' cannot be null");
            this.file = file;
        }
        protected File getFile() {
            if (this.file == null) {
                throw new IllegalStateException("No 'file' has been provided");
            }
            return this.file;
        }
        @Override
        public String getComponentType() {
            return "file:line-by-line-inbound-channel-adapter";
        }
        private void readFile() {
            FileInputStream fstream;
            try {
                fstream = new FileInputStream(getFile());
                BufferedReader br = new BufferedReader(new InputStreamReader(fstream));
                String strLine;
                // Read File Line By Line, make sure we honor if someone manually sets the isRunning=false (via clicking the stop() method in JMX)
                while ((strLine = br.readLine()) != null && isRunning()) {
                    send(strLine);
                }
                //Close the input stream
                IOUtils.closeQuietly(br);
                IOUtils.closeQuietly(fstream);
            } catch (FileNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        @Override
        protected void doStart() {
            super.doStart();
            // TODO this needs to be moved into it's own thread since isRunning() will return "false" until this method has completed
            // and we want to honor the stop() command while we read line-by-line
            readFile();
        }
        protected void send(String line) {
            Message<?> message = this.getMessageBuilderFactory().withPayload(line).setHeader(FileHeaders.FILENAME, this.file.getAbsolutePath()).build();
            super.sendMessage(message);
        }
        @Override
        public Message<String> receive() {
            // TODO Auto-generated method stub
            return null;
        }
    }

在我看来,我的用例并不超出人们可能喜欢做的典型事情的范围,所以我很惊讶我找不到开箱即用的解决方案。 然而,我已经搜索了很多,查看了很多例子,不幸的是还没有找到适合我需求的东西。

假设也许我错过了框架已经提供的一些明显的东西(尽管这可能属于Spring Integraton和Spring Batch之间的模糊界限)。 有人可以让我知道我的想法是否完全偏离基础,或者是否有一个简单的解决方案我错过了,或者提供替代建议?

Spring Integration 4.x有一个很好的新功能,使用Iterator作为消息:

弹簧集成参考

从版本 4.1 开始,AbstractMessageSplitter 支持要拆分的值的迭代器类型。

这允许将迭代器作为消息发送,而不是将整个文件读取到内存中。

下面是一个 Spring 上下文将 CSV 文件拆分为每行一条消息的简单示例:

<int-file:inbound-channel-adapter 
        directory="${inputFileDirectory:/tmp}"
        channel="inputFiles"/>
<int:channel id="inputFiles">
    <int:dispatcher task-executor="executor"/>
</int:channel>
<int:splitter 
    input-channel="inputFiles" 
    output-channel="output">
    <bean 
        class="FileSplitter" 
        p:commentPrefix="${commentPrefix:#}" />
</int:splitter>
<task:executor 
    id="executor" 
    pool-size="${poolSize:8}" 
    queue-capacity="${aueueCapacity:0}" 
    rejection-policy="CALLER_RUNS" />
<int:channel id="output"/>

这是拆分器实现:

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
public class FileSplitter extends AbstractMessageSplitter {
    private static final Logger log = LoggerFactory.getLogger(FileSplitter.class);
    private String commentPrefix = "#";
    public Object splitMessage(Message<?> message) {
        if(log.isDebugEnabled()) {
            log.debug(message.toString());
        }
        try {
            Object payload = message.getPayload();
            Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload"); 
            return new BufferedReaderFileIterator((File) payload);
        } 
        catch (IOException e) {
            String msg = "Unable to transform file: " + e.getMessage();
            log.error(msg);
            throw new MessageTransformationException(msg, e);
        }
    }
    public void setCommentPrefix(String commentPrefix) {
        this.commentPrefix = commentPrefix;
    }
    public class BufferedReaderFileIterator implements Iterator<String> {
        private File file;
        private BufferedReader bufferedReader;
        private String line;
        public BufferedReaderFileIterator(File file) throws IOException {
            this.file = file;
            this.bufferedReader = new BufferedReader(new FileReader(file));
            readNextLine();
        }
        @Override
        public boolean hasNext() {
            return line != null;
        }
        @Override
        public String next() {
            try {
                String res = this.line;
                readNextLine();
                return res;
            } 
            catch (IOException e) {
                log.error("Error reading file", e);
                throw new RuntimeException(e);
            }   
        }
        void readNextLine() throws IOException {
            do {
                line = bufferedReader.readLine();
            }
            while(line != null && line.trim().startsWith(commentPrefix));
            if(log.isTraceEnabled()) {
                log.trace("Read next line: {}", line);
            }
            if(line == null) {
                close();
            }
        }
        void close() throws IOException {
            bufferedReader.close();
            file.delete();
        }
        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }
}

请注意从 splitMessage() 处理程序方法返回的迭代器对象。

我也有这个,我也将文件复制到另一个文件夹并从文件中读取数据

文件复制应用程序上下文.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:file="http://www.springframework.org/schema/integration/file"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file
            http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/context 
            http://www.springframework.org/schema/context/spring-context.xsd">
    <context:property-placeholder />
    <file:inbound-channel-adapter id="filesIn"
        directory="E:/usmandata/logs/input/" filter="onlyPropertyFiles"
        auto-startup="true">
        <int:poller id="poller" fixed-delay="500" />
    </file:inbound-channel-adapter>

    <int:service-activator input-channel="filesIn"
        output-channel="filesOut" ref="handler" />
    <file:outbound-channel-adapter id="filesOut"
        directory="E:/usmandata/logs/output/" />


    <bean id="handler" class="com.javarticles.spring.integration.file.FileHandler" />
    <bean id="onlyPropertyFiles"
        class="org.springframework.integration.file.config.FileListFilterFactoryBean"
        p:filenamePattern="*.log" />
</beans>

文件处理程序.java

package com.javarticles.spring.integration.file;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class FileHandler {
    public File handleFile(File input) throws IOException {
       // System.out.println("Copying file: " + input.getAbsolutePath());

        RandomAccessFile file = new RandomAccessFile(input,"r");
        FileChannel channel = file.getChannel();
        //System.out.println("File size is: " + channel.size());
        ByteBuffer buffer = ByteBuffer.allocate((int) channel.size());
        channel.read(buffer);
        buffer.flip();//Restore buffer to position 0 to read it
        System.out.println("Reading content and printing ... ");
        for (int i = 0; i < channel.size(); i++) {
            System.out.print((char) buffer.get());
        }
        channel.close();
        file.close();
        return input;
    }
}

SpringIntegrationFileCopyExample.java

package com.javarticles.spring.integration.file;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class SpringIntegrationFileCopyExample {
    public static void main(String[] args) throws InterruptedException, IOException {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                "fileCopyApplicationContext.xml");
    }
}

最新更新