diff --git a/README.md b/README.md index d011622..453f019 100644 --- a/README.md +++ b/README.md @@ -31,10 +31,10 @@ queue that guarantees receipt of a high-priority items before low-priority ones. This is primarily a fun exercise, I cannot recommend that anyone actually use this in a real project. -Additionally, the root `priorityq` package implements a concurrent priority -queue, using a binary max-heap. This is more general than `mq`, because it -allows multiple levels of priority, instead of just "high" and "low". This, of -course, also makes operations slower. +Additionally, the `pq` package implements a concurrent priority queue, using a +binary max-heap. This is more general than `mq`, because it allows multiple +levels of priority, instead of just "high" and "low". This, of course, also +makes operations slower. [reddit]: https://www.reddit.com/r/golang/comments/11drc17/worker_pool_reading_from_two_channels_one_chan/ [sol]: https://www.reddit.com/r/golang/comments/11drc17/worker_pool_reading_from_two_channels_one_chan/jabfvkh/ diff --git a/binheap/lib.go b/binheap/lib.go index 0ae83d8..1c93e23 100644 --- a/binheap/lib.go +++ b/binheap/lib.go @@ -1,8 +1,9 @@ +// Package binheap implements a binary max-heap. package binheap import "golang.org/x/exp/constraints" -// H is a generic, non-concurrent binary max-heap. +// H is a binary max-heap. // // `I` is the type of the priority IDs, and `E` the type of the elements. type H[I constraints.Ordered, E any] struct { diff --git a/circ/lib.go b/circ/lib.go index b14ccaf..0172089 100644 --- a/circ/lib.go +++ b/circ/lib.go @@ -1,6 +1,7 @@ +// Package circ implements a circular FIFO buffer. package circ -// B is a generic, non-concurrent circular FIFO buffer. +// B is a circular FIFO buffer. type B[T any] struct { buf []T len int diff --git a/lib.go b/lib.go index 5d7340b..1a998c2 100644 --- a/lib.go +++ b/lib.go @@ -1,117 +1,22 @@ +// Package priorityq provides generic implementations of various concurrent, +// prioritized queues. +// +// # Behavior +// +// All types of queues in this module act similarly to buffered Go channels. +// +// - They are bounded to a fixed capacity, set at construction. +// - Closing and sending to an already-closed queue causes a panic. +// - Receivers can continue getting items after closure, and can use a final +// bool to determine when there are none remaining. +// - They are safe for multiple concurrent senders and receivers. +// +// # Implementation +// +// All data structures in this module use [generics], introduced in Go 1.18. +// +// All of the concurrent data structures in this package use a [sync.Mutex] +// and a few [sync.Cond] variables. +// +// [generics]: https://go.dev/blog/intro-generics package priorityq - -import ( - "sync" - - "gogs.humancabbage.net/sam/priorityq/binheap" - "golang.org/x/exp/constraints" -) - -// Q is a generic, concurrent priority queue. -type Q[P constraints.Ordered, T any] struct { - *state[P, T] -} - -// Make a new queue. -func Make[P constraints.Ordered, T any](cap int) Q[P, T] { - heap := binheap.Make[P, T](cap) - s := &state[P, T]{ - heap: heap, - } - s.canRecv = sync.NewCond(&s.mu) - s.canSend = sync.NewCond(&s.mu) - return Q[P, T]{s} -} - -type state[P constraints.Ordered, T any] struct { - mu sync.Mutex - heap binheap.H[P, T] - canSend *sync.Cond - canRecv *sync.Cond - closed bool -} - -// Close marks the queue as closed. -// -// Subsequent attempts to send will panic. Subsequent calls to Recv will -// continue to return the remaining items in the queue. -func (s *state[P, T]) Close() { - s.mu.Lock() - s.closed = true - s.mu.Unlock() - s.canRecv.Broadcast() -} - -// Recv returns an item from the prioritized buffers, blocking if empty. -// -// The returned bool will be true if the queue still has items or is open. -// It will be false if the queue is empty and closed. -func (s *state[P, T]) Recv() (P, T, bool) { - s.mu.Lock() - defer s.mu.Unlock() - for { - for !s.closed && !s.heap.CanExtract() { - s.canRecv.Wait() - } - if s.closed && !s.heap.CanExtract() { - var emptyP P - var emptyT T - return emptyP, emptyT, false - } - if s.heap.CanExtract() { - priority, value := s.heap.Extract() - s.canSend.Broadcast() - return priority, value, true - } - } -} - -// Send adds an item to the queue, blocking if full. -func (s *state[P, T]) Send(priority P, value T) { - s.mu.Lock() - defer s.mu.Unlock() - for { - for !s.closed && !s.heap.CanInsert() { - s.canSend.Wait() - } - if s.closed { - panic("send on closed queue") - } - if s.heap.CanInsert() { - s.heap.Insert(priority, value) - s.canRecv.Broadcast() - return - } - } -} - -// TryRecv attempts to return an item from the queue. -// -// This method does not block. If there is an item in the queue, it returns -// true. If the queue is empty, it returns false. -func (s *state[P, T]) TryRecv() (priority P, value T, ok bool) { - s.mu.Lock() - defer s.mu.Unlock() - if s.heap.CanExtract() { - priority, value = s.heap.Extract() - ok = true - s.canSend.Broadcast() - return - } - return -} - -// TrySend attempts to add an item to the high priority buffer. -// -// This method does not block. If there is space in the buffer, it returns -// true. If the buffer is full, it returns false. -func (s *state[P, T]) TrySend(priority P, value T) bool { - s.mu.Lock() - defer s.mu.Unlock() - if !s.heap.CanInsert() { - return false - } - s.heap.Insert(priority, value) - s.canRecv.Broadcast() - return true -} diff --git a/mq/lib.go b/mq/lib.go index 9bcfead..a843167 100644 --- a/mq/lib.go +++ b/mq/lib.go @@ -1,3 +1,30 @@ +// Package mq implements a concurrent, dual-priority message queue. +// +// [Q] is similar to a buffered channel, except that senders can assign one of +// two priority levels to each item, "high" or "low." Receivers will always +// get a high-priority item ahead of any low-priority ones. +// +// For example: +// +// q := mq.Make[string](8) +// mq.SendLow("world") +// mq.SendHigh("hello") +// word1, _ := mq.Recv() +// word2, _ := mq.Recv() +// fmt.Println(word1, word2) +// pq.Close() +// // Output: hello world +// +// # Implementation +// +// Each queue has two circular buffers, one for each priority level. +// Currently, the capacities for these are fixed and equal. If one buffer is +// full, attempts to send further items with its priority level will block +// ([Q.Send]) or fail ([Q.TrySend]). +// +// Compared the pq package, the limitation on priority levels increases +// performance, as its circular buffers are much less expensive than the heap +// operations of a traditional priority queue. package mq import ( @@ -6,14 +33,7 @@ import ( "gogs.humancabbage.net/sam/priorityq/circ" ) -// Q is a precise, concurrent, prioritized message queue. -// -// Each queue has two internal buffers, high and low. This implementation -// guarantees that when there are items in both buffers, consumers receive -// ones from the high priority buffer first. -// -// Each buffer has the same capacity, set on initial construction. Sending to -// a buffer will block if it is full, even if the other buffer has space. +// Q is a concurrent, dual-priority message queue. type Q[T any] struct { *state[T] } @@ -44,16 +64,18 @@ type state[T any] struct { // Close marks the queue as closed. // -// Subsequent attempts to send will panic. Subsequent calls to Recv will -// continue to return the remaining items in the queue. +// Attempting to close an already-closed queue results in a panic. func (s *state[T]) Close() { s.mu.Lock() + if s.closed { + panic("close of closed queue") + } s.closed = true s.mu.Unlock() s.canRecv.Broadcast() } -// Recv returns an item from the prioritized buffers, blocking if empty. +// Recv gets an item, blocking when empty until one is available. // // The returned bool will be true if the queue still has items or is open. // It will be false if the queue is empty and closed. @@ -86,20 +108,19 @@ func (s *state[T]) Send(value T) { s.SendLow(value) } -// SendHigh adds an item to the high priority buffer, blocking if full. +// SendHigh adds an item with high priority, blocking if full. func (s *state[T]) SendHigh(value T) { s.send(value, &s.high, s.canSendHigh) } -// SendLow adds an item to the low priority buffer, blocking if full. +// SendLow adds an item with low buffer, blocking if full. func (s *state[T]) SendLow(value T) { s.send(value, &s.low, s.canSendLow) } -// TryRecv attempts to return an item from the prioritized buffers. +// TryRecv attempts to get an item from the queue, without blocking. // -// This method does not block. If there is an item in a buffer, it returns -// true. If the buffer is empty, it returns false. +// If the attempt succeeds, the returned bool is true. Otherwise, it is false. func (s *state[T]) TryRecv() (value T, ok bool) { s.mu.Lock() defer s.mu.Unlock() @@ -118,18 +139,21 @@ func (s *state[T]) TryRecv() (value T, ok bool) { return } -// TrySendHigh attempts to add an item to the high priority buffer. +// TrySend is an alias for TrySendLow. +func (s *state[T]) TrySend(value T) bool { + return s.trySend(value, &s.low) +} + +// TrySendHigh attempts to add an item with high priority, without blocking. // -// This method does not block. If there is space in the buffer, it returns -// true. If the buffer is full, it returns false. +// If the attempt succeeds, the returned bool is true. Otherwise, it is false. func (s *state[T]) TrySendHigh(value T) bool { return s.trySend(value, &s.high) } -// TrySendLow attempts to add an item to the low priority buffer. +// TrySendLow attempts to add an item with low priority, without blocking. // -// This method does not block. If there is space in the buffer, it returns -// true. If the buffer is full, it returns false. +// If the attempt succeeds, the returned bool is true. Otherwise, it is false. func (s *state[T]) TrySendLow(value T) bool { return s.trySend(value, &s.low) } diff --git a/mq/lib_test.go b/mq/lib_test.go index 54d0457..80d4ce5 100644 --- a/mq/lib_test.go +++ b/mq/lib_test.go @@ -62,6 +62,18 @@ func TestRecvClosed(t *testing.T) { } } +func TestDoubleClose(t *testing.T) { + t.Parallel() + q := mq.Make[int](4) + defer func() { + if r := recover(); r == nil { + t.Errorf("closing a closed queue did not panic") + } + }() + q.Close() + q.Close() +} + func TestTrySendRecv(t *testing.T) { t.Parallel() q := mq.Make[int](4) diff --git a/pq/lib.go b/pq/lib.go new file mode 100644 index 0000000..50922fe --- /dev/null +++ b/pq/lib.go @@ -0,0 +1,145 @@ +// Package pq implements a concurrent priority queue. +// +// [Q] is similar to a buffered channel, except that senders attach to each +// item a priority, and receivers always get the highest-priority item. +// +// For example: +// +// import "gogs.humancabbage.net/sam/priorityq/pq" +// q := pq.Make[int, string](8) +// q.Send(1, "world") +// q.Send(2, "hello") +// _, word1, _ := pq.Recv() +// _, word2, _ := pq.Recv() +// fmt.Println(word1, word2) +// pq.Close() +// // Output: hello world +// +// # Implementation +// +// Each queue has a [binary max-heap]. Sending and receiving items require +// heap-up and heap-down operations, respectively. +// +// [binary max-heap]: https://en.wikipedia.org/wiki/Binary_heap +package pq + +import ( + "sync" + + "gogs.humancabbage.net/sam/priorityq/binheap" + "golang.org/x/exp/constraints" +) + +// Q is a generic, concurrent priority queue. +type Q[P constraints.Ordered, T any] struct { + *state[P, T] +} + +// Make a new queue. +func Make[P constraints.Ordered, T any](cap int) Q[P, T] { + heap := binheap.Make[P, T](cap) + s := &state[P, T]{ + heap: heap, + } + s.canRecv = sync.NewCond(&s.mu) + s.canSend = sync.NewCond(&s.mu) + return Q[P, T]{s} +} + +type state[P constraints.Ordered, T any] struct { + mu sync.Mutex + heap binheap.H[P, T] + canSend *sync.Cond + canRecv *sync.Cond + closed bool +} + +// Close marks the queue as closed. +// +// Attempting to close an already-closed queue results in a panic. +func (s *state[P, T]) Close() { + s.mu.Lock() + if s.closed { + panic("close of closed queue") + } + s.closed = true + s.mu.Unlock() + s.canRecv.Broadcast() +} + +// Recv gets an item, blocking when empty until one is available. +// +// This returns both the item itself and the its assigned priority. +// +// The returned bool will be true if the queue still has items or is open. +// It will be false if the queue is empty and closed. +func (s *state[P, T]) Recv() (P, T, bool) { + s.mu.Lock() + defer s.mu.Unlock() + for { + for !s.closed && !s.heap.CanExtract() { + s.canRecv.Wait() + } + if s.closed && !s.heap.CanExtract() { + var emptyP P + var emptyT T + return emptyP, emptyT, false + } + if s.heap.CanExtract() { + priority, value := s.heap.Extract() + s.canSend.Broadcast() + return priority, value, true + } + } +} + +// Send adds an item with some priority, blocking if full. +func (s *state[P, T]) Send(priority P, value T) { + s.mu.Lock() + defer s.mu.Unlock() + for { + for !s.closed && !s.heap.CanInsert() { + s.canSend.Wait() + } + if s.closed { + panic("send on closed queue") + } + if s.heap.CanInsert() { + s.heap.Insert(priority, value) + s.canRecv.Broadcast() + return + } + } +} + +// TryRecv attempts to get an item without blocking. +// +// This returns both the item itself and the its assigned priority. +// +// If the attempt succeeds, the returned bool is true. Otherwise, it is false. +func (s *state[P, T]) TryRecv() (priority P, value T, ok bool) { + s.mu.Lock() + defer s.mu.Unlock() + if s.heap.CanExtract() { + priority, value = s.heap.Extract() + ok = true + s.canSend.Broadcast() + return + } + return +} + +// TrySend attempts to add an item with some priority, without blocking. +// +// This method does not block. If there is space in the buffer, it returns +// true. If the buffer is full, it returns false. +func (s *state[P, T]) TrySend(priority P, value T) bool { + s.mu.Lock() + defer s.mu.Unlock() + if !s.heap.CanInsert() { + return false + } + s.heap.Insert(priority, value) + s.canRecv.Broadcast() + return true +} diff --git a/lib_test.go b/pq/lib_test.go similarity index 86% rename from lib_test.go rename to pq/lib_test.go index 5dd7c46..e91323d 100644 --- a/lib_test.go +++ b/pq/lib_test.go @@ -1,4 +1,4 @@ -package priorityq_test +package pq_test import ( "math/rand" @@ -6,12 +6,12 @@ import ( "sync" "testing" - "gogs.humancabbage.net/sam/priorityq" + "gogs.humancabbage.net/sam/priorityq/pq" ) func TestRecvHighestFirst(t *testing.T) { t.Parallel() - q := priorityq.Make[int, int](8) + q := pq.Make[int, int](8) q.Send(4, 4) q.Send(2, 2) q.Send(1, 1) @@ -42,14 +42,14 @@ func TestSendClosedPanic(t *testing.T) { t.Errorf("sending to closed queue did not panic") } }() - q := priorityq.Make[int, int](4) + q := pq.Make[int, int](4) q.Close() q.Send(1, 1) } func TestRecvClosed(t *testing.T) { t.Parallel() - q := priorityq.Make[int, int](4) + q := pq.Make[int, int](4) q.Send(1, 1) q.Close() _, _, ok := q.Recv() @@ -62,9 +62,21 @@ func TestRecvClosed(t *testing.T) { } } +func TestDoubleClose(t *testing.T) { + t.Parallel() + q := pq.Make[int, int](4) + defer func() { + if r := recover(); r == nil { + t.Errorf("closing a closed queue did not panic") + } + }() + q.Close() + q.Close() +} + func TestTrySendRecv(t *testing.T) { t.Parallel() - q := priorityq.Make[int, int](4) + q := pq.Make[int, int](4) assumeSendOk := func(n int) { ok := q.TrySend(n, n) if !ok { @@ -101,7 +113,7 @@ func TestTrySendRecv(t *testing.T) { func TestConcProducerConsumer(t *testing.T) { t.Parallel() - q := priorityq.Make[int, int](4) + q := pq.Make[int, int](4) var wg sync.WaitGroup produceDone := make(chan struct{}) wg.Add(2) @@ -126,7 +138,7 @@ func TestConcProducerConsumer(t *testing.T) { } func BenchmarkSend(b *testing.B) { - q := priorityq.Make[int, int](b.N) + q := pq.Make[int, int](b.N) // randomize priorities to get amortized cost per op ps := make([]int, b.N) for i := 0; i < b.N; i++ { @@ -139,7 +151,7 @@ func BenchmarkSend(b *testing.B) { } func BenchmarkRecv(b *testing.B) { - q := priorityq.Make[int, int](b.N) + q := pq.Make[int, int](b.N) // randomize priorities to get amortized cost per op for i := 0; i < b.N; i++ { q.Send(rand.Int(), i) @@ -151,7 +163,7 @@ func BenchmarkRecv(b *testing.B) { } func BenchmarkConcSendRecv(b *testing.B) { - q := priorityq.Make[int, int](b.N) + q := pq.Make[int, int](b.N) // randomize priorities to get amortized cost per op ps := make([]int, b.N) for i := 0; i < b.N; i++ { @@ -180,7 +192,7 @@ func BenchmarkConcSendRecv(b *testing.B) { } func BenchmarkHighContention(b *testing.B) { - q := priorityq.Make[int, int](b.N) + q := pq.Make[int, int](b.N) var wg sync.WaitGroup start := make(chan struct{}) done := make(chan struct{})