Move priority queue to pq package; improve docs.

This commit is contained in:
Sam Fredrickson 2023-03-02 01:53:12 -08:00
parent 5e23a92314
commit b3b491d9a9
8 changed files with 255 additions and 155 deletions

View File

@ -31,10 +31,10 @@ queue that guarantees receipt of a high-priority items before low-priority
ones. This is primarily a fun exercise, I cannot recommend that anyone ones. This is primarily a fun exercise, I cannot recommend that anyone
actually use this in a real project. actually use this in a real project.
Additionally, the root `priorityq` package implements a concurrent priority Additionally, the `pq` package implements a concurrent priority queue, using a
queue, using a binary max-heap. This is more general than `mq`, because it binary max-heap. This is more general than `mq`, because it allows multiple
allows multiple levels of priority, instead of just "high" and "low". This, of levels of priority, instead of just "high" and "low". This, of course, also
course, also makes operations slower. makes operations slower.
[reddit]: https://www.reddit.com/r/golang/comments/11drc17/worker_pool_reading_from_two_channels_one_chan/ [reddit]: https://www.reddit.com/r/golang/comments/11drc17/worker_pool_reading_from_two_channels_one_chan/
[sol]: https://www.reddit.com/r/golang/comments/11drc17/worker_pool_reading_from_two_channels_one_chan/jabfvkh/ [sol]: https://www.reddit.com/r/golang/comments/11drc17/worker_pool_reading_from_two_channels_one_chan/jabfvkh/

View File

@ -1,8 +1,9 @@
// Package binheap implements a binary max-heap.
package binheap package binheap
import "golang.org/x/exp/constraints" import "golang.org/x/exp/constraints"
// H is a generic, non-concurrent binary max-heap. // H is a binary max-heap.
// //
// `I` is the type of the priority IDs, and `E` the type of the elements. // `I` is the type of the priority IDs, and `E` the type of the elements.
type H[I constraints.Ordered, E any] struct { type H[I constraints.Ordered, E any] struct {

View File

@ -1,6 +1,7 @@
// Package circ implements a circular FIFO buffer.
package circ package circ
// B is a generic, non-concurrent circular FIFO buffer. // B is a circular FIFO buffer.
type B[T any] struct { type B[T any] struct {
buf []T buf []T
len int len int

137
lib.go
View File

@ -1,117 +1,22 @@
// Package priorityq provides generic implementations of various concurrent,
// prioritized queues.
//
// # Behavior
//
// All types of queues in this module act similarly to buffered Go channels.
//
// - They are bounded to a fixed capacity, set at construction.
// - Closing and sending to an already-closed queue causes a panic.
// - Receivers can continue getting items after closure, and can use a final
// bool to determine when there are none remaining.
// - They are safe for multiple concurrent senders and receivers.
//
// # Implementation
//
// All data structures in this module use [generics], introduced in Go 1.18.
//
// All of the concurrent data structures in this package use a [sync.Mutex]
// and a few [sync.Cond] variables.
//
// [generics]: https://go.dev/blog/intro-generics
package priorityq package priorityq
import (
"sync"
"gogs.humancabbage.net/sam/priorityq/binheap"
"golang.org/x/exp/constraints"
)
// Q is a generic, concurrent priority queue.
type Q[P constraints.Ordered, T any] struct {
*state[P, T]
}
// Make a new queue.
func Make[P constraints.Ordered, T any](cap int) Q[P, T] {
heap := binheap.Make[P, T](cap)
s := &state[P, T]{
heap: heap,
}
s.canRecv = sync.NewCond(&s.mu)
s.canSend = sync.NewCond(&s.mu)
return Q[P, T]{s}
}
type state[P constraints.Ordered, T any] struct {
mu sync.Mutex
heap binheap.H[P, T]
canSend *sync.Cond
canRecv *sync.Cond
closed bool
}
// Close marks the queue as closed.
//
// Subsequent attempts to send will panic. Subsequent calls to Recv will
// continue to return the remaining items in the queue.
func (s *state[P, T]) Close() {
s.mu.Lock()
s.closed = true
s.mu.Unlock()
s.canRecv.Broadcast()
}
// Recv returns an item from the prioritized buffers, blocking if empty.
//
// The returned bool will be true if the queue still has items or is open.
// It will be false if the queue is empty and closed.
func (s *state[P, T]) Recv() (P, T, bool) {
s.mu.Lock()
defer s.mu.Unlock()
for {
for !s.closed && !s.heap.CanExtract() {
s.canRecv.Wait()
}
if s.closed && !s.heap.CanExtract() {
var emptyP P
var emptyT T
return emptyP, emptyT, false
}
if s.heap.CanExtract() {
priority, value := s.heap.Extract()
s.canSend.Broadcast()
return priority, value, true
}
}
}
// Send adds an item to the queue, blocking if full.
func (s *state[P, T]) Send(priority P, value T) {
s.mu.Lock()
defer s.mu.Unlock()
for {
for !s.closed && !s.heap.CanInsert() {
s.canSend.Wait()
}
if s.closed {
panic("send on closed queue")
}
if s.heap.CanInsert() {
s.heap.Insert(priority, value)
s.canRecv.Broadcast()
return
}
}
}
// TryRecv attempts to return an item from the queue.
//
// This method does not block. If there is an item in the queue, it returns
// true. If the queue is empty, it returns false.
func (s *state[P, T]) TryRecv() (priority P, value T, ok bool) {
s.mu.Lock()
defer s.mu.Unlock()
if s.heap.CanExtract() {
priority, value = s.heap.Extract()
ok = true
s.canSend.Broadcast()
return
}
return
}
// TrySend attempts to add an item to the high priority buffer.
//
// This method does not block. If there is space in the buffer, it returns
// true. If the buffer is full, it returns false.
func (s *state[P, T]) TrySend(priority P, value T) bool {
s.mu.Lock()
defer s.mu.Unlock()
if !s.heap.CanInsert() {
return false
}
s.heap.Insert(priority, value)
s.canRecv.Broadcast()
return true
}

View File

@ -1,3 +1,30 @@
// Package mq implements a concurrent, dual-priority message queue.
//
// [Q] is similar to a buffered channel, except that senders can assign one of
// two priority levels to each item, "high" or "low." Receivers will always
// get a high-priority item ahead of any low-priority ones.
//
// For example:
//
// q := mq.Make[string](8)
// mq.SendLow("world")
// mq.SendHigh("hello")
// word1, _ := mq.Recv()
// word2, _ := mq.Recv()
// fmt.Println(word1, word2)
// pq.Close()
// // Output: hello world
//
// # Implementation
//
// Each queue has two circular buffers, one for each priority level.
// Currently, the capacities for these are fixed and equal. If one buffer is
// full, attempts to send further items with its priority level will block
// ([Q.Send]) or fail ([Q.TrySend]).
//
// Compared the pq package, the limitation on priority levels increases
// performance, as its circular buffers are much less expensive than the heap
// operations of a traditional priority queue.
package mq package mq
import ( import (
@ -6,14 +33,7 @@ import (
"gogs.humancabbage.net/sam/priorityq/circ" "gogs.humancabbage.net/sam/priorityq/circ"
) )
// Q is a precise, concurrent, prioritized message queue. // Q is a concurrent, dual-priority message queue.
//
// Each queue has two internal buffers, high and low. This implementation
// guarantees that when there are items in both buffers, consumers receive
// ones from the high priority buffer first.
//
// Each buffer has the same capacity, set on initial construction. Sending to
// a buffer will block if it is full, even if the other buffer has space.
type Q[T any] struct { type Q[T any] struct {
*state[T] *state[T]
} }
@ -44,16 +64,18 @@ type state[T any] struct {
// Close marks the queue as closed. // Close marks the queue as closed.
// //
// Subsequent attempts to send will panic. Subsequent calls to Recv will // Attempting to close an already-closed queue results in a panic.
// continue to return the remaining items in the queue.
func (s *state[T]) Close() { func (s *state[T]) Close() {
s.mu.Lock() s.mu.Lock()
if s.closed {
panic("close of closed queue")
}
s.closed = true s.closed = true
s.mu.Unlock() s.mu.Unlock()
s.canRecv.Broadcast() s.canRecv.Broadcast()
} }
// Recv returns an item from the prioritized buffers, blocking if empty. // Recv gets an item, blocking when empty until one is available.
// //
// The returned bool will be true if the queue still has items or is open. // The returned bool will be true if the queue still has items or is open.
// It will be false if the queue is empty and closed. // It will be false if the queue is empty and closed.
@ -86,20 +108,19 @@ func (s *state[T]) Send(value T) {
s.SendLow(value) s.SendLow(value)
} }
// SendHigh adds an item to the high priority buffer, blocking if full. // SendHigh adds an item with high priority, blocking if full.
func (s *state[T]) SendHigh(value T) { func (s *state[T]) SendHigh(value T) {
s.send(value, &s.high, s.canSendHigh) s.send(value, &s.high, s.canSendHigh)
} }
// SendLow adds an item to the low priority buffer, blocking if full. // SendLow adds an item with low buffer, blocking if full.
func (s *state[T]) SendLow(value T) { func (s *state[T]) SendLow(value T) {
s.send(value, &s.low, s.canSendLow) s.send(value, &s.low, s.canSendLow)
} }
// TryRecv attempts to return an item from the prioritized buffers. // TryRecv attempts to get an item from the queue, without blocking.
// //
// This method does not block. If there is an item in a buffer, it returns // If the attempt succeeds, the returned bool is true. Otherwise, it is false.
// true. If the buffer is empty, it returns false.
func (s *state[T]) TryRecv() (value T, ok bool) { func (s *state[T]) TryRecv() (value T, ok bool) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@ -118,18 +139,21 @@ func (s *state[T]) TryRecv() (value T, ok bool) {
return return
} }
// TrySendHigh attempts to add an item to the high priority buffer. // TrySend is an alias for TrySendLow.
func (s *state[T]) TrySend(value T) bool {
return s.trySend(value, &s.low)
}
// TrySendHigh attempts to add an item with high priority, without blocking.
// //
// This method does not block. If there is space in the buffer, it returns // If the attempt succeeds, the returned bool is true. Otherwise, it is false.
// true. If the buffer is full, it returns false.
func (s *state[T]) TrySendHigh(value T) bool { func (s *state[T]) TrySendHigh(value T) bool {
return s.trySend(value, &s.high) return s.trySend(value, &s.high)
} }
// TrySendLow attempts to add an item to the low priority buffer. // TrySendLow attempts to add an item with low priority, without blocking.
// //
// This method does not block. If there is space in the buffer, it returns // If the attempt succeeds, the returned bool is true. Otherwise, it is false.
// true. If the buffer is full, it returns false.
func (s *state[T]) TrySendLow(value T) bool { func (s *state[T]) TrySendLow(value T) bool {
return s.trySend(value, &s.low) return s.trySend(value, &s.low)
} }

View File

@ -62,6 +62,18 @@ func TestRecvClosed(t *testing.T) {
} }
} }
func TestDoubleClose(t *testing.T) {
t.Parallel()
q := mq.Make[int](4)
defer func() {
if r := recover(); r == nil {
t.Errorf("closing a closed queue did not panic")
}
}()
q.Close()
q.Close()
}
func TestTrySendRecv(t *testing.T) { func TestTrySendRecv(t *testing.T) {
t.Parallel() t.Parallel()
q := mq.Make[int](4) q := mq.Make[int](4)

145
pq/lib.go Normal file
View File

@ -0,0 +1,145 @@
// Package pq implements a concurrent priority queue.
//
// [Q] is similar to a buffered channel, except that senders attach to each
// item a priority, and receivers always get the highest-priority item.
//
// For example:
//
// import "gogs.humancabbage.net/sam/priorityq/pq"
// q := pq.Make[int, string](8)
// q.Send(1, "world")
// q.Send(2, "hello")
// _, word1, _ := pq.Recv()
// _, word2, _ := pq.Recv()
// fmt.Println(word1, word2)
// pq.Close()
// // Output: hello world
//
// # Implementation
//
// Each queue has a [binary max-heap]. Sending and receiving items require
// heap-up and heap-down operations, respectively.
//
// [binary max-heap]: https://en.wikipedia.org/wiki/Binary_heap
package pq
import (
"sync"
"gogs.humancabbage.net/sam/priorityq/binheap"
"golang.org/x/exp/constraints"
)
// Q is a generic, concurrent priority queue.
type Q[P constraints.Ordered, T any] struct {
*state[P, T]
}
// Make a new queue.
func Make[P constraints.Ordered, T any](cap int) Q[P, T] {
heap := binheap.Make[P, T](cap)
s := &state[P, T]{
heap: heap,
}
s.canRecv = sync.NewCond(&s.mu)
s.canSend = sync.NewCond(&s.mu)
return Q[P, T]{s}
}
type state[P constraints.Ordered, T any] struct {
mu sync.Mutex
heap binheap.H[P, T]
canSend *sync.Cond
canRecv *sync.Cond
closed bool
}
// Close marks the queue as closed.
//
// Attempting to close an already-closed queue results in a panic.
func (s *state[P, T]) Close() {
s.mu.Lock()
if s.closed {
panic("close of closed queue")
}
s.closed = true
s.mu.Unlock()
s.canRecv.Broadcast()
}
// Recv gets an item, blocking when empty until one is available.
//
// This returns both the item itself and the its assigned priority.
//
// The returned bool will be true if the queue still has items or is open.
// It will be false if the queue is empty and closed.
func (s *state[P, T]) Recv() (P, T, bool) {
s.mu.Lock()
defer s.mu.Unlock()
for {
for !s.closed && !s.heap.CanExtract() {
s.canRecv.Wait()
}
if s.closed && !s.heap.CanExtract() {
var emptyP P
var emptyT T
return emptyP, emptyT, false
}
if s.heap.CanExtract() {
priority, value := s.heap.Extract()
s.canSend.Broadcast()
return priority, value, true
}
}
}
// Send adds an item with some priority, blocking if full.
func (s *state[P, T]) Send(priority P, value T) {
s.mu.Lock()
defer s.mu.Unlock()
for {
for !s.closed && !s.heap.CanInsert() {
s.canSend.Wait()
}
if s.closed {
panic("send on closed queue")
}
if s.heap.CanInsert() {
s.heap.Insert(priority, value)
s.canRecv.Broadcast()
return
}
}
}
// TryRecv attempts to get an item without blocking.
//
// This returns both the item itself and the its assigned priority.
//
// If the attempt succeeds, the returned bool is true. Otherwise, it is false.
func (s *state[P, T]) TryRecv() (priority P, value T, ok bool) {
s.mu.Lock()
defer s.mu.Unlock()
if s.heap.CanExtract() {
priority, value = s.heap.Extract()
ok = true
s.canSend.Broadcast()
return
}
return
}
// TrySend attempts to add an item with some priority, without blocking.
//
// This method does not block. If there is space in the buffer, it returns
// true. If the buffer is full, it returns false.
func (s *state[P, T]) TrySend(priority P, value T) bool {
s.mu.Lock()
defer s.mu.Unlock()
if !s.heap.CanInsert() {
return false
}
s.heap.Insert(priority, value)
s.canRecv.Broadcast()
return true
}

View File

@ -1,4 +1,4 @@
package priorityq_test package pq_test
import ( import (
"math/rand" "math/rand"
@ -6,12 +6,12 @@ import (
"sync" "sync"
"testing" "testing"
"gogs.humancabbage.net/sam/priorityq" "gogs.humancabbage.net/sam/priorityq/pq"
) )
func TestRecvHighestFirst(t *testing.T) { func TestRecvHighestFirst(t *testing.T) {
t.Parallel() t.Parallel()
q := priorityq.Make[int, int](8) q := pq.Make[int, int](8)
q.Send(4, 4) q.Send(4, 4)
q.Send(2, 2) q.Send(2, 2)
q.Send(1, 1) q.Send(1, 1)
@ -42,14 +42,14 @@ func TestSendClosedPanic(t *testing.T) {
t.Errorf("sending to closed queue did not panic") t.Errorf("sending to closed queue did not panic")
} }
}() }()
q := priorityq.Make[int, int](4) q := pq.Make[int, int](4)
q.Close() q.Close()
q.Send(1, 1) q.Send(1, 1)
} }
func TestRecvClosed(t *testing.T) { func TestRecvClosed(t *testing.T) {
t.Parallel() t.Parallel()
q := priorityq.Make[int, int](4) q := pq.Make[int, int](4)
q.Send(1, 1) q.Send(1, 1)
q.Close() q.Close()
_, _, ok := q.Recv() _, _, ok := q.Recv()
@ -62,9 +62,21 @@ func TestRecvClosed(t *testing.T) {
} }
} }
func TestDoubleClose(t *testing.T) {
t.Parallel()
q := pq.Make[int, int](4)
defer func() {
if r := recover(); r == nil {
t.Errorf("closing a closed queue did not panic")
}
}()
q.Close()
q.Close()
}
func TestTrySendRecv(t *testing.T) { func TestTrySendRecv(t *testing.T) {
t.Parallel() t.Parallel()
q := priorityq.Make[int, int](4) q := pq.Make[int, int](4)
assumeSendOk := func(n int) { assumeSendOk := func(n int) {
ok := q.TrySend(n, n) ok := q.TrySend(n, n)
if !ok { if !ok {
@ -101,7 +113,7 @@ func TestTrySendRecv(t *testing.T) {
func TestConcProducerConsumer(t *testing.T) { func TestConcProducerConsumer(t *testing.T) {
t.Parallel() t.Parallel()
q := priorityq.Make[int, int](4) q := pq.Make[int, int](4)
var wg sync.WaitGroup var wg sync.WaitGroup
produceDone := make(chan struct{}) produceDone := make(chan struct{})
wg.Add(2) wg.Add(2)
@ -126,7 +138,7 @@ func TestConcProducerConsumer(t *testing.T) {
} }
func BenchmarkSend(b *testing.B) { func BenchmarkSend(b *testing.B) {
q := priorityq.Make[int, int](b.N) q := pq.Make[int, int](b.N)
// randomize priorities to get amortized cost per op // randomize priorities to get amortized cost per op
ps := make([]int, b.N) ps := make([]int, b.N)
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
@ -139,7 +151,7 @@ func BenchmarkSend(b *testing.B) {
} }
func BenchmarkRecv(b *testing.B) { func BenchmarkRecv(b *testing.B) {
q := priorityq.Make[int, int](b.N) q := pq.Make[int, int](b.N)
// randomize priorities to get amortized cost per op // randomize priorities to get amortized cost per op
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
q.Send(rand.Int(), i) q.Send(rand.Int(), i)
@ -151,7 +163,7 @@ func BenchmarkRecv(b *testing.B) {
} }
func BenchmarkConcSendRecv(b *testing.B) { func BenchmarkConcSendRecv(b *testing.B) {
q := priorityq.Make[int, int](b.N) q := pq.Make[int, int](b.N)
// randomize priorities to get amortized cost per op // randomize priorities to get amortized cost per op
ps := make([]int, b.N) ps := make([]int, b.N)
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
@ -180,7 +192,7 @@ func BenchmarkConcSendRecv(b *testing.B) {
} }
func BenchmarkHighContention(b *testing.B) { func BenchmarkHighContention(b *testing.B) {
q := priorityq.Make[int, int](b.N) q := pq.Make[int, int](b.N)
var wg sync.WaitGroup var wg sync.WaitGroup
start := make(chan struct{}) start := make(chan struct{})
done := make(chan struct{}) done := make(chan struct{})