如何在谷歌我的业务异常后将所有java ExecutorService的线程置于睡眠状态:RATE_LIMIT_EXCEEDED



为了读取我在MyBusinessBusinessInformation.locations.GetGoogleUpdated中的整个位置的所有谷歌更新,我使用的是ExecutorService,假设mybusinessUpdateLocations的类型是MyBusinessBusinessInformation,它也负责身份验证,变量列表的类型为java.util.List,其中包含从谷歌请求所需的位置信息。问题是,有时我的列表包括3000多个位置,谷歌会给我返回Exception。我的抽象代码是:

ExecutorService ex =Executors.newFixedThreadPool(20);
for(int i=0;i<list.size();i++) {
ex.execute(new Runnable() {
@Override
public void run() {
try {
String locationName ="The location name";
String readMask="storeCode,regularHours,name,languageCode,title,phoneNumbers,categories,storefrontAddress,websiteUri,regularHours,specialHours,serviceArea,labels,adWordsLocationExtensions,latlng,openInfo,metadata,profile,relationshipData,moreHours";

MyBusinessBusinessInformation.Locations.GetGoogleUpdated 
updateList=mybusinessUpdateLocations.locations()
.getGoogleUpdated(locationName).setReadMask(readMask);
GoogleUpdatedLocation response = updateList.execute();
} catch (Exception e) {
System.out.println( e);
}

为了让程序运行得更快,我使用了ExecutorService,但有时代码会出现异常,我会丢失特定的位置信息:

{
"code" : 429,
"details" : [ {
"@type" : "type.googleapis.com/google.rpc.ErrorInfo",
"reason" : "RATE_LIMIT_EXCEEDED"
} ],
"errors" : [ {
"domain" : "global",
"message" : "Quota exceeded for quota metric 'Requests' and limit 'Requests per minute' of service 'mybusinessbusinessinformation.googleapis.com' for consumer 'project_number:XXXXXXXXX'.",
"reason" : "rateLimitExceeded"
} ],
"message" : "Quota exceeded for quota metric 'Requests' and limit 'Requests per minute' of service 'mybusinessbusinessinformation.googleapis.com' for consumer 'project_number:XXXXXXXXXXX'.",
"status" : "RESOURCE_EXHAUSTED"
}

问题是,在向Google API发出一定数量的请求后,我如何将所有线程置于睡眠状态?例如,在每100个请求之后,我希望执行器睡眠几秒钟。

Thread.sleep(30000);

只是把一根线放在睡眠上,而不是全部。如有任何帮助,我们将不胜感激。

您可以使用不同的解决方案,我有将CyclicBarrier和Thread.sleep的使用组合在for循环中的想法,您可以从这个例子中得到启发,运行它并查看行为:

public class RequestSender {
private void sendRequest() {
System.out.println("Send request and wait 0,5 second");
}
public void performTask(CyclicBarrier c1) {
try {
sendRequest();
c1.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ExecutorService service = null;
try {
service = Executors.newFixedThreadPool(4);
RequestSender sender = new RequestSender();
CyclicBarrier c1 = new CyclicBarrier(2, () -> System.out.println("***  2 Threads worked !"));
for (int i = 0; i < 16; i++) {
service.submit(() -> sender.performTask(c1));
try {
// after 8 request , wait for 5 seconds
if(i==8)
Thread.sleep(5000);
else // normal case , wait just for 1 second to send a new request 
Thread.sleep(1000);

} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} finally {
if (service != null)
service.shutdown();
}
}
}

CyclicBarrier

一种同步辅助工具,允许一组线程全部等待以达到共同的障碍点。CyclicBarriers很有用在涉及固定大小的线程组的程序中偶尔相互等待。屏障被称为循环屏障,因为它可以在等待线程释放后重新使用。A.CyclicBarrier支持运行一次的可选Runnable命令每个障碍点,在聚会中的最后一个线程到达之后,但是在释放任何线程之前。此屏障操作对在任何一方继续之前更新共享状态。

最新更新