我有一个包含数十亿条记录的ArrayList,我遍历每条记录并将其发布到服务器。在每次迭代中调用的方法如下:
public void sendNotification(String url, String accountId, String accountPwd, String jsonPayLoad,
int maxConnections) {
notificationService = Executors.newFixedThreadPool(maxConnections);
notificationService.submit(new SendPushNotification(url, accountId, accountPwd, jsonPayLoad));
notificationService.shutdown();
}
我的 SendPushNotification 类如下所示:
public class SendPushNotification implements Runnable {
String url;
String accountId;
String accountPwd;
String jsonPayLoad;
public SendPushNotification(String url, String accountId, String accountPwd, String jsonPayLoad) {
this.url = url;
this.accountId = accountId;
this.accountPwd = accountPwd;
this.jsonPayLoad = jsonPayLoad;
}
public void run() {
HttpsURLConnection conn = null;
try {
StringBuffer response;
URL url1 = new URL(url);
conn = (HttpsURLConnection) url1.openConnection();
// conn.setReadTimeout(20000);
// conn.setConnectTimeout(30000);
conn.setRequestProperty("X-Account-Id", accountId);
conn.setRequestProperty("X-Passcode", accountPwd);
conn.setRequestProperty("Content-Type", "application/json");
conn.setDoOutput(true);
conn.setRequestMethod("POST");
OutputStream out = new BufferedOutputStream(conn.getOutputStream());
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out, "UTF-8"));
writer.write(jsonPayLoad);
writer.close();
out.close();
int responseCode = conn.getResponseCode();
System.out.println(String.valueOf(responseCode));
switch (responseCode) {
case 200:
BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));
String inputLine;
response = new StringBuffer();
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
}
in.close();
System.out.println(response.toString());
}
} catch (IOException ez) {
ez.printStackTrace();
} finally {
if (conn != null) {
try {
conn.disconnect();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
所以这里出了什么问题,我怀疑我必须在一个体面的系统配置上运行它。基本上我想了解我的代码是否有任何问题?
错误的方法!循环:
notificationService = Executors.newFixedThreadPool(maxConnections);
是个坏主意!为什么你打算创建数十亿个线程池;提交一个任务然后关闭它?!这就像每次烟灰缸装满时都买一辆新的法拉利......
请理解:你的简单代码创建了相当多的对象;所有这些对象在一次循环迭代后都消失了。意思是:他们有资格进行垃圾回收。换句话说:你不断地以非常高的速度制造垃圾。你真的惊讶这样做会把你推入"记忆之外"吗?
相反,使用一个ThreadPool 并将您的数十亿个请求提交到其中!
除此之外,即使这也不是一个好主意。每个条目打开一个到服务器的网络连接根本不会扩展到数十亿个条目:真正的解决方案需要您退后一步,想出一些以"合理方式"端到端工作的东西。例如,您应该考虑在服务器上创建某种"批量"或"流"界面。在客户端上迭代数十亿个条目的文件,并与服务器建立数十亿个连接,这是对不起,疯了!
因此,与其这样做:
loop:
open connection / push ONE item / close connection
你最好去:
open connection / push all items / close connections
或者除此之外,您甚至可以考虑传输压缩的二进制数据。含义:在客户端压缩文件,将其作为blob发送;并在服务器端提取/处理它。
这里有很多选项空间;但请放心:您当前的"内存不足"异常只是由"不合适"设计引起的一种症状。
编辑:鉴于您的评论,我的(个人)建议:
- 显然:使用一个线程池(可能建立在 3 个线程上)并将您的 Runnable 推送到该共享池中
- 然后开始仔细分析。鉴于您打算处理这数十亿个条目,因此每一毫秒都可能很重要。换句话说:进行合理的测试,找出该解决方案的"好"程度。如果它不成功;进行剖析以找到需要改进的地方。
- 要理解的关键是:你可能可以微调事情,在这里获得1%,在那里获得5%;但很可能所有这些都不够好。当你做十亿次的事情时,它们应该非常快;否则那个因素会杀死你...
ExecutorService 给出内存不足错误
正如@GhostCat指出的那样,您应该在程序开始时在通知类的顶部创建一个ExecutorService
。 当您收到新请求时,您将所有通知请求提交到同一ExecutorService
该将根据需要创建线程。 每次通过创建新池是一种糟糕的模式。
// at top of class not in the sending loop
private final ExecutorService notificationThreadPool =
Executors.newFixedThreadPool(MAX_CONNECTIONS);
但是,重要的是要意识到,由于线程池使用无限队列,因此可能仍会耗尽内存。 幕后的newFixedThreadPool(...)
方法使用new LinkedBlockingQueue<Runnable>()
因此,如果您的发件人没有跟上需求,您仍然会耗尽内存。
然后,如果无法增加线程数,则必须以某种方式减慢生产者的速度,或者将请求写入磁盘或其他短期存储。
如果您需要使用有界队列来检测何时填满,那么您应该执行以下操作:
private final ExecutorService notificationThreadPool =
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(MAX_QUEUED_REQUESTS);
默认情况下,如果所有线程都繁忙并且队列已满,当您向池提交SendPushNotification
时,它将抛出RejectedExecutionException
如果它已满,则让您有机会以某种方式处理它。 您还可以使用notificationThreadPool.setRejectedExecutionHandler(...)
设置一个RejectedExecutionHandler
,将被拒绝的通知写入临时存储,直到线程赶上。