Go-如果为空,则等待优先级队列中的下一个项目



我正在尝试实现一个优先级队列,以便根据优先级通过网络套接字发送json对象。我正在使用container/heap包来实现队列。我想出了这样的东西:

for {
    if pq.Len() > 0 {
        item := heap.Pop(&pq).(*Item)
        jsonEncoder.Encode(&item)
    } else {
        time.Sleep(10 * time.Millisecond)
    }
}

有没有比轮询优先级队列更好的方法来等待新项目?

我可能会使用几个排队goroutine。从PriorityQueue示例中的数据结构开始,我将构建这样一个函数:

http://play.golang.org/p/hcNFX8ehBW

func queue(in <-chan *Item, out chan<- *Item) {
    // Make us a queue!
    pq := make(PriorityQueue, 0)
    heap.Init(&pq)
    var currentItem *Item       // Our item "in hand"
    var currentIn = in          // Current input channel (may be nil sometimes)
    var currentOut chan<- *Item // Current output channel (starts nil until we have something)
    defer close(out)
    for {
        select {
        // Read from the input
        case item, ok := <-currentIn:
            if !ok {
                // The input has been closed. Don't keep trying to read it
                currentIn = nil
                // If there's nothing pending to write, we're done
                if currentItem == nil {
                    return
                }
                continue
            }
            // Were we holding something to write? Put it back.
            if currentItem != nil {
                heap.Push(&pq, currentItem)
            }
            // Put our new thing on the queue
            heap.Push(&pq, item)
            // Turn on the output queue if it's not turned on
            currentOut = out
            // Grab our best item. We know there's at least one. We just put it there.
            currentItem = heap.Pop(&pq).(*Item)
            // Write to the output
        case currentOut <- currentItem:
            // OK, we wrote. Is there anything else?
            if len(pq) > 0 {
                // Hold onto it for next time
                currentItem = heap.Pop(&pq).(*Item)
            } else {
                // Oh well, nothing to write. Is the input stream done?
                if currentIn == nil {
                    // Then we're done
                    return
                }
                // Otherwise, turn off the output stream for now.
                currentItem = nil
                currentOut = nil
            }
        }
    }
}

下面是一个使用它的例子:

func main() {
    // Some items and their priorities.
    items := map[string]int{
        "banana": 3, "apple": 2, "pear": 4,
    }
    in := make(chan *Item, 10) // Big input buffer and unbuffered output should give best sort ordering.
    out := make(chan *Item)    // But the system will "work" for any particular values
    // Start the queuing engine!
    go queue(in, out)
    // Stick some stuff on in another goroutine
    go func() {
        i := 0
        for value, priority := range items {
            in <- &Item{
                value:    value,
                priority: priority,
                index:    i,
            }
            i++
        }
        close(in)
    }()
    // Read the results
    for item := range out {
        fmt.Printf("%.2d:%s ", item.priority, item.value)
    }
    fmt.Println()
}

请注意,如果运行此示例,则每次的顺序都会有所不同。这当然是意料之中的事。这取决于输入和输出通道的运行速度。

一种方法是使用sync.Cond:

Cond实现了一个条件变量,即等待或宣布事件发生的goroutine的集合点。

包装中的一个例子可以修改如下(针对消费者):

c.L.Lock()
for heap.Len() == 0 {
    c.Wait() // Will wait until signalled by pushing routine
}
item := heap.Pop(&pq).(*Item)
c.L.Unlock()
// Do stuff with the item

生产商可以简单地做:

c.L.Lock()
heap.Push(x)
c.L.Unlock()
c.Signal()

(将这些封装在函数中并使用defer可能是个好主意。)

下面是一个线程安全(幼稚)堆的例子,pop方法会等待直到项目可用:

package main
import (
    "fmt"
    "sort"
    "sync"
    "time"
    "math/rand"
)
type Heap struct {
    b []int
    c *sync.Cond
}
func NewHeap() *Heap {
    return &Heap{c: sync.NewCond(new(sync.Mutex))}
}
// Pop (waits until anything available)
func (h *Heap) Pop() int {
    h.c.L.Lock()
    defer h.c.L.Unlock()
    for len(h.b) == 0 {
        h.c.Wait()
    }
    // There is definitely something in there
    x := h.b[len(h.b)-1]
    h.b = h.b[:len(h.b)-1]
    return x
}
func (h *Heap) Push(x int) {
    defer h.c.Signal() // will wake up a popper
    h.c.L.Lock()
    defer h.c.L.Unlock()
    // Add and sort to maintain priority (not really how the heap works)
    h.b = append(h.b, x)
    sort.Ints(h.b)
}
func main() {
    heap := NewHeap()
    go func() {
        for range time.Tick(time.Second) {
            for n := 0; n < 3; n++ {
                x := rand.Intn(100)
                fmt.Println("push:", x)
                heap.Push(x)
            }
        }
    }()
    for {
        item := heap.Pop()
        fmt.Println("pop: ", item)
    }
}

(请注意,由于for range time.Tick循环,这在操场上不起作用。请在本地运行。)

相关内容

最新更新