使用执行人员服务并行处理任务



我正在编写一个Java程序,该程序需要并行监视许多机器。此数字不是固定的,在程序执行期间的任何时间都可以变化(增加/减少)。

我正在考虑这样做:

public static void main (String args[]) {
    ExecutorService EXEC1 = Executors.newScheduledThreadPool(1);
    EXEC1.scheduleWithFixedDelay(new Runnable() {
        ExecutorService EXEC2 = Executors.new...
        Map<Integer, Future<Void>> monitoringTasks = new HashMap<Integer, Future<Void>>();
        @Override
        public void run() {
            List<Machine> monitorizedMachines = MachineDao.getMonitorizedMachines();
            for (Machine machine: monitorizedMachines) {
                Future<Void> monitoringTask = monitoringTasks.get(machine.getId());
                if(monitoringTask == null || monitoringTask.isDone()) {
                    monitoringTask = EXEC2.submit(new Runnable() {
                        @Override
                        public void run() throws Exception {
                            // monitor machine....
                        }
                    });
                    monitoringTasks.put(machine.getId(), monitoringTask);
                }
            }
        }
    }, 1, 1, TimeUnit.SECONDS);
}

但是,我在选择最合适的执行程序(exec2)时遇到了困难:fixeThreadPool,cachedthreadpool,自定义实现,...

需要说每个监视任务约为2/3秒。

谁能给我任何建议?

大多数时候,当您开发大型生产应用程序时,您需要使用ExecutorService EXEC2 = Executors.newFixedThreadPool(THREAD_COUNT);,并且在进行性能测试之后,需要正确配置THREAD_COUNT w ITH请求/卷的预期数

您可以在此处查找有关newCachedThreadPool()为何不理想的更多详细信息。

这是一个简单的示例。首先,在您的类机器中添加示例公共变量布尔值为工作。在运行()代码之间,在变量之间添加您的代码,例如下一个示例:

public static class Machine implements Runnable {
        public boolean ISWORKING = true;
        @Override
        public void run() {
            this.ISWORKING = true;
            //YOUR CODE HERE..................
            this.ISWORKING = false;
        }
    }

第二个示例代码:

    Timer timer = null;
    TimerTask task = null;
    boolean isLocked = false;
    public void main() {
        task = new TimerTask() {
            @Override
            public void run() {
                if (isLocked) {
                    return;
                }
                isLocked = true;
                List<Machine> monitorizedMachines = MachineDao.getMonitorizedMachines();
                //Count the pending job.
                int poolsize = 0;
                for (Machine machine : monitorizedMachines) {
                    if (!machine.ISWORKING) {
                        poolsize++;
                    }
                }
                if (poolsize == 0) {
                    isLocked = false;
                    return;
                }
                //Prevent a lot of poolsize.
                poolsize = Math.min(100, poolsize);
                ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(poolsize);
                for (Machine machine : monitorizedMachines) {
                    if (!machine.ISWORKING) {
                        pool.execute(machine);
                    }
                }
                pool.shutdown();

                isLocked = false;
            }
        };
        timer = new Timer();
        timer.schedule(task, 1000, 2000);
    }

最新更新