在多个线程中调用相同的 API



我想从公开的API获取数据并将数据插入数据库。
API 返回包含两个对象的 JSON:页面对象和活动数组对象。
在 Page 对象中,可以看到有 400 个页面,因此我需要调用 API 400 次,并将每次调用的活动存储到 DB 中。
由于调用 API 400 次非常耗时,我想使用多线程来提高速度,但是,代码行为奇怪,传递的页码看起来没有按顺序进行,在控制台中我几乎立即看到索引达到 numberOfPages 400。

我尝试了以下故障,但我不确定可能是什么问题:

private static Map<Integer, ActivityResult> activities = new ConcurrentHashMap<Integer, ActivityResult>();
public static void fetchActivities(){
    ExecutorService es = Executors.newFixedThreadPool(20);
    String activityResponse = runRequest("/activity/", first_page);
    ActivityResult ar = (ActivityResult) gson.fromJson(activityResponse, ActivityResult.class);
    activities.put(first_page, ar);
    int numberOfPages = ar.getPaging().getPages();
    AtomicInteger index = new AtomicInteger(first_page +1);
    for(index.get(); index.get() < numberOfPages; index.incrementAndGet()){
    es.submit(new Runnable() {
            @Override
            public void run() {
                 System.out.println(index.get());
                 String tmpResponse = runRequest("/activity/", index.get());
                 activities.put(index.get(), gson.fromJson(tmpResponse, ActivityResult.class));          
            }
        });
    }
    es.shutdown();
    System.out.println(activities.size());
}

runRequest 方法使用 okhttp3 调用 API,它应该是线程安全的。

你的问题在这里:

AtomicInteger index = new AtomicInteger(first_page +1);
for(index.get(); index.get() < numberOfPages; index.incrementAndGet()){
    es.submit(new Runnable() {
        @Override
        public void run() {
             System.out.println(index.get());
             String tmpResponse = runRequest("/activity/", index.get());
             activities.put(index.get(), gson.fromJson(tmpResponse, ActivityResult.class));          
        }
    });
}

您没有使用AtomicInteger的原子性。for 循环在父线程上执行速度非常快,并提交所有 400 个任务。20 个将很快开始(但不一定立即开始(,其他 380 个将排队。 index已一路增加到 400。任务已提交,但可能要到将来某个不确定的时间点才能开始。

然后启动任务并使用最新的值 index 。队列中的任何任务(在您的情况下是任务 20-400(可能在 for 循环完成后全部启动,index.get将为所有这些任务返回 400。前 20 个任务可能会在 for 循环的中途开始,并将获得一组随机值。

正确的代码应该看起来像这样(我也Runnable转换为lambda(:

AtomicInteger index = new AtomicInteger(first_page + 1);
for(int i = first_page; i < numberOfPages; ++i){
    es.submit(() -> {
       int page = index.incrementAndGet();
       System.out.println(page);
       String tmpResponse = runRequest("/activity/", page);
       activities.put(page , gson.fromJson(tmpResponse, ActivityResult.class));          
   });
}

最新更新