并行化 do while 循环取决于我们需要调用多少页?



我必须通过传递标头和正文来发出HTTP POST请求。在正文中,我需要在发布数据之前提供一个pageNumber,所以我最初从"1"开始。之后,我将发布数据,我将得到一个 JSON 响应,如下所示。

{
"response": {
"pageNumber": 1,
"entries": 200,
"numberOfPages": 3
},
"list": [
{
// some stuff here
}
],
"total": 1000
}

现在根据对pageNumber 1的响应,我将决定还需要拨打多少个电话。现在在上面的响应中numberOfPages是 3,所以我总共需要对同一个 URL 进行三次调用。由于我们已经拨打了 1 个电话,我将在正文中再拨打 2pageNumber"2"和"3"的电话。

下面是我的工作代码。我只需要通过更改正文来调用相同的 URL 直到numberOfPages次。对于每个调用,它应该使用相应的pageNumber进行,所以如果numberOfPages是 3,那么我将总共进行 3 次调用。从每个页面收集数据后,我正在填充两张地图。

public class AppParser {
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final String lastParentIdJsonPath = "......";    
private final Map<String, String> processToTaskIdHolder = new HashMap<>();
private final Multimap<String, Category> itemsByCategory = LinkedListMultimap.create();
private final int entries;
private final String siteId;
public AppParser(int entries, String id) {
this.entries = entries;
this.id = id;
collect();
}
// this is only called from above constructor
private void collect() {
String endpoint = "url_endpoint";
int number = 1;
int expectedNumber;
do {
HttpEntity<String> requestEntity = new HttpEntity<String>(getBody(number), getHeader());
ResponseEntity<String> responseEntity =
HttpClient.getInstance().getClient()
.exchange(URI.create(endpoint), HttpMethod.POST, requestEntity, String.class);
String jsonInput = responseEntity.getBody();
Stuff response = objectMapper.readValue(jsonInput, Stuff.class);
expectedNumber = (int) response.getPaginationResponse().getNumberOfPages();
if (expectedNumber <= 0) {
break;
}
List<Postings> postings = response.getPostings();
for (Postings posting : postings) {
if (posting.getClientIds().isEmpty()) {
continue;
}
List<String> lastParent = JsonPath.read(jsonInput, lastParentIdJsonPath);
String clientId = posting.getClientIds().get(0).getId();
Category category = getCategory(posting);
// populate two maps now
itemsByCategory.put(clientId, category);
processToTaskIdHolder.put(clientId, lastParent.get(0));
}
number++;
} while (number <= expectedNumber);
}
private String getBody(final int number) {
Input input = new Input(entries, number, 0);
Body body = new Body("Stuff", input);
return gson.toJson(body);
}
// getters for those two above maps
}

现在我上面的代码是按顺序逐页收集数据,因此如果我有高numberOfPages那么收集所有这些页码的所有数据将需要一些时间。假设numberOfPages是 500,那么我的代码将为每个页号按顺序逐个运行。有没有办法并行化我的上述代码,以便我们可以同时收集 5 页的数据?这可能做到吗?我想那么我需要确保我的代码是线程安全的。

注意:HttpClient是线程安全的单例类。

我尝试使用多线程修改您的代码,但这并不容易,因为您没有为所有导入提供完整的类源代码。此外,您的代码也不像可能的那样干净。 您的任务是异步请求的常见情况。我把你的收集代码包装成java.util.concurrent.Callable.它通过执行器服务异步为我提供使用任务,并在需要时将结果作为对象获取ParseResult。 在下面的代码中,我发出了 1 个请求来填充expectedNumber变量,然后 in 循环应用程序创建任务并将它们提交到运行任务的专用线程池executorService中。 法典:

private static final ObjectMapper objectMapper = new ObjectMapper();
private static final String URL_ENDPOINT = "url_endpoint";
private final Map<String, String> processToTaskIdHolder = new HashMap<>();
private final Multimap<String, Category> itemsByCategory = LinkedListMultimap.create();
private static final String lastParentIdJsonPath = "......";
class ParseResult {
private String clientId;
private Category category;
private String lastParent;
private int expectedNumber;
}
class ParseTask implements Callable<ParseResult> {
private int pageNumber;
public ParseTask(int pageNumber) {
this.pageNumber = pageNumber;
}
@Override
public ParseResult call() throws Exception {
HttpEntity<String> requestEntity = new HttpEntity<String>(getBody(pageNumber), getHeader());
ResponseEntity<String> responseEntity =
HttpClient.getInstance().getClient()
.exchange(URI.create(URL_ENDPOINT), HttpMethod.POST, requestEntity, String.class);
String jsonInput = responseEntity.getBody();
Stuff response = objectMapper.readValue(jsonInput, Stuff.class);
int expectedNumber = (int) response.getPaginationResponse().getNumberOfPages();
if (expectedNumber <= 0) {
return null; // or throw exception
}
List<Postings> postings = response.getPostings();
for (Postings posting : postings) {
if (posting.getClientIds().isEmpty()) {
continue;
}
List<String> lastParent = JsonPath.read(jsonInput, lastParentIdJsonPath);
String clientId = posting.getClientIds().get(0).getId();
Category category = getCategory(posting);
//collecting the result
ParseResult parseResult = new ParseResult();
parseResult.clientId = clientId;
parseResult.category = category;
parseResult.expectedNumber = expectedNumber;
parseResult.lastParent = lastParent.get(0);
writeResult(parseResult); // writing the result
return parseResult;
}
}
}
public AppParser(int entries, String id) {
// .....
collect();
}
// this is only called from above constructor
private void collect() {
int number = 1;
int expectedNumber = 0;
ParseTask parseTask = new ParseTask(number);
try {
ParseResult firstResult = parseTask.call();
expectedNumber = firstResult.expectedNumber; // fill the pages amount
} catch (Exception e) {
e.printStackTrace();
}
ExecutorService executorService = Executors.newCachedThreadPool();
while (number <= expectedNumber) {
executorService.submit(new ParseTask(number));
}
}
private String getBody(final int number) {
Input input = new Input(entries, number, 0);
Body body = new Body("Stuff", input);
return gson.toJson(body);
}
private void writeResult(ParseResult result) {
// populate two maps now
itemsByCategory.put(result.clientId, result.category);
processToTaskIdHolder.put(result.clientId, result.lastParent);
}

我们可以花很多时间来升级您的代码,但这是一个具有多线程的原始版本。我不确定它会起作用,因为正如我之前所说,您没有提供完整版本。也许它需要一些语法修复。

Java 8 解决方案(按顺序执行):

并行流可以在这里成为您的朋友:

IntStream.range(1,numberOfPages)
.parallel()
.forEachOrdered(page -> {
// ...
postings.parallelStream()
.forEachOrdered(posting -> {
// ...
});                    
});

lambda 中使用的任何变量都需要声明为 final。

如果输出顺序不重要,则 foreachOrdered 可以替换为 foreach。

有关控制并发运行的线程数,请参阅本主题: 在 Java 8 中,parallelStream 中生成了多少个线程?

Java 7解决方案(非有序执行):

灵感来源:等到所有线程在java中完成工作

ExecutorService es = Executors.newFixedThreadPool(4);
for(int page=1 ; page < numberOfPages ; ++page) {
es.execute(new Runnable() {
@Override
public void run() {
/*  your task */  
}});
}
es.shutdown();
boolean finished = false;
try {
finished = es.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {}

这 4 可以随时交换您想要的最大线程数。并且等待终止需要一些适当的超时。

最新更新