我目前正在使用Spring Integration 4.1.0和Spring 4.1.2。我要求能够逐行读取文件,并将每行读取用作消息。 基本上,我想允许"重播"我们的消息源之一,但消息不是保存在单个文件中,而是保存在单个文件中。 我对此用例没有事务要求。我的要求与此帖子类似,除了与运行 JVM 的文件位于同一服务器上的文件: 弹簧集成 - 逐行读取远程文件
在我看来,我有以下选择:
1.使用int-file:inbound-channel-adapter
读取文件,然后"拆分"该文件,以便1条消息现在变成多条消息。示例配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int-file:inbound-channel-adapter id="filereader" directory="/tmp" filename-pattern="myfile.txt" channel="channel1"/>
<int-file:file-to-string-transformer input-channel="channel1" output-channel="channel2"/>
<int:channel id="channel1"/>
<int:splitter input-channel="channel2" output-channel="nullChannel"/>
<int:channel id="channel2"/>
</beans>
问题是文件非常大,当使用上述技术时,整个文件首先被读入内存,然后被拆分,JVM耗尽堆空间。 实际上需要的步骤是:读取一行并将行转换为消息,发送消息,从内存中删除消息,重复。
将
int-file:tail-inbound-channel-adapter
与end="false"
一起使用(这基本上表示从文件的开头读取)。 根据需要为每个文件启动和停止此适配器(在每次启动之前更改文件名)。示例配置文件:<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <int-file:tail-inbound-channel-adapter id="apache" channel="exchangeSpringQueueChannel" task-executor="exchangeFileReplayTaskExecutor" file="C:p2-test.txt" delay="1" end="false" reopen="true" file-delay="10000" /> <int:channel id="exchangeSpringQueueChannel" /> <task:executor id="exchangeFileReplayTaskExecutor" pool-size="1" /> </beans>
将 Spring Integration 调用到 Spring Batch 并使用
ItemReader
来处理文件。 当然允许对整个过程进行更细粒度的控制,但使用作业存储库等设置内容需要相当多的工作(而且我不关心作业历史记录,所以我要么告诉作业不记录状态和/或使用内存中MapJobRepository
)。
4.通过扩展MessageProducerSupport
创建自己的FileLineByLineInboundChannelAdapter
。大部分代码可以从ApacheCommonsFileTailingMessageProducer
借用(另请参阅 http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter)。 下面是一个示例,但需要一些工作才能将读数放入它自己的Thread
中,以便在逐行阅读时遵守stop()
命令。
package com.xxx.exchgateway.common.util.springintegration;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.commons.io.IOUtils;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.file.FileHeaders;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
* A lot of the logic for this class came from {@link #ApacheCommonsFileTailingMessageProducer}.
* See {@link http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter}
*/
public class FileLineByLineInboundChannelAdapter extends MessageProducerSupport implements MessageSource<String> {
private volatile File file;
/**
* The name of the file you wish to tail.
* @param file The absolute path of the file.
*/
public void setFile(File file) {
Assert.notNull("'file' cannot be null");
this.file = file;
}
protected File getFile() {
if (this.file == null) {
throw new IllegalStateException("No 'file' has been provided");
}
return this.file;
}
@Override
public String getComponentType() {
return "file:line-by-line-inbound-channel-adapter";
}
private void readFile() {
FileInputStream fstream;
try {
fstream = new FileInputStream(getFile());
BufferedReader br = new BufferedReader(new InputStreamReader(fstream));
String strLine;
// Read File Line By Line, make sure we honor if someone manually sets the isRunning=false (via clicking the stop() method in JMX)
while ((strLine = br.readLine()) != null && isRunning()) {
send(strLine);
}
//Close the input stream
IOUtils.closeQuietly(br);
IOUtils.closeQuietly(fstream);
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
protected void doStart() {
super.doStart();
// TODO this needs to be moved into it's own thread since isRunning() will return "false" until this method has completed
// and we want to honor the stop() command while we read line-by-line
readFile();
}
protected void send(String line) {
Message<?> message = this.getMessageBuilderFactory().withPayload(line).setHeader(FileHeaders.FILENAME, this.file.getAbsolutePath()).build();
super.sendMessage(message);
}
@Override
public Message<String> receive() {
// TODO Auto-generated method stub
return null;
}
}
在我看来,我的用例并不超出人们可能喜欢做的典型事情的范围,所以我很惊讶我找不到开箱即用的解决方案。 然而,我已经搜索了很多,查看了很多例子,不幸的是还没有找到适合我需求的东西。
我假设也许我错过了框架已经提供的一些明显的东西(尽管这可能属于Spring Integraton和Spring Batch之间的模糊界限)。 有人可以让我知道我的想法是否完全偏离基础,或者是否有一个简单的解决方案我错过了,或者提供替代建议?
Spring Integration 4.x有一个很好的新功能,使用Iterator作为消息:
弹簧集成参考
从版本 4.1 开始,AbstractMessageSplitter 支持要拆分的值的迭代器类型。
这允许将迭代器作为消息发送,而不是将整个文件读取到内存中。
下面是一个 Spring 上下文将 CSV 文件拆分为每行一条消息的简单示例:
<int-file:inbound-channel-adapter
directory="${inputFileDirectory:/tmp}"
channel="inputFiles"/>
<int:channel id="inputFiles">
<int:dispatcher task-executor="executor"/>
</int:channel>
<int:splitter
input-channel="inputFiles"
output-channel="output">
<bean
class="FileSplitter"
p:commentPrefix="${commentPrefix:#}" />
</int:splitter>
<task:executor
id="executor"
pool-size="${poolSize:8}"
queue-capacity="${aueueCapacity:0}"
rejection-policy="CALLER_RUNS" />
<int:channel id="output"/>
这是拆分器实现:
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
public class FileSplitter extends AbstractMessageSplitter {
private static final Logger log = LoggerFactory.getLogger(FileSplitter.class);
private String commentPrefix = "#";
public Object splitMessage(Message<?> message) {
if(log.isDebugEnabled()) {
log.debug(message.toString());
}
try {
Object payload = message.getPayload();
Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload");
return new BufferedReaderFileIterator((File) payload);
}
catch (IOException e) {
String msg = "Unable to transform file: " + e.getMessage();
log.error(msg);
throw new MessageTransformationException(msg, e);
}
}
public void setCommentPrefix(String commentPrefix) {
this.commentPrefix = commentPrefix;
}
public class BufferedReaderFileIterator implements Iterator<String> {
private File file;
private BufferedReader bufferedReader;
private String line;
public BufferedReaderFileIterator(File file) throws IOException {
this.file = file;
this.bufferedReader = new BufferedReader(new FileReader(file));
readNextLine();
}
@Override
public boolean hasNext() {
return line != null;
}
@Override
public String next() {
try {
String res = this.line;
readNextLine();
return res;
}
catch (IOException e) {
log.error("Error reading file", e);
throw new RuntimeException(e);
}
}
void readNextLine() throws IOException {
do {
line = bufferedReader.readLine();
}
while(line != null && line.trim().startsWith(commentPrefix));
if(log.isTraceEnabled()) {
log.trace("Read next line: {}", line);
}
if(line == null) {
close();
}
}
void close() throws IOException {
bufferedReader.close();
file.delete();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
}
请注意从 splitMessage() 处理程序方法返回的迭代器对象。
我也有这个,我也将文件复制到另一个文件夹并从文件中读取数据
文件复制应用程序上下文.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:file="http://www.springframework.org/schema/integration/file"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder />
<file:inbound-channel-adapter id="filesIn"
directory="E:/usmandata/logs/input/" filter="onlyPropertyFiles"
auto-startup="true">
<int:poller id="poller" fixed-delay="500" />
</file:inbound-channel-adapter>
<int:service-activator input-channel="filesIn"
output-channel="filesOut" ref="handler" />
<file:outbound-channel-adapter id="filesOut"
directory="E:/usmandata/logs/output/" />
<bean id="handler" class="com.javarticles.spring.integration.file.FileHandler" />
<bean id="onlyPropertyFiles"
class="org.springframework.integration.file.config.FileListFilterFactoryBean"
p:filenamePattern="*.log" />
</beans>
文件处理程序.java
package com.javarticles.spring.integration.file;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class FileHandler {
public File handleFile(File input) throws IOException {
// System.out.println("Copying file: " + input.getAbsolutePath());
RandomAccessFile file = new RandomAccessFile(input,"r");
FileChannel channel = file.getChannel();
//System.out.println("File size is: " + channel.size());
ByteBuffer buffer = ByteBuffer.allocate((int) channel.size());
channel.read(buffer);
buffer.flip();//Restore buffer to position 0 to read it
System.out.println("Reading content and printing ... ");
for (int i = 0; i < channel.size(); i++) {
System.out.print((char) buffer.get());
}
channel.close();
file.close();
return input;
}
}
SpringIntegrationFileCopyExample.java
package com.javarticles.spring.integration.file;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class SpringIntegrationFileCopyExample {
public static void main(String[] args) throws InterruptedException, IOException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"fileCopyApplicationContext.xml");
}
}