从套接字出现数据,Dequeue将返回其他线程中的null



我已经尝试过调试此问题,但是我到了一个我不知道为什么会发生这种情况的地步(我也是一个线程新手)。大约有2/3的排水数据以零为空,而其余的数据则正确。任何洞察力都将不胜感激。

using UnityEngine;
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace UDPNetwork
{
    public class NetworkManager : MonoBehaviour
    {
        struct DataPacket
        {
            public IPEndPoint destination;
            public byte[] data;
            public int size;
            public DataPacket(IPEndPoint destination, byte[] data)
            {
                this.destination = destination;
                this.data = data;
                size = 0;
            }
        }
        [SerializeField]
        string SERVER_IP = "127.0.0.1";
        [SerializeField]
        ushort SERVER_PORT = 55566;
        AsyncPriorityQueue<DataPacket> queuedReceiveData = new AsyncPriorityQueue<DataPacket>(2000, false);
        Socket sck;
        IPEndPoint ipEndPoint;
        bool listening = true;
        bool processing = true;
        void Start()
        {
            sck = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
#if SERVER
            ipEndPoint = new IPEndPoint(IPAddress.Any, SERVER_PORT);
            sck.Bind(ipEndPoint);
#endif
            new Thread(() => ListenForData()).Start();
            new Thread(() => ProcessData()).Start();
        }
        void OnDestroy()
        {
            listening = false;
            processing = false;
            sck.Close();
        }
        void ListenForData()
        {
            EndPoint endPoint = ipEndPoint;
            while (listening)
            {
                byte[] buffer = new byte[512];
                try
                {
                    int rec = sck.ReceiveFrom(buffer, 0, buffer.Length, SocketFlags.None, ref endPoint);
                    Array.Resize(ref buffer, rec);
                    queuedReceiveData.Enqueue(new DataPacket((IPEndPoint)endPoint, buffer) { size = rec }, 0);
                }
                catch (Exception e)
                {
                    Debug.LogError(e.Message);
                }
            }
        }
        void ProcessData()
        {
            DataPacket rcv;
            byte[] data;
            IPEndPoint ep;
            while (processing)
            {
                rcv = queuedReceiveData.Dequeue(); // blocks until queue has >1 item
                data = rcv.data;
                ep = rcv.destination;
                if (data == null)
                {
                    Debug.LogError(data); // null
                    Debug.LogError(rcv.size); // 0
                    Debug.LogError(ep); // null
                    Debug.LogError(rcv);
                    continue;
                }
                //process...
            }
        }
    }
}

队列:

using System;
/// <summary>
/// Priority queue removes added items highest priority items first, ties broken by First-In-First-Out.
/// </summary>
/// <typeparam name="T"></typeparam>
public class PriorityQueue<T>
{
    struct Node
    {
        public T item;
        public int priority;
        public CircularInt32 insertionIndex;
    }
    Node[] items;
    bool _resizeable;
    CircularInt32 _numItemsEverEnqueued = 0;
    /// <summary>
    /// How many items are currently in the queue
    /// </summary>
    public int Count
    {
        get;
        private set;
    }
    /// <summary>
    /// How many items the queue can hold. 0 == infinite.
    /// </summary>
    public int Capacity
    {
        get
        {
            return _resizeable ? 0 : items.Length;
        }
    }
    /// <summary>
    /// Create a new resizeable priority queue with default capacity (8)
    /// </summary>
    public PriorityQueue() : this(8) { }
    /// <summary>
    /// Create a new priority queue
    /// </summary>
    /// <param name="capacity"></param>
    /// <param name="resizeable"></param>
    public PriorityQueue(int capacity, bool resizeable = true)
    {
        if (capacity < 2)
        {
            throw new ArgumentException("New queue size cannot be smaller than 2", "capacity");
        }
        items = new Node[capacity];
        Count = 0;
        _resizeable = resizeable;
    }
    /// <summary>
    /// Add an object to the queue. If queue is full and resizeable is true, increases the capacity. If queue is full and resizeable is false, does nothing, returns false.
    /// </summary>
    /// <param name="item">object to add to queue</param>
    /// <param name="priority">object's priority, lower # = higher priority, ties are broken by FIFO</param>
    /// <returns>true if added successfully, false otherwise (queue is full)</returns>
    public bool Enqueue(T item, int priority)
    {
        if (Count == items.Length)
        {
            if (_resizeable)
            {
                Array.Resize(ref items, Capacity * 3 / 2 + 1);
            }
            else
            {
                return false;
            }
        }
        items[Count] = new Node() { item = item, priority = priority, insertionIndex = _numItemsEverEnqueued++ };
        percolateUp(Count);
        Count++;
        return true;
    }
    void percolateUp(int index)
    {
        while (true)
        {
            if (index == 0)
            {
                break;
            }
            int parent = (index % 2 == 0) ? index / 2 - 1 : index / 2;
            if (HasHigherPriority(items[parent], items[index]))
            {
                var temp = items[index];
                items[index] = items[parent];
                items[parent] = temp;
                index = parent;
            }
            else
            {
                break;
            }
        }
    }
    /// <summary>
    /// Removes and returns the highest priority object in the queue. Ties are broken by FIFO.
    /// Returns an object's default value if the queue is empty.
    /// </summary>
    /// <returns></returns>
    public T Dequeue()
    {
        if (Count == 0)
        {
            return default(T);
        }
        var item = items[0].item;
        items[0] = new Node();
        percolateDown(0);
        Count--;
        return item;
    }
    void percolateDown(int index)
    {
        while (true)
        {
            int left = index * 2 + 1;
            if (left + 1 < Count && HasHigherPriority(items[left + 1], items[left]))
            {
                var temp = items[index];
                items[index] = items[left + 1];
                items[left + 1] = temp;
                index = left + 1;
            }
            else if (left < Count)
            {
                var temp = items[index];
                items[index] = items[left];
                items[left] = temp;
                index = left;
            }
            else
            {
                break;
            }
        }
    }
    bool HasHigherPriority(Node higher, Node lower)
    {
        return (higher.priority < lower.priority || (higher.priority == lower.priority && higher.insertionIndex < lower.insertionIndex));
    }
}

async:

using System.Threading;
/// <summary>
/// A thread-safe priority queue.
/// </summary>
/// <typeparam name="T"></typeparam>
public class AsyncPriorityQueue<T>
{
    PriorityQueue<T> pq;
    /// <summary>
    /// How many items are currently in the queue
    /// </summary>
    public int Count
    {
        get { return pq.Count; }
    }
    /// <summary>
    /// How many items the queue can hold. 0 == infinite.
    /// </summary>
    public int Capacity
    {
        get { return pq.Capacity; }
    }
    /// <summary>
    /// Create a new resizeable async priority queue with default capacity (8)
    /// </summary>
    public AsyncPriorityQueue()
    {
        pq = new PriorityQueue<T>();
    }
    /// <summary>
    /// Create a new priority queue
    /// </summary>
    /// <param name="capacity"></param>
    /// <param name="resizeable"></param>
    public AsyncPriorityQueue(int capacity, bool resizeable = true)
    {
        pq = new PriorityQueue<T>(capacity, resizeable);
    }
    /// <summary>
    /// Add an object to the queue. If queue is full and resizeable is true, increases the capacity. If queue is full and resizeable is false, does nothing, returns false.
    /// </summary>
    /// <param name="item">object to add to queue</param>
    /// <param name="priority">object's priority, lower # = higher priority, ties are broken by FIFO</param>
    /// <returns>true if added successfully, false otherwise (queue is full)</returns>
    public bool Enqueue(T item, int priority)
    {
        lock (pq)
        {
            bool added = pq.Enqueue(item, priority);
            if (pq.Count == 1)
            {
                Monitor.Pulse(pq);
            }
            return added;
        }
    }
    /// <summary>
    /// Removes and returns the highest priority object in the queue. Ties are broken by FIFO.
    /// WARNING: if the queue is empty when this is called, the thread WILL BLOCK until a new item is added to the queue in another thread. If this behaviour is not wanted, be sure to check Count > 0.
    /// </summary>
    /// <returns></returns>
    public T Dequeue()
    {
        lock (pq)
        {
            while (pq.Count == 0)
            {
                Monitor.Wait(pq);
            }
            return pq.Dequeue();
        }
    }
}

首先,目前尚不清楚为什么在所有消息都以优先级排队时使用优先级队列。但是,我假设您的目标是最终更改优先级一些消息。无论如何,由于您以优先级为0的所有内容,因此您在优先级队列实现中揭示了一个关键的错误。

我怀疑,如果您以优先级为1,您将永远不会看到此错误。但是你不应该那样做。

问题是,当您脱离商品时,您正在通过渗透一个空节点,重新调整了堆的优先级为0。更重要的是,它的insertionIndex永远不会设置,所以它是0将新的空节点放在 之前已经在队列中的好节点,然后将您添加到队列中的新节点将在该空节点之后添加。而且由于队列中的所有内容都处于优先级0,所以新的空节点在根部左右。

您需要更改当您脱离商品时重新调整堆的方式。您不应该在顶部输入一个空节点并将其渗透,而是将最后一个节点插入堆中,将其插入根部,然后将其渗入。但是您必须更改percolateDown方法。

这是我建议的:

public T Dequeue()
{
    if (Count == 0)
    {
        return default(T);
    }
    var item = items[0].item;
    items[0] = items[Count-1];
    items[Count-1] = null;
    Count--;
    percolateDown(0);
    return item;
}
void percolateDown(int index)
{
    while (true)
    {
        // The rules for adjusting on percolate down are to swap the
        // node with the highest-priority child. So first we have to
        // find the highest-priority child.
        int hpChild = index*2+1;
        if (hpChild >= Count)
        {
            break;
        }
        if (hpChild+1 < Count && HasHigherPriority(items[hpChild+1], items[hpChild]))
        {
            ++hpChild;
        }
        if (HasHigherPriority(items[hpChild, items[index]))
        {
            var temp = items[index];
            items[index] = items[hpChild];
            items[hpChild] = temp;
        }
        else
        {
            break;
        }
        index = hpChild;
    }
}

有关正确实施二进制堆的更多详细信息,请参见http://blog.mischel.com/2013/09/29/a-better-way-way-to-to-do-do-it-it-it-the-heap/和条目接下来。

其他几个注释:

而不是自己调整数组的大小,而应该将items数组变成List<Node>。它将处理所有调整大小的调整。

在您的percolateUp中,您有:

int parent = (index % 2 == 0) ? index / 2 - 1 : index / 2;

您可以将其简化为:

int parent = (index + 1)/2;

相关内容

  • 没有找到相关文章

最新更新