我正在编写一个读写同步类,并想了解下一步要做的事情。由于某种原因,有时允许Read
发生在Write
的中间,我找不到原因。
这是我想要的。
- 读取不允许与写作同时读取。
- 倍数可以同时发生。
- 一次只能发生一个写作。
- 当需要写作时,所有已经执行的读取仍在继续,不允许新的读取,当所有读取完成写作时。
我知道.NET框架有一个可以做到这一点的课程...但是我想要理解和复制类似的东西。我不是在重新发明轮子,我试图通过制作自己的车轮来理解它...碰巧我的车轮有点平方。
我目前拥有的是:
public class ReadWriteSync
{
private ManualResetEvent read = new ManualResetEvent(true);
private volatile int readingBlocks = 0;
private AutoResetEvent write = new AutoResetEvent(true);
private object locker = new object();
public IDisposable ReadLock()
{
lock (this.locker)
{
this.write.Reset();
Interlocked.Increment(ref this.readingBlocks);
this.read.WaitOne();
}
return new Disposer(() =>
{
if (Interlocked.Decrement(ref this.readingBlocks) == 0)
this.write.Set();
});
}
public IDisposable WriteLock()
{
lock (this.locker)
{
this.read.Reset();
this.write.WaitOne();
}
return new Disposer(() =>
{
this.read.Set();
if (this.readingBlocks == 0)
this.write.Set();
});
}
class Disposer : IDisposable
{
Action disposer;
public Disposer(Action disposer) { this.disposer = disposer; }
public void Dispose() { this.disposer(); }
}
}
这是我的测试程序...当出现问题时,它会以红色打印线。
class Program
{
static ReadWriteSync sync = new ReadWriteSync();
static void Main(string[] args)
{
Console.BackgroundColor = ConsoleColor.DarkGray;
Console.ForegroundColor = ConsoleColor.Gray;
Console.Clear();
Task readTask1 = new Task(() => DoReads("A", 20));
Task readTask2 = new Task(() => DoReads("B", 30));
Task readTask3 = new Task(() => DoReads("C", 40));
Task readTask4 = new Task(() => DoReads("D", 50));
Task writeTask1 = new Task(() => DoWrites("E", 500));
Task writeTask2 = new Task(() => DoWrites("F", 200));
readTask1.Start();
readTask2.Start();
readTask3.Start();
readTask4.Start();
writeTask1.Start();
writeTask2.Start();
Task.WaitAll(
readTask1, readTask2, readTask3, readTask4,
writeTask1, writeTask2);
}
static volatile bool reading;
static volatile bool writing;
static void DoWrites(string name, int interval)
{
for (int i = 1; i < int.MaxValue; i += 2)
{
using (sync.WriteLock())
{
Console.ForegroundColor = (writing || reading) ? ConsoleColor.Red : ConsoleColor.Gray;
writing = true;
Console.WriteLine("WRITE {1}-{0} BEGIN", i, name);
Thread.Sleep(interval);
Console.WriteLine("WRITE {1}-{0} END", i, name);
writing = false;
}
Thread.Sleep(interval);
}
}
static void DoReads(string name, int interval)
{
for (int i = 0; i < int.MaxValue; i += 2)
{
using (sync.ReadLock())
{
Console.ForegroundColor = (writing) ? ConsoleColor.Red : ConsoleColor.Gray;
reading = true;
Console.WriteLine("READ {1}-{0} BEGIN", i, name);
Thread.Sleep(interval * 3);
Console.WriteLine("READ {1}-{0} END", i, name);
reading = false;
}
Thread.Sleep(interval);
}
}
}
所有这些问题是什么...关于如何正确执行的建议?
我看到的主要问题是,您正在尝试使重置事件涵盖读/写入的含义和当前状态的处理,而不会以一致的方式同步。
这是一个示例,说明不一致的同步可能会在特定的代码中咬您。
-
write
正在处置,read
进入。 -
read
获取锁 -
write
设置read
MANUALRESETEVENT(MRE) -
write
检查当前读数计数,查找0 -
read
重置write
Autoresetevent(是) -
read
递增读取计数 -
read
发现其MRE已设置并开始阅读
到目前为止一切都很好,但是write
尚未完成...
- 第二个
write
进来并获取锁 - 第二个
write
重置read
MRE -
通过设置
write
是
,第一个 - 第二个
write
发现其已设置并开始写
write
完成在考虑多个线程时,除非您在某种锁定之内,否则您必须认为所有其他数据都非常波动,无法信任。
天真的实现可能会将排队逻辑与状态逻辑分开并适当同步。
public class ReadWrite
{
private static int readerCount = 0;
private static int writerCount = 0;
private int pendingReaderCount = 0;
private int pendingWriterCount = 0;
private readonly object decision = new object();
private class WakeLock:IDisposable
{
private readonly object wakeLock;
public WakeLock(object wakeLock) { this.wakeLock = wakeLock; }
public virtual void Dispose() { lock(this.wakeLock) Monitor.PulseAll(this.wakeLock); }
}
private class ReadLock:WakeLock
{
public ReadLock(object wakeLock) : base(wakeLock) { Interlocked.Increment(ref readerCount); }
public override void Dispose()
{
Interlocked.Decrement(ref readerCount);
base.Dispose();
}
}
private class WriteLock:WakeLock
{
public WriteLock(object wakeLock) : base(wakeLock) { Interlocked.Increment(ref writerCount); }
public override void Dispose()
{
Interlocked.Decrement(ref writerCount);
base.Dispose();
}
}
public IDisposable TakeReadLock()
{
lock(decision)
{
pendingReaderCount++;
while (pendingWriterCount > 0 || Thread.VolatileRead(ref writerCount) > 0)
Monitor.Wait(decision);
pendingReaderCount--;
return new ReadLock(this.decision);
}
}
public IDisposable TakeWriteLock()
{
lock(decision)
{
pendingWriterCount++;
while (Thread.VolatileRead(ref readerCount) > 0 || Thread.VolatileRead(ref writerCount) > 0)
Monitor.Wait(decision);
pendingWriterCount--;
return new WriteLock(this.decision);
}
}
}