C#监视器/信号机并发为缓冲区生成使用者



我正在解决一个典型的生产者-消费者问题。我有多个生产者和一个消费者。有n生产者线程,每个线程调用SetOne(OrderObject order),消费者调用GetOne()uffer类用作包含<n个单元格,是指Buffer类可用时消耗的每个单元格。出于某种原因,下面的设置有时可以工作,但并不总是占用所有单元格。我已经包含了所有涉及客户端、服务器和缓冲区的类。此外,我可以展示一个运行这个原型的简单原型。仅供参考-使用的方法是首先将信号量初始化为与所用缓冲区大小相同的大小,然后在缓冲区打开后,找到一个打开的单元格,然后对该单元格执行操作。

public class Buffer
{
    public static OrderObject[] BufferCells;
    public static Semaphore _pool { get; set; }
    public static void SetOne(OrderObject order)
    {
        _pool.WaitOne();
        try
        {
            Monitor.Enter(BufferCells);
            for (int i = 0; i < BufferCells.Length - 1; i++)
            {
                BufferCells[i] = order;
                Console.WriteLine(String.Format("Client {0} Produced {1}", BufferCells[i].Id, BufferCells[i].Id));
            }
        }
        finally
        {
            Monitor.Exit(BufferCells);
            _pool.Release();
        } 
    }
    public static OrderObject GetOne()
    {
        _pool.WaitOne();
        OrderObject value = null;
        try
        {
            Monitor.Enter(BufferCells);
            for (int i = 0; i < BufferCells.Length - 1; i++)
            {
                if (BufferCells[i].Id != "-1")
                {
                    value = BufferCells[i];
                    BufferCells[i] = new OrderObject() { Id = "-1" }; /*Clear Cell*/
                    Console.WriteLine(String.Format("        Server Consumed {0}", value.Id));
                    break;
                }
            }
        }
        finally
        {
            Monitor.Exit(BufferCells);
            _pool.Release();
        }
        return value;
    }
 }

public class Client
{
    public int Id {get;set;}
    public void Run()
    {
         /*Produce*/
         Buffer.SetOne(Id);
    }
}
public class Server
{
    public void Run()
    {
        while(true)
        {
             Buffer.GetOne();
        }
    }
}
public class Program
{
    public static void Main(string[] args)
    {
        /*Initialize 2 Open Semaphores*/
        int numCells = 2;
        Semaphore pool = new Semaphore(numCells, numCells);
        /*Initialize BufferCells with Empty OrderObjects*/
        List<OrderObject> OrderObjects = new List<OrderObject>();
        for (var i = 0; i < numCells; i++)
        {
            OrderObjects.Add(new OrderObject() { Id = "-1" });
        }
        Buffer.BufferCells = OrderObjects.ToArray();
        /*Initialize Consumer Thread*/
        Server server = new Server(pool);
        Thread serverThread = new Thread(new ThreadStart(server.Run));

        /*Initialize Producer Objects*/
        List<Client> clients = new List<Client>();
        for (int i = 0; i <= 20; i++)
        {
            /*Create 5000 Clients*/
            Client client = new Client(i.ToString(), pool, new OrderObject() { Id = i.ToString() });
            clients.Add(client);
        }
        /*Start Each Client Thread*/
        List<Thread> clientThreads = new List<Thread>();
        foreach (var client in clients)
        {
            Thread t = new Thread(new ThreadStart(client.Run));
            clientThreads.Add(t);
        }
        /*Start Server Thread*/
        serverThread.Start();
        /*Start Each Producer Thread*/
        clientThreads.ForEach(p => p.Start());
        /*Start Consumer Thread*/
        Console.ReadLine();
    }
}

我猜我遇到了以下问题之一:死锁、Livelock或饥饿。由于某些原因,Server无法使用生成并添加到单元格缓冲区的所有订单对象。不确定修复是什么。

好的,让我们分解一下这个代码试图做的事情。

public class Buffer
{
    public static OrderObject[] BufferCells;
    public static Semaphore _pool { get; set; }

//设置一个

    public static void SetOne(OrderObject order)
    {

//为什么你这里需要一个信号灯?

        _pool.WaitOne();
        try
        {

//Semaphore是多余的,因为Monitor。Enter是一个限制性更强的锁。

            Monitor.Enter(BufferCells);

//嗯?我以为这应该是SetOne?不是SetEverything?我只能假设你的意图是设置其中一个单元格,而让其他单元格可以设置或获取。如果您正试图实现这一点,那么队列似乎是一种更合适的数据结构。更好的是BlockingCollection也启用了锁定/阻塞机制。

            for (int i = 0; i < BufferCells.Length - 1; i++)
            {
                BufferCells[i] = order;
                Console.WriteLine(String.Format("Client {0} Produced {1}", BufferCells[i].Id, BufferCells[i].Id));
            }
        }
        finally
        {
            Monitor.Exit(BufferCells);
            _pool.Release();
        } 
    }
    public static OrderObject GetOne()
    {

//同样,这个信号量在这里似乎没有太大帮助

        _pool.WaitOne();
        OrderObject value = null;
        try
        {

//因为监视器是一个限制性更强的锁定

            Monitor.Enter(BufferCells);
            for (int i = 0; i < BufferCells.Length - 1; i++)
            {
                if (BufferCells[i].Id != "-1")
                {
                    value = BufferCells[i];
                    BufferCells[i] = new OrderObject() { Id = "-1" }; /*Clear Cell*/
                    Console.WriteLine(String.Format("        Server Consumed {0}", value.Id));
                    break;
                }
            }
        }
        finally
        {
            Monitor.Exit(BufferCells);
            _pool.Release();
        }
        return value;
    }
 }

综上所述:

  • 此代码使用冗余锁定,这是不必要的
  • 这段代码将缓冲区中的所有单元格一次性设置为独占锁,这似乎首先破坏了缓冲区的目的
  • 这段代码试图实现的目标似乎已经在BlockingCollection中实现了。无需重新发明轮子!:)

祝你好运!

我假设监视器是为了保护对数组的访问,信号量应该进行计数,是吗?

如果是这样,则不应在getOne()的finally部分调用"_pool.Release()",也不应在setOne()顶部调用"_pool.WWaitOne()"。生产者的工作是在信号量推送对象后发出信号,消费者的工作是等待信号量再弹出对象。

Aarrgghh!

说:"使用的方法是首先将信号量初始化为与所用缓冲区大小相同的大小。"

如果您想要一个无边界队列,请将信号量初始化为0。

如果你想要一个有界队列,正如上面的文本所暗示的那样,你需要两个信号量(以及监视器),一个是"I",用于计算队列中的项目,一个"S",用于统计队列中剩余的空间。将I初始化为0,将S初始化为队列大小。在生产者中,等待S,锁定,按下,解锁,信号I。在消费者中,等待I,锁定,弹出,解锁,S。

最新更新