我有一个我想并行化的循环。在下面的代码中,我迭代最外面的循环,并将条目放入各种数据结构中,并且效果很好。所有这些数据架构在同一类中都有一个getter,我以后使用这些数据结构,一旦在此循环中完成所有详细信息,请从其他类中获取所有详细信息。我正在填充info
,itemToNumberMapping
,catToValueHolder
,tasksByCategory
,catHolder
,itemIds
数据结构,它们也有Getters。
// want to parallelize this for loop
for (Task task : tasks) {
if (task.getCategories().isEmpty() || task.getEventList() == null
|| task.getMetaInfo() == null) {
continue;
}
String itemId = task.getEventList().getId();
String categoryId = task.getCategories().get(0).getId();
Processor fp = new Processor(siteId, itemId, categoryId, poolType);
Map<String, Integer> holder = fp.getDataHolder();
if (!holder.isEmpty()) {
for (Map.Entry<String, Integer> entry : holder.entrySet()) {
info.putIfAbsent(entry.getKey(), entry.getValue());
}
List<Integer> values = new ArrayList<>();
for (String key : holder.keySet()) {
values.add(info.get(key));
}
itemToNumberMapping.put(itemId, StringUtils.join(values, ","));
catToValueHolder.put(categoryId, StringUtils.join(values, ","));
}
Category cat = getCategory(task, holder.isEmpty());
tasksByCategory.add(cat);
LinkedList<String> ids = getCategoryIds(task);
catHolder.put(categoryId, ids.getLast());
itemIds.add(itemId);
}
现在,我知道如何像下面的示例中平行一个循环,但混乱是 - 在我的情况下,我没有一个像 output
这样的对象。就我而言,我有多个通过迭代循环来填充的数据结构,因此我会混淆如何并行使最外面的循环并仍然填充所有这些数据结构?
private final ExecutorService service = Executors.newFixedThreadPool(10);
List<Future<Output>> futures = new ArrayList<Future<Output>>();
for (final Input input : inputs) {
Callable<Output> callable = new Callable<Output>() {
public Output call() throws Exception {
Output output = new Output();
// process your input here and compute the output
return output;
}
};
futures.add(service.submit(callable));
}
service.shutdown();
List<Output> outputs = new ArrayList<Output>();
for (Future<Output> future : futures) {
outputs.add(future.get());
}
更新: -
我正在平行一个循环的循环,该循环内部的循环,我的循环运行时,直到 number
小于或等于 pages
。所以也许我没有正确地做。因为我的操作循环将运行直到所有页面完成,对于每个页面,我都有一个for循环,我试图并行化和设置它的方式,它给出了rejectedexecutionexception
。
private void check() {
String endpoint = "some_url";
int number = 1;
int pages = 0;
do {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 1; i <= retryCount; i++) {
try {
HttpEntity<String> requestEntity =
new HttpEntity<String>(getBody(number), getHeader());
ResponseEntity<String> responseEntity =
HttpClient.getInstance().getClient()
.exchange(URI.create(endpoint), HttpMethod.POST, requestEntity, String.class);
String jsonInput = responseEntity.getBody();
Process response = objectMapper.readValue(jsonInput, Process.class);
pages = (int) response.getPaginationResponse().getTotalPages();
List<Task> tasks = response.getTasks();
if (pages <= 0 || tasks.isEmpty()) {
continue;
}
// want to parallelize this for loop
for (Task task : tasks) {
Callable<Void> c = new Callable<>() {
public void call() {
if (!task.getCategories().isEmpty() && task.getEventList() != null
&& task.getMetaInfo() != null) {
// my code here
}
}
};
executorService.submit(c);
}
// is this at right place? because I am getting rejectedexecutionexception
executorService.shutdown();
number++;
break;
} catch (Exception ex) {
// log exception
}
}
} while (number <= pages);
}
您不必从并行代码中输出一些内容。您只需拿外圈的主体并为每个项目创建一个任务,例如:
for (Task task : tasks) {
Callable<Void> c = new Callable<>() {
public void call() {
if (task.getCategories().isEmpty() || task.getEventList() == null || task.getMetaInfo() == null) {
// ... rest of code here
}
}
};
executorService.submit(c);
}
// wait for executor service, check for exceptions or whatever else you want to do here