允许多次写的Java读写锁



在分布式计算机系统(与硬盘无关)中,我有一个特殊的场景:

  1. A类操作——可以并行发生
  2. B类操作——可以并行发生
  3. A类操作不能与B类操作并行发生
  4. 类型B操作非常慢——因此作为优化,我希望它们具有更高的优先级,,但这不是严格必要的

作为一个快速的解决方案,我使用了读/写锁。但是,既然"写"了;每当有另一个正在进行的"写"时,就表示它不够好。

我考虑过滚动我自己的读/写锁。也许我错过了什么图书馆?还有其他建议吗?

import static java.util.concurrent.Executors.*;
import java.util.concurrent.*;
import lombok.*;
import lombok.extern.slf4j.*;
/**
* Based on
* http://tutorials.jenkov.com/java-concurrency/read-write-locks.html#simple
* 
* <p>
* Allows for multiple writers.
* </p>
*/
@Slf4j
public class ReadMultiWriteLock {
private static final ReadMultiWriteLock lock    = new ReadMultiWriteLock();
private int                             readers = 0;
private int                             writers = 0;
/**
* Guards specified critical section for reading purposes.
*
* @param criticalSection
*            the critical section
* @throws Throwable
*             if this thread was interrupted or any other exception thrown
*             from critical section
* @return value returned by critical section
*/
@SneakyThrows
public static <T> T readLocking(final Callable<T> criticalSection) {
lock.readingAquire();
try {
return criticalSection.call();
} finally {
lock.readingRelease();
}
}
/**
* Guards specified critical section for reading purposes.
*
* @param criticalSection
*            the critical section
* @throws Throwable
*             if this thread was interrupted or any other exception thrown
*             from critical section
* @return always {@code null}
*/
public static void readLocking(final Runnable criticalSection) {
readLocking(callable(criticalSection));
}
/**
* Guards specified critical section for writing purposes.
*
* @param criticalSection
*            the critical section
* @throws Throwable
*             if this thread was interrupted or any other exception thrown
*             from critical section
* @return value returned by critical section
*/
@SneakyThrows
public static <T> T writeLocking(final Callable<T> criticalSection) {
lock.writingAcquire();
try {
return criticalSection.call();
} finally {
lock.writingRelease();
}
}
/**
* Guards specified critical section for writing purposes.
*
* @param criticalSection
*            the critical section
* @throws Throwable
*             if this thread was interrupted or any other exception thrown
*             from critical section
* @return always {@code null}
*/
public static void writeLocking(final Runnable criticalSection) {
writeLocking(callable(criticalSection));
}
/**
* Waits for writers to finish and accounts for another reader lock.
* 
* @throws InterruptedException
*             if this thread was interrupted
*/
public synchronized void readingAquire() throws InterruptedException {
while (writers > 0) {
log.trace("blocking read -- {} writers running", writers);
wait();
}
readers++;
log.trace("aquired {} reading locks", readers);
}
/**
* Accounts for one less reader lock.
*/
public synchronized void readingRelease() {
readers--;
notifyAll();
}
/**
* Waits for readers to finish and accounts for another writer lock.
* 
* @throws InterruptedException
*             if this thread was interrupted
*/
public synchronized void writingAcquire() throws InterruptedException {
while (readers > 0) {
log.trace("blocking write -- {} readers running", readers);
wait();
}
writers++;
log.trace("aquired {} writing locks", writers);
}
/**
* Accounts for one less writer lock.
*/
public synchronized void writingRelease() throws InterruptedException {
writers--;
notifyAll();
}
}

和测试:

import java.util.concurrent.*;
import org.testng.annotations.*;
import lombok.*;
import lombok.extern.slf4j.*;
@Slf4j
public class ReadMultiWriteLockTest {
private static final class State {
private enum Status {
STARTED, ENDED
}
private static final int MAX_DELAY_MS = 10;
private Status           readStatus, writeStatus;
static int randomDelay(final int maxMs) {
return ThreadLocalRandom.current().nextInt(0, maxMs);
}
static boolean randomReadWrite() {
return ThreadLocalRandom.current().nextBoolean();
}
int read(final int id) {
if (Status.STARTED == writeStatus)
throw new IllegalStateException("other thread is writing");
readStatus = Status.STARTED;
log.trace(">>> start reading {}", id);
sleep(randomDelay(MAX_DELAY_MS));
log.trace("<<< end reading {}", id);
readStatus = Status.ENDED;
return id;
}
int write(final int id) {
if (Status.STARTED == readStatus)
throw new IllegalStateException("other thread is reading");
writeStatus = Status.STARTED;
log.trace(">>> start writing {}", id);
sleep(randomDelay(MAX_DELAY_MS));
log.trace("<<< end writing {}", id);
writeStatus = Status.ENDED;
return id;
}
}
private static final ParallelLoop PARALLEL_LOOP = ParallelLoop.INSTANCE
.withParallelism(100)
// NOTE: when running with trace may take long time
.withRepetitions(200_000);
@Test
public void shouldExclusivlyLockForReadsAndWrites() {
val sharedState = new State();
PARALLEL_LOOP
.run(id -> State.randomReadWrite()
? writeLocking(() -> sharedState.write(id))
: readLocking(() -> sharedState.read(id)));
}
@Test(expectedExceptions = IllegalStateException.class)
public void shouldFailIfNeitherLocked() {
val sharedState = new State();
PARALLEL_LOOP
.run(id -> State.randomReadWrite()
? sharedState.write(id)
: sharedState.read(id));
}
@Test(expectedExceptions = IllegalStateException.class)
public void shouldFailIfNotReadLocked() {
val sharedState = new State();
PARALLEL_LOOP
.run(id -> State.randomReadWrite()
? writeLocking(() -> sharedState.write(id))
: sharedState.read(id));
}
@Test(expectedExceptions = IllegalStateException.class)
public void shouldFailIfNotWriteLocked() {
val sharedState = new State();
PARALLEL_LOOP
.run(id -> State.randomReadWrite()
? sharedState.write(id)
: readLocking(() -> sharedState.read(id)));
}
@Test(expectedExceptions = RuntimeException.class)
public void shouldExecuteReadingCriticalSectionWithoutValue() {
readLocking((Runnable) () -> {
throw new RuntimeException("reading critical section executed");
});
}
@Test(expectedExceptions = RuntimeException.class)
public void shouldExecuteWritingCriticalSectionWithoutValue() {
writeLocking((Runnable) () -> {
throw new RuntimeException("writing critical section executed");
});
}
}

和实用程序并行循环:

import java.util.function.*;
import java.util.stream.*;
import lombok.*;
import lombok.extern.slf4j.*;
/**
* Parallel looping with specified threads, repetitions and block of code.
*/
@Slf4j
@AllArgsConstructor
public final class ParallelLoop {
/**
* The default parallel loop; chain with {@link #parallelism} and
* {@link #repetitions} to configure.
*/
public static final ParallelLoop INSTANCE = new ParallelLoop();
@With
private final int                parallelism;
@With
private final int                repetitions;
/**
* Constructs a default parallel loop with one thread and one repetition.
*/
public ParallelLoop() {
parallelism = 1;
repetitions = 1;
}
/**
* Runs specified function in configured loop.
* 
* @param function
*            the function to run; is called with the run identifier and
*            expected to return it
*/
public void run(final Function<Integer, Integer> function) {
System.setProperty(
"java.util.concurrent.ForkJoinPool.common.parallelism",
String.valueOf(parallelism));
IntStream.range(0, repetitions)
.parallel()
.forEach(id -> log.trace("run id {}", function.apply(id)));
}
/**
* Runs specified consumer in configure loop.
* 
* @param consumer
*            the consumer to run; is called with the run identifier.
*/
public void run(final IntConsumer consumer) {
run(id -> {
consumer.accept(id);
return id;
});
}
}

最新更新