// Package pq implements a concurrent priority queue. // // [Q] is similar to a buffered channel, except that senders attach to each // item a priority, and receivers always get the highest-priority item. // // For example: // // import "gogs.humancabbage.net/sam/priorityq/pq" // q := pq.Make[int, string](8) // q.Send(1, "world") // q.Send(2, "hello") // _, word1, _ := pq.Recv() // _, word2, _ := pq.Recv() // fmt.Println(word1, word2) // q.Close() // // Output: hello world // // # Implementation // // Each queue has a [binary max-heap]. Sending and receiving items require // heap-up and heap-down operations, respectively. // // [binary max-heap]: https://en.wikipedia.org/wiki/Binary_heap package pq import ( "iter" "sync" "gogs.humancabbage.net/sam/priorityq" "gogs.humancabbage.net/sam/priorityq/binheap" "golang.org/x/exp/constraints" ) // Q is a generic, concurrent priority queue. type Q[P constraints.Ordered, T any] struct { *state[P, T] } // Make a new queue. func Make[P constraints.Ordered, T any](cap int) Q[P, T] { heap := binheap.Make[P, T](cap) s := &state[P, T]{ heap: heap, } s.canRecv = sync.Cond{L: &s.mu} s.canSend = sync.Cond{L: &s.mu} return Q[P, T]{s} } type state[P constraints.Ordered, T any] struct { mu sync.Mutex heap binheap.H[P, T] canSend sync.Cond canRecv sync.Cond closed bool } // Close marks the queue as closed. // // Attempting to close an already-closed queue results in a panic. func (s *state[P, T]) Close() { s.mu.Lock() if s.closed { s.mu.Unlock() panic("close of closed queue") } s.closed = true s.mu.Unlock() s.canRecv.Broadcast() } // Recv gets an item, blocking when empty until one is available. // // This returns both the item itself and the its assigned priority. // // The returned bool will be true if the queue still has items or is open. // It will be false if the queue is empty and closed. func (s *state[P, T]) Recv() (P, T, bool) { s.mu.Lock() defer s.mu.Unlock() for { for !s.closed && !s.heap.CanExtract() { s.canRecv.Wait() } if s.closed && !s.heap.CanExtract() { var emptyP P var emptyT T return emptyP, emptyT, false } if s.heap.CanExtract() { priority, value := s.heap.Extract() s.canSend.Broadcast() return priority, value, true } } } // Send adds an item with some priority, blocking if full. func (s *state[P, T]) Send(priority P, value T) { s.mu.Lock() defer s.mu.Unlock() for { for !s.closed && !s.heap.CanInsert() { s.canSend.Wait() } if s.closed { panic("send on closed queue") } if s.heap.CanInsert() { s.heap.Insert(priority, value) s.canRecv.Broadcast() return } } } // TryRecv attempts to get an item without blocking. // // This returns both the item itself and the its assigned priority. // // The error indicates whether the attempt succeeded, the queue is empty, or // the queue is closed. func (s *state[P, T]) TryRecv() (priority P, value T, err error) { s.mu.Lock() defer s.mu.Unlock() if s.heap.CanExtract() { priority, value = s.heap.Extract() s.canSend.Broadcast() return } if s.closed { err = priorityq.ErrClosed } else { err = priorityq.ErrEmpty } return } // TrySend attempts to add an item with some priority, without blocking. // // This method does not block. If there is space in the buffer, it returns // true. If the buffer is full, it returns false. func (s *state[P, T]) TrySend(priority P, value T) error { s.mu.Lock() defer s.mu.Unlock() if s.closed { return priorityq.ErrClosed } if !s.heap.CanInsert() { return priorityq.ErrFull } s.heap.Insert(priority, value) s.canRecv.Broadcast() return nil } // Iter returns an iterator that consumes values until the queue is closed. func (s *state[P, T]) Iter() iter.Seq[T] { return func(yield func(T) bool) { for { _, t, ok := s.Recv() if !ok { return } if !yield(t) { return } } } }