使用生产者 - 消费者模式处理巨大的CSV文件



我正在尝试处理一个任意的CSV文件,该文件的范围可以从10条记录到1000万条记录。CSV 文件有 4 个固定列(例如 a、b、c、d(和 2 个来自外部 REST API 的附加列(e,f(。

我的目标是从 CSV 读取所有记录,对于每条记录,调用外部 REST API 以引入另外 2 列并将生成的 CSV 输出为合并的 CSV。输出应该是包含列 (a、b、c、d、e、f( 的相同 csv 文件。

我使用 Spring 集成使用 EIP 中的内容丰富器模式实现了此场景,并且能够实现预期的输出,但是当我按顺序读取 CSV 文件时,此解决方案适用于少量记录,但是一旦记录数增加,执行程序的时间也会以O(n(方式增加。

我进一步开始实现生产者 - 消费者设计模式,并尝试以这样一种方式实现代码:从CSV读取的每条记录然后使用put((放入队列中,然后使用BlockingQueue的take((方法从同一个共享队列中读取多个消费者。Main 程序使用Executors.newFixedSizedThreadPool(3(实例化具有 1 个生产者和多个使用者的 ExecutorService,但是我面临着几个挑战:

  1. take(( 方法永远不会退出。我尝试通过添加一个终结者对象来实现毒丸,然后在消费者循环中检查相同的毒丸以突破,但它仍然从未爆发(我在循环中添加了一个系统以查看它是否曾经到达毒丸并且它确实打印了我的语句(,那么为什么它不退出?

  2. CSV文件仅保留从上次执行的消费者线程读取的数据,并覆盖从其他消费者写入的所有内容 - 我正在使用OpenCSV读取/写入CSV数据。

这是我现在的代码。有人可以指导我错误的地方以及此代码中需要改进的地方吗?

主程序

**

BlockingQueue<Account> queue = new ArrayBlockingQueue<>(100);
AccountProducer readingThread = new AccountProducer(inputFileName, queue);
//new Thread(readingThread).start();
ExecutorService producerExecutor = Executors.newFixedThreadPool(1);
producerExecutor.submit(readingThread);
AccountConsumer normalizers = new AccountConsumer(outputFileName, queue, accountService );
ExecutorService consumerExecutor = Executors.newFixedThreadPool(3);
for (int i = 1; i <= 3; i++) {
consumerExecutor.submit(normalizers);
}
producerExecutor.shutdown();
consumerExecutor.shutdown();

帐户制作者

public class AccountProducer implements Runnable {
private String inputFileName;
private BlockingQueue<Account> blockingQueue;
private static final String TERMINATOR = "TERMINATOR";
public AccountProducer (String inputFileName, BlockingQueue<Account> blockingQueue) {
this.inputFileName = inputFileName;
this.blockingQueue = blockingQueue;
}

@Override
public void run() {
try (Reader reader = Files.newBufferedReader(Paths.get(inputFileName));) {
PropertyEditorManager.registerEditor(java.util.Date.class, DateEditor.class);
ColumnPositionMappingStrategy<Account> strategy = new ColumnPositionMappingStrategy<>();
strategy.setType(Account.class);
String[] memberFieldsToBindTo = { "accountId", "accountName", "firstName", "createdOn" };
strategy.setColumnMapping(memberFieldsToBindTo);
CsvToBean<Account> csvToBean = new CsvToBeanBuilder<Account>(reader).withMappingStrategy(strategy)
.withSkipLines(1).withIgnoreLeadingWhiteSpace(true).build();
Iterator<Account> csvAccountIterator = csvToBean.iterator();
while (csvAccountIterator.hasNext()) {
Account account = csvAccountIterator.next();    
// Checking if the Account Id in the csv is blank / null - If so, we skip the
// row for processing and hence avoiding API call..
if (null == account.getAccountId() || account.getAccountId().isEmpty()) {
continue;
} else {
// This call will send the extracted Account Object down the Enricher to get
// additional details from API
blockingQueue.put(account);
}
}
} catch (InterruptedException | IOException ex) {
System.out.println(ex);
} finally {
while (true) {
try {
Account account = new Account();
account.setAccountId(TERMINATOR);
blockingQueue.put(account);
break;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

帐户消费者

public class AccountConsumer implements Runnable {
private String outputFileLocation;
private BlockingQueue<Account> blockingQueue;
private AccountService accountService;
public AccountConsumer(String outputFileLocation, BlockingQueue<Account> blockingQueue, AccountService accountService) {
this.blockingQueue = blockingQueue;
this.outputFileLocation = outputFileLocation;
this.accountService = accountService;
}
@Override
public void run() {
List<Account> accounts = new ArrayList<>();
try {
while (true) {
Account account = blockingQueue.poll();
account = accountService.populateAccount(account);
accounts.add(account);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception ex) {
System.out.println(ex);
}
processOutput(accounts, outputFileLocation);
}
/**
* The method processOutput simply takes the list of Accounts and writes them to
* CSV.
* 
* @param outputFileName
* @param accounts
* @throws Exception
*/
private void processOutput(List<Account> accounts, String outputFileName) {
System.out.println("List Size is : " + accounts.size());
// Using try with Resources block to make sure resources are released properly
try (Writer writer = new FileWriter(outputFileName, true);) {
StatefulBeanToCsv<Account> beanToCsv = new StatefulBeanToCsvBuilder(writer).build();
beanToCsv.write(accounts);
} catch (CsvDataTypeMismatchException | CsvRequiredFieldEmptyException ex) {
System.out.println(ex);
//logger.error("Unable to write the output CSV File : " + ex);
//throw ex;
} catch (IOException e) {
e.printStackTrace();
}
}

}

这是我正在使用的Spring Integration XML:

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context 
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/task 
http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/integration 
http://www.springframework.org/schema/integration/spring-integration.xsd"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:beans="http://www.springframework.org/schema/beans" 
xmlns:task="http://www.springframework.org/schema/task">
<channel id="accountChannel" /> 
<!-- accountOutputChannel is used for putting the Account object on the 
Channel which will then be consumed by accountAPIChannel as Input to the API 
-->
<channel id="accountOutputChannel" />
<!-- accountAPIChannel will take 1 accountId at a time and invoke REST API 
Service to get additional details required to fill the Content Enricher -->
<channel id="accountAPIChannel" />
<!-- accountGateway is the entry point to the utility -->
<gateway id="accountGateway" default-request-timeout="5000"
default-reply-timeout="5000"
service-interface="com.epe.service.AccountService"
default-request-channel="accountChannel">
</gateway>
<!--Content  enricher is used here for enriching an existing message with 
additional data from External API
This is based on EIP Pattern - Content Enricher -->
<enricher id="enricher" input-channel="accountOutputChannel"
request-channel="accountAPIChannel">
<property name="status" expression="payload.status" />
<property name="statusSetOn" expression="payload.createdOn" />
</enricher>

<beans:bean id="accountService"
class="com.epe.service.impl.AccountServiceImpl" />
<!-- Below service-activator is used to actually invoke the external REST 
API which will provide the additional fields for enrichment -->
<service-activator id="fetchAdditionalAccountInfoServiceActivator"
ref="accountInfoService" method="getAdditionalAccountInfoService" 
input-channel="accountAPIChannel"
/>
<!-- accountInfoService is a bean which will be used for fetching 
additional information from REST API Service -->
<beans:bean id="accountInfoService"
class="com.epe.service.impl.AccountInfoServiceImpl" />
</beans:beans>

您在代码中使用poll(),而不是take()

您应该使用带有超时的poll(),例如poll(10, TimeUnit.SECONDS),以便您可以优雅地终止每个线程。

但是,您不需要所有这些逻辑;您可以使用 Spring 集成组件实现所有这些 -ExecutorChannel和附加模式下的文件出站通道适配器。

编辑

我没有时间编写您的整个应用程序,但本质上您需要...

<file:inbound-channel-adapter />
<file:splitter output-channel="execChannel"/>
<int:channel id="execChannel">
<int:dispatcher task-executor="exec" />
</int:channel>
<int:transformer /> <!-- OpenCSV -->
<int:enricher ... />
<int:transformer /> <!-- OpenCSV -->
<int:resequencer /> <!== restore order -->
<file:outbound-channel-adapter />

您可以在参考手册中阅读所有这些组件。

您可能还需要考虑使用Java DSL而不是xml;例如...

@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(File.inboundChannelAdapter(...))
.split(Files.splitter())
.channel(MessageChannels.executor(exec())
.transform(...)
.enrich(...)
.transform(...)
.resequence()
.handle(File.outboundCHannelAdapter(...))
.get();

In AccountProducer

catch (InterruptedException | IOException ex) {
System.out.println(ex);
} 

这不是处理中断异常的正确方法。ExecutorService 使用中断进行强制关闭 (sshutdownDownNow(((,但由于您吃了中断,ExecutorService 将不会响应强制击落。

在帐户中消费者

catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

这确保了线程将重新引发中断异常,该异常可以重新设计为

try {
while (true) {
Account account = blockingQueue.poll();
account = accountService.populateAccount(account);
accounts.add(account);
if(Thread.currentThread().isInterrupted()) {
System.out.println("Thread interrupted and hence exiting...");
break;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception ex) {
System.out.println(ex);
}

编辑ExecutorService 调用 shutdown(( 不会导致立即破坏

使用 awaitTermination(( 方法关闭 ExecutorService 的一种好方法

最新更新