执行程序服务线程安全



假设我有一个来自Executors静态工厂方法之一的ExecutorService实例。

如果我提交一个 Callable,其中 RetVal 不是来自某个线程的线程安全的本地实例化对象,当我从同一线程获取 RetVals 时,我是否需要担心 RetVals 的完整性?人们说局部变量是线程安全的,但我不确定当您返回本地实例化的对象并从其他线程接收它时它是否适用。

下面是一个与我的情况类似的示例:

ExecutorService executor = Executors.newFixedThreadPool(5);
Future<List<String>> fut = executor.submit(() -> {
List<String> ret = new ArrayList<>();
ret.add("aasdf");
ret.add("dfls");
return ret;
});
List<String> myList = fut.get();

在上面的示例中,我检索了一个在不同线程中创建的 ArrayList,该线程由执行器创建。我不认为上面的代码是线程安全的,但我无法找到有关我的具体情况的太多信息。

现在我在计算机上尝试了上面的代码,它实际上在我尝试的 100% 时间内返回了预期结果,我什至尝试使用我自己的 ExecutorService 实现,到目前为止我只得到了预期的结果。因此,除非我非常幸运,否则我很确定它会起作用,但我不确定如何。 我在另一个线程中创建了一个非线程安全对象,并在另一个线程中接收了它;我不应该有机会收到一个部分构造的对象 - 在我的例子中是一个不包含 2 个字符串的列表?

下面是我只是为了测试而做的自定义实现。你可以忽略EType枚举的东西。

class MyExecutor {
enum EType {
NoHolder, Holder1, Holder2
}
private ConcurrentLinkedQueue<MyFutureTask<?>> tasksQ;
private final Thread thread;
private final EType eType;
public MyExecutor(EType eType) {
eType = Objects.requireNonNull(eType);
tasksQ = new ConcurrentLinkedQueue<>();
thread = new Thread(new MyRunnable());
thread.start();
}
public <T> Future<T> submit(Callable<T> c) {
MyFutureTask<T> task = new MyFutureTask<T>(c, eType);
tasksQ.add(task);
return task;
}
class MyRunnable implements Runnable {
@Override
public void run() {
while (true) {
if (tasksQ.isEmpty()) {
try {
Thread.sleep(1);
continue;
} catch (InterruptedException ite) {
Thread.interrupted();
break;
}
}
MyFutureTask<?> task = tasksQ.poll();
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
class MyFutureTask<T> implements RunnableFuture<T> {
final Callable<?> cb;
volatile Object outcome;
static final int STATE_PENDING = 1;
static final int STATE_EXECUTING = 2;
static final int STATE_DONE = 3;
final AtomicInteger atomicState = new AtomicInteger(STATE_PENDING);
final EType eType;
public MyFutureTask(Callable<?> cb, EType eType) {
cb = Objects.requireNonNull(cb);
eType = Objects.requireNonNull(eType);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new NotImplementedException();
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return atomicState.get() == STATE_DONE;
}
@SuppressWarnings("unchecked")
@Override
public T get() throws InterruptedException, ExecutionException {
while (true) {
switch (atomicState.get()) {
case STATE_PENDING:
case STATE_EXECUTING:
//                      Thread.sleep(1);
break;
case STATE_DONE:
return (T)outcome;
default:
throw new IllegalStateException();
}
}
}
@Override
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
throw new NotImplementedException();
}
void set(T t) {
outcome = t;
}
@Override
public void run() {
if (atomicState.compareAndSet(STATE_PENDING, STATE_EXECUTING)) {
Object result;
try {
switch (eType) {
case NoHolder:
result = cb.call();
break;
case Holder1:
throw new NotImplementedException();
case Holder2:
throw new NotImplementedException();
default:
throw new IllegalStateException();
}
} catch (Exception e) {
e.printStackTrace();
result = null;
}
outcome = result;
atomicState.set(STATE_DONE);
}
}
}
}
class MyTask implements Callable<List<Integer>> {
@Override
public List<Integer> call() throws Exception {
List<Integer> ret = new ArrayList<>(100);
IntStream.range(0, 100).boxed().forEach(ret::add);
return ret;
}
}

重要的是发生之前的关系。从ExecutorServiceAPI 文档中:

内存一致性影响:线程中的操作在 向ExecutorService提交RunnableCallable任务发生在该任务采取的任何操作之前,这反过来又会 在通过Future.get()检索结果之前发生

因此,您可以安全地传输这样的可变对象。ExecutorService实现通过某种形式的安全发布传输对象。

显然,返回后不要更新原始线程中的对象。

如果要通过存储在共享的非volatile字段中来在线程之间进行通信,那么这将是不安全的。

当多个线程尝试同时访问和修改同一状态时,线程安全性成为一个问题。

请注意,在任务完成之前,您将无法从Future获得实际结果(即Future#get在任务完成之前不会返回(。

在第一个示例中,线程安全不是问题,因为新对象(虽然可变(由一个线程(由执行程序创建的线程(创建,并在该线程完成任务处理后从Future对象检索。一旦调用线程掌握了该对象,任何其他线程都无法对其进行修改,因为创建线程不再有权访问 List。

最新更新