我有一段代码,可以针对给定的问题运行一组算法,然后只要一个算法找到了问题的答案,程序就会继续。投资组合中的其他算法得到自愿终止的信号,执行的主线程继续进行
这段代码的一个用户向我发送了一个堆栈跟踪,行中有一个NullPointerException"resultReference.set(solverResult);"从下面的代码中可以看到,resultReference是一个最终变量,并且会立即初始化。我不知道它怎么会变成空的。我花了很长时间试图在我这边重现这个问题,但没有成功。用户堆栈中的行号与我的代码中的行号相匹配。用户报告在3种不同的情况下看到了错误,但很少(并不是每次解决问题都会出现这种情况),所以这可能是某种竞争条件。这是jdk 1.8_25。
我假设这个错误是不可能的,因为变量是最终的,这是对的吗?我不知道该如何处理这个堆栈跟踪,我想让大家放心,这应该是不可能的。
public class ParallelSolver {
private final ListeningExecutorService executorService;
private final AtomicReference<Throwable> error;
private final List<Solver> solvers;
private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(ParallelSolver.class);
public ParallelSolver(int threadPoolSize, List<Solvers> solvers) {
executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadPoolSize));
error = new AtomicReference<>();
this.solvers = solvers;
}
public SolverResult solve(Problem p) {
final AtomicReference<SolverResult> resultReference = new AtomicReference<>();
final List<Future> futures = new ArrayList<>();
final Semaphore workDone = new Semaphore(0);
try {
// Submit one job per each solver in the portfolio
solvers.forEach(solver -> {
final ListenableFuture<Void> future = executorService.submit(() -> {
SolverResult solverResult = solver.solve(p);
if (solverResult.isConclusive()) {
log.debug("Signalling the blocked thread to wake up!");
// NPE HERE ON THIS LINE
resultReference.set(solverResult);
workDone.release(solvers.size());
}
log.debug("Releasing a single permit as the work for this thread is done.");
workDone.release(1);
log.debug("Job ending...");
return null;
});
futures.add(future);
Futures.addCallback(future, new FutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
}
@Override
public void onFailure(Throwable t) {
if (t instanceof CancellationException) {
return;
}
error.compareAndSet(null, t);
// Wake up the main thread (if it's still sleeping)
workDone.release(solvers.size());
}
});
});
// Wait for a thread to complete solving and signal you, or all threads to timeout
log.debug("Main thread going to sleep");
workDone.acquire(solvers.size());
log.debug("Main thread waking up, checking for errors then cancelling futures");
checkForErrors();
// cancel any still to be launched futures
futures.forEach(future -> future.cancel(false));
log.debug("Returning now");
return resultReference.get() == null ? SolverResult.createTimeoutResult() : resultReference.get();
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while running parallel job", e);
}
}
/**
* We want a fail-fast policy, but java executors aren't going to throw the exception on the main thread.
* We can't call Future.get() and check for errors, because that might block.
* So we set a variable when an error occurs, and check it here.
*/
private void checkForErrors() {
if (error.get() != null) {
log.error("Error occured while executing a task", error.get());
throw new RuntimeException("Error occurred while executing a task", error.get());
}
}
以下是使用akka:所需的内容
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import scala.concurrent.Await;
import scala.concurrent.Future;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.pattern.Patterns;
import akka.util.Timeout;
public class AlgorithmTester extends UntypedActor
{
public AlgorithmTester(){}
public static class RegisterResultListener
{
}
public static class Result
{
final double result;
public Result(double result)
{
this.result = result;
}
}
public static interface Algorithmable
{
public Result solve();
}
@SuppressWarnings("serial")
public static class AlgorithmsToTest extends ArrayList<Algorithmable> {
}
public static class AlgorithmRunner extends UntypedActor
{
public AlgorithmRunner(){}
@Override
public void onReceive(Object msg) throws Exception
{
if (msg instanceof Algorithmable)
{
Algorithmable alg = (Algorithmable) msg;
getSender().tell(alg.solve(), getSelf());
}
}
}
List<ActorRef> runners = new ArrayList<ActorRef>();
List<ActorRef> resultListeners = new ArrayList<ActorRef>();
@Override
public void onReceive(Object msg) throws Exception
{
if (msg instanceof RegisterResultListener)
{
resultListeners.add(getSender());
}
else if (msg instanceof AlgorithmsToTest)
{
AlgorithmsToTest algorithms = (AlgorithmsToTest) msg;
for (Algorithmable algorithm : algorithms)
{
ActorRef runner = getContext().actorOf(Props.create(AlgorithmRunner.class));
runners.add(runner);
runner.tell(algorithm, getSelf());
}
getSelf().tell(new RegisterResultListener(), getSender());
}
else if (msg instanceof Result)
{
for (ActorRef runner : runners)
{
getContext().stop(runner);
}
runners.clear();
for (ActorRef l : resultListeners)
{
l.tell(msg, getSelf());
}
}
}
public static void main(String[] args)
{
ActorSystem system = ActorSystem.create("AlogrithmTest");
ActorRef tester = system.actorOf(Props.create(AlgorithmTester.class), "algorithmTest");
Algorithmable a1 = new Algorithmable()
{
public Result solve() {
try {
Thread.sleep(7000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return new Result(1100.0);
}
};
Algorithmable a2 = new Algorithmable()
{
public Result solve() {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return new Result(330.0);
}
};
Algorithmable a3 = new Algorithmable()
{
public Result solve() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return new Result(1000);
}
};
AlgorithmsToTest algorithmsToTest = new AlgorithmsToTest();
algorithmsToTest.add(a1);
algorithmsToTest.add(a2);
algorithmsToTest.add(a3);
Timeout t = new Timeout(5, TimeUnit.SECONDS);
Future<Object> future = Patterns.ask(tester, algorithmsToTest, 100000);
try {
Result response = (Result)Await.result(future, t.duration());
System.out.println(response.result);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Continuing on");
system.terminate();
System.out.println("Terminated");
}
}
然而,在akka中没有办法在参与者处理消息时杀死他们,你会注意到,当参与者处理其他算法时,这个程序仍在继续执行,即使已经找到了第一个答案。杀死线程从来都不是一件好事,所以你的问题没有一个好的解决方案。我想,你可以在主方法末尾的System.exit(0)上进行标记,或者在算法中的某个地方有一个可怕的原子变量,如果它们正在迭代并抛出异常,或者将它们作为线程并杀死它们,所有这些都不是很好):就我个人而言,如果你能逃脱惩罚,我会使用System.exit(0)。
编辑:好的,感谢你毫无理由地投了反对票看,这是你想要做的替代代码,没有atmoic变量、原子变量和锁以及所有其他非常危险和容易出错的东西,这是一个更干净的答案,向下投票这绝对是垃圾,只需要更改此代码以匹配你想要的,只需要Result或Algorithmable接口,并提供您想要的所有实现—这正是您所要求的。事实上,你只是在没有评论的情况下否决了这一点,这意味着你不知道如何使用stackerflow。如果这种否决推断出你认为原子变量比基于行动者的模型更受欢迎,那么我建议你阅读一下。Asker甚至没有为空指针异常提供堆栈竞争,因此不可能直接解决这个问题,下次我写这个答案的时间有0.01%是在投否决票之前写评论的。