我需要从远程位置读取所有文件,并将其发送到另一个服务,如果成功发送,则删除所有文件。我的代码对一个文件运行良好,但如果我想在循环中读取所有文件,那么代码就不会被执行。
请查找以下代码。在RemoteFileReadImpl类中,我试图在循环中读取不起作用的文件。在WebClientUtil类中,我正在将文件发送到另一个服务。返回成功响应后,我想重命名已读取的文件。
package com.remotefileread.serviceImpl;
import java.io.IOException;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
public class WebClientUtil{
WebClient webClient = WebClient.create("http://localhost:9091");
public Mono<HttpStatus> ftpFileSend(MultipartFile fileData) {
MultiValueMap<String,Object> body=new LinkedMultiValueMap<String,Object>();
try {
body.add("file", fileData.getBytes());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return webClient
.post()
.uri("/storeFileData")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(body))
.exchange()
.map(response -> {
return response.statusCode();
});
}
}
package com.remotefileread.serviceImpl;
import java.io.File;
import java.io.FileInputStream;
import java.nio.file.Files;
import java.util.Base64;
import org.apache.commons.io.IOUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.mock.web.MockMultipartFile;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.reactive.function.server.ServerResponse;
import com.remotefileread.model.SendFileData;
import com.remotefileread.service.RemoteFileRead;
import reactor.core.publisher.Mono;
@Service
public class RemoteFileReadImpl implements RemoteFileRead{
@Autowired
WebClientUtil webClientUtil;
public Mono<ServerResponse> ftpFileRead() {
File directoryPath = new File("\\localhost\SharedFolder\csv_container");
File files[] = directoryPath.listFiles();
try {
for(File csvFile : files) {
SendFileData fileData=new SendFileData();
byte[] content = Files.readAllBytes(csvFile.toPath());
fileData.setFilename(csvFile.getName());
fileData.setFileContent(Base64.getEncoder().encodeToString(content));
fileData.setCustomerName("Cust");
FileInputStream input = new FileInputStream(csvFile);
MultipartFile multipartFile = new MockMultipartFile("file",
csvFile.getName(),"text/plain",IOUtils.toByteArray(input));
input.close();
Mono<HttpStatus> monoStatus = webClientUtil.ftpFileSend(multipartFile);
monoStatus
.doOnSuccess( httpStatus ->
{
System.out.println("Http Status:" + httpStatus);
})
.doOnError(error ->
{
System.out.println("Http Status:" + error);
});
}
return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN).bodyValue("OK");
}
catch(Exception ex) {
return ServerResponse.badRequest().contentType(MediaType.TEXT_PLAIN).bodyValue("Error Message: " + ex.getMessage());
}
}
}
关于如何让问题变得更好的一些提示:
- 您提到了当每个文件成功时,您希望发生什么,但如果一个文件失败,则不希望发生。它应该继续到下一个文件,还是停止并返回服务器错误
- 解释它目前的表现,以及它与你的期望/愿望有何不同
- 突出显示代码中不起作用的地方
如果您使用的是反应式,那么您通常不会像现在这样循环。您可能希望创建一个文件流进行处理,作为Flux
:
Flux.fromArray(directoryPath.listFiles())
接下来,您将操作添加到该流的末尾,慢慢构建一个";计划;流必须如何处理才能产生特定的结果。上面的Flux
准备在流下一个接一个地生成文件给订阅者。请注意,在您的代码中,没有任何内容订阅Mono
,因此不会启动任何内容。
CCD_ 4也返回一个新的流。CCD_ 5函数允许将流中的元素映射到其他流。在您的情况下,web请求只是返回一个Mono
。
因此,请记住,将创建MultipartFile
的代码重构为另一个方法createMultipart
,并使用平面图通过util类发出web请求,我们将得到以下流:
Flux<HttpStatus> statusStream =
Flux.fromArray(directoryPath.listFiles())
.map(this::createMultipart)
.flatMap(webClientUtil::ftpFileSend)
现在我们来谈谈WebClient
的用法。
请注意,使用exchange()
(现已弃用(意味着您需要确保使用响应数据,否则可能会导致内存泄漏。因此exchangeToMono()
或retrieve()
通常更好
如果使用retrieve()
,不成功的响应将自动引发异常,这将导致流中出现"错误"信号,从而停止处理任何其他文件。
所以总的来说,你可以有这样的实现:
public Mono<ResponseEntity<Void>> ftpFileSend(MultipartFile fileData) {
MultiValueMap<String,Object> body=new LinkedMultiValueMap<String,Object>();
try {
body.add("file", fileData.getBytes());
} catch (IOException e) {
return Mono.error(e); // <-- note how to create an error signal
}
return webClient
.post()
.uri("/storeFileData")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(body))
.retrieve()
.toBodilessEntity()
}
...
public Mono<ServerResponse> ftpFileRead() {
return Flux.fromArray(directoryPath.listFiles())
.flatMap(this::sendAndRename)
.onErrorContinue((ex, file) -> log("failed to process: " + file)) // <-- skips the erroring item and continues
.then(Mono.just(
ServerResponse.ok()
.contentType(MediaType.TEXT_PLAIN)
.bodyValue("OK"))
.onErrorResume(ex -> Mono.just(
ServerResponse.badRequest()
.contentType(MediaType.TEXT_PLAIN)
.bodyValue("Error Message:" + ex.getMessage())); // <-- not really a need if we just skip.
}
public Mono<Void> sendAndRename(final File file) {
MultipartFile multipart = createMultipart(file);
return webClientUtil.ftpFileSend(multipart)
.then(() -> renameDoneFile(file));
}
所有文件都发送到这里。如果在发送或重命名时发生错误,则会记录该错误,跳过该文件,并继续处理下一个文件。