TL;DR:在Java中,我有N个线程,每个线程都使用共享集合。ConcurrentHashMap 允许我锁定写入,但不能锁定读取。我需要的是锁定集合的特定项,读取以前的数据,进行一些计算并更新值。如果两个线程从同一个发件人收到两条消息,则第二个线程必须等待第一个线程完成,然后再执行其操作。
长版本:
这些线程正在接收按时间顺序排列的消息,并且它们必须根据messageSenderID
更新集合。
我的简化代码如下:
public class Parent {
private Map<String, MyObject> myObjects;
ExecutorService executor;
List<Future<?>> runnables = new ArrayList<Future<?>>();
public Parent(){
myObjects= new ConcurrentHashMap<String, MyObject>();
executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
WorkerThread worker = new WorkerThread("worker_" + i);
Future<?> future = executor.submit(worker);
runnables.add(future);
}
}
private synchronized String getMessageFromSender(){
// Get a message from the common source
}
private synchronized MyObject getMyObject(String id){
MyObject myObject = myObjects.get(id);
if (myObject == null) {
myObject = new MyObject(id);
myObjects.put(id, myObject);
}
return myObject;
}
private class WorkerThread implements Runnable {
private String name;
public WorkerThread(String name) {
this.name = name;
}
@Override
public void run() {
while(!isStopped()) {
JSONObject message = getMessageFromSender();
String id = message.getString("id");
MyObject myObject = getMyObject(id);
synchronized (myObject) {
doLotOfStuff(myObject);
}
}
}
}
}
所以基本上我有一个生产者和N个消费者,以加快处理速度,但N个消费者必须处理一个共同的数据基础,并且必须尊重时间顺序。
我目前正在使用ConcurrentHashMap
,但如果需要,我愿意更改它。
如果具有相同 ID 的消息到达时相距足够远(> 1 秒),代码似乎有效,但是如果我在微秒的距离内收到两条具有相同 ID 的消息,我会得到两个线程处理集合中的同一项目。
我想我想要的行为是:
Thread 1 Thread 2
--------------------------------------------------------------
read message 1
find ID
lock that ID in collection
do computation and update
read message 2
find ID
lock that ID in collection
do computation and update
虽然我认为这就是发生的事情:
Thread 1 Thread 2
--------------------------------------------------------------
read message 1
read message 2
find ID
lock that ID in collection
do computation and update
find ID
lock that ID in collection
do computation and update
我想过做一些类似的事情
JSONObject message = getMessageFromSender();
synchronized(message){
String id = message.getString("id");
MyObject myObject = getMyObject(id);
synchronized (myObject) {
doLotOfStuff(myObject);
} // well maybe this inner synchronized is superfluous, at this point
}
但我认为这会扼杀拥有多线程结构的全部目的,因为我一次读取一条消息,而工人没有做任何其他事情;这就像我使用 SyncdHashMap 而不是 ConcurrentHashMap。
作为记录,我在这里报告我最终实施的解决方案。我不确定它是否最佳,我仍然需要测试性能,但至少输入正确。
public class Parent implements Runnable {
private final static int NUM_WORKERS = 10;
ExecutorService executor;
List<Future<?>> futures = new ArrayList<Future<?>>();
List<WorkerThread> workers = new ArrayList<WorkerThread>();
@Override
public void run() {
executor = Executors.newFixedThreadPool(NUM_WORKERS);
for (int i = 0; i < NUM_WORKERS; i++) {
WorkerThread worker = new WorkerThread("worker_" + i);
Future<?> future = executor.submit(worker);
futures.add(future);
workers.add(worker);
}
while(!isStopped()) {
byte[] message = getMessageFromSender();
byte[] id = getId(message);
int n = Integer.valueOf(Byte.toString(id[id.length-1])) % NUM_WORKERS;
if(n >= 0 && n <= (NUM_WORKERS-1)){
workers.get(n).addToQueue(line);
}
}
}
private class WorkerThread implements Runnable {
private String name;
private Map<String, MyObject> myObjects;
private LinkedBlockingQueue<byte[]> queue;
public WorkerThread(String name) {
this.name = name;
}
public void addToQueue(byte[] line) {
queue.add(line);
}
@Override
public void run() {
while(!isStopped()) {
byte[] message= queue.poll();
if(line != null) {
String id = getId(message);
MyObject myObject = getMyObject(id);
doLotOfStuff(myObject);
}
}
}
}
}
从概念上讲,这是一种路由问题。您需要的是:
让您的主线程(单线程)读取队列的消息,并将数据推送到每个 id 的 FIFO 队列。获取单个线程以使用来自每个队列的消息。
锁定示例(可能)不起作用,因为即使fair=true
,也无法保证第二条消息的顺序。
来自Javadoc: Even when this lock has been set to use a fair ordering policy, a call to tryLock() will immediately acquire the lock if it is available, whether or not other threads are currently waiting for the lock.
您需要决定的一件事是,是要为每个队列创建一个线程(一旦队列为空,该线程将退出)还是保留固定大小的线程池并管理获取额外的位以将线程分配给队列。
因此,您可以从原始队列中读取单个线程并写入每个 id 队列,并且您还会从各个队列的每个 id 读取一个线程。这将确保任务序列化。
在性能方面,只要传入消息具有良好的分布(id-wise),您应该会看到显着的加速。如果您主要收到相同 id 的消息,则任务将被序列化,并且还包括控制对象创建和同步的开销。
您可以为锁使用单独的Map
。还有一个WeakHashMap
,当密钥不再存在时,它会自动丢弃条目。
static final Map<String, Lock> locks = Collections.synchronizedMap(new WeakHashMap<>());
public void lock(String id) throws InterruptedException {
// Grab a Lock out of the map.
Lock l = locks.computeIfAbsent(id, k -> new ReentrantLock());
// Lock it.
l.lockInterruptibly();
}
public void unlock(String id) throws InterruptedException {
// Is it locked?
Lock l = locks.get(id);
if ( l != null ) {
l.unlock();
}
}
我认为您对synchronized
块的想法是正确的,除了您分析错误并且在任何情况下都走得太远。外部synchronized
块不应该强迫您一次只处理一条消息,它只是防止多个线程一次访问同一消息。但你不需要它。实际上,您只需要MyObject
实例上的内部synchronized
块。这将确保一次只有一个线程可以访问任何给定的MyObject
实例,同时使其他线程能够根据需要访问消息、Map
和其他MyObject
实例。
JSONObject message = getMessageFromSender();
String id = message.getString("id");
MyObject myObject = getMyObject(id);
synchronized (myObject) {
doLotOfStuff(myObject);
}
如果您不喜欢这样,并且对MyObject
实例的更新都涉及单方法调用,那么您可以synchronize
所有这些方法。您仍然在 Map
中保留并发性,但您正在保护MyObject
本身免受并发更新的影响。
class MyObject {
public synchronize void updateFoo() {
// ...
}
public synchronize void updateBar() {
// ...
}
}
当任何Thread
访问任何updateX()
方法时,它将自动锁定任何其他Thread
访问该方法或任何其他synchronized
方法。如果您的更新与该模式匹配,那将是最简单的。
如果没有,那么您需要通过使用某种锁定协议来使所有工作Threads
合作。OldCurmudgeon建议的ReentrantLock
是一个不错的选择,但我会把它放在MyObject
本身。要使事情正确有序,您应该使用公平性参数(请参阅 http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/ReentrantLock.html#ReentrantLock-boolean-)。"设置为 true 时,在争用下,锁定有利于授予对等待时间最长的线程的访问权限。"
class MyObject {
private final ReentrantLock lock = new ReentrantLock(true);
public void lock() {
lock.lock();
}
public void unlock() {
lock.unlock();
}
public void updateFoo() {
// ...
}
public void updateBar() {
// ...
}
}
然后你可以更新这样的东西:
JSONObject message = getMessageFromSender();
String id = message.getString("id");
MyObject myObject = getMyObject(id);
myObject.lock();
try {
doLotOfStuff(myObject);
}
finally {
myObject.unlock();
}
重要的一点是,您不需要控制对消息的访问,也不需要控制Map
。您需要做的就是确保任何给定MyObject
一次最多由一个线程更新。
实际上这里有一个设计思想:当消费者请求处理你的对象时,它实际上应该从你的对象列表中删除具有该ID的对象,然后在处理完成后重新插入它。然后,任何其他使用者收到请求处理具有相同 id 的对象都应处于阻止模式,等待具有该 ID 的对象重新出现在列表中。您将需要添加一个管理来记录所有现有对象,以便您可以区分已经存在但当前不在列表中的对象(即由其他消费者处理)和尚不存在的对象。
JSON 解析与 doLotsOfStuff()
分开,您可以获得一些加速。 一个线程侦听消息,解析它们,然后将解析的消息放在队列中以保持时间顺序。 第二个线程从该队列中读取并执行LotsOfStuff,而无需锁定。
但是,由于您显然需要超过 2 倍的加速,这可能还不够。
添加
另一种可能性是多个HashMap。 例如,如果所有 ID 都是整数,则为以 0,1,2 结尾的 ID 创建 10 个哈希映射... 传入的消息被定向到 10 个线程之一,这些线程解析 JSON 并更新其相关的 Map。 每个地图中的顺序都保持不变,并且没有锁定或争用问题。 假设消息 ID 是随机分布的,这会产生高达 10 倍的加速,尽管您的地图需要额外的一层开销。 例如
Thread JSON Threads 0-9
--------------------------------------------------------------
while (notInterrupted) {
read / parse next JSON message
mapToUse = ID % 10
pass JSON to that Thread's queue
}
while (notInterrupted) {
take JSON off queue
// I'm the only one with writing to Map#N
do computation and update ID
}