如何同步多线程映射更新



我想知道如何同步以下多线程,以便我仍然可以并行运行线程而不会导致保存多个对象。

public void setupRender() {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    final Map<Integer, String> map = Collections.synchronizedMap(new HashMap<Integer, String>());
    for (int i = 0; i < 10; i++) {
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                parse(map); 
            }
        });
        System.out.println("map size " + map.size() + " loop " + i);
    }
}
public void parse(Map<Integer, String> map) {
    for (int j = 0; j < 100; j++) {
        if (map.containsKey(j)) {
            System.out.println("Update");
            //session.update(map.getKey(j));
        } else {
            System.out.println("add to map " + j);
            String obj = "test";
            map.put(j, obj);
            System.out.println("save");
            //session.save(obj);
        }
        if (j % 50 == 0) {
            System.out.println("commit");
            //tx.commit();
        }
    }
}

结果,请注意如何将相同键的多个对象添加到映射中,但更糟糕的是保存到数据库中,从而导致重复的数据库输入。

map size 0 loop 0
map size 0 loop 1
add to map 0
commit
add to map 1
add to map 0
commit
add to map 2
add to map 3
add to map 4
add to map 2
add to map 5
add to map 6
map size 1 loop 2
add to map 7
add to map 8
add to map 9
commit
map size 10 loop 3
commit
add to map 5
您可以使用

ConcurrentHashMapputIfAbsent来保证一致性和性能。在这种情况下,您的会话更新可以同时运行。

putIfAbsent:
    - returns the previous value associated with the specified key, 
      or null if there was no mapping for the key
public void parse(ConcurrentHashMap<Integer, String> map) {
    for (int j = 0; j < 100; j++) {    
        String obj = "test";
        Object returnedValue = map.putIfAbsent(j, obj);
        boolean wasInMap = returnedValue != null;
        if (wasInMap) {
            System.out.println("Update");
            //session.update(map.getKey(j));
        } else {
            System.out.println("add to map " + j);
            map.put(j, obj);
            System.out.println("save");
            //session.save(obj);
        }
    }
}

您还可以在那里放置一个回调,该回调将有条件地创建新实例:

class Callback{
    abstract void saveOrUpdate(boolean wasInMap);    
}
public void parse(Map<Integer, String> map) {
    for (int j = 0; j < 100; j++) {    
        String obj = "test";
        Callback callback = (wasInMap) -> { //Java 8 syntax to be short
            if (wasInMap) {
                System.out.println("Update");
                //session.update(map.getKey(j));
            } else {
                System.out.println("add to map " + j);
                map.put(j, obj);
                System.out.println("save");
                //session.save(obj);
            }
        }
        Object returnedValue = map.putIfAbsent(j, callback);
        callback.saveOrUpdate(returnedValue != null);
    }
}

使用 syncronized 关键字来确保检查 j 是否在映射中,如果不是,则添加它,是以原子方式完成的。但是,它违背了多线程的目的,因为当一个线程循环时,大多数线程将被阻塞。

public void setupRender() {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
final Map<Integer, String> map = Collections.synchronizedMap(new HashMap<Integer, String>());
for (int i = 0; i < 10; i++) {
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            parse(map); 
        }
    });
    System.out.println("map size " + map.size() + " loop " + i);
}
}
public void parse(Map<Integer, String> map) {
    synchronized(map) {
    for (int j = 0; j < 100; j++) {
        if (map.containsKey(j)) {
            System.out.println("Update");
            //session.update(map.getKey(j));
        } else {
            System.out.println("add to map " + j);
            String obj = "test";
            map.put(j, obj);
            System.out.println("save");
            //session.save(obj);
        }
        }
        if (j % 50 == 0) {
            System.out.println("commit");
            //tx.commit();
        }
    }
}

最新更新