如何根据标头值"分组依据"消息



我正在尝试根据遵循此标准的文件扩展名创建文件zip:文件名。NUMBER},我正在做的是读取一个文件夹,按.{number},然后在末尾使用该 .num 创建一个唯一的文件.zip,例如:

文件夹/

文件.01

文件2.01

文件.02

文件2.02

文件夹 ->/已处理

file.01.zip包含 -> file.01, file2.01

file02.zip包含 -> 文件.02、文件2.02

我所做的是使用 outboundGateway,拆分文件,丰富读取文件扩展名的标头,然后聚合读取该标头,但似乎无法正常工作。

public IntegrationFlow integrationFlow() {
return flow
.handle(Ftp.outboundGateway(FTPServers.PC_LOCAL.getFactory(), AbstractRemoteFileOutboundGateway.Command.MGET, "payload")
.fileExistsMode(FileExistsMode.REPLACE)
.filterFunction(ftpFile -> {
int extensionIndex = ftpFile.getName().indexOf(".");
return extensionIndex != -1 && ftpFile.getName().substring(extensionIndex).matches("\.([0-9]*)");
})
.localDirectory(new File("/tmp")))
.split() //receiving an iterator, creates a message for each file
.enrichHeaders(headerEnricherSpec -> headerEnricherSpec.headerExpression("warehouseId", "payload.getName().substring(payload.getName().indexOf('.') +1)"))
.aggregate(aggregatorSpec -> aggregatorSpec.correlationExpression("headers['warehouseId']"))
.transform(new ZipTransformer())
.log(message -> {
log.info(message.getHeaders().toString());
return message;
});
}

它给了我一条包含所有文件的消息,我应该期待 2 条消息。

由于这个 dsl 的性质,我有一个动态数量的文件,所以我无法计算以相同数字结尾的消息(文件),而且我认为超时可能不是一个很好的发布策略,我只是自己编写代码而不写入磁盘:


.<List<File>, List<Message<ByteArrayOutputStream>>>transform(files -> {
HashMap<String, ZipOutputStream> zipOutputStreamHashMap = new HashMap<>();
HashMap<String, ByteArrayOutputStream> zipByteArrayMap = new HashMap<>();
ArrayList<Message<ByteArrayOutputStream>> messageList = new ArrayList<>();
files.forEach(file -> {
String warehouseId = file.getName().substring(file.getName().indexOf('.') + 1);
ZipOutputStream warehouseStream = zipOutputStreamHashMap.computeIfAbsent(warehouseId, s -> new ZipOutputStream(zipByteArrayMap.computeIfAbsent(s, s1 -> new ByteArrayOutputStream())));
try {
warehouseStream.putNextEntry(new ZipEntry(file.getName()));
FileInputStream inputStream = new FileInputStream(file);
byte[] bytes = new byte[4096];
int length;
while ((length = inputStream.read(bytes)) >= 0) {
warehouseStream.write(bytes, 0, length);
}
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
});
zipOutputStreamHashMap.forEach((s, zipOutputStream) -> {
try {
zipOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
});
zipByteArrayMap.forEach((key, byteArrayOutputStream) -> {
messageList.add(MessageBuilder.withPayload(byteArrayOutputStream).setHeader("warehouseId", key).build());
});
return messageList;
})
.split()
.transform(ByteArrayOutputStream::toByteArray)
.handle(Ftp.outboundAdapter(FTPServers.PC_LOCAL.getFactory(), FileExistsMode.REPLACE)
......

相关内容

  • 没有找到相关文章

最新更新