如何避免Java ExecutorService中的上下文切换



我使用软件(AnyLogic(导出可运行的jar文件,这些文件本身重复使用不同的参数重新运行一组模拟(所谓的参数变化实验(。我正在运行的模拟非常占用RAM,所以我必须限制jar文件可用的内核数量。在AnyLogic中,可用内核的数量很容易设置,但从服务器上的Linux命令行,我知道如何做到这一点的唯一方法是使用taskset命令手动指定要使用的可用内核(使用CPU亲和性"掩码"(。到目前为止,这种方法效果很好,但由于您必须指定要使用的单个内核,我了解到,根据您选择的内核,性能可能会有相当大的差异。例如,您希望最大限度地使用CPU缓存级别,因此如果您选择共享过多缓存的内核,则性能会慢得多。

由于AnyLogic是用Java编写的,所以我可以使用Java代码来指定模拟的运行。我正在考虑使用Java ExecutorService来构建一个单独运行的池,这样我就可以将池的大小指定为与我使用的机器的RAM匹配的任何数量的内核。我认为这会带来很多好处,最重要的是,也许计算机的scehduler可以更好地选择内核,以最大限度地减少运行时间。

在我的测试中,我构建了一个小的AnyLogic模型,运行大约需要10秒(它只是在两个状态图状态之间重复切换(。然后我用这个简单的代码创建了一个自定义实验。

ExecutorService service = Executors.newFixedThreadPool(2);
for (int i=0; i<10; i++)
{
Simulation experiment = new Simulation();
experiment.variable = i;
service.execute( () -> experiment.run() );
}

我希望看到的是,一次只启动2个Simulation对象,因为这就是线程池的大小。但我看到所有10个线程都在2个线程上并行启动和运行。这让我觉得上下文切换正在发生,我认为这是非常低效的。

当我不调用AnyLogicSimulation,而只是在service.execute函数中调用一个自定义Java类(如下(时,它似乎工作得很好,一次只显示2个Tasks在运行。

public class Task implements Runnable, Serializable {
public void run() {
traceln("Starting task on thread " + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
traceln("Ending task on thread " + Thread.currentThread().getName());
}
}

有人知道为什么AnyLogic函数似乎同时设置所有模拟吗?

我猜Simulation是从ExperimentalParamVariation扩展而来的。实现你想要的目标的关键是确定实验何时结束。

文档显示了一些有趣的方法,如getProgress((和getState((,但您必须轮询这些方法,直到进度为1或状态为FINISHEDERROR。还有一些方法onAfterExperiment((和onError((应该由引擎调用,以指示实验已经结束或出现错误。我认为你可以使用信号量的最后两种方法来控制一次运行多少个实验:

import java.util.concurrent.Semaphore;
import com.anylogic.engine.ExperimentParamVariation;
public class Simulation extends ExperimentParamVariation</* Agent */> {
private final Semaphore semaphore;
public Simulation(Semaphore semaphore) {
this.semaphore = semaphore;
}
public void onAfterExperiment() {
this.semaphore.release();
super.onAfterExperiment();
}
public void onError(Throwable error) {
this.semaphore.release();
super.onError(error);
}
// run() cannot be overriden because it is final
// You could create another run method or acquire a permit from the semaphore elsewhere
public void runWithSemaphore() throws InterruptedException {
// This acquire() will block until a permit is available or the thread is interrupted
this.semaphore.acquire();
this.run();
}
}

然后,您必须配置一个具有所需数量许可证的信号量,并将其传递给Simulation实例:

import java.util.concurrent.Semaphore;
// ...
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 10; i++)
{
Simulation experiment = new Simulation(semaphore);
// ...
// Handle the InterruptedException thrown here
experiment.runWithSemaphore();
/* Alternative to runWithSemaphore(): acquire the permit and call run().
semaphore.acquire();
experiment.run();
*/
}

首先,我认为这是对AnyLogic功能的一个相对较新的添加,从而消除了整个问题。您可以指定一个ini文件,该文件的指定数目为"0";平行工人";。

https://help.anylogic.com/index.jsp?topic=%2Fcom.anylogic.help%2Fhtml%2Frunning%2Fexport-html&cp=0_3_9&anchor=自定义设置

但就在找到这个(更好的(选择之前,我设法找到了一个可行的解决方案。埃尔南的回答几乎足够了。我认为它受到了AnyLogic引擎的一些不确定性的阻碍(正如我在评论中详细描述的那样(。

我能收集到的最好的版本是使用ExecuterService。在一个自定义实验中,我输入了以下代码:

ExecutorService service = Executors.newFixedThreadPool(2);
List<Callable<Integer>> tasks = new ArrayList<>();
for (int i=0; i<10; i++)
{
int t = i;
tasks.add( () -> simulate(t) );
}
try{
traceln("starting setting up service");
List<Future<Integer>> futureResults = service.invokeAll(tasks);

traceln("finished setting up service");

List<Integer> res = futureResults.stream().parallel().map(
f -> {
try {
return f.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return null;
}).collect(Collectors.toList());
System.out.println("----- Future Results are ready -------");

System.out.println("----- Finished -------");

} catch (InterruptedException e) {
e.printStackTrace();
}
service.shutdown();

这里的关键是使用JavaFuture。此外,为了使用invokeAll函数,我在Additional类代码块中创建了一个函数:

public int simulate(int variable){
// Create Engine, initialize random number generator:
Engine engine = createEngine();
// Set stop time
engine.setStopTime( 100000 );
// Create new root object:
Main root = new Main( engine, null, null );
root.parameter = variable;
// Prepare Engine for simulation:
engine.start( root );
// Start simulation in fast mode:
//traceln("attempting to acquire 1 permit on run "+variable);
//s.acquireUninterruptibly(1);
traceln("starting run "+variable);
engine.runFast();
traceln("ending run "+variable);
//s.release();
// Destroy the model:
engine.stop();

traceln( "Finished, run "+variable);
return 1;
}

我能看到的这种方法的唯一限制是,我没有每隔几分钟输出进度的等待循环。但是,与其找到解决方案,我必须放弃这项工作,在链接顶部找到更好的设置文件解决方案。

最新更新