进程无法访问该文件,因为在移动文件时,另一个进程正在使用该文件



我想创建一个Quartz作业,该作业读取.csv文件并在处理文件时移动它们。我试过这个:

@Override
public void execute(JobExecutionContext context) {
File directoryPath = new File("C:\csv\nov");
// Create a new subfolder called "processed" into source directory
try {
Files.createDirectory(Path.of(directoryPath.getAbsolutePath() + "/processed"));
} catch (IOException e) {
throw new RuntimeException(e);
}
FilenameFilter textFileFilter = (dir, name) -> {
String lowercaseName = name.toLowerCase();
if (lowercaseName.endsWith(".csv")) {
return true;
} else {
return false;
}
};
// List of all the csv files
File filesList[] = directoryPath.listFiles(textFileFilter);
System.out.println("List of the text files in the specified directory:");
Optional<File> csvFile = Arrays.stream(filesList).findFirst();
File file = csvFile.get();

for(File file : filesList) {
try {
List<CsvLine> beans = new CsvToBeanBuilder(new FileReader(file.getAbsolutePath(), StandardCharsets.UTF_16))
.....
.build()
.parse();
for(CsvLine item: beans){
....... sql queries
Optional<ProcessedWords> isFound = processedWordsService.findByKeyword(item.getKeyword());
......................................
}
} catch (Exception e){
e.printStackTrace();
}
// Move here file into new subdirectory when file processing is finished
Path copied = Paths.get(file.getAbsolutePath() + "/processed");
Path originalPath = file.toPath();
try {
Files.move(originalPath, copied, StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

文件夹processed是在作业启动时创建的,但我收到异常:

2022-11-17 23:12:51.470 ERROR 16512 --- [cessor_Worker-4] org.quartz.core.JobRunShell              : Job DEFAULT.keywordPostJobDetail threw an unhandled Exception: 
java.lang.RuntimeException: java.nio.file.FileSystemException: C:csvnov11_42_33.csv -> C:csvnovprocessed11_42_33.csv: The process cannot access the file because it is being used by another process
at com.wordscore.engine.processor.ImportCsvFilePostJob.execute(ImportCsvFilePostJob.java:127) ~[main/:na]
at org.quartz.core.JobRunShell.run(JobRunShell.java:202) ~[quartz-2.3.2.jar:na]
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573) ~[quartz-2.3.2.jar:na]
Caused by: java.nio.file.FileSystemException: C:csvnov11_42_33.csv -> C:csvnovprocessed11_42_33.csv: The process cannot access the file because it is being used by another process
at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92) ~[na:na]
at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) ~[na:na]
at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:403) ~[na:na]
at java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293) ~[na:na]
at java.base/java.nio.file.Files.move(Files.java:1432) ~[na:na]
at com.wordscore.engine.processor.ImportCsvFilePostJob.execute(ImportCsvFilePostJob.java:125) ~[main/:na]
... 2 common frames omitted

您知道如何释放文件并将其移动到子目录中吗?

编辑:使用try-catch更新代码

@Override
public void execute(JobExecutionContext context) {
File directoryPath = new File("C:\csv\nov");
// Create a new subfolder called "processed" into source directory
try {
Path path = Path.of(directoryPath.getAbsolutePath() + "/processed");
if (!Files.exists(path) || !Files.isDirectory(path)) {
Files.createDirectory(path);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
FilenameFilter textFileFilter = (dir, name) -> {
String lowercaseName = name.toLowerCase();
if (lowercaseName.endsWith(".csv")) {
return true;
} else {
return false;
}
};
// List of all the csv files
File filesList[] = directoryPath.listFiles(textFileFilter);
System.out.println("List of the text files in the specified directory:");

Optional<File> csvFile = Arrays.stream(filesList).findFirst();
File file = csvFile.get();

for(File file : filesList) {
try {
try (var br = new FileReader(file.getAbsolutePath(), StandardCharsets.UTF_16)){
List<CsvLine> beans = new CsvToBeanBuilder(br)
......
.build()
.parse();
for (CsvLine item : beans) {
.....
if (isFound.isPresent()) {
.........
}}
} catch (Exception e){
e.printStackTrace();
}
// Move here file into new subdirectory when file processing is finished
Path copied = Paths.get(file.getAbsolutePath() + "/processed");
Path originalPath = file.toPath();
try {
Files.move(originalPath, copied, StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
throw new RuntimeException(e);
}

}

石英配置:

@Configuration
public class SchedulerConfig {
private static final Logger LOG = LoggerFactory.getLogger(SchedulerConfig.class);
private ApplicationContext applicationContext;
@Autowired
public SchedulerConfig(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Bean
public JobFactory jobFactory() {
AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
return jobFactory;
}
@Bean
public SchedulerFactoryBean schedulerFactoryBean(Trigger simpleJobTrigger) throws IOException {
SchedulerFactoryBean schedulerFactory = new SchedulerFactoryBean();
schedulerFactory.setQuartzProperties(quartzProperties());
schedulerFactory.setWaitForJobsToCompleteOnShutdown(true);
schedulerFactory.setAutoStartup(true);
schedulerFactory.setTriggers(simpleJobTrigger);
schedulerFactory.setJobFactory(jobFactory());
return schedulerFactory;
}
@Bean
public SimpleTriggerFactoryBean simpleJobTrigger(@Qualifier("keywordPostJobDetail") JobDetail jobDetail,
@Value("${simplejob.frequency}") long frequency) {
LOG.info("simpleJobTrigger");
SimpleTriggerFactoryBean factoryBean = new SimpleTriggerFactoryBean();
factoryBean.setJobDetail(jobDetail);
factoryBean.setStartDelay(1000);
factoryBean.setRepeatInterval(frequency);
factoryBean.setRepeatCount(4); //         factoryBean.setRepeatCount(SimpleTrigger.REPEAT_INDEFINITELY);
return factoryBean;
}
@Bean
public JobDetailFactoryBean keywordPostJobDetail() {
JobDetailFactoryBean factoryBean = new JobDetailFactoryBean();
factoryBean.setJobClass(ImportCsvFilePostJob.class);
factoryBean.setDurability(true);
return factoryBean;
}
public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}
}

石英配置:

org.quartz.scheduler.instanceName=wordscore-processor
org.quartz.scheduler.instanceId=AUTO
org.quartz.threadPool.threadCount=5
org.quartz.jobStore.class=org.quartz.simpl.RAMJobStore

如您所见,我希望有 5 个线程来执行 5 个并行作业。您知道如何在没有此异常的情况下处理文件吗?

>
new FileReader(file.getAbsolutePath(), StandardCharsets.UTF_16)

此部分创建一个资源。资源是一个对象,它表示一个底层的重物 - 一个你可以很少拥有的东西。在本例中,它表示基础 OS 文件句柄。

您必须始终安全地关闭它们。实际上只有两种方法可以正确做到这一点:

  • 使用试用资源
  • 将其保存到字段中,并让自己AutoClosable以便使用此类实例的代码可以使用 try-with-resources
try (var br = new FileReader(file, StandardCharsets.UTF_16)) {
List<CsvLine> beans = new CsvToBeanBuilder(br)
.....
.build()
.parse();
}

是答案。

尽管我完全同意@rzwitserloot的答案和评论,但在错误堆栈跟踪中请注意以下内容:

java.nio.file.FileSystemException: C:csvnov7_06_26.csv -> C:csvnov7_06_26.csvprocessed: The process cannot access the file because it is being used by another process

您正在尝试将文件移动到备份目录,但请注意,在示例中,您正在将文件移动到错误的路径,C:csvnov7_06_26.csvprocessed

请尝试以下操作:

@Override
public void execute(JobExecutionContext context) {
File directoryPath = new File("C:\csv\nov");
// Create a new subfolder called "processed" into source directory
// Hold a reference to the processed files directory path, we will
// use it later
Path processedDirectoryPath;
try {
processedDirectoryPath = Path.of(directoryPath.getAbsolutePath() + "/processed");
if (!Files.exists(processedDirectoryPath) || !Files.isDirectory(processedDirectoryPath)) {
Files.createDirectory(processedDirectoryPath);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
FilenameFilter textFileFilter = (dir, name) -> {
String lowercaseName = name.toLowerCase();
if (lowercaseName.endsWith(".csv")) {
return true;
} else {
return false;
}
};
// List of all the csv files
File filesList[] = directoryPath.listFiles(textFileFilter);
System.out.println("List of the text files in the specified directory:");
for(File file : filesList) {
try {
try (var br = new FileReader(file.getAbsolutePath(), StandardCharsets.UTF_16)){
List<CsvLine> beans = new CsvToBeanBuilder(br)
......
.build()
.parse();
for (CsvLine item : beans) {
.....
if (isFound.isPresent()) {
.........
}}
} catch (Exception e){
e.printStackTrace();
}
// Move here file into new subdirectory when file processing is finished
// In my opinion, here is the error:
// Path copied = Paths.get(file.getAbsolutePath() + "/processed");
Path originalPath = file.toPath();
try {
// Note the use of the path we defined before
Files.move(originalPath, processedDirectoryPath.resolve(originalPath.getFileName()),
StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

如果您需要增加处理文件的吞吐量,您可以尝试将它们分批拆分,例如,对于它们名称中的某些模式,例如月份名称或作业编号。简单的解决方案可能是使用每个作业提供的JobExecutionContext来包含一些拆分条件。该条件将用于您的FilenameFilter导致每个作业仅处理需要处理的文件总量中的特定部分。我认为该解决方案比任何类型的锁定或类似机制都更可取。

例如,请考虑以下事项:

@Override
public void execute(JobExecutionContext context) {
File directoryPath = new File("C:\csv\nov");
// Create a new subfolder called "processed" into source directory
// Hold a reference to the processed files directory path, we will
// use it later
Path processedDirectoryPath;
try {
processedDirectoryPath = Path.of(directoryPath.getAbsolutePath() + "/processed");
if (!Files.exists(processedDirectoryPath) || !Files.isDirectory(processedDirectoryPath)) {
Files.createDirectory(processedDirectoryPath);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
// We obtain the file processing criteria using a job parameter
JobDataMap data = context.getJobDetail().getJobDataMap();
String filenameProcessingCriteria = data.getString("FILENAME_PROCESSING_CRITERIA");
// Use the provided criteria to restrict the files that this job
// will process 
FilenameFilter textFileFilter = (dir, name) -> {
String lowercaseName = name.toLowerCase();
if (lowercaseName.endsWith(".csv") && lowercaseName.indexOf(filenameProcessingCriteria) > 0) {
return true;
} else {
return false;
}
};
// List of all the csv files
File filesList[] = directoryPath.listFiles(textFileFilter);
System.out.println("List of the text files in the specified directory:");
for(File file : filesList) {
try {
try (var br = new FileReader(file.getAbsolutePath(), StandardCharsets.UTF_16)){
List<CsvLine> beans = new CsvToBeanBuilder(br)
......
.build()
.parse();
for (CsvLine item : beans) {
.....
if (isFound.isPresent()) {
.........
}}
} catch (Exception e){
e.printStackTrace();
}
// Move here file into new subdirectory when file processing is finished
// In my opinion, here is the error:
// Path copied = Paths.get(file.getAbsolutePath() + "/processed");
Path originalPath = file.toPath();
try {
// Note the use of the path we defined before
Files.move(originalPath, processedDirectoryPath.resolve(originalPath.getFileName()),
StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

您需要将所需的参数传递给作业:

JobDetail job1 = ...;
job1.getJobDataMap().put("FILENAME_PROCESSING_CRITERIA", "job1pattern");

基于相同的想法,更简单的方法可能是拆分不同文件夹中的文件,并将需要处理的文件夹名称作为作业参数传递:

@Override
public void execute(JobExecutionContext context) {
// We obtain the directory path as a job parameter
JobDataMap data = context.getJobDetail().getJobDataMap();
String directoryPathName = data.getString("DIRECTORY_PATH_NAME");
File directoryPath = new File(directoryPathName);
// Create a new subfolder called "processed" into source directory
// Hold a reference to the processed files directory path, we will
// use it later
Path processedDirectoryPath;
try {
processedDirectoryPath = Path.of(directoryPath.getAbsolutePath() + "/processed");
if (!Files.exists(processedDirectoryPath) || !Files.isDirectory(processedDirectoryPath)) {
Files.createDirectory(processedDirectoryPath);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
FilenameFilter textFileFilter = (dir, name) -> {
String lowercaseName = name.toLowerCase();
if (lowercaseName.endsWith(".csv")) {
return true;
} else {
return false;
}
};
// List of all the csv files
File filesList[] = directoryPath.listFiles(textFileFilter);
System.out.println("List of the text files in the specified directory:");
for(File file : filesList) {
try {
try (var br = new FileReader(file.getAbsolutePath(), StandardCharsets.UTF_16)){
List<CsvLine> beans = new CsvToBeanBuilder(br)
......
.build()
.parse();
for (CsvLine item : beans) {
.....
if (isFound.isPresent()) {
.........
}}
} catch (Exception e){
e.printStackTrace();
}
// Move here file into new subdirectory when file processing is finished
// In my opinion, here is the error:
// Path copied = Paths.get(file.getAbsolutePath() + "/processed");
Path originalPath = file.toPath();
try {
// Note the use of the path we defined before
Files.move(originalPath, processedDirectoryPath.resolve(originalPath.getFileName()),
StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

并将不同的文件夹传递给每个不同的作业:

JobDetail job1 = ...;
job1.getJobDataMap().put("DIRECTORY_PATH_NAME", "C:\csv\nov");

请考虑重构您的代码并定义文件处理、文件备份等方法,这将使您的代码易于理解和处理。

假设我们有File file = new File("c:/test.txt"),并打印以下路径:

Path copied = Paths.get(file.getAbsolutePath() + "/processed");
Path originalPath = file.toPath();

我们将得到结果:

copied: C:test.txtprocessed
originalPath: C:test.txt

所以这是不正确的。您应该尝试获取父路径加上已处理的文件夹以及文件名。

Path copied = Paths.get(file.getParentFile().getAbsolutePath() + "/processed/" + file.getName());
Path originalPath = file.toPath();

错误消息中的行

由以下原因引起:java.lang.RuntimeException: java.nio.file.FileSystemException: C:\csvov\07_06_26.csv ->C:\csvov\07_06_26.csv\已处理: 进程无法访问文件 因为它正被另一个进程使用

我认为您想将文件从C:csvnov移动到C:csvnovprocessed,所以 您必须更改以下行:

Path copied = Paths.get(file.getAbsolutePath() + "/processed");

Path copied = Paths.get(file.getParent() + "/processed");

由于file.getAbsolutePath()返回完整路径,因此请包含文件名。

我很确定该文件被您创建但从未在以下行关闭的文件读取器锁定:

List<CsvLine> beans = new CsvToBeanBuilder(new FileReader(file.getAbsolutePath(), StandardCharsets.UTF_16))

重构您的代码,以便您尝试该读取器最终阻止或显式关闭它。

您可能会看到的不直观行为是这些文件是在看似随机的时间发布的。这是因为当垃圾回收器释放这些读取器时,它们将释放文件。而是显式清理它们。

最新更新