Compare commits
10 Commits
c4e92faaf7
...
a20c8908de
Author | SHA1 | Date | |
---|---|---|---|
a20c8908de | |||
9d1228988d | |||
0840eb9272 | |||
9f8d0760f1 | |||
f7474fb673 | |||
b00fe25128 | |||
b3b491d9a9 | |||
5e23a92314 | |||
ab364c31bb | |||
0759aaa2cd |
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
benchresults.txt
|
||||
coverage
|
11
Makefile
Normal file
11
Makefile
Normal file
@ -0,0 +1,11 @@
|
||||
.PHONY: test
|
||||
test:
|
||||
go test -count=1 -coverprofile=coverage ./...
|
||||
|
||||
.PHONY: bench
|
||||
bench:
|
||||
go test -bench=. ./...
|
||||
|
||||
.PHONY: longbench
|
||||
longbench:
|
||||
go test -count=30 -bench=. ./... | tee benchresults.txt
|
72
README.md
72
README.md
@ -1,12 +1,69 @@
|
||||
# priorityq - generic priority queue in Go
|
||||
# priorityq - generic prioritized queues in Go
|
||||
|
||||
In Go, the builtin buffered channels provide a concurrent FIFO queue for
|
||||
passing messages between goroutines. Sometimes, however, it's convenient to be
|
||||
able to assign priority levels to messages, so that they get delivered to
|
||||
consumers more promptly.
|
||||
|
||||
The `mq` package in this module implements a concurrent, dual-priority message
|
||||
queue that guarantees receipt of a high-priority items before low-priority
|
||||
ones. There is a pattern using two channels and select statements to achieve
|
||||
similar functionality, but it's not exactly equivalent. (See the
|
||||
[Background](#background) section for more details.)
|
||||
|
||||
Additionally, the `pq` package implements a concurrent, traditional priority
|
||||
queue, using a binary max-heap. This is more general than `mq`, because it
|
||||
allows multiple levels of priority. This, of course, also makes operations
|
||||
slower.
|
||||
|
||||
Lastly, the `npq` package implements a concurrent, n-priority message queue.
|
||||
It's similar to `mq`, except that it an arbitrary fixed number of priority
|
||||
levels. It can have better performance than `pq` for several hundred levels.
|
||||
|
||||
## Benchmarks
|
||||
|
||||
Here are some benchmark results from running on a Mac Studio/M1 Ultra.
|
||||
|
||||
pkg: gogs.humancabbage.net/sam/priorityq/mq
|
||||
Send-20 13.93n ± 0%
|
||||
SendChan-20 13.19n ± 0%
|
||||
Recv-20 13.64n ± 1%
|
||||
RecvChan-20 13.29n ± 1%
|
||||
ConcSendRecv-20 97.60n ± 1%
|
||||
ConcSendRecvChan-20 171.8n ± 5%
|
||||
HighContention-20 128.2n ± 0%
|
||||
HighContentionChan-20 47.27n ± 0%
|
||||
|
||||
pkg: gogs.humancabbage.net/sam/priorityq/npq
|
||||
Send-20 13.56n ± 0%
|
||||
Recv-20 13.51n ± 0%
|
||||
ConcSendRecv-20 176.3n ± 8%
|
||||
HighContention-20 121.0n ± 0%
|
||||
|
||||
pkg: gogs.humancabbage.net/sam/priorityq/pq
|
||||
Send-20 18.79n ± 1%
|
||||
Recv-20 268.1n ± 3%
|
||||
ConcSendRecv-20 199.2n ± 2%
|
||||
HighContention-20 440.0n ± 1%
|
||||
|
||||
pkg: gogs.humancabbage.net/sam/priorityq/binheap
|
||||
Insert-20 11.92n ± 0%
|
||||
Extract-20 261.6n ± 2%
|
||||
RepeatedInsertExtract-20 25.68n ± 1%
|
||||
|
||||
pkg: gogs.humancabbage.net/sam/priorityq/circ
|
||||
Push-20 2.196n ± 1%
|
||||
Pop-20 2.187n ± 0%
|
||||
|
||||
## Background
|
||||
|
||||
This module was inspired by [a reddit post][reddit] wherein /u/zandery23 asked
|
||||
how to implement a priority queue in Go. A fantastic solution was [provided by
|
||||
/u/Ploobers][sol]. That's probably right for 99 out of 100 use cases, but it's
|
||||
not completely precise.
|
||||
how to implement a prioritized message queue in Go. A fantastic solution was
|
||||
[provided by /u/Ploobers][sol]. That's probably right for 99 out of 100 use
|
||||
cases, but it's not completely precise.
|
||||
|
||||
Particularly, the second select block does not guarantee that an item from the
|
||||
priority queue will be taken if there is also an item in the regular queue.
|
||||
prioritized queue will be taken if there is also an item in the regular queue.
|
||||
|
||||
```go
|
||||
select {
|
||||
@ -26,11 +83,6 @@ From the [Go Language Specification][go_select]:
|
||||
Thus, it is possible for the second case to be chosen even if the first case is
|
||||
also ready.
|
||||
|
||||
The `precise` package in this module implements a concurrent priority queue that
|
||||
guarantees receipt of a high-priority items before low-priority ones. This is
|
||||
primarily a fun exercise, I cannot recommend that anyone actually use this in a
|
||||
real project.
|
||||
|
||||
[reddit]: https://www.reddit.com/r/golang/comments/11drc17/worker_pool_reading_from_two_channels_one_chan/
|
||||
[sol]: https://www.reddit.com/r/golang/comments/11drc17/worker_pool_reading_from_two_channels_one_chan/jabfvkh/
|
||||
[go_select]: https://go.dev/ref/spec#Select_statements
|
||||
|
115
binheap/lib.go
Normal file
115
binheap/lib.go
Normal file
@ -0,0 +1,115 @@
|
||||
// Package binheap implements a non-concurrent binary max-heap.
|
||||
//
|
||||
// # Implementation
|
||||
//
|
||||
// [H] is parameterized over two types, one for the priority levels, one for
|
||||
// the elements. Internally, there are two equally-sized buffers for these
|
||||
// types. Re-heaping operations swap corresponding entries in these buffers
|
||||
// in lock-step.
|
||||
package binheap
|
||||
|
||||
import "golang.org/x/exp/constraints"
|
||||
|
||||
// H is a binary max-heap.
|
||||
//
|
||||
// `P` is the type of the priority levels, and `E` the type of the elements.
|
||||
type H[P constraints.Ordered, E any] struct {
|
||||
prs []P
|
||||
els []E
|
||||
len int
|
||||
}
|
||||
|
||||
// Make creates a new heap.
|
||||
func Make[P constraints.Ordered, E any](cap int) H[P, E] {
|
||||
priorities := make([]P, cap)
|
||||
elements := make([]E, cap)
|
||||
h := H[P, E]{prs: priorities, els: elements}
|
||||
return h
|
||||
}
|
||||
|
||||
// Capacity returns the total capacity of the heap.
|
||||
func (h *H[P, E]) Capacity() int {
|
||||
return cap(h.prs)
|
||||
}
|
||||
|
||||
// Len returns the number of items in the heap.
|
||||
func (h *H[P, E]) Len() int {
|
||||
return h.len
|
||||
}
|
||||
|
||||
// CanExtract returns true if the heap has any item, otherwise false.
|
||||
func (h *H[P, E]) CanExtract() bool {
|
||||
return h.len != 0
|
||||
}
|
||||
|
||||
// CanInsert returns true if the heap has unused capacity, otherwise false.
|
||||
func (h *H[P, E]) CanInsert() bool {
|
||||
return cap(h.prs)-h.len != 0
|
||||
}
|
||||
|
||||
// Extract returns the current heap root, then performs a heap-down pass.
|
||||
//
|
||||
// If the heap is empty, it panics.
|
||||
func (h *H[P, E]) Extract() (P, E) {
|
||||
if !h.CanExtract() {
|
||||
panic("heap is empty")
|
||||
}
|
||||
// extract root
|
||||
priority := h.prs[0]
|
||||
element := h.els[0]
|
||||
// move last entry to root position
|
||||
h.prs[0] = h.prs[h.len-1]
|
||||
h.els[0] = h.els[h.len-1]
|
||||
// clear the former last entry position,
|
||||
// so as not to hold onto garbage
|
||||
var emptyPriority P
|
||||
var emptyElem E
|
||||
h.prs[h.len-1] = emptyPriority
|
||||
h.els[h.len-1] = emptyElem
|
||||
// heap-down
|
||||
h.len--
|
||||
idx := 0
|
||||
for {
|
||||
left := idx<<1 + 1
|
||||
right := idx<<1 + 2
|
||||
largest := idx
|
||||
if left < h.len && h.prs[left] > h.prs[largest] {
|
||||
largest = left
|
||||
}
|
||||
if right < h.len && h.prs[right] > h.prs[largest] {
|
||||
largest = right
|
||||
}
|
||||
if largest == idx {
|
||||
break
|
||||
}
|
||||
h.prs[idx], h.prs[largest] = h.prs[largest], h.prs[idx]
|
||||
h.els[idx], h.els[largest] = h.els[largest], h.els[idx]
|
||||
idx = largest
|
||||
}
|
||||
|
||||
return priority, element
|
||||
}
|
||||
|
||||
// Insert adds an item to the heap, then performs a heap-up pass.
|
||||
//
|
||||
// If the heap is full, it panics.
|
||||
func (h *H[P, E]) Insert(priority P, elem E) {
|
||||
if !h.CanInsert() {
|
||||
panic("heap is full")
|
||||
}
|
||||
// insert new item into last position
|
||||
idx := h.len
|
||||
h.prs[idx] = priority
|
||||
h.els[idx] = elem
|
||||
// heap-up
|
||||
h.len++
|
||||
for {
|
||||
parent := (idx - 1) >> 1
|
||||
if parent < 0 || h.prs[parent] >= h.prs[idx] {
|
||||
break
|
||||
}
|
||||
h.prs[parent], h.prs[idx] = h.prs[idx], h.prs[parent]
|
||||
h.els[parent], h.els[idx] = h.els[idx], h.els[parent]
|
||||
idx = parent
|
||||
}
|
||||
}
|
131
binheap/lib_test.go
Normal file
131
binheap/lib_test.go
Normal file
@ -0,0 +1,131 @@
|
||||
package binheap_test
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"gogs.humancabbage.net/sam/priorityq/binheap"
|
||||
)
|
||||
|
||||
func TestSmoke(t *testing.T) {
|
||||
h := binheap.Make[int, int](10)
|
||||
if h.Capacity() != 10 {
|
||||
t.Errorf("expected heap capacity to be 10")
|
||||
}
|
||||
h.Insert(1, 1)
|
||||
h.Insert(2, 2)
|
||||
h.Insert(3, 3)
|
||||
h.Insert(4, 4)
|
||||
if h.Len() != 4 {
|
||||
t.Errorf("expected heap length to be 4")
|
||||
}
|
||||
checkExtract := func(n int) {
|
||||
_, extracted := h.Extract()
|
||||
if extracted != n {
|
||||
t.Errorf("expected to extract %d, got %d", n, extracted)
|
||||
}
|
||||
}
|
||||
checkExtract(4)
|
||||
checkExtract(3)
|
||||
checkExtract(2)
|
||||
checkExtract(1)
|
||||
}
|
||||
|
||||
func TestInsertFullPanic(t *testing.T) {
|
||||
h := binheap.Make[int, int](4)
|
||||
h.Insert(1, 1)
|
||||
h.Insert(2, 2)
|
||||
h.Insert(3, 3)
|
||||
h.Insert(4, 4)
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Errorf("expected final insert to panic")
|
||||
}
|
||||
}()
|
||||
h.Insert(5, 5)
|
||||
}
|
||||
|
||||
func TestExtractEmptyPanic(t *testing.T) {
|
||||
h := binheap.Make[int, int](4)
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Errorf("expected extract to panic")
|
||||
}
|
||||
}()
|
||||
h.Extract()
|
||||
}
|
||||
|
||||
func TestRandomized(t *testing.T) {
|
||||
h := binheap.Make[int, int](8192)
|
||||
rs := rand.NewSource(0)
|
||||
r := rand.New(rs)
|
||||
// insert a bunch of random integers
|
||||
for i := 0; i < h.Capacity(); i++ {
|
||||
n := r.Int()
|
||||
h.Insert(n, n)
|
||||
}
|
||||
// ensure that each extracted integer is <= the last extracted integer
|
||||
var extracted []int
|
||||
for h.CanExtract() {
|
||||
id, item := h.Extract()
|
||||
if id != item {
|
||||
t.Errorf("id / item mismatch: %d %d", id, item)
|
||||
}
|
||||
lastIdx := len(extracted) - 1
|
||||
extracted = append(extracted, item)
|
||||
if lastIdx < 0 {
|
||||
continue
|
||||
}
|
||||
if item > extracted[lastIdx] {
|
||||
t.Errorf("newly extracted %d is greater than %d",
|
||||
item, extracted[lastIdx])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkInsert(b *testing.B) {
|
||||
h := binheap.Make[int, int](b.N)
|
||||
rs := rand.NewSource(0)
|
||||
r := rand.New(rs)
|
||||
items := make([]int, b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
items[i] = r.Int()
|
||||
}
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
h.Insert(items[i], items[i])
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkExtract(b *testing.B) {
|
||||
h := binheap.Make[int, int](b.N)
|
||||
rs := rand.NewSource(0)
|
||||
r := rand.New(rs)
|
||||
for i := 0; i < b.N; i++ {
|
||||
n := r.Int()
|
||||
h.Insert(n, n)
|
||||
}
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
h.Extract()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkRepeatedInsertExtract(b *testing.B) {
|
||||
h := binheap.Make[int, int](128)
|
||||
rs := rand.NewSource(0)
|
||||
r := rand.New(rs)
|
||||
items := make([]int, b.N)
|
||||
for i := 0; i < h.Capacity()-1; i++ {
|
||||
n := r.Int()
|
||||
h.Insert(n, n)
|
||||
}
|
||||
for i := 0; i < b.N; i++ {
|
||||
items[i] = r.Int()
|
||||
}
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
h.Insert(items[i], items[i])
|
||||
h.Extract()
|
||||
}
|
||||
}
|
59
circ/lib.go
59
circ/lib.go
@ -1,59 +0,0 @@
|
||||
package circ
|
||||
|
||||
// B is a generic, non-concurrent circular FIFO buffer.
|
||||
type B[T any] struct {
|
||||
buf []T
|
||||
len int
|
||||
head int
|
||||
tail int
|
||||
}
|
||||
|
||||
// Make creates a new circular buffer.
|
||||
func Make[T any](cap int) B[T] {
|
||||
buf := make([]T, cap)
|
||||
return B[T]{buf: buf}
|
||||
}
|
||||
|
||||
// CanPush returns true if the buffer has space for new items.
|
||||
func (b *B[T]) CanPush() bool {
|
||||
return cap(b.buf)-b.len != 0
|
||||
}
|
||||
|
||||
// CanPop returns true if the buffer has one or more items.
|
||||
func (b *B[T]) CanPop() bool {
|
||||
return b.len != 0
|
||||
}
|
||||
|
||||
// PopFront returns the front-most item from the buffer.
|
||||
//
|
||||
// If the buffer is empty, it panics.
|
||||
func (b *B[T]) PopFront() T {
|
||||
if !b.CanPop() {
|
||||
panic("cannot pop from empty buffer")
|
||||
}
|
||||
var empty T
|
||||
item := b.buf[b.head]
|
||||
// clear buffer slot so that we don't hold on to garbage
|
||||
b.buf[b.head] = empty
|
||||
b.len--
|
||||
b.head++
|
||||
if b.head == cap(b.buf) {
|
||||
b.head = 0
|
||||
}
|
||||
return item
|
||||
}
|
||||
|
||||
// PushBack adds an item to the end of the buffer.
|
||||
//
|
||||
// If the buffer is full, it panics.
|
||||
func (b *B[T]) PushBack(value T) {
|
||||
if !b.CanPush() {
|
||||
panic("cannot push back to full buffer")
|
||||
}
|
||||
b.buf[b.tail] = value
|
||||
b.len++
|
||||
b.tail++
|
||||
if b.tail == cap(b.buf) {
|
||||
b.tail = 0
|
||||
}
|
||||
}
|
@ -1,67 +0,0 @@
|
||||
package circ_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"gogs.humancabbage.net/sam/priorityq/circ"
|
||||
)
|
||||
|
||||
func TestRepeatPushPop(t *testing.T) {
|
||||
t.Parallel()
|
||||
cb := circ.Make[int](4)
|
||||
for i := 0; i < 50; i++ {
|
||||
cb.PushBack(1)
|
||||
cb.PushBack(2)
|
||||
cb.PushBack(3)
|
||||
cb.PushBack(4)
|
||||
checkPop := func(n int) {
|
||||
if v := cb.PopFront(); v != n {
|
||||
t.Errorf("popped %d, expected %d", v, n)
|
||||
}
|
||||
}
|
||||
checkPop(1)
|
||||
checkPop(2)
|
||||
checkPop(3)
|
||||
checkPop(4)
|
||||
}
|
||||
}
|
||||
|
||||
func TestInterleavedPushPop(t *testing.T) {
|
||||
t.Parallel()
|
||||
cb := circ.Make[int](4)
|
||||
checkPop := func(n int) {
|
||||
if v := cb.PopFront(); v != n {
|
||||
t.Errorf("popped %d, expected %d", v, n)
|
||||
}
|
||||
}
|
||||
cb.PushBack(1)
|
||||
cb.PushBack(2)
|
||||
cb.PushBack(3)
|
||||
checkPop(1)
|
||||
cb.PushBack(4)
|
||||
cb.PushBack(5)
|
||||
checkPop(2)
|
||||
}
|
||||
|
||||
func TestEmptyPopPanic(t *testing.T) {
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Errorf("popping empty buffer did not panic")
|
||||
}
|
||||
}()
|
||||
t.Parallel()
|
||||
cb := circ.Make[int](4)
|
||||
cb.PopFront()
|
||||
}
|
||||
|
||||
func TestFullPushPanic(t *testing.T) {
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Errorf("pushing full buffer did not panic")
|
||||
}
|
||||
}()
|
||||
t.Parallel()
|
||||
cb := circ.Make[int](1)
|
||||
cb.PushBack(1)
|
||||
cb.PushBack(2)
|
||||
}
|
2
go.mod
2
go.mod
@ -1,3 +1,5 @@
|
||||
module gogs.humancabbage.net/sam/priorityq
|
||||
|
||||
go 1.20
|
||||
|
||||
require golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2
|
||||
|
2
go.sum
2
go.sum
@ -0,0 +1,2 @@
|
||||
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2 h1:Jvc7gsqn21cJHCmAWx0LiimpP18LZmUxkT5Mp7EZ1mI=
|
||||
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
|
41
lib.go
Normal file
41
lib.go
Normal file
@ -0,0 +1,41 @@
|
||||
// Package priorityq provides generic implementations of various concurrent,
|
||||
// prioritized queues.
|
||||
//
|
||||
// # Behavior
|
||||
//
|
||||
// All types of queues in this module act similarly to buffered Go channels.
|
||||
//
|
||||
// - They are bounded to a fixed capacity, set at construction.
|
||||
// - Closing and sending to an already-closed queue causes a panic.
|
||||
// - Receivers can continue getting items after closure, and can use a final
|
||||
// bool to determine when there are none remaining.
|
||||
// - They are safe for multiple concurrent senders and receivers.
|
||||
//
|
||||
// # Implementation
|
||||
//
|
||||
// All data structures in this module use [generics], introduced in Go 1.18.
|
||||
//
|
||||
// All of the concurrent data structures in this package use a [sync.Mutex]
|
||||
// and a few [sync.Cond] variables.
|
||||
//
|
||||
// [generics]: https://go.dev/blog/intro-generics
|
||||
package priorityq
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// ErrEmpty means that an operation failed because the queue was empty.
|
||||
var ErrEmpty error
|
||||
|
||||
// ErrFull means that an operation failed because the queue was full.
|
||||
var ErrFull error
|
||||
|
||||
// ErrClosed means that an operation failed because the queue was closed.
|
||||
var ErrClosed error
|
||||
|
||||
func init() {
|
||||
ErrEmpty = fmt.Errorf("queue is empty")
|
||||
ErrFull = fmt.Errorf("queue is full")
|
||||
ErrClosed = fmt.Errorf("queue is closed")
|
||||
}
|
201
mq/lib.go
Normal file
201
mq/lib.go
Normal file
@ -0,0 +1,201 @@
|
||||
// Package mq implements a concurrent, dual-priority message queue.
|
||||
//
|
||||
// [Q] is similar to a buffered channel, except that senders can assign one of
|
||||
// two priority levels to each item, "high" or "low." Receivers will always
|
||||
// get a high-priority item ahead of any low-priority ones.
|
||||
//
|
||||
// For example:
|
||||
//
|
||||
// q := mq.Make[string](8)
|
||||
// q.SendLow("world")
|
||||
// q.SendHigh("hello")
|
||||
// word1, _ := mq.Recv()
|
||||
// word2, _ := mq.Recv()
|
||||
// fmt.Println(word1, word2)
|
||||
// q.Close()
|
||||
// // Output: hello world
|
||||
//
|
||||
// # Implementation
|
||||
//
|
||||
// Each [Q] has two circular buffers, one for each priority level. Currently,
|
||||
// the capacities for these are fixed and equal. If one buffer is full,
|
||||
// attempts to send further items with its priority level will block
|
||||
// ([Q.Send]) or fail ([Q.TrySend]).
|
||||
//
|
||||
// Compared [pq.Q], the limitation on priority levels increases performance,
|
||||
// as its circular buffers are much less expensive than the heap operations of
|
||||
// a traditional priority queue.
|
||||
package mq
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"gogs.humancabbage.net/sam/priorityq"
|
||||
"gogs.humancabbage.net/sam/priorityq/pq"
|
||||
"gogs.humancabbage.net/sam/priorityq/queue"
|
||||
)
|
||||
|
||||
// so that godoc (kinda) works
|
||||
var _ *pq.Q[int, int]
|
||||
|
||||
// Q is a concurrent, dual-priority message queue.
|
||||
type Q[T any] struct {
|
||||
*state[T]
|
||||
}
|
||||
|
||||
// Make a new queue.
|
||||
func Make[T any](cap int) Q[T] {
|
||||
high := queue.Make[T](cap)
|
||||
low := queue.Make[T](cap)
|
||||
s := &state[T]{
|
||||
high: high,
|
||||
low: low,
|
||||
}
|
||||
s.canRecv = sync.Cond{L: &s.mu}
|
||||
s.canSendHigh = sync.Cond{L: &s.mu}
|
||||
s.canSendLow = sync.Cond{L: &s.mu}
|
||||
return Q[T]{s}
|
||||
}
|
||||
|
||||
type state[T any] struct {
|
||||
mu sync.Mutex
|
||||
high queue.Q[T]
|
||||
low queue.Q[T]
|
||||
canSendHigh sync.Cond
|
||||
canSendLow sync.Cond
|
||||
canRecv sync.Cond
|
||||
closed bool
|
||||
}
|
||||
|
||||
// Close marks the queue as closed.
|
||||
//
|
||||
// Attempting to close an already-closed queue results in a panic.
|
||||
func (s *state[T]) Close() {
|
||||
s.mu.Lock()
|
||||
if s.closed {
|
||||
s.mu.Unlock()
|
||||
panic("close of closed queue")
|
||||
}
|
||||
s.closed = true
|
||||
s.mu.Unlock()
|
||||
s.canRecv.Broadcast()
|
||||
}
|
||||
|
||||
// Recv gets an item, blocking when empty until one is available.
|
||||
//
|
||||
// The returned bool will be true if the queue still has items or is open.
|
||||
// It will be false if the queue is empty and closed.
|
||||
func (s *state[T]) Recv() (T, bool) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
for {
|
||||
for !s.closed && !s.high.CanPop() && !s.low.CanPop() {
|
||||
s.canRecv.Wait()
|
||||
}
|
||||
if s.closed && !s.high.CanPop() && !s.low.CanPop() {
|
||||
var empty T
|
||||
return empty, false
|
||||
}
|
||||
if s.high.CanPop() {
|
||||
value := s.high.PopFront()
|
||||
s.canSendHigh.Broadcast()
|
||||
return value, true
|
||||
}
|
||||
if s.low.CanPop() {
|
||||
value := s.low.PopFront()
|
||||
s.canSendLow.Broadcast()
|
||||
return value, true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send is an alias for SendLow.
|
||||
func (s *state[T]) Send(value T) {
|
||||
s.SendLow(value)
|
||||
}
|
||||
|
||||
// SendHigh adds an item with high priority, blocking if full.
|
||||
func (s *state[T]) SendHigh(value T) {
|
||||
s.send(value, &s.high, &s.canSendHigh)
|
||||
}
|
||||
|
||||
// SendLow adds an item with low buffer, blocking if full.
|
||||
func (s *state[T]) SendLow(value T) {
|
||||
s.send(value, &s.low, &s.canSendLow)
|
||||
}
|
||||
|
||||
// TryRecv attempts to get an item from the queue, without blocking.
|
||||
//
|
||||
// The error indicates whether the attempt succeeded, the queue is empty, or
|
||||
// the queue is closed.
|
||||
func (s *state[T]) TryRecv() (value T, err error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.high.CanPop() {
|
||||
value = s.high.PopFront()
|
||||
s.canSendHigh.Broadcast()
|
||||
return
|
||||
}
|
||||
if s.low.CanPop() {
|
||||
value = s.low.PopFront()
|
||||
s.canSendLow.Broadcast()
|
||||
return
|
||||
}
|
||||
if s.closed {
|
||||
err = priorityq.ErrClosed
|
||||
} else {
|
||||
err = priorityq.ErrEmpty
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// TrySend is an alias for TrySendLow.
|
||||
func (s *state[T]) TrySend(value T) error {
|
||||
return s.trySend(value, &s.low)
|
||||
}
|
||||
|
||||
// TrySendHigh attempts to add an item with high priority, without blocking.
|
||||
//
|
||||
// Returns an error from the root priorityq package, or nil if successful.
|
||||
func (s *state[T]) TrySendHigh(value T) error {
|
||||
return s.trySend(value, &s.high)
|
||||
}
|
||||
|
||||
// TrySendLow attempts to add an item with low priority, without blocking.
|
||||
//
|
||||
// Returns an error from the root priorityq package, or nil if successful.
|
||||
func (s *state[T]) TrySendLow(value T) error {
|
||||
return s.trySend(value, &s.low)
|
||||
}
|
||||
|
||||
func (s *state[T]) send(value T, buf *queue.Q[T], cond *sync.Cond) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
for {
|
||||
for !s.closed && !buf.CanPush() {
|
||||
cond.Wait()
|
||||
}
|
||||
if s.closed {
|
||||
panic("send on closed queue")
|
||||
}
|
||||
if buf.CanPush() {
|
||||
buf.PushBack(value)
|
||||
s.canRecv.Broadcast()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *state[T]) trySend(value T, buf *queue.Q[T]) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.closed {
|
||||
return priorityq.ErrClosed
|
||||
}
|
||||
if !buf.CanPush() {
|
||||
return priorityq.ErrFull
|
||||
}
|
||||
buf.PushBack(value)
|
||||
s.canRecv.Broadcast()
|
||||
return nil
|
||||
}
|
@ -1,16 +1,18 @@
|
||||
package precise_test
|
||||
package mq_test
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"gogs.humancabbage.net/sam/priorityq/precise"
|
||||
"gogs.humancabbage.net/sam/priorityq"
|
||||
"gogs.humancabbage.net/sam/priorityq/mq"
|
||||
)
|
||||
|
||||
func TestRecvHighFirst(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := precise.Make[int](4)
|
||||
q := mq.Make[int](4)
|
||||
q.Send(1)
|
||||
q.Send(2)
|
||||
q.Send(3)
|
||||
@ -41,14 +43,14 @@ func TestSendClosedPanic(t *testing.T) {
|
||||
t.Errorf("sending to closed queue did not panic")
|
||||
}
|
||||
}()
|
||||
q := precise.Make[int](4)
|
||||
q := mq.Make[int](4)
|
||||
q.Close()
|
||||
q.Send(1)
|
||||
}
|
||||
|
||||
func TestRecvClosed(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := precise.Make[int](4)
|
||||
q := mq.Make[int](4)
|
||||
q.Send(1)
|
||||
q.Close()
|
||||
_, ok := q.Recv()
|
||||
@ -61,18 +63,30 @@ func TestRecvClosed(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDoubleClose(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := mq.Make[int](4)
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Errorf("closing a closed queue did not panic")
|
||||
}
|
||||
}()
|
||||
q.Close()
|
||||
q.Close()
|
||||
}
|
||||
|
||||
func TestTrySendRecv(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := precise.Make[int](4)
|
||||
assumeSendOk := func(n int, f func(int) bool) {
|
||||
ok := f(n)
|
||||
if !ok {
|
||||
q := mq.Make[int](4)
|
||||
assumeSendOk := func(n int, f func(int) error) {
|
||||
err := f(n)
|
||||
if err != nil {
|
||||
t.Errorf("expected to be able to send")
|
||||
}
|
||||
}
|
||||
assumeRecvOk := func(expected int) {
|
||||
actual, ok := q.TryRecv()
|
||||
if !ok {
|
||||
actual, err := q.TryRecv()
|
||||
if err != nil {
|
||||
t.Errorf("expected to be able to receive")
|
||||
}
|
||||
if actual != expected {
|
||||
@ -81,10 +95,10 @@ func TestTrySendRecv(t *testing.T) {
|
||||
}
|
||||
assumeSendOk(1, q.TrySendLow)
|
||||
assumeSendOk(2, q.TrySendLow)
|
||||
assumeSendOk(3, q.TrySendLow)
|
||||
assumeSendOk(3, q.TrySend)
|
||||
assumeSendOk(4, q.TrySendLow)
|
||||
ok := q.TrySendLow(5)
|
||||
if ok {
|
||||
err := q.TrySendLow(5)
|
||||
if err == nil {
|
||||
t.Errorf("expected low buffer to be full")
|
||||
}
|
||||
assumeRecvOk(1)
|
||||
@ -96,8 +110,8 @@ func TestTrySendRecv(t *testing.T) {
|
||||
assumeSendOk(6, q.TrySendHigh)
|
||||
assumeSendOk(7, q.TrySendHigh)
|
||||
assumeSendOk(8, q.TrySendHigh)
|
||||
ok = q.TrySendHigh(5)
|
||||
if ok {
|
||||
err = q.TrySendHigh(5)
|
||||
if err == nil {
|
||||
t.Errorf("expected high buffer to be full")
|
||||
}
|
||||
assumeRecvOk(5)
|
||||
@ -105,15 +119,24 @@ func TestTrySendRecv(t *testing.T) {
|
||||
assumeRecvOk(7)
|
||||
assumeRecvOk(8)
|
||||
|
||||
_, ok = q.TryRecv()
|
||||
if ok {
|
||||
_, err = q.TryRecv()
|
||||
if err != priorityq.ErrEmpty {
|
||||
t.Errorf("expected queue to be empty")
|
||||
}
|
||||
q.Close()
|
||||
_, err = q.TryRecv()
|
||||
if err != priorityq.ErrClosed {
|
||||
t.Errorf("expected queue to be closed ")
|
||||
}
|
||||
err = q.TrySend(5)
|
||||
if err != priorityq.ErrClosed {
|
||||
t.Errorf("expected queue to be closed ")
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcProducerConsumer(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := precise.Make[int](4)
|
||||
q := mq.Make[int](4)
|
||||
var wg sync.WaitGroup
|
||||
produceDone := make(chan struct{})
|
||||
wg.Add(2)
|
||||
@ -142,7 +165,7 @@ func TestConcProducerConsumer(t *testing.T) {
|
||||
}
|
||||
|
||||
func BenchmarkSend(b *testing.B) {
|
||||
q := precise.Make[int](b.N)
|
||||
q := mq.Make[int](b.N)
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
q.Send(i)
|
||||
@ -158,7 +181,7 @@ func BenchmarkSendChan(b *testing.B) {
|
||||
}
|
||||
|
||||
func BenchmarkRecv(b *testing.B) {
|
||||
q := precise.Make[int](b.N)
|
||||
q := mq.Make[int](b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
q.Send(i)
|
||||
}
|
||||
@ -180,7 +203,7 @@ func BenchmarkRecvChan(b *testing.B) {
|
||||
}
|
||||
|
||||
func BenchmarkConcSendRecv(b *testing.B) {
|
||||
q := precise.Make[int](b.N)
|
||||
q := mq.Make[int](b.N)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
start := make(chan struct{})
|
||||
@ -226,3 +249,64 @@ func BenchmarkConcSendRecvChan(b *testing.B) {
|
||||
close(start)
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func BenchmarkHighContention(b *testing.B) {
|
||||
q := mq.Make[int](b.N)
|
||||
var wg sync.WaitGroup
|
||||
start := make(chan struct{})
|
||||
done := make(chan struct{})
|
||||
numProducers := runtime.NumCPU()
|
||||
sendsPerProducer := b.N / numProducers
|
||||
wg.Add(numProducers)
|
||||
for i := 0; i < numProducers; i++ {
|
||||
go func() {
|
||||
<-start
|
||||
for i := 0; i < sendsPerProducer; i++ {
|
||||
q.Send(1)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
go func() {
|
||||
ok := true
|
||||
for ok {
|
||||
_, ok = q.Recv()
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
b.ResetTimer()
|
||||
close(start)
|
||||
wg.Wait()
|
||||
q.Close()
|
||||
<-done
|
||||
}
|
||||
|
||||
func BenchmarkHighContentionChan(b *testing.B) {
|
||||
c := make(chan int, b.N)
|
||||
var wg sync.WaitGroup
|
||||
start := make(chan struct{})
|
||||
done := make(chan struct{})
|
||||
numProducers := runtime.NumCPU()
|
||||
sendsPerProducer := b.N / numProducers
|
||||
wg.Add(numProducers)
|
||||
for i := 0; i < numProducers; i++ {
|
||||
go func() {
|
||||
<-start
|
||||
for i := 0; i < sendsPerProducer; i++ {
|
||||
c <- 1
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
go func() {
|
||||
for n := range c {
|
||||
_ = n
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
b.ResetTimer()
|
||||
close(start)
|
||||
wg.Wait()
|
||||
close(c)
|
||||
<-done
|
||||
}
|
214
npq/lib.go
Normal file
214
npq/lib.go
Normal file
@ -0,0 +1,214 @@
|
||||
// Package npq implements a concurrent, n-priority message queue.
|
||||
//
|
||||
// [Q] is similar to a buffered channel, except that senders can assign one of
|
||||
// some fixed number priority levels to each item. Receivers will always get
|
||||
// the item with the highest priority.
|
||||
//
|
||||
// For example:
|
||||
//
|
||||
// q := npq.Make[int, string](8, 1, 2)
|
||||
// q.Send(1, "world")
|
||||
// q.Send(2, "hello")
|
||||
// word1, _ := q.Recv()
|
||||
// word2, _ := q.Recv()
|
||||
// fmt.Println(word1, word2)
|
||||
// q.Close()
|
||||
// // Output: hello world
|
||||
//
|
||||
// # Implementation
|
||||
//
|
||||
// Each [Q] has an array of circular buffers, one for each priority level.
|
||||
// Currently, the capacities for these are fixed and equal. If one buffer is
|
||||
// full, attempts to send further items with its priority level will block
|
||||
// ([Q.Send]) or fail ([Q.TrySend]).
|
||||
//
|
||||
// Compared to [mq.Q] , which has only two priority levels, [Q] performs
|
||||
// similarly in the Send and Recv benchmarks. In the ConcSendRecv benchmark,
|
||||
// though, this package is around 75% slower. Even more peculiar, however, in
|
||||
// HighContention, it is 7% faster.
|
||||
//
|
||||
// Compared the pq package, this package is faster in the HighContention
|
||||
// benchmark until around 145-150 priority levels; in ConcSendRecv, until
|
||||
// around 200 levels; and in Recv, until around 750 levels.
|
||||
//
|
||||
// It's important to remember the memory requirement differences. A [pq.Q]
|
||||
// with capacity N can store N items; a [Q] with capacity N and P priority
|
||||
// levels can store N*P items.
|
||||
package npq
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"gogs.humancabbage.net/sam/priorityq"
|
||||
"gogs.humancabbage.net/sam/priorityq/mq"
|
||||
"gogs.humancabbage.net/sam/priorityq/pq"
|
||||
"gogs.humancabbage.net/sam/priorityq/queue"
|
||||
"golang.org/x/exp/constraints"
|
||||
)
|
||||
|
||||
// so that godoc (kinda) works
|
||||
var _ *pq.Q[int, int]
|
||||
var _ *mq.Q[int]
|
||||
|
||||
// Q is a concurrent, dual-priority message queue.
|
||||
type Q[P constraints.Integer, T any] struct {
|
||||
*state[P, T]
|
||||
}
|
||||
|
||||
// Make a new queue.
|
||||
func Make[P constraints.Integer, T any](cap int, min P, max P) Q[P, T] {
|
||||
numLevels := max - min + 1
|
||||
levels := make([]queue.Q[T], numLevels)
|
||||
for i := 0; i < int(numLevels); i++ {
|
||||
levels[i] = queue.Make[T](cap)
|
||||
}
|
||||
s := &state[P, T]{
|
||||
levels: levels,
|
||||
min: min,
|
||||
max: max,
|
||||
}
|
||||
canSend := make([]sync.Cond, numLevels)
|
||||
for i := 0; i < len(canSend); i++ {
|
||||
canSend[i].L = &s.mu
|
||||
}
|
||||
s.canRecv = sync.Cond{L: &s.mu}
|
||||
s.canSend = canSend
|
||||
return Q[P, T]{s}
|
||||
}
|
||||
|
||||
type state[P constraints.Integer, T any] struct {
|
||||
mu sync.Mutex
|
||||
levels []queue.Q[T]
|
||||
canSend []sync.Cond
|
||||
canRecv sync.Cond
|
||||
min P
|
||||
max P
|
||||
closed bool
|
||||
}
|
||||
|
||||
// Close marks the queue as closed.
|
||||
//
|
||||
// Attempting to close an already-closed queue results in a panic.
|
||||
func (s *state[P, T]) Close() {
|
||||
s.mu.Lock()
|
||||
if s.closed {
|
||||
s.mu.Unlock()
|
||||
panic("close of closed queue")
|
||||
}
|
||||
s.closed = true
|
||||
s.mu.Unlock()
|
||||
s.canRecv.Broadcast()
|
||||
}
|
||||
|
||||
// Recv gets an item, blocking when empty until one is available.
|
||||
//
|
||||
// The returned bool will be true if the queue still has items or is open.
|
||||
// It will be false if the queue is empty and closed.
|
||||
func (s *state[P, T]) Recv() (T, bool) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
for {
|
||||
var available int = -1
|
||||
findAvailable := func() {
|
||||
for levelIdx := len(s.levels) - 1; levelIdx > -1; levelIdx-- {
|
||||
level := &s.levels[levelIdx]
|
||||
if level.CanPop() {
|
||||
available = levelIdx
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
findAvailable()
|
||||
for !s.closed && available == -1 {
|
||||
s.canRecv.Wait()
|
||||
findAvailable()
|
||||
}
|
||||
if s.closed && available == -1 {
|
||||
var empty T
|
||||
return empty, false
|
||||
}
|
||||
if available != -1 {
|
||||
level := &s.levels[available]
|
||||
value := level.PopFront()
|
||||
s.canSend[available].Broadcast()
|
||||
return value, true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send adds an item with some priority, blocking if full.
|
||||
func (s *state[P, T]) Send(priority P, value T) {
|
||||
s.validatePriority(priority)
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
levelIdx := priority - s.min
|
||||
level := &s.levels[levelIdx]
|
||||
for {
|
||||
for !s.closed && !level.CanPush() {
|
||||
s.canSend[levelIdx].Wait()
|
||||
}
|
||||
if s.closed {
|
||||
panic("send on closed queue")
|
||||
}
|
||||
if level.CanPush() {
|
||||
level.PushBack(value)
|
||||
s.canRecv.Broadcast()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TryRecv attempts to get an item from the queue, without blocking.
|
||||
//
|
||||
// The error indicates whether the attempt succeeded, the queue is empty, or
|
||||
// the queue is closed.
|
||||
func (s *state[P, T]) TryRecv() (value T, err error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
for levelIdx := len(s.levels) - 1; levelIdx > -1; levelIdx-- {
|
||||
level := &s.levels[levelIdx]
|
||||
if level.CanPop() {
|
||||
value = level.PopFront()
|
||||
s.canSend[levelIdx].Broadcast()
|
||||
return
|
||||
}
|
||||
}
|
||||
if s.closed {
|
||||
err = priorityq.ErrClosed
|
||||
} else {
|
||||
err = priorityq.ErrEmpty
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// TrySend attempts to add an item with some priority, without blocking.
|
||||
//
|
||||
// Returns an error from the root priorityq package, or nil if successful.
|
||||
func (s *state[P, T]) TrySend(priority P, value T) error {
|
||||
s.validatePriority(priority)
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.closed {
|
||||
return priorityq.ErrClosed
|
||||
}
|
||||
levelIdx := priority - s.min
|
||||
level := &s.levels[levelIdx]
|
||||
if !level.CanPush() {
|
||||
return priorityq.ErrFull
|
||||
}
|
||||
level.PushBack(value)
|
||||
s.canRecv.Broadcast()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *state[P, T]) validatePriority(priority P) {
|
||||
if priority < s.min || priority > s.max {
|
||||
panic(fmt.Errorf("priority %d out of range (%d, %d)",
|
||||
priority, s.min, s.max))
|
||||
}
|
||||
}
|
277
npq/lib_test.go
Normal file
277
npq/lib_test.go
Normal file
@ -0,0 +1,277 @@
|
||||
package npq_test
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"gogs.humancabbage.net/sam/priorityq"
|
||||
"gogs.humancabbage.net/sam/priorityq/npq"
|
||||
)
|
||||
|
||||
func TestRecvHighFirst(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := npq.Make[int, int](4, 1, 2)
|
||||
q.Send(1, 1)
|
||||
q.Send(1, 2)
|
||||
q.Send(1, 3)
|
||||
q.Send(1, 4)
|
||||
q.Send(2, 5)
|
||||
q.Send(2, 6)
|
||||
q.Send(2, 7)
|
||||
q.Send(2, 8)
|
||||
checkRecv := func(n int) {
|
||||
if v, _ := q.Recv(); v != n {
|
||||
t.Errorf("popped %d, expected %d", v, n)
|
||||
}
|
||||
}
|
||||
checkRecv(5)
|
||||
checkRecv(6)
|
||||
checkRecv(7)
|
||||
checkRecv(8)
|
||||
checkRecv(1)
|
||||
checkRecv(2)
|
||||
checkRecv(3)
|
||||
checkRecv(4)
|
||||
}
|
||||
|
||||
func TestSendClosedPanic(t *testing.T) {
|
||||
t.Parallel()
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Errorf("sending to closed queue did not panic")
|
||||
}
|
||||
}()
|
||||
q := npq.Make[int, int](4, 1, 2)
|
||||
q.Close()
|
||||
q.Send(1, 1)
|
||||
}
|
||||
|
||||
func TestTooLowPriorityPanic(t *testing.T) {
|
||||
testInvalidPriorityPanic(t, 0)
|
||||
}
|
||||
|
||||
func TestTooHighPriorityPanic(t *testing.T) {
|
||||
testInvalidPriorityPanic(t, 3)
|
||||
}
|
||||
|
||||
func testInvalidPriorityPanic(t *testing.T, invalid int) {
|
||||
t.Parallel()
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Errorf("sending with invalid priority did not panic")
|
||||
}
|
||||
}()
|
||||
q := npq.Make[int, int](4, 1, 2)
|
||||
q.Send(invalid, 1)
|
||||
|
||||
}
|
||||
|
||||
func TestRecvClosed(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := npq.Make[int, int](4, 1, 2)
|
||||
q.Send(1, 1)
|
||||
q.Close()
|
||||
_, ok := q.Recv()
|
||||
if !ok {
|
||||
t.Errorf("queue should have item to receive")
|
||||
}
|
||||
_, ok = q.Recv()
|
||||
if ok {
|
||||
t.Errorf("queue should be closed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDoubleClose(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := npq.Make[int, int](4, 1, 2)
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Errorf("closing a closed queue did not panic")
|
||||
}
|
||||
}()
|
||||
q.Close()
|
||||
q.Close()
|
||||
}
|
||||
|
||||
func TestTrySendRecv(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := npq.Make[int, int](4, 1, 2)
|
||||
assumeSendOk := func(n int, f func(int) error) {
|
||||
err := f(n)
|
||||
if err != nil {
|
||||
t.Errorf("expected to be able to send")
|
||||
}
|
||||
}
|
||||
assumeRecvOk := func(expected int) {
|
||||
actual, err := q.TryRecv()
|
||||
if err != nil {
|
||||
t.Errorf("expected to be able to receive")
|
||||
}
|
||||
if actual != expected {
|
||||
t.Errorf("expected %d, got %d", expected, actual)
|
||||
}
|
||||
}
|
||||
trySendLow := func(n int) error {
|
||||
return q.TrySend(1, n)
|
||||
}
|
||||
trySendHigh := func(n int) error {
|
||||
return q.TrySend(2, n)
|
||||
}
|
||||
assumeSendOk(1, trySendLow)
|
||||
assumeSendOk(2, trySendLow)
|
||||
assumeSendOk(3, trySendLow)
|
||||
assumeSendOk(4, trySendLow)
|
||||
err := trySendLow(5)
|
||||
if err == nil {
|
||||
t.Errorf("expected low buffer to be full")
|
||||
}
|
||||
assumeRecvOk(1)
|
||||
assumeRecvOk(2)
|
||||
assumeRecvOk(3)
|
||||
assumeRecvOk(4)
|
||||
|
||||
assumeSendOk(5, trySendHigh)
|
||||
assumeSendOk(6, trySendHigh)
|
||||
assumeSendOk(7, trySendHigh)
|
||||
assumeSendOk(8, trySendHigh)
|
||||
err = trySendHigh(5)
|
||||
if err == nil {
|
||||
t.Errorf("expected high buffer to be full")
|
||||
}
|
||||
assumeRecvOk(5)
|
||||
assumeRecvOk(6)
|
||||
assumeRecvOk(7)
|
||||
assumeRecvOk(8)
|
||||
|
||||
_, err = q.TryRecv()
|
||||
if err != priorityq.ErrEmpty {
|
||||
t.Errorf("expected queue to be empty")
|
||||
}
|
||||
q.Close()
|
||||
_, err = q.TryRecv()
|
||||
if err != priorityq.ErrClosed {
|
||||
t.Errorf("expected queue to be closed ")
|
||||
}
|
||||
err = q.TrySend(1, 5)
|
||||
if err != priorityq.ErrClosed {
|
||||
t.Errorf("expected queue to be closed ")
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcProducerConsumer(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := npq.Make[int, int](4, 1, 2)
|
||||
var wg sync.WaitGroup
|
||||
produceDone := make(chan struct{})
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
for i := 0; i < 10000; i++ {
|
||||
q.Send(rand.Intn(2)+1, i)
|
||||
}
|
||||
close(produceDone)
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
ok := true
|
||||
for ok {
|
||||
_, ok = q.Recv()
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
<-produceDone
|
||||
t.Logf("producer done, closing channel")
|
||||
q.Close()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
const highPriority = 2
|
||||
|
||||
func BenchmarkSend(b *testing.B) {
|
||||
q := npq.Make[int, int](b.N, 1, highPriority)
|
||||
// randomize priorities to get amortized cost per op
|
||||
ps := make([]int, b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
ps[i] = rand.Intn(highPriority) + 1
|
||||
}
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
q.Send(ps[i], i)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkRecv(b *testing.B) {
|
||||
q := npq.Make[int, int](b.N, 1, highPriority)
|
||||
for i := 0; i < b.N; i++ {
|
||||
q.Send(rand.Intn(highPriority)+1, i)
|
||||
}
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
q.Recv()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkConcSendRecv(b *testing.B) {
|
||||
q := npq.Make[int, int](b.N, 1, highPriority)
|
||||
// randomize priorities to get amortized cost per op
|
||||
ps := make([]int, b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
ps[i] = rand.Intn(highPriority) + 1
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
start := make(chan struct{})
|
||||
go func() {
|
||||
<-start
|
||||
for i := 0; i < b.N; i++ {
|
||||
q.Send(ps[i], i)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
<-start
|
||||
for i := 0; i < b.N; i++ {
|
||||
q.Recv()
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
b.ResetTimer()
|
||||
close(start)
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func BenchmarkHighContention(b *testing.B) {
|
||||
q := npq.Make[int, int](b.N, 1, highPriority)
|
||||
var wg sync.WaitGroup
|
||||
start := make(chan struct{})
|
||||
done := make(chan struct{})
|
||||
numProducers := runtime.NumCPU()
|
||||
sendsPerProducer := b.N / numProducers
|
||||
wg.Add(numProducers)
|
||||
for i := 0; i < numProducers; i++ {
|
||||
go func() {
|
||||
ps := make([]int, sendsPerProducer)
|
||||
for i := range ps {
|
||||
ps[i] = rand.Intn(highPriority) + 1
|
||||
}
|
||||
<-start
|
||||
for i := 0; i < sendsPerProducer; i++ {
|
||||
q.Send(ps[i], 1)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
go func() {
|
||||
ok := true
|
||||
for ok {
|
||||
_, ok = q.Recv()
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
b.ResetTimer()
|
||||
close(start)
|
||||
wg.Wait()
|
||||
q.Close()
|
||||
<-done
|
||||
}
|
155
pq/lib.go
Normal file
155
pq/lib.go
Normal file
@ -0,0 +1,155 @@
|
||||
// Package pq implements a concurrent priority queue.
|
||||
//
|
||||
// [Q] is similar to a buffered channel, except that senders attach to each
|
||||
// item a priority, and receivers always get the highest-priority item.
|
||||
//
|
||||
// For example:
|
||||
//
|
||||
// import "gogs.humancabbage.net/sam/priorityq/pq"
|
||||
// q := pq.Make[int, string](8)
|
||||
// q.Send(1, "world")
|
||||
// q.Send(2, "hello")
|
||||
// _, word1, _ := pq.Recv()
|
||||
// _, word2, _ := pq.Recv()
|
||||
// fmt.Println(word1, word2)
|
||||
// q.Close()
|
||||
// // Output: hello world
|
||||
//
|
||||
// # Implementation
|
||||
//
|
||||
// Each queue has a [binary max-heap]. Sending and receiving items require
|
||||
// heap-up and heap-down operations, respectively.
|
||||
//
|
||||
// [binary max-heap]: https://en.wikipedia.org/wiki/Binary_heap
|
||||
package pq
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"gogs.humancabbage.net/sam/priorityq"
|
||||
"gogs.humancabbage.net/sam/priorityq/binheap"
|
||||
"golang.org/x/exp/constraints"
|
||||
)
|
||||
|
||||
// Q is a generic, concurrent priority queue.
|
||||
type Q[P constraints.Ordered, T any] struct {
|
||||
*state[P, T]
|
||||
}
|
||||
|
||||
// Make a new queue.
|
||||
func Make[P constraints.Ordered, T any](cap int) Q[P, T] {
|
||||
heap := binheap.Make[P, T](cap)
|
||||
s := &state[P, T]{
|
||||
heap: heap,
|
||||
}
|
||||
s.canRecv = sync.Cond{L: &s.mu}
|
||||
s.canSend = sync.Cond{L: &s.mu}
|
||||
return Q[P, T]{s}
|
||||
}
|
||||
|
||||
type state[P constraints.Ordered, T any] struct {
|
||||
mu sync.Mutex
|
||||
heap binheap.H[P, T]
|
||||
canSend sync.Cond
|
||||
canRecv sync.Cond
|
||||
closed bool
|
||||
}
|
||||
|
||||
// Close marks the queue as closed.
|
||||
//
|
||||
// Attempting to close an already-closed queue results in a panic.
|
||||
func (s *state[P, T]) Close() {
|
||||
s.mu.Lock()
|
||||
if s.closed {
|
||||
s.mu.Unlock()
|
||||
panic("close of closed queue")
|
||||
}
|
||||
s.closed = true
|
||||
s.mu.Unlock()
|
||||
s.canRecv.Broadcast()
|
||||
}
|
||||
|
||||
// Recv gets an item, blocking when empty until one is available.
|
||||
//
|
||||
// This returns both the item itself and the its assigned priority.
|
||||
//
|
||||
// The returned bool will be true if the queue still has items or is open.
|
||||
// It will be false if the queue is empty and closed.
|
||||
func (s *state[P, T]) Recv() (P, T, bool) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
for {
|
||||
for !s.closed && !s.heap.CanExtract() {
|
||||
s.canRecv.Wait()
|
||||
}
|
||||
if s.closed && !s.heap.CanExtract() {
|
||||
var emptyP P
|
||||
var emptyT T
|
||||
return emptyP, emptyT, false
|
||||
}
|
||||
if s.heap.CanExtract() {
|
||||
priority, value := s.heap.Extract()
|
||||
s.canSend.Broadcast()
|
||||
return priority, value, true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send adds an item with some priority, blocking if full.
|
||||
func (s *state[P, T]) Send(priority P, value T) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
for {
|
||||
for !s.closed && !s.heap.CanInsert() {
|
||||
s.canSend.Wait()
|
||||
}
|
||||
if s.closed {
|
||||
panic("send on closed queue")
|
||||
}
|
||||
if s.heap.CanInsert() {
|
||||
s.heap.Insert(priority, value)
|
||||
s.canRecv.Broadcast()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TryRecv attempts to get an item without blocking.
|
||||
//
|
||||
// This returns both the item itself and the its assigned priority.
|
||||
//
|
||||
// The error indicates whether the attempt succeeded, the queue is empty, or
|
||||
// the queue is closed.
|
||||
func (s *state[P, T]) TryRecv() (priority P, value T, err error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.heap.CanExtract() {
|
||||
priority, value = s.heap.Extract()
|
||||
s.canSend.Broadcast()
|
||||
return
|
||||
}
|
||||
if s.closed {
|
||||
err = priorityq.ErrClosed
|
||||
} else {
|
||||
err = priorityq.ErrEmpty
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// TrySend attempts to add an item with some priority, without blocking.
|
||||
//
|
||||
// This method does not block. If there is space in the buffer, it returns
|
||||
// true. If the buffer is full, it returns false.
|
||||
func (s *state[P, T]) TrySend(priority P, value T) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.closed {
|
||||
return priorityq.ErrClosed
|
||||
}
|
||||
if !s.heap.CanInsert() {
|
||||
return priorityq.ErrFull
|
||||
}
|
||||
s.heap.Insert(priority, value)
|
||||
s.canRecv.Broadcast()
|
||||
return nil
|
||||
}
|
237
pq/lib_test.go
Normal file
237
pq/lib_test.go
Normal file
@ -0,0 +1,237 @@
|
||||
package pq_test
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"gogs.humancabbage.net/sam/priorityq"
|
||||
"gogs.humancabbage.net/sam/priorityq/pq"
|
||||
)
|
||||
|
||||
func TestRecvHighestFirst(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := pq.Make[int, int](8)
|
||||
q.Send(4, 4)
|
||||
q.Send(2, 2)
|
||||
q.Send(1, 1)
|
||||
q.Send(5, 5)
|
||||
q.Send(7, 7)
|
||||
q.Send(8, 8)
|
||||
q.Send(3, 3)
|
||||
q.Send(6, 6)
|
||||
checkRecv := func(n int) {
|
||||
if _, v, _ := q.Recv(); v != n {
|
||||
t.Errorf("popped %d, expected %d", v, n)
|
||||
}
|
||||
}
|
||||
checkRecv(8)
|
||||
checkRecv(7)
|
||||
checkRecv(6)
|
||||
checkRecv(5)
|
||||
checkRecv(4)
|
||||
checkRecv(3)
|
||||
checkRecv(2)
|
||||
checkRecv(1)
|
||||
}
|
||||
|
||||
func TestSendClosedPanic(t *testing.T) {
|
||||
t.Parallel()
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Errorf("sending to closed queue did not panic")
|
||||
}
|
||||
}()
|
||||
q := pq.Make[int, int](4)
|
||||
q.Close()
|
||||
q.Send(1, 1)
|
||||
}
|
||||
|
||||
func TestRecvClosed(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := pq.Make[int, int](4)
|
||||
q.Send(1, 1)
|
||||
q.Close()
|
||||
_, _, ok := q.Recv()
|
||||
if !ok {
|
||||
t.Errorf("queue should have item to receive")
|
||||
}
|
||||
_, _, ok = q.Recv()
|
||||
if ok {
|
||||
t.Errorf("queue should be closed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDoubleClose(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := pq.Make[int, int](4)
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Errorf("closing a closed queue did not panic")
|
||||
}
|
||||
}()
|
||||
q.Close()
|
||||
q.Close()
|
||||
}
|
||||
|
||||
func TestTrySendRecv(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := pq.Make[int, int](4)
|
||||
assumeSendOk := func(n int) {
|
||||
err := q.TrySend(n, n)
|
||||
if err != nil {
|
||||
t.Errorf("expected to be able to send")
|
||||
}
|
||||
}
|
||||
assumeRecvOk := func(expected int) {
|
||||
_, actual, err := q.TryRecv()
|
||||
if err != nil {
|
||||
t.Errorf("expected to be able to receive")
|
||||
}
|
||||
if actual != expected {
|
||||
t.Errorf("expected %d, got %d", expected, actual)
|
||||
}
|
||||
}
|
||||
assumeSendOk(1)
|
||||
assumeSendOk(2)
|
||||
assumeSendOk(3)
|
||||
assumeSendOk(4)
|
||||
err := q.TrySend(5, 5)
|
||||
if err == nil {
|
||||
t.Errorf("expected queue to be full")
|
||||
}
|
||||
assumeRecvOk(4)
|
||||
assumeRecvOk(3)
|
||||
assumeRecvOk(2)
|
||||
assumeRecvOk(1)
|
||||
|
||||
_, _, err = q.TryRecv()
|
||||
if err != priorityq.ErrEmpty {
|
||||
t.Errorf("expected queue to be empty")
|
||||
}
|
||||
q.Close()
|
||||
_, _, err = q.TryRecv()
|
||||
if err != priorityq.ErrClosed {
|
||||
t.Errorf("expected queue to be closed ")
|
||||
}
|
||||
err = q.TrySend(1, 1)
|
||||
if err != priorityq.ErrClosed {
|
||||
t.Errorf("expected queue to be closed ")
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcProducerConsumer(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := pq.Make[int, int](4)
|
||||
var wg sync.WaitGroup
|
||||
produceDone := make(chan struct{})
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
for i := 0; i < 10000; i++ {
|
||||
q.Send(rand.Int(), i)
|
||||
}
|
||||
close(produceDone)
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
ok := true
|
||||
for ok {
|
||||
_, _, ok = q.Recv()
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
<-produceDone
|
||||
t.Logf("producer done, closing channel")
|
||||
q.Close()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func BenchmarkSend(b *testing.B) {
|
||||
q := pq.Make[int, int](b.N)
|
||||
// randomize priorities to get amortized cost per op
|
||||
ps := make([]int, b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
ps[i] = rand.Int()
|
||||
}
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
q.Send(ps[i], i)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkRecv(b *testing.B) {
|
||||
q := pq.Make[int, int](b.N)
|
||||
// randomize priorities to get amortized cost per op
|
||||
for i := 0; i < b.N; i++ {
|
||||
q.Send(rand.Int(), i)
|
||||
}
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
q.Recv()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkConcSendRecv(b *testing.B) {
|
||||
q := pq.Make[int, int](b.N)
|
||||
// randomize priorities to get amortized cost per op
|
||||
ps := make([]int, b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
ps[i] = rand.Int()
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
start := make(chan struct{})
|
||||
go func() {
|
||||
<-start
|
||||
for i := 0; i < b.N; i++ {
|
||||
q.Send(ps[i], i)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
<-start
|
||||
for i := 0; i < b.N; i++ {
|
||||
q.Recv()
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
b.ResetTimer()
|
||||
close(start)
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func BenchmarkHighContention(b *testing.B) {
|
||||
q := pq.Make[int, int](b.N)
|
||||
var wg sync.WaitGroup
|
||||
start := make(chan struct{})
|
||||
done := make(chan struct{})
|
||||
numProducers := runtime.NumCPU()
|
||||
sendsPerProducer := b.N / numProducers
|
||||
wg.Add(numProducers)
|
||||
for i := 0; i < numProducers; i++ {
|
||||
go func() {
|
||||
ps := make([]int, sendsPerProducer)
|
||||
for i := 0; i < sendsPerProducer; i++ {
|
||||
ps[i] = rand.Int()
|
||||
}
|
||||
<-start
|
||||
for i := 0; i < sendsPerProducer; i++ {
|
||||
q.Send(ps[i], 1)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
go func() {
|
||||
ok := true
|
||||
for ok {
|
||||
_, _, ok = q.Recv()
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
b.ResetTimer()
|
||||
close(start)
|
||||
wg.Wait()
|
||||
q.Close()
|
||||
<-done
|
||||
}
|
164
precise/lib.go
164
precise/lib.go
@ -1,164 +0,0 @@
|
||||
package precise
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"gogs.humancabbage.net/sam/priorityq/circ"
|
||||
)
|
||||
|
||||
// Q is a precise, concurrent priority queue.
|
||||
//
|
||||
// Each queue has two internal buffers, high and low. This implementation
|
||||
// guarantees that when there are items in both buffers, consumers receive
|
||||
// ones from the high priority buffer first.
|
||||
//
|
||||
// Each buffer has the same capacity, set on initial construction. Sending to
|
||||
// a buffer will block if it is full, even if the other buffer has space.
|
||||
type Q[T any] struct {
|
||||
*state[T]
|
||||
}
|
||||
|
||||
// Make a new priority queue.
|
||||
func Make[T any](cap int) Q[T] {
|
||||
high := circ.Make[T](cap)
|
||||
low := circ.Make[T](cap)
|
||||
s := &state[T]{
|
||||
high: high,
|
||||
low: low,
|
||||
}
|
||||
s.canRecv = sync.NewCond(&s.mu)
|
||||
s.canSendHigh = sync.NewCond(&s.mu)
|
||||
s.canSendLow = sync.NewCond(&s.mu)
|
||||
return Q[T]{s}
|
||||
}
|
||||
|
||||
type state[T any] struct {
|
||||
mu sync.Mutex
|
||||
high circ.B[T]
|
||||
low circ.B[T]
|
||||
canSendHigh *sync.Cond
|
||||
canSendLow *sync.Cond
|
||||
canRecv *sync.Cond
|
||||
closed bool
|
||||
}
|
||||
|
||||
// Close marks the queue as closed.
|
||||
//
|
||||
// Subsequent attempts to send will panic. Subsequent calls to Recv will
|
||||
// continue to return the remaining items in the queue.
|
||||
func (s *state[T]) Close() {
|
||||
s.mu.Lock()
|
||||
s.closed = true
|
||||
s.mu.Unlock()
|
||||
s.canRecv.Broadcast()
|
||||
}
|
||||
|
||||
// Recv returns an item from the prioritized buffers, blocking if empty.
|
||||
//
|
||||
// The returned bool will be true if the queue still has items or is open.
|
||||
// It will be false if the queue is empty and closed.
|
||||
func (s *state[T]) Recv() (T, bool) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
for {
|
||||
for !s.closed && !s.high.CanPop() && !s.low.CanPop() {
|
||||
s.canRecv.Wait()
|
||||
}
|
||||
if s.closed && !s.high.CanPop() && !s.low.CanPop() {
|
||||
var empty T
|
||||
return empty, false
|
||||
}
|
||||
if s.high.CanPop() {
|
||||
value := s.high.PopFront()
|
||||
s.canSendHigh.Broadcast()
|
||||
return value, true
|
||||
}
|
||||
if s.low.CanPop() {
|
||||
value := s.low.PopFront()
|
||||
s.canSendLow.Broadcast()
|
||||
return value, true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send is an alias for SendLow.
|
||||
func (s *state[T]) Send(value T) {
|
||||
s.SendLow(value)
|
||||
}
|
||||
|
||||
// SendHigh adds an item to the high priority buffer, blocking if full.
|
||||
func (s *state[T]) SendHigh(value T) {
|
||||
s.send(value, &s.high, s.canSendHigh)
|
||||
}
|
||||
|
||||
// SendLow adds an item to the low priority buffer, blocking if full.
|
||||
func (s *state[T]) SendLow(value T) {
|
||||
s.send(value, &s.low, s.canSendLow)
|
||||
}
|
||||
|
||||
// TryRecv attempts to return an item from the prioritized buffers.
|
||||
//
|
||||
// This method does not block. If there is an item in a buffer, it returns
|
||||
// true. If the buffer is empty, it returns false.
|
||||
func (s *state[T]) TryRecv() (value T, ok bool) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.high.CanPop() {
|
||||
value = s.high.PopFront()
|
||||
ok = true
|
||||
s.canSendHigh.Broadcast()
|
||||
return
|
||||
}
|
||||
if s.low.CanPop() {
|
||||
value = s.low.PopFront()
|
||||
ok = true
|
||||
s.canSendLow.Broadcast()
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// TrySendHigh attempts to add an item to the high priority buffer.
|
||||
//
|
||||
// This method does not block. If there is space in the buffer, it returns
|
||||
// true. If the buffer is full, it returns false.
|
||||
func (s *state[T]) TrySendHigh(value T) bool {
|
||||
return s.trySend(value, &s.high)
|
||||
}
|
||||
|
||||
// TrySendLow attempts to add an item to the low priority buffer.
|
||||
//
|
||||
// This method does not block. If there is space in the buffer, it returns
|
||||
// true. If the buffer is full, it returns false.
|
||||
func (s *state[T]) TrySendLow(value T) bool {
|
||||
return s.trySend(value, &s.low)
|
||||
}
|
||||
|
||||
func (s *state[T]) send(value T, buf *circ.B[T], cond *sync.Cond) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
for {
|
||||
for !s.closed && !buf.CanPush() {
|
||||
cond.Wait()
|
||||
}
|
||||
if s.closed {
|
||||
panic("send on closed queue")
|
||||
}
|
||||
if buf.CanPush() {
|
||||
buf.PushBack(value)
|
||||
s.canRecv.Broadcast()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *state[T]) trySend(value T, buf *circ.B[T]) bool {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if !buf.CanPush() {
|
||||
return false
|
||||
}
|
||||
buf.PushBack(value)
|
||||
s.canRecv.Broadcast()
|
||||
return true
|
||||
}
|
75
queue/lib.go
Normal file
75
queue/lib.go
Normal file
@ -0,0 +1,75 @@
|
||||
// Package queue implements a non-concurrent queue.
|
||||
//
|
||||
// # Implementation
|
||||
//
|
||||
// [Q] is a classic ring buffer. It tracks its head, tail, and length. This
|
||||
// makes determining whether the queue is full or empty trivial.
|
||||
package queue
|
||||
|
||||
// Q is a non-concurrent queue.
|
||||
type Q[T any] struct {
|
||||
buf []T
|
||||
len int
|
||||
head int
|
||||
tail int
|
||||
}
|
||||
|
||||
// Make creates a new queue.
|
||||
func Make[T any](cap int) Q[T] {
|
||||
buf := make([]T, cap)
|
||||
return Q[T]{buf: buf}
|
||||
}
|
||||
|
||||
// Capacity returns the total capacity of the queue.
|
||||
func (b *Q[T]) Capacity() int {
|
||||
return cap(b.buf)
|
||||
}
|
||||
|
||||
// Len returns the number of items in the queue.
|
||||
func (b *Q[T]) Len() int {
|
||||
return b.len
|
||||
}
|
||||
|
||||
// CanPush returns true if the queue has space for new items.
|
||||
func (b *Q[T]) CanPush() bool {
|
||||
return cap(b.buf)-b.len != 0
|
||||
}
|
||||
|
||||
// CanPop returns true if the queue has one or more items.
|
||||
func (b *Q[T]) CanPop() bool {
|
||||
return b.len != 0
|
||||
}
|
||||
|
||||
// PopFront returns the front-most item from the queue.
|
||||
//
|
||||
// If the queue is empty, it panics.
|
||||
func (b *Q[T]) PopFront() T {
|
||||
if !b.CanPop() {
|
||||
panic("cannot pop from empty queue")
|
||||
}
|
||||
item := b.buf[b.head]
|
||||
// clear queue slot so as not to hold on to garbage
|
||||
var empty T
|
||||
b.buf[b.head] = empty
|
||||
b.len--
|
||||
b.head++
|
||||
if b.head == cap(b.buf) {
|
||||
b.head = 0
|
||||
}
|
||||
return item
|
||||
}
|
||||
|
||||
// PushBack adds an item to the end of the queue.
|
||||
//
|
||||
// If the queue is full, it panics.
|
||||
func (b *Q[T]) PushBack(value T) {
|
||||
if !b.CanPush() {
|
||||
panic("cannot push back to full queue")
|
||||
}
|
||||
b.buf[b.tail] = value
|
||||
b.len++
|
||||
b.tail++
|
||||
if b.tail == cap(b.buf) {
|
||||
b.tail = 0
|
||||
}
|
||||
}
|
101
queue/lib_test.go
Normal file
101
queue/lib_test.go
Normal file
@ -0,0 +1,101 @@
|
||||
package queue_test
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"gogs.humancabbage.net/sam/priorityq/queue"
|
||||
)
|
||||
|
||||
func TestRepeatPushPop(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := queue.Make[int](4)
|
||||
if q.Capacity() != 4 {
|
||||
t.Errorf("wrong capacity")
|
||||
}
|
||||
for i := 0; i < 50; i++ {
|
||||
q.PushBack(1)
|
||||
q.PushBack(2)
|
||||
q.PushBack(3)
|
||||
q.PushBack(4)
|
||||
if q.Len() != 4 {
|
||||
t.Errorf("wrong length")
|
||||
}
|
||||
checkPop := func(n int) {
|
||||
if v := q.PopFront(); v != n {
|
||||
t.Errorf("popped %d, expected %d", v, n)
|
||||
}
|
||||
}
|
||||
checkPop(1)
|
||||
checkPop(2)
|
||||
checkPop(3)
|
||||
checkPop(4)
|
||||
}
|
||||
}
|
||||
|
||||
func TestInterleavedPushPop(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := queue.Make[int](4)
|
||||
checkPop := func(n int) {
|
||||
if v := q.PopFront(); v != n {
|
||||
t.Errorf("popped %d, expected %d", v, n)
|
||||
}
|
||||
}
|
||||
q.PushBack(1)
|
||||
q.PushBack(2)
|
||||
q.PushBack(3)
|
||||
checkPop(1)
|
||||
q.PushBack(4)
|
||||
q.PushBack(5)
|
||||
checkPop(2)
|
||||
}
|
||||
|
||||
func TestEmptyPopPanic(t *testing.T) {
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Errorf("popping empty buffer did not panic")
|
||||
}
|
||||
}()
|
||||
t.Parallel()
|
||||
q := queue.Make[int](4)
|
||||
q.PopFront()
|
||||
}
|
||||
|
||||
func TestFullPushPanic(t *testing.T) {
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Errorf("pushing full buffer did not panic")
|
||||
}
|
||||
}()
|
||||
t.Parallel()
|
||||
q := queue.Make[int](1)
|
||||
q.PushBack(1)
|
||||
q.PushBack(2)
|
||||
}
|
||||
|
||||
func BenchmarkPush(b *testing.B) {
|
||||
q := queue.Make[int](b.N)
|
||||
rs := rand.NewSource(0)
|
||||
r := rand.New(rs)
|
||||
items := make([]int, b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
items[i] = r.Int()
|
||||
}
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
q.PushBack(items[i])
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkPop(b *testing.B) {
|
||||
q := queue.Make[int](b.N)
|
||||
rs := rand.NewSource(0)
|
||||
r := rand.New(rs)
|
||||
for i := 0; i < b.N; i++ {
|
||||
q.PushBack(r.Int())
|
||||
}
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
q.PopFront()
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user