如何在上传所有文件时获得回拨



我需要从远程位置读取所有文件,并将其发送到另一个服务,如果成功发送,则删除所有文件。我的代码对一个文件运行良好,但如果我想在循环中读取所有文件,那么代码就不会被执行。

请查找以下代码。在RemoteFileReadImpl类中,我试图在循环中读取不起作用的文件。在WebClientUtil类中,我正在将文件发送到另一个服务。返回成功响应后,我想重命名已读取的文件。

package com.remotefileread.serviceImpl;
import java.io.IOException;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
public class WebClientUtil{
WebClient webClient = WebClient.create("http://localhost:9091");
public Mono<HttpStatus> ftpFileSend(MultipartFile fileData) {
MultiValueMap<String,Object> body=new LinkedMultiValueMap<String,Object>();
try {
body.add("file", fileData.getBytes());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

return webClient
.post()
.uri("/storeFileData")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(body))
.exchange()
.map(response -> {

return response.statusCode();
});
}

}

package com.remotefileread.serviceImpl;

import java.io.File;
import java.io.FileInputStream;
import java.nio.file.Files;
import java.util.Base64;

import org.apache.commons.io.IOUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.mock.web.MockMultipartFile;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.reactive.function.server.ServerResponse;

import com.remotefileread.model.SendFileData;
import com.remotefileread.service.RemoteFileRead;

import reactor.core.publisher.Mono;

@Service
public class RemoteFileReadImpl implements RemoteFileRead{

@Autowired
WebClientUtil webClientUtil;

public Mono<ServerResponse> ftpFileRead() { 
File directoryPath = new File("\\localhost\SharedFolder\csv_container");
File files[] = directoryPath.listFiles();
try {
for(File csvFile : files) {
SendFileData fileData=new SendFileData();   
byte[] content = Files.readAllBytes(csvFile.toPath());

fileData.setFilename(csvFile.getName());
fileData.setFileContent(Base64.getEncoder().encodeToString(content));
fileData.setCustomerName("Cust");
FileInputStream input = new FileInputStream(csvFile);
MultipartFile multipartFile = new MockMultipartFile("file",
csvFile.getName(),"text/plain",IOUtils.toByteArray(input));
input.close();

Mono<HttpStatus> monoStatus = webClientUtil.ftpFileSend(multipartFile);

monoStatus
.doOnSuccess( httpStatus ->
{
System.out.println("Http Status:" + httpStatus);

})
.doOnError(error -> 
{
System.out.println("Http Status:" + error);

});
}
return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN).bodyValue("OK");
}
catch(Exception ex) {
return ServerResponse.badRequest().contentType(MediaType.TEXT_PLAIN).bodyValue("Error Message: " + ex.getMessage());
}
}
}

关于如何让问题变得更好的一些提示:

  • 您提到了当每个文件成功时,您希望发生什么,但如果一个文件失败,则不希望发生。它应该继续到下一个文件,还是停止并返回服务器错误
  • 解释它目前的表现,以及它与你的期望/愿望有何不同
  • 突出显示代码中不起作用的地方

如果您使用的是反应式,那么您通常不会像现在这样循环。您可能希望创建一个文件流进行处理,作为Flux:

Flux.fromArray(directoryPath.listFiles())

接下来,您将操作添加到该流的末尾,慢慢构建一个";计划;流必须如何处理才能产生特定的结果。上面的Flux准备在流下一个接一个地生成文件给订阅者。请注意,在您的代码中,没有任何内容订阅Mono,因此不会启动任何内容。

CCD_ 4也返回一个新的流。CCD_ 5函数允许将流中的元素映射到其他流。在您的情况下,web请求只是返回一个Mono

因此,请记住,将创建MultipartFile的代码重构为另一个方法createMultipart,并使用平面图通过util类发出web请求,我们将得到以下流:

Flux<HttpStatus> statusStream = 
Flux.fromArray(directoryPath.listFiles())
.map(this::createMultipart)
.flatMap(webClientUtil::ftpFileSend)

现在我们来谈谈WebClient的用法。

请注意,使用exchange()(现已弃用(意味着您需要确保使用响应数据,否则可能会导致内存泄漏。因此exchangeToMono()retrieve()通常更好

如果使用retrieve(),不成功的响应将自动引发异常,这将导致流中出现"错误"信号,从而停止处理任何其他文件。

所以总的来说,你可以有这样的实现:

public Mono<ResponseEntity<Void>> ftpFileSend(MultipartFile fileData) {
MultiValueMap<String,Object> body=new LinkedMultiValueMap<String,Object>();
try {
body.add("file", fileData.getBytes());
} catch (IOException e) {
return Mono.error(e);   // <-- note how to create an error signal
}

return webClient
.post()
.uri("/storeFileData")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(body))
.retrieve()
.toBodilessEntity()
}
...
public Mono<ServerResponse> ftpFileRead() { 

return Flux.fromArray(directoryPath.listFiles())
.flatMap(this::sendAndRename)
.onErrorContinue((ex, file) -> log("failed to process: " + file)) // <-- skips the erroring item and continues
.then(Mono.just(
ServerResponse.ok()
.contentType(MediaType.TEXT_PLAIN)
.bodyValue("OK"))
.onErrorResume(ex -> Mono.just(
ServerResponse.badRequest()
.contentType(MediaType.TEXT_PLAIN)
.bodyValue("Error Message:" + ex.getMessage())); // <-- not really a need if we just skip.
}
public Mono<Void> sendAndRename(final File file) {
MultipartFile multipart = createMultipart(file);
return webClientUtil.ftpFileSend(multipart)
.then(() -> renameDoneFile(file));
}

所有文件都发送到这里。如果在发送或重命名时发生错误,则会记录该错误,跳过该文件,并继续处理下一个文件。

最新更新