以线程安全的方式填充映射,并将该映射从后台线程传递给另一个方法



我有一个下面的类,其中add方法将由多个线程调用,以线程安全的方式填充channelMessageHolderCHM。

在同一个类中,我有一个每30秒运行一次的backgrond线程,它通过传递channelMessageHolder中的数据来调用send方法。

public class Processor {
private final ScheduledExecutorService executorService = Executors
.newSingleThreadScheduledExecutor();
private final AtomicReference<ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>> channelMessageHolder =
new AtomicReference<>(new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>());
private Processor() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
send();
}
}, 0, 30, TimeUnit.SECONDS);
}
// this will be called by only single background thread
private void send(ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>> messageByChannels) {
for(Entry<Channel, ConcurrentLinkedQueue<Message>> entry : messageByChannels.entrySet()) {
Channel channel = entry.getKey();
ConcurrentLinkedQueue<Message> messageHolder = entry.getValue();
while (!messageHolder.isEmpty()) {
Message message = messageHolder.poll();
....
// process this and send to database
}      
}
}
// called by multiple threads
public void add(final Channel channel, final Message message) {
// populate channelMessageHolder in a thread safe way
}
}

问题

正如你所看到的,channelMessageHolder已经存在于我的Processor类中,所以我需要每30秒明确地传递一次来自该映射的数据来发送方法吗?或者我可以直接在我的发送方法中使用它?

令人困惑的是,如果我在发送方法中直接使用它,那么它将同时由多个线程填充,所以这就是为什么我使用AtomicReference的getAndSet方法将它传递给send方法。

如果我做的是错误的,请告诉我,有什么更好的方法可以做到这一点?

正如你所看到的,channelMessageHolder已经存在于我的Processor类中,所以我需要每30秒从这个映射中显式传递数据来发送方法吗?或者我可以直接在我的发送方法中使用它?

您当然可以直接在send()方法中使用它,并且您不需要AtomicReference包装器,因为ConcurrentHashMap已经同步。您需要担心的是映射中的键和值对象是否正确同步。我假设Channel是不可变的,而ConcurrentLinkedQueue是并发的,所以你应该很好。

// no need for AtomicReference
private final ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>> channelMessageHolder =
new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>();

ConcurrentHashMap为您负责同步,因此生产者线程可以在发送方线程发送项目的同时向其中添加项目,而不会发生冲突。只有当您试图在多个线程之间共享一个未同步的类时,才需要AtomicReference

困惑的是,如果我在发送方法中直接使用它,那么它将同时由多个线程填充,所以这就是为什么我使用AtomicReference的getAndSet方法将它传递给发送方法。

没错,但这没关系。多个线程将向ConcurrentLinkedQueue添加消息。每30秒,您的后台线程就会启动一次,获取Channel,退出队列,然后发送此时队列中的消息。ConcurrentLinkedQueue保护生产者和消费者免受种族条件的影响。

代码中的问题是,这是不可重入的,因为它依赖于对队列的多次调用:

while (!messageHolder.isEmpty()) {
Message message = messageHolder.poll();

它适用于您的情况,因为看起来只有一个线程出列,但以下代码更好:

while (true) {
// only one call to the concurrent queue
Message message = messageHolder.poll();
if (message == null) {
break;
}
...
}

或者我可以直接在我的发送方法中使用它,而不传递任何

通过在send方法的开头说channelMessageHolder.getAndSet(new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>()),您应该能够在send方法中直接使用它,而不会出现任何问题。

也就是说,Java8ConcurrentHashMap类中添加了一个名为computeIfAbsent的新方法,这意味着您并不真正需要您正在使用的AtomicReference

最新更新