通过并发删除为名称维护唯一对象



我使用以下编程习惯用法。我保持同步具有名称与对象关联的HashMap。查找对象的名称我使用以下代码:

MyObject getObject(String name) {
synchronized(map) {
MyObject obj = map.get(name);
if (obj == null) {
obj = new MyObjec();
map.put(name, obj);
}
}
}

当我想专门研究这样一个物体时,我将在这样的对象上使用同步:

synchronized(obj) {
/* do something exclusively on obj (work type I) */
}

到目前为止,这一直运行良好,直到最近。新的要求有I型和II型专用作品类型I将保留对象,类型II应删除完成工作后的对象。如果我做了什么沿着以下方向:

synchronized(obj) {
/* do something exclusively on obj (work type II) */
}
synchronized(map) { /* not good! */
map.remove(obj);
}

我可能会授予某些对象某种类型的工作,尽管对象已从地图中删除。所以基本上应该替换类型I工作的synchronized(obj)通过一些新的信号量将对象重新连接到映射以防之前授予第二类作品。分别地只有在没有同步的情况下,对象才应该离开地图正在等待。

最好是看不到这些物体。我会去的只使用名称的API。对象仅用于维护名称的某些状态。但是HashMap应在第二类工作完成后从名称中删除已完成。但在第一类或第二类工作中,HashMap不应锁定。

有什么办法吗?这是已知的模式吗?

再见

需求似乎是这样的:

  • 有一个Map<String, Object>是缓存
  • 池中有许多工作线程访问缓存
  • 某些类型的工作需要在完成时使缓存中的对象无效

首先,您需要一个ConcurrentHashMap<String, Lock> keys。这个Map将存储String密钥和我们将使用的锁定密钥的Lock对象之间的关系。这允许我们在不锁定整个数据Map的情况下替换key -> value映射。

接下来,您将需要ConcurrentHashMap<String, Object> data。此Map将存储实际映射。

使用ConcurrentHashMap而不是普通CCD_10的原因是它是线程安全的。这意味着不需要手动同步。该实现实际上将Map划分为多个扇区,并且只锁定执行操作所需的扇区——这使其更加高效。

现在,逻辑将是

  1. CCD_ 12将新的CCD_。这将以线程安全的方式检查key是否已经存在锁。如果没有,将添加新的,否则将检索现有的。这意味着每把钥匙只有一把锁
  2. 获取锁。这意味着您可以独占访问映射
  3. 做好工作。在TypeII的情况下,在完成之后从data移除映射
  4. 解锁锁

代码看起来像这样:

private final ConcurrentHashMap<String, Object> data = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Lock> keys = new ConcurrentHashMap<>();
private final ExecutorService executorService = null; //obviously make one of these
@RequiredArgsConstructor
private class TypeI implements Runnable {
private final String key;
private final Work work;
@Override
public void run() {
final Lock lock = keys.putIfAbsent(key, new ReentrantLock());
lock.lock();
try {
final Object value = data.get(key);
work.doWork(value);
} finally {
lock.unlock();
}
}
}
@RequiredArgsConstructor
private class TypeII implements Runnable {
private final String key;
private final Work work;
@Override
public void run() {
final Lock lock = keys.putIfAbsent(key, new ReentrantLock());
lock.lock();
try {
final Object value = data.get(key);
work.doWork(value);
data.remove(key);
} finally {
lock.unlock();
}
}
}
public static interface Work {
void doWork(Object value);
}
public void doTypeIWork(final String key, final Work work) {
executorService.submit(new TypeI(key, work));
}
public void doTypeIIWork(final String key, final Work work) {
executorService.submit(new TypeII(key, work));
}

我使用了Lombok注释来减少混乱的数量。

这个想法是最小化或几乎消除公共资源锁定的数量,同时仍然允许Thread在需要时获得对特定映射的独占访问。

要清洁钥匙Map,您需要保证当前没有任何工作正在进行,并且在清洁期间没有Thread尝试获取任何锁。您可以通过尝试获取相关的锁,然后从密钥映射中删除映射来实现这一点——这将确保当时没有其他线程使用该锁。

你可以运行一个计划任务,比如说,每X分钟从地图上清除20个键。如果您将其实现为LRU缓存,那么它应该相当干净。Google Guava提供了一个您可以使用的实现。

您可以使用AtomicInteger来跟踪对象上正在进行的任务数量。然后,对于类型II的任务,只有在没有正在进行的任务的情况下才删除对象:

class MyObject {
private AtomicInteger worksInProgress = new AtomicInteger(0);
public int incWIP() {
return worksInProgress.incrementAndGet();
}
public int decWIP() {
return worksInProgress.decrementAndGet();
}
public int getWIP() {
return worksInProgress.get();
}
...
}
MyObject getObject(String name) {
synchronized(map) {
MyObject obj = map.get(name);
if (obj == null) {
obj = new MyObject();
map.put(name, obj);
}
obj.incWIP(); // assume you're doing work on this starting now
}
}

我喜欢的工作类型:

MyObject obj = getObject(name);
synchronized(obj) {
obj.workI();
}
obj.decWIP(); // finished doing work type I

第二类看起来像:

MyObject obj = getObject(name);
synchronized(obj) {
obj.workII();
}
if (obj.decWIP() == 0) { // finished with this work and all others
synchronized(map) {
// double-check the value because we checked previously without the map lock
if (obj.getWIP() == 0) {
map.remove(obj);
}
}
}

1)第一种方法是使用哈希来存储数据对象objHash。

2) 您需要一个额外的锁来确保objHash上type1和type2操作的原子执行。type2是写操作,type1是读操作。您可以使用读写锁并将锁存储在哈希表lockHash中。

3) 为了确保数据对象上type1、type2的原子操作,您必须将在同步语句中键入1/2操作,以获取对此数据对象的锁定。

public class ConDeleteHash {
ConcurrentHashMap <String, Object> objHash = new ConcurrentHashMap <String, Object> ();
ConcurrentHashMap <String, ReentrantReadWriteLock> lockHash = new ConcurrentHashMap <String, ReentrantReadWriteLock> ();
void Type1Op(String name) {
ReadWriteLock rwl = lockHash.get(name);
if(rwl==null) return;
Lock lock = rwl.readLock();
lock.lock();
Object obj = objHash.get(name);
if(obj==null) return;
synchronized(obj) {
System.out.println("TYPE1 to :"+ obj.toString());
}
lock.unlock();      
}
void Type2Op(String name) {
ReadWriteLock rwl = lockHash.get(name);
if(rwl==null) return;
Lock lock = rwl.writeLock();
Object obj = objHash.get(name);
synchronized(obj) {
System.out.println("TYPE2 to :"+ obj.toString());
}
lockHash.remove(name);
objHash.remove(name);
lock.unlock();
}
void add(String name, Object obj) {
if(lockHash.get(name)!=null) return;
objHash.put(name, obj);
lockHash.put(name, new ReentrantReadWriteLock());
}
}

延迟从地图中删除

... class MyObject{
boolean active = true;
...
}

synchronized(obj) {
if(obj.active){
/* do something exclusively on obj */
obj.active = false; //or not
}
}

MyObject getObject(String name) {
synchronized(map) {
MyObject = map.get(name);
if (obj == null) {
obj = new MyObjec();
map.put(name, obj);
}else{
synchronized(obj){
if(!obj.active){
//any remove action here
obj = new MyObjec();
map.put(name, obj); // no previous obj in map
}
}
}
}

这个怎么样。。一个稍微修改过的版本的鲍里斯蜘蛛。

带有ConcurrentHashMap的主类到工作人员

public class Concurrent {
// Hash map to hold workers
final ConcurrentHashMap<String, Work> jobs = new ConcurrentHashMap<>();

工作接口

interface Work{
void doWork(Object value);
}

砌块工程的基类。意味着这个实例只能完成一项工作

abstract class BaseWork implements Work {
String name;
Lock lock = new ReentrantLock();
BaseWork(String name){
this.name = name;
}
@Override
public void doWork(Object value) {
lock.lock();   // lock the following block
try{
if (jobs.get(name) != null) {    // Check in case there are waiting threads to perform work on this instance which is removed by completed Type11 Work
performTask(value);
System.out.println("Job Completed");
}else{
jobs.putIfAbsent(name, new Type2Work(name)).doWork(value); // if new job has to be trigger. Note this section only possible when Type2Work, so created Type2Work
System.out.println("Removed.. Job terminated");
}
}finally{
lock.unlock(); // unlock this block , so other threads can start working
}
}
abstract void performTask(Object value);    // Actual Job 
}

在这里,name将与concurrentHashMap中的key相同。doWork一调用,就会锁定执行实际工作的块。

类型1和类型2实现

class Type1Work extends BaseWork{
Type1Work(String name) {
super(name);
}
@Override
void performTask(Object value) {
// Do type 1 Work
}
}

class Type2Work extends BaseWork{
Type2Work(String name) {
super(name);
}
@Override
void performTask(Object value) {
// Do Type 2 work.
jobs.remove(name);
}
}

非阻塞工作-类型111的(doWork可以在线程之间不共享任何信息的情况下执行工作)

class NonLockingWork implements Work {
@Override
public void doWork(Object value) {
// Do thread safe non blocking Work ( Type 111)
}
}

最后阻止将作品加载到Map

String key = "type1-name1";
Work work = jobs.putIfAbsent(key, new Type1Work(key));
work.doWork(new Object());

}

这个问题的逻辑有一个问题:如果允许用户在类型2操作之后请求类型1操作,为什么要从映射中删除对象?如果允许用户在类型2之后提交类型1,则总是存在在删除对象之后请求类型1的情况。在这种情况下,你为什么要删除它?

是否存在这样的问题:客户端只能在类型二操作之前提交类型一操作,但不能保证执行器服务会在另一操作之前执行一个操作?在这种情况下,使用优先级执行器服务并提交优先级高于类型2的类型1,因为这将确保类型1总是在类型2之前启动,如果两者都是挂起的。由于类型2在类型1完成之前无法启动,这意味着删除总是在类型1之后执行,提供的类型1在类型2之前提交。

这感觉就像是其他地方糟糕的程序设计导致了无法解决的困境。如果你能解释这些奇怪的规范是如何产生的,那么我们可能能够制定一个更持久的解决方案。

此外,在这种类型的并发中,不要从映射中删除对象,用作为对象子类的singleton对象替换它们,并对该singleton进行doWork方法检查,这比null检查更可靠,因为可以出于多种原因创建null,但您的singleton object是出于特定原因而被传递来执行工作的,这意味着错误跟踪在以后的开发中更容易。

最新更新