Compare commits
10 Commits
c4e92faaf7
...
a20c8908de
Author | SHA1 | Date | |
---|---|---|---|
a20c8908de | |||
9d1228988d | |||
0840eb9272 | |||
9f8d0760f1 | |||
f7474fb673 | |||
b00fe25128 | |||
b3b491d9a9 | |||
5e23a92314 | |||
ab364c31bb | |||
0759aaa2cd |
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
benchresults.txt
|
||||||
|
coverage
|
11
Makefile
Normal file
11
Makefile
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
.PHONY: test
|
||||||
|
test:
|
||||||
|
go test -count=1 -coverprofile=coverage ./...
|
||||||
|
|
||||||
|
.PHONY: bench
|
||||||
|
bench:
|
||||||
|
go test -bench=. ./...
|
||||||
|
|
||||||
|
.PHONY: longbench
|
||||||
|
longbench:
|
||||||
|
go test -count=30 -bench=. ./... | tee benchresults.txt
|
72
README.md
72
README.md
@ -1,12 +1,69 @@
|
|||||||
# priorityq - generic priority queue in Go
|
# priorityq - generic prioritized queues in Go
|
||||||
|
|
||||||
|
In Go, the builtin buffered channels provide a concurrent FIFO queue for
|
||||||
|
passing messages between goroutines. Sometimes, however, it's convenient to be
|
||||||
|
able to assign priority levels to messages, so that they get delivered to
|
||||||
|
consumers more promptly.
|
||||||
|
|
||||||
|
The `mq` package in this module implements a concurrent, dual-priority message
|
||||||
|
queue that guarantees receipt of a high-priority items before low-priority
|
||||||
|
ones. There is a pattern using two channels and select statements to achieve
|
||||||
|
similar functionality, but it's not exactly equivalent. (See the
|
||||||
|
[Background](#background) section for more details.)
|
||||||
|
|
||||||
|
Additionally, the `pq` package implements a concurrent, traditional priority
|
||||||
|
queue, using a binary max-heap. This is more general than `mq`, because it
|
||||||
|
allows multiple levels of priority. This, of course, also makes operations
|
||||||
|
slower.
|
||||||
|
|
||||||
|
Lastly, the `npq` package implements a concurrent, n-priority message queue.
|
||||||
|
It's similar to `mq`, except that it an arbitrary fixed number of priority
|
||||||
|
levels. It can have better performance than `pq` for several hundred levels.
|
||||||
|
|
||||||
|
## Benchmarks
|
||||||
|
|
||||||
|
Here are some benchmark results from running on a Mac Studio/M1 Ultra.
|
||||||
|
|
||||||
|
pkg: gogs.humancabbage.net/sam/priorityq/mq
|
||||||
|
Send-20 13.93n ± 0%
|
||||||
|
SendChan-20 13.19n ± 0%
|
||||||
|
Recv-20 13.64n ± 1%
|
||||||
|
RecvChan-20 13.29n ± 1%
|
||||||
|
ConcSendRecv-20 97.60n ± 1%
|
||||||
|
ConcSendRecvChan-20 171.8n ± 5%
|
||||||
|
HighContention-20 128.2n ± 0%
|
||||||
|
HighContentionChan-20 47.27n ± 0%
|
||||||
|
|
||||||
|
pkg: gogs.humancabbage.net/sam/priorityq/npq
|
||||||
|
Send-20 13.56n ± 0%
|
||||||
|
Recv-20 13.51n ± 0%
|
||||||
|
ConcSendRecv-20 176.3n ± 8%
|
||||||
|
HighContention-20 121.0n ± 0%
|
||||||
|
|
||||||
|
pkg: gogs.humancabbage.net/sam/priorityq/pq
|
||||||
|
Send-20 18.79n ± 1%
|
||||||
|
Recv-20 268.1n ± 3%
|
||||||
|
ConcSendRecv-20 199.2n ± 2%
|
||||||
|
HighContention-20 440.0n ± 1%
|
||||||
|
|
||||||
|
pkg: gogs.humancabbage.net/sam/priorityq/binheap
|
||||||
|
Insert-20 11.92n ± 0%
|
||||||
|
Extract-20 261.6n ± 2%
|
||||||
|
RepeatedInsertExtract-20 25.68n ± 1%
|
||||||
|
|
||||||
|
pkg: gogs.humancabbage.net/sam/priorityq/circ
|
||||||
|
Push-20 2.196n ± 1%
|
||||||
|
Pop-20 2.187n ± 0%
|
||||||
|
|
||||||
|
## Background
|
||||||
|
|
||||||
This module was inspired by [a reddit post][reddit] wherein /u/zandery23 asked
|
This module was inspired by [a reddit post][reddit] wherein /u/zandery23 asked
|
||||||
how to implement a priority queue in Go. A fantastic solution was [provided by
|
how to implement a prioritized message queue in Go. A fantastic solution was
|
||||||
/u/Ploobers][sol]. That's probably right for 99 out of 100 use cases, but it's
|
[provided by /u/Ploobers][sol]. That's probably right for 99 out of 100 use
|
||||||
not completely precise.
|
cases, but it's not completely precise.
|
||||||
|
|
||||||
Particularly, the second select block does not guarantee that an item from the
|
Particularly, the second select block does not guarantee that an item from the
|
||||||
priority queue will be taken if there is also an item in the regular queue.
|
prioritized queue will be taken if there is also an item in the regular queue.
|
||||||
|
|
||||||
```go
|
```go
|
||||||
select {
|
select {
|
||||||
@ -26,11 +83,6 @@ From the [Go Language Specification][go_select]:
|
|||||||
Thus, it is possible for the second case to be chosen even if the first case is
|
Thus, it is possible for the second case to be chosen even if the first case is
|
||||||
also ready.
|
also ready.
|
||||||
|
|
||||||
The `precise` package in this module implements a concurrent priority queue that
|
|
||||||
guarantees receipt of a high-priority items before low-priority ones. This is
|
|
||||||
primarily a fun exercise, I cannot recommend that anyone actually use this in a
|
|
||||||
real project.
|
|
||||||
|
|
||||||
[reddit]: https://www.reddit.com/r/golang/comments/11drc17/worker_pool_reading_from_two_channels_one_chan/
|
[reddit]: https://www.reddit.com/r/golang/comments/11drc17/worker_pool_reading_from_two_channels_one_chan/
|
||||||
[sol]: https://www.reddit.com/r/golang/comments/11drc17/worker_pool_reading_from_two_channels_one_chan/jabfvkh/
|
[sol]: https://www.reddit.com/r/golang/comments/11drc17/worker_pool_reading_from_two_channels_one_chan/jabfvkh/
|
||||||
[go_select]: https://go.dev/ref/spec#Select_statements
|
[go_select]: https://go.dev/ref/spec#Select_statements
|
||||||
|
115
binheap/lib.go
Normal file
115
binheap/lib.go
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
// Package binheap implements a non-concurrent binary max-heap.
|
||||||
|
//
|
||||||
|
// # Implementation
|
||||||
|
//
|
||||||
|
// [H] is parameterized over two types, one for the priority levels, one for
|
||||||
|
// the elements. Internally, there are two equally-sized buffers for these
|
||||||
|
// types. Re-heaping operations swap corresponding entries in these buffers
|
||||||
|
// in lock-step.
|
||||||
|
package binheap
|
||||||
|
|
||||||
|
import "golang.org/x/exp/constraints"
|
||||||
|
|
||||||
|
// H is a binary max-heap.
|
||||||
|
//
|
||||||
|
// `P` is the type of the priority levels, and `E` the type of the elements.
|
||||||
|
type H[P constraints.Ordered, E any] struct {
|
||||||
|
prs []P
|
||||||
|
els []E
|
||||||
|
len int
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make creates a new heap.
|
||||||
|
func Make[P constraints.Ordered, E any](cap int) H[P, E] {
|
||||||
|
priorities := make([]P, cap)
|
||||||
|
elements := make([]E, cap)
|
||||||
|
h := H[P, E]{prs: priorities, els: elements}
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
// Capacity returns the total capacity of the heap.
|
||||||
|
func (h *H[P, E]) Capacity() int {
|
||||||
|
return cap(h.prs)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Len returns the number of items in the heap.
|
||||||
|
func (h *H[P, E]) Len() int {
|
||||||
|
return h.len
|
||||||
|
}
|
||||||
|
|
||||||
|
// CanExtract returns true if the heap has any item, otherwise false.
|
||||||
|
func (h *H[P, E]) CanExtract() bool {
|
||||||
|
return h.len != 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// CanInsert returns true if the heap has unused capacity, otherwise false.
|
||||||
|
func (h *H[P, E]) CanInsert() bool {
|
||||||
|
return cap(h.prs)-h.len != 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract returns the current heap root, then performs a heap-down pass.
|
||||||
|
//
|
||||||
|
// If the heap is empty, it panics.
|
||||||
|
func (h *H[P, E]) Extract() (P, E) {
|
||||||
|
if !h.CanExtract() {
|
||||||
|
panic("heap is empty")
|
||||||
|
}
|
||||||
|
// extract root
|
||||||
|
priority := h.prs[0]
|
||||||
|
element := h.els[0]
|
||||||
|
// move last entry to root position
|
||||||
|
h.prs[0] = h.prs[h.len-1]
|
||||||
|
h.els[0] = h.els[h.len-1]
|
||||||
|
// clear the former last entry position,
|
||||||
|
// so as not to hold onto garbage
|
||||||
|
var emptyPriority P
|
||||||
|
var emptyElem E
|
||||||
|
h.prs[h.len-1] = emptyPriority
|
||||||
|
h.els[h.len-1] = emptyElem
|
||||||
|
// heap-down
|
||||||
|
h.len--
|
||||||
|
idx := 0
|
||||||
|
for {
|
||||||
|
left := idx<<1 + 1
|
||||||
|
right := idx<<1 + 2
|
||||||
|
largest := idx
|
||||||
|
if left < h.len && h.prs[left] > h.prs[largest] {
|
||||||
|
largest = left
|
||||||
|
}
|
||||||
|
if right < h.len && h.prs[right] > h.prs[largest] {
|
||||||
|
largest = right
|
||||||
|
}
|
||||||
|
if largest == idx {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
h.prs[idx], h.prs[largest] = h.prs[largest], h.prs[idx]
|
||||||
|
h.els[idx], h.els[largest] = h.els[largest], h.els[idx]
|
||||||
|
idx = largest
|
||||||
|
}
|
||||||
|
|
||||||
|
return priority, element
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert adds an item to the heap, then performs a heap-up pass.
|
||||||
|
//
|
||||||
|
// If the heap is full, it panics.
|
||||||
|
func (h *H[P, E]) Insert(priority P, elem E) {
|
||||||
|
if !h.CanInsert() {
|
||||||
|
panic("heap is full")
|
||||||
|
}
|
||||||
|
// insert new item into last position
|
||||||
|
idx := h.len
|
||||||
|
h.prs[idx] = priority
|
||||||
|
h.els[idx] = elem
|
||||||
|
// heap-up
|
||||||
|
h.len++
|
||||||
|
for {
|
||||||
|
parent := (idx - 1) >> 1
|
||||||
|
if parent < 0 || h.prs[parent] >= h.prs[idx] {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
h.prs[parent], h.prs[idx] = h.prs[idx], h.prs[parent]
|
||||||
|
h.els[parent], h.els[idx] = h.els[idx], h.els[parent]
|
||||||
|
idx = parent
|
||||||
|
}
|
||||||
|
}
|
131
binheap/lib_test.go
Normal file
131
binheap/lib_test.go
Normal file
@ -0,0 +1,131 @@
|
|||||||
|
package binheap_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/rand"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"gogs.humancabbage.net/sam/priorityq/binheap"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSmoke(t *testing.T) {
|
||||||
|
h := binheap.Make[int, int](10)
|
||||||
|
if h.Capacity() != 10 {
|
||||||
|
t.Errorf("expected heap capacity to be 10")
|
||||||
|
}
|
||||||
|
h.Insert(1, 1)
|
||||||
|
h.Insert(2, 2)
|
||||||
|
h.Insert(3, 3)
|
||||||
|
h.Insert(4, 4)
|
||||||
|
if h.Len() != 4 {
|
||||||
|
t.Errorf("expected heap length to be 4")
|
||||||
|
}
|
||||||
|
checkExtract := func(n int) {
|
||||||
|
_, extracted := h.Extract()
|
||||||
|
if extracted != n {
|
||||||
|
t.Errorf("expected to extract %d, got %d", n, extracted)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
checkExtract(4)
|
||||||
|
checkExtract(3)
|
||||||
|
checkExtract(2)
|
||||||
|
checkExtract(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInsertFullPanic(t *testing.T) {
|
||||||
|
h := binheap.Make[int, int](4)
|
||||||
|
h.Insert(1, 1)
|
||||||
|
h.Insert(2, 2)
|
||||||
|
h.Insert(3, 3)
|
||||||
|
h.Insert(4, 4)
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r == nil {
|
||||||
|
t.Errorf("expected final insert to panic")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
h.Insert(5, 5)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExtractEmptyPanic(t *testing.T) {
|
||||||
|
h := binheap.Make[int, int](4)
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r == nil {
|
||||||
|
t.Errorf("expected extract to panic")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
h.Extract()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRandomized(t *testing.T) {
|
||||||
|
h := binheap.Make[int, int](8192)
|
||||||
|
rs := rand.NewSource(0)
|
||||||
|
r := rand.New(rs)
|
||||||
|
// insert a bunch of random integers
|
||||||
|
for i := 0; i < h.Capacity(); i++ {
|
||||||
|
n := r.Int()
|
||||||
|
h.Insert(n, n)
|
||||||
|
}
|
||||||
|
// ensure that each extracted integer is <= the last extracted integer
|
||||||
|
var extracted []int
|
||||||
|
for h.CanExtract() {
|
||||||
|
id, item := h.Extract()
|
||||||
|
if id != item {
|
||||||
|
t.Errorf("id / item mismatch: %d %d", id, item)
|
||||||
|
}
|
||||||
|
lastIdx := len(extracted) - 1
|
||||||
|
extracted = append(extracted, item)
|
||||||
|
if lastIdx < 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if item > extracted[lastIdx] {
|
||||||
|
t.Errorf("newly extracted %d is greater than %d",
|
||||||
|
item, extracted[lastIdx])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkInsert(b *testing.B) {
|
||||||
|
h := binheap.Make[int, int](b.N)
|
||||||
|
rs := rand.NewSource(0)
|
||||||
|
r := rand.New(rs)
|
||||||
|
items := make([]int, b.N)
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
items[i] = r.Int()
|
||||||
|
}
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
h.Insert(items[i], items[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkExtract(b *testing.B) {
|
||||||
|
h := binheap.Make[int, int](b.N)
|
||||||
|
rs := rand.NewSource(0)
|
||||||
|
r := rand.New(rs)
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
n := r.Int()
|
||||||
|
h.Insert(n, n)
|
||||||
|
}
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
h.Extract()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkRepeatedInsertExtract(b *testing.B) {
|
||||||
|
h := binheap.Make[int, int](128)
|
||||||
|
rs := rand.NewSource(0)
|
||||||
|
r := rand.New(rs)
|
||||||
|
items := make([]int, b.N)
|
||||||
|
for i := 0; i < h.Capacity()-1; i++ {
|
||||||
|
n := r.Int()
|
||||||
|
h.Insert(n, n)
|
||||||
|
}
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
items[i] = r.Int()
|
||||||
|
}
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
h.Insert(items[i], items[i])
|
||||||
|
h.Extract()
|
||||||
|
}
|
||||||
|
}
|
59
circ/lib.go
59
circ/lib.go
@ -1,59 +0,0 @@
|
|||||||
package circ
|
|
||||||
|
|
||||||
// B is a generic, non-concurrent circular FIFO buffer.
|
|
||||||
type B[T any] struct {
|
|
||||||
buf []T
|
|
||||||
len int
|
|
||||||
head int
|
|
||||||
tail int
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make creates a new circular buffer.
|
|
||||||
func Make[T any](cap int) B[T] {
|
|
||||||
buf := make([]T, cap)
|
|
||||||
return B[T]{buf: buf}
|
|
||||||
}
|
|
||||||
|
|
||||||
// CanPush returns true if the buffer has space for new items.
|
|
||||||
func (b *B[T]) CanPush() bool {
|
|
||||||
return cap(b.buf)-b.len != 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// CanPop returns true if the buffer has one or more items.
|
|
||||||
func (b *B[T]) CanPop() bool {
|
|
||||||
return b.len != 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// PopFront returns the front-most item from the buffer.
|
|
||||||
//
|
|
||||||
// If the buffer is empty, it panics.
|
|
||||||
func (b *B[T]) PopFront() T {
|
|
||||||
if !b.CanPop() {
|
|
||||||
panic("cannot pop from empty buffer")
|
|
||||||
}
|
|
||||||
var empty T
|
|
||||||
item := b.buf[b.head]
|
|
||||||
// clear buffer slot so that we don't hold on to garbage
|
|
||||||
b.buf[b.head] = empty
|
|
||||||
b.len--
|
|
||||||
b.head++
|
|
||||||
if b.head == cap(b.buf) {
|
|
||||||
b.head = 0
|
|
||||||
}
|
|
||||||
return item
|
|
||||||
}
|
|
||||||
|
|
||||||
// PushBack adds an item to the end of the buffer.
|
|
||||||
//
|
|
||||||
// If the buffer is full, it panics.
|
|
||||||
func (b *B[T]) PushBack(value T) {
|
|
||||||
if !b.CanPush() {
|
|
||||||
panic("cannot push back to full buffer")
|
|
||||||
}
|
|
||||||
b.buf[b.tail] = value
|
|
||||||
b.len++
|
|
||||||
b.tail++
|
|
||||||
if b.tail == cap(b.buf) {
|
|
||||||
b.tail = 0
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,67 +0,0 @@
|
|||||||
package circ_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"gogs.humancabbage.net/sam/priorityq/circ"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestRepeatPushPop(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
cb := circ.Make[int](4)
|
|
||||||
for i := 0; i < 50; i++ {
|
|
||||||
cb.PushBack(1)
|
|
||||||
cb.PushBack(2)
|
|
||||||
cb.PushBack(3)
|
|
||||||
cb.PushBack(4)
|
|
||||||
checkPop := func(n int) {
|
|
||||||
if v := cb.PopFront(); v != n {
|
|
||||||
t.Errorf("popped %d, expected %d", v, n)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
checkPop(1)
|
|
||||||
checkPop(2)
|
|
||||||
checkPop(3)
|
|
||||||
checkPop(4)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestInterleavedPushPop(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
cb := circ.Make[int](4)
|
|
||||||
checkPop := func(n int) {
|
|
||||||
if v := cb.PopFront(); v != n {
|
|
||||||
t.Errorf("popped %d, expected %d", v, n)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cb.PushBack(1)
|
|
||||||
cb.PushBack(2)
|
|
||||||
cb.PushBack(3)
|
|
||||||
checkPop(1)
|
|
||||||
cb.PushBack(4)
|
|
||||||
cb.PushBack(5)
|
|
||||||
checkPop(2)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestEmptyPopPanic(t *testing.T) {
|
|
||||||
defer func() {
|
|
||||||
if r := recover(); r == nil {
|
|
||||||
t.Errorf("popping empty buffer did not panic")
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
t.Parallel()
|
|
||||||
cb := circ.Make[int](4)
|
|
||||||
cb.PopFront()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestFullPushPanic(t *testing.T) {
|
|
||||||
defer func() {
|
|
||||||
if r := recover(); r == nil {
|
|
||||||
t.Errorf("pushing full buffer did not panic")
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
t.Parallel()
|
|
||||||
cb := circ.Make[int](1)
|
|
||||||
cb.PushBack(1)
|
|
||||||
cb.PushBack(2)
|
|
||||||
}
|
|
2
go.mod
2
go.mod
@ -1,3 +1,5 @@
|
|||||||
module gogs.humancabbage.net/sam/priorityq
|
module gogs.humancabbage.net/sam/priorityq
|
||||||
|
|
||||||
go 1.20
|
go 1.20
|
||||||
|
|
||||||
|
require golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2
|
||||||
|
2
go.sum
2
go.sum
@ -0,0 +1,2 @@
|
|||||||
|
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2 h1:Jvc7gsqn21cJHCmAWx0LiimpP18LZmUxkT5Mp7EZ1mI=
|
||||||
|
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
|
41
lib.go
Normal file
41
lib.go
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
// Package priorityq provides generic implementations of various concurrent,
|
||||||
|
// prioritized queues.
|
||||||
|
//
|
||||||
|
// # Behavior
|
||||||
|
//
|
||||||
|
// All types of queues in this module act similarly to buffered Go channels.
|
||||||
|
//
|
||||||
|
// - They are bounded to a fixed capacity, set at construction.
|
||||||
|
// - Closing and sending to an already-closed queue causes a panic.
|
||||||
|
// - Receivers can continue getting items after closure, and can use a final
|
||||||
|
// bool to determine when there are none remaining.
|
||||||
|
// - They are safe for multiple concurrent senders and receivers.
|
||||||
|
//
|
||||||
|
// # Implementation
|
||||||
|
//
|
||||||
|
// All data structures in this module use [generics], introduced in Go 1.18.
|
||||||
|
//
|
||||||
|
// All of the concurrent data structures in this package use a [sync.Mutex]
|
||||||
|
// and a few [sync.Cond] variables.
|
||||||
|
//
|
||||||
|
// [generics]: https://go.dev/blog/intro-generics
|
||||||
|
package priorityq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ErrEmpty means that an operation failed because the queue was empty.
|
||||||
|
var ErrEmpty error
|
||||||
|
|
||||||
|
// ErrFull means that an operation failed because the queue was full.
|
||||||
|
var ErrFull error
|
||||||
|
|
||||||
|
// ErrClosed means that an operation failed because the queue was closed.
|
||||||
|
var ErrClosed error
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
ErrEmpty = fmt.Errorf("queue is empty")
|
||||||
|
ErrFull = fmt.Errorf("queue is full")
|
||||||
|
ErrClosed = fmt.Errorf("queue is closed")
|
||||||
|
}
|
201
mq/lib.go
Normal file
201
mq/lib.go
Normal file
@ -0,0 +1,201 @@
|
|||||||
|
// Package mq implements a concurrent, dual-priority message queue.
|
||||||
|
//
|
||||||
|
// [Q] is similar to a buffered channel, except that senders can assign one of
|
||||||
|
// two priority levels to each item, "high" or "low." Receivers will always
|
||||||
|
// get a high-priority item ahead of any low-priority ones.
|
||||||
|
//
|
||||||
|
// For example:
|
||||||
|
//
|
||||||
|
// q := mq.Make[string](8)
|
||||||
|
// q.SendLow("world")
|
||||||
|
// q.SendHigh("hello")
|
||||||
|
// word1, _ := mq.Recv()
|
||||||
|
// word2, _ := mq.Recv()
|
||||||
|
// fmt.Println(word1, word2)
|
||||||
|
// q.Close()
|
||||||
|
// // Output: hello world
|
||||||
|
//
|
||||||
|
// # Implementation
|
||||||
|
//
|
||||||
|
// Each [Q] has two circular buffers, one for each priority level. Currently,
|
||||||
|
// the capacities for these are fixed and equal. If one buffer is full,
|
||||||
|
// attempts to send further items with its priority level will block
|
||||||
|
// ([Q.Send]) or fail ([Q.TrySend]).
|
||||||
|
//
|
||||||
|
// Compared [pq.Q], the limitation on priority levels increases performance,
|
||||||
|
// as its circular buffers are much less expensive than the heap operations of
|
||||||
|
// a traditional priority queue.
|
||||||
|
package mq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"gogs.humancabbage.net/sam/priorityq"
|
||||||
|
"gogs.humancabbage.net/sam/priorityq/pq"
|
||||||
|
"gogs.humancabbage.net/sam/priorityq/queue"
|
||||||
|
)
|
||||||
|
|
||||||
|
// so that godoc (kinda) works
|
||||||
|
var _ *pq.Q[int, int]
|
||||||
|
|
||||||
|
// Q is a concurrent, dual-priority message queue.
|
||||||
|
type Q[T any] struct {
|
||||||
|
*state[T]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make a new queue.
|
||||||
|
func Make[T any](cap int) Q[T] {
|
||||||
|
high := queue.Make[T](cap)
|
||||||
|
low := queue.Make[T](cap)
|
||||||
|
s := &state[T]{
|
||||||
|
high: high,
|
||||||
|
low: low,
|
||||||
|
}
|
||||||
|
s.canRecv = sync.Cond{L: &s.mu}
|
||||||
|
s.canSendHigh = sync.Cond{L: &s.mu}
|
||||||
|
s.canSendLow = sync.Cond{L: &s.mu}
|
||||||
|
return Q[T]{s}
|
||||||
|
}
|
||||||
|
|
||||||
|
type state[T any] struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
high queue.Q[T]
|
||||||
|
low queue.Q[T]
|
||||||
|
canSendHigh sync.Cond
|
||||||
|
canSendLow sync.Cond
|
||||||
|
canRecv sync.Cond
|
||||||
|
closed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close marks the queue as closed.
|
||||||
|
//
|
||||||
|
// Attempting to close an already-closed queue results in a panic.
|
||||||
|
func (s *state[T]) Close() {
|
||||||
|
s.mu.Lock()
|
||||||
|
if s.closed {
|
||||||
|
s.mu.Unlock()
|
||||||
|
panic("close of closed queue")
|
||||||
|
}
|
||||||
|
s.closed = true
|
||||||
|
s.mu.Unlock()
|
||||||
|
s.canRecv.Broadcast()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recv gets an item, blocking when empty until one is available.
|
||||||
|
//
|
||||||
|
// The returned bool will be true if the queue still has items or is open.
|
||||||
|
// It will be false if the queue is empty and closed.
|
||||||
|
func (s *state[T]) Recv() (T, bool) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
for {
|
||||||
|
for !s.closed && !s.high.CanPop() && !s.low.CanPop() {
|
||||||
|
s.canRecv.Wait()
|
||||||
|
}
|
||||||
|
if s.closed && !s.high.CanPop() && !s.low.CanPop() {
|
||||||
|
var empty T
|
||||||
|
return empty, false
|
||||||
|
}
|
||||||
|
if s.high.CanPop() {
|
||||||
|
value := s.high.PopFront()
|
||||||
|
s.canSendHigh.Broadcast()
|
||||||
|
return value, true
|
||||||
|
}
|
||||||
|
if s.low.CanPop() {
|
||||||
|
value := s.low.PopFront()
|
||||||
|
s.canSendLow.Broadcast()
|
||||||
|
return value, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send is an alias for SendLow.
|
||||||
|
func (s *state[T]) Send(value T) {
|
||||||
|
s.SendLow(value)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendHigh adds an item with high priority, blocking if full.
|
||||||
|
func (s *state[T]) SendHigh(value T) {
|
||||||
|
s.send(value, &s.high, &s.canSendHigh)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendLow adds an item with low buffer, blocking if full.
|
||||||
|
func (s *state[T]) SendLow(value T) {
|
||||||
|
s.send(value, &s.low, &s.canSendLow)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TryRecv attempts to get an item from the queue, without blocking.
|
||||||
|
//
|
||||||
|
// The error indicates whether the attempt succeeded, the queue is empty, or
|
||||||
|
// the queue is closed.
|
||||||
|
func (s *state[T]) TryRecv() (value T, err error) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
if s.high.CanPop() {
|
||||||
|
value = s.high.PopFront()
|
||||||
|
s.canSendHigh.Broadcast()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.low.CanPop() {
|
||||||
|
value = s.low.PopFront()
|
||||||
|
s.canSendLow.Broadcast()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.closed {
|
||||||
|
err = priorityq.ErrClosed
|
||||||
|
} else {
|
||||||
|
err = priorityq.ErrEmpty
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// TrySend is an alias for TrySendLow.
|
||||||
|
func (s *state[T]) TrySend(value T) error {
|
||||||
|
return s.trySend(value, &s.low)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TrySendHigh attempts to add an item with high priority, without blocking.
|
||||||
|
//
|
||||||
|
// Returns an error from the root priorityq package, or nil if successful.
|
||||||
|
func (s *state[T]) TrySendHigh(value T) error {
|
||||||
|
return s.trySend(value, &s.high)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TrySendLow attempts to add an item with low priority, without blocking.
|
||||||
|
//
|
||||||
|
// Returns an error from the root priorityq package, or nil if successful.
|
||||||
|
func (s *state[T]) TrySendLow(value T) error {
|
||||||
|
return s.trySend(value, &s.low)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *state[T]) send(value T, buf *queue.Q[T], cond *sync.Cond) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
for {
|
||||||
|
for !s.closed && !buf.CanPush() {
|
||||||
|
cond.Wait()
|
||||||
|
}
|
||||||
|
if s.closed {
|
||||||
|
panic("send on closed queue")
|
||||||
|
}
|
||||||
|
if buf.CanPush() {
|
||||||
|
buf.PushBack(value)
|
||||||
|
s.canRecv.Broadcast()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *state[T]) trySend(value T, buf *queue.Q[T]) error {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
if s.closed {
|
||||||
|
return priorityq.ErrClosed
|
||||||
|
}
|
||||||
|
if !buf.CanPush() {
|
||||||
|
return priorityq.ErrFull
|
||||||
|
}
|
||||||
|
buf.PushBack(value)
|
||||||
|
s.canRecv.Broadcast()
|
||||||
|
return nil
|
||||||
|
}
|
@ -1,16 +1,18 @@
|
|||||||
package precise_test
|
package mq_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"gogs.humancabbage.net/sam/priorityq/precise"
|
"gogs.humancabbage.net/sam/priorityq"
|
||||||
|
"gogs.humancabbage.net/sam/priorityq/mq"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRecvHighFirst(t *testing.T) {
|
func TestRecvHighFirst(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
q := precise.Make[int](4)
|
q := mq.Make[int](4)
|
||||||
q.Send(1)
|
q.Send(1)
|
||||||
q.Send(2)
|
q.Send(2)
|
||||||
q.Send(3)
|
q.Send(3)
|
||||||
@ -41,14 +43,14 @@ func TestSendClosedPanic(t *testing.T) {
|
|||||||
t.Errorf("sending to closed queue did not panic")
|
t.Errorf("sending to closed queue did not panic")
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
q := precise.Make[int](4)
|
q := mq.Make[int](4)
|
||||||
q.Close()
|
q.Close()
|
||||||
q.Send(1)
|
q.Send(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRecvClosed(t *testing.T) {
|
func TestRecvClosed(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
q := precise.Make[int](4)
|
q := mq.Make[int](4)
|
||||||
q.Send(1)
|
q.Send(1)
|
||||||
q.Close()
|
q.Close()
|
||||||
_, ok := q.Recv()
|
_, ok := q.Recv()
|
||||||
@ -61,18 +63,30 @@ func TestRecvClosed(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDoubleClose(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := mq.Make[int](4)
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r == nil {
|
||||||
|
t.Errorf("closing a closed queue did not panic")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
q.Close()
|
||||||
|
q.Close()
|
||||||
|
}
|
||||||
|
|
||||||
func TestTrySendRecv(t *testing.T) {
|
func TestTrySendRecv(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
q := precise.Make[int](4)
|
q := mq.Make[int](4)
|
||||||
assumeSendOk := func(n int, f func(int) bool) {
|
assumeSendOk := func(n int, f func(int) error) {
|
||||||
ok := f(n)
|
err := f(n)
|
||||||
if !ok {
|
if err != nil {
|
||||||
t.Errorf("expected to be able to send")
|
t.Errorf("expected to be able to send")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assumeRecvOk := func(expected int) {
|
assumeRecvOk := func(expected int) {
|
||||||
actual, ok := q.TryRecv()
|
actual, err := q.TryRecv()
|
||||||
if !ok {
|
if err != nil {
|
||||||
t.Errorf("expected to be able to receive")
|
t.Errorf("expected to be able to receive")
|
||||||
}
|
}
|
||||||
if actual != expected {
|
if actual != expected {
|
||||||
@ -81,10 +95,10 @@ func TestTrySendRecv(t *testing.T) {
|
|||||||
}
|
}
|
||||||
assumeSendOk(1, q.TrySendLow)
|
assumeSendOk(1, q.TrySendLow)
|
||||||
assumeSendOk(2, q.TrySendLow)
|
assumeSendOk(2, q.TrySendLow)
|
||||||
assumeSendOk(3, q.TrySendLow)
|
assumeSendOk(3, q.TrySend)
|
||||||
assumeSendOk(4, q.TrySendLow)
|
assumeSendOk(4, q.TrySendLow)
|
||||||
ok := q.TrySendLow(5)
|
err := q.TrySendLow(5)
|
||||||
if ok {
|
if err == nil {
|
||||||
t.Errorf("expected low buffer to be full")
|
t.Errorf("expected low buffer to be full")
|
||||||
}
|
}
|
||||||
assumeRecvOk(1)
|
assumeRecvOk(1)
|
||||||
@ -96,8 +110,8 @@ func TestTrySendRecv(t *testing.T) {
|
|||||||
assumeSendOk(6, q.TrySendHigh)
|
assumeSendOk(6, q.TrySendHigh)
|
||||||
assumeSendOk(7, q.TrySendHigh)
|
assumeSendOk(7, q.TrySendHigh)
|
||||||
assumeSendOk(8, q.TrySendHigh)
|
assumeSendOk(8, q.TrySendHigh)
|
||||||
ok = q.TrySendHigh(5)
|
err = q.TrySendHigh(5)
|
||||||
if ok {
|
if err == nil {
|
||||||
t.Errorf("expected high buffer to be full")
|
t.Errorf("expected high buffer to be full")
|
||||||
}
|
}
|
||||||
assumeRecvOk(5)
|
assumeRecvOk(5)
|
||||||
@ -105,15 +119,24 @@ func TestTrySendRecv(t *testing.T) {
|
|||||||
assumeRecvOk(7)
|
assumeRecvOk(7)
|
||||||
assumeRecvOk(8)
|
assumeRecvOk(8)
|
||||||
|
|
||||||
_, ok = q.TryRecv()
|
_, err = q.TryRecv()
|
||||||
if ok {
|
if err != priorityq.ErrEmpty {
|
||||||
t.Errorf("expected queue to be empty")
|
t.Errorf("expected queue to be empty")
|
||||||
}
|
}
|
||||||
|
q.Close()
|
||||||
|
_, err = q.TryRecv()
|
||||||
|
if err != priorityq.ErrClosed {
|
||||||
|
t.Errorf("expected queue to be closed ")
|
||||||
|
}
|
||||||
|
err = q.TrySend(5)
|
||||||
|
if err != priorityq.ErrClosed {
|
||||||
|
t.Errorf("expected queue to be closed ")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConcProducerConsumer(t *testing.T) {
|
func TestConcProducerConsumer(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
q := precise.Make[int](4)
|
q := mq.Make[int](4)
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
produceDone := make(chan struct{})
|
produceDone := make(chan struct{})
|
||||||
wg.Add(2)
|
wg.Add(2)
|
||||||
@ -142,7 +165,7 @@ func TestConcProducerConsumer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkSend(b *testing.B) {
|
func BenchmarkSend(b *testing.B) {
|
||||||
q := precise.Make[int](b.N)
|
q := mq.Make[int](b.N)
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
q.Send(i)
|
q.Send(i)
|
||||||
@ -158,7 +181,7 @@ func BenchmarkSendChan(b *testing.B) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkRecv(b *testing.B) {
|
func BenchmarkRecv(b *testing.B) {
|
||||||
q := precise.Make[int](b.N)
|
q := mq.Make[int](b.N)
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
q.Send(i)
|
q.Send(i)
|
||||||
}
|
}
|
||||||
@ -180,7 +203,7 @@ func BenchmarkRecvChan(b *testing.B) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkConcSendRecv(b *testing.B) {
|
func BenchmarkConcSendRecv(b *testing.B) {
|
||||||
q := precise.Make[int](b.N)
|
q := mq.Make[int](b.N)
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(2)
|
wg.Add(2)
|
||||||
start := make(chan struct{})
|
start := make(chan struct{})
|
||||||
@ -226,3 +249,64 @@ func BenchmarkConcSendRecvChan(b *testing.B) {
|
|||||||
close(start)
|
close(start)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkHighContention(b *testing.B) {
|
||||||
|
q := mq.Make[int](b.N)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
start := make(chan struct{})
|
||||||
|
done := make(chan struct{})
|
||||||
|
numProducers := runtime.NumCPU()
|
||||||
|
sendsPerProducer := b.N / numProducers
|
||||||
|
wg.Add(numProducers)
|
||||||
|
for i := 0; i < numProducers; i++ {
|
||||||
|
go func() {
|
||||||
|
<-start
|
||||||
|
for i := 0; i < sendsPerProducer; i++ {
|
||||||
|
q.Send(1)
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
ok := true
|
||||||
|
for ok {
|
||||||
|
_, ok = q.Recv()
|
||||||
|
}
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
b.ResetTimer()
|
||||||
|
close(start)
|
||||||
|
wg.Wait()
|
||||||
|
q.Close()
|
||||||
|
<-done
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkHighContentionChan(b *testing.B) {
|
||||||
|
c := make(chan int, b.N)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
start := make(chan struct{})
|
||||||
|
done := make(chan struct{})
|
||||||
|
numProducers := runtime.NumCPU()
|
||||||
|
sendsPerProducer := b.N / numProducers
|
||||||
|
wg.Add(numProducers)
|
||||||
|
for i := 0; i < numProducers; i++ {
|
||||||
|
go func() {
|
||||||
|
<-start
|
||||||
|
for i := 0; i < sendsPerProducer; i++ {
|
||||||
|
c <- 1
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
for n := range c {
|
||||||
|
_ = n
|
||||||
|
}
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
b.ResetTimer()
|
||||||
|
close(start)
|
||||||
|
wg.Wait()
|
||||||
|
close(c)
|
||||||
|
<-done
|
||||||
|
}
|
214
npq/lib.go
Normal file
214
npq/lib.go
Normal file
@ -0,0 +1,214 @@
|
|||||||
|
// Package npq implements a concurrent, n-priority message queue.
|
||||||
|
//
|
||||||
|
// [Q] is similar to a buffered channel, except that senders can assign one of
|
||||||
|
// some fixed number priority levels to each item. Receivers will always get
|
||||||
|
// the item with the highest priority.
|
||||||
|
//
|
||||||
|
// For example:
|
||||||
|
//
|
||||||
|
// q := npq.Make[int, string](8, 1, 2)
|
||||||
|
// q.Send(1, "world")
|
||||||
|
// q.Send(2, "hello")
|
||||||
|
// word1, _ := q.Recv()
|
||||||
|
// word2, _ := q.Recv()
|
||||||
|
// fmt.Println(word1, word2)
|
||||||
|
// q.Close()
|
||||||
|
// // Output: hello world
|
||||||
|
//
|
||||||
|
// # Implementation
|
||||||
|
//
|
||||||
|
// Each [Q] has an array of circular buffers, one for each priority level.
|
||||||
|
// Currently, the capacities for these are fixed and equal. If one buffer is
|
||||||
|
// full, attempts to send further items with its priority level will block
|
||||||
|
// ([Q.Send]) or fail ([Q.TrySend]).
|
||||||
|
//
|
||||||
|
// Compared to [mq.Q] , which has only two priority levels, [Q] performs
|
||||||
|
// similarly in the Send and Recv benchmarks. In the ConcSendRecv benchmark,
|
||||||
|
// though, this package is around 75% slower. Even more peculiar, however, in
|
||||||
|
// HighContention, it is 7% faster.
|
||||||
|
//
|
||||||
|
// Compared the pq package, this package is faster in the HighContention
|
||||||
|
// benchmark until around 145-150 priority levels; in ConcSendRecv, until
|
||||||
|
// around 200 levels; and in Recv, until around 750 levels.
|
||||||
|
//
|
||||||
|
// It's important to remember the memory requirement differences. A [pq.Q]
|
||||||
|
// with capacity N can store N items; a [Q] with capacity N and P priority
|
||||||
|
// levels can store N*P items.
|
||||||
|
package npq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"gogs.humancabbage.net/sam/priorityq"
|
||||||
|
"gogs.humancabbage.net/sam/priorityq/mq"
|
||||||
|
"gogs.humancabbage.net/sam/priorityq/pq"
|
||||||
|
"gogs.humancabbage.net/sam/priorityq/queue"
|
||||||
|
"golang.org/x/exp/constraints"
|
||||||
|
)
|
||||||
|
|
||||||
|
// so that godoc (kinda) works
|
||||||
|
var _ *pq.Q[int, int]
|
||||||
|
var _ *mq.Q[int]
|
||||||
|
|
||||||
|
// Q is a concurrent, dual-priority message queue.
|
||||||
|
type Q[P constraints.Integer, T any] struct {
|
||||||
|
*state[P, T]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make a new queue.
|
||||||
|
func Make[P constraints.Integer, T any](cap int, min P, max P) Q[P, T] {
|
||||||
|
numLevels := max - min + 1
|
||||||
|
levels := make([]queue.Q[T], numLevels)
|
||||||
|
for i := 0; i < int(numLevels); i++ {
|
||||||
|
levels[i] = queue.Make[T](cap)
|
||||||
|
}
|
||||||
|
s := &state[P, T]{
|
||||||
|
levels: levels,
|
||||||
|
min: min,
|
||||||
|
max: max,
|
||||||
|
}
|
||||||
|
canSend := make([]sync.Cond, numLevels)
|
||||||
|
for i := 0; i < len(canSend); i++ {
|
||||||
|
canSend[i].L = &s.mu
|
||||||
|
}
|
||||||
|
s.canRecv = sync.Cond{L: &s.mu}
|
||||||
|
s.canSend = canSend
|
||||||
|
return Q[P, T]{s}
|
||||||
|
}
|
||||||
|
|
||||||
|
type state[P constraints.Integer, T any] struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
levels []queue.Q[T]
|
||||||
|
canSend []sync.Cond
|
||||||
|
canRecv sync.Cond
|
||||||
|
min P
|
||||||
|
max P
|
||||||
|
closed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close marks the queue as closed.
|
||||||
|
//
|
||||||
|
// Attempting to close an already-closed queue results in a panic.
|
||||||
|
func (s *state[P, T]) Close() {
|
||||||
|
s.mu.Lock()
|
||||||
|
if s.closed {
|
||||||
|
s.mu.Unlock()
|
||||||
|
panic("close of closed queue")
|
||||||
|
}
|
||||||
|
s.closed = true
|
||||||
|
s.mu.Unlock()
|
||||||
|
s.canRecv.Broadcast()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recv gets an item, blocking when empty until one is available.
|
||||||
|
//
|
||||||
|
// The returned bool will be true if the queue still has items or is open.
|
||||||
|
// It will be false if the queue is empty and closed.
|
||||||
|
func (s *state[P, T]) Recv() (T, bool) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
for {
|
||||||
|
var available int = -1
|
||||||
|
findAvailable := func() {
|
||||||
|
for levelIdx := len(s.levels) - 1; levelIdx > -1; levelIdx-- {
|
||||||
|
level := &s.levels[levelIdx]
|
||||||
|
if level.CanPop() {
|
||||||
|
available = levelIdx
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
findAvailable()
|
||||||
|
for !s.closed && available == -1 {
|
||||||
|
s.canRecv.Wait()
|
||||||
|
findAvailable()
|
||||||
|
}
|
||||||
|
if s.closed && available == -1 {
|
||||||
|
var empty T
|
||||||
|
return empty, false
|
||||||
|
}
|
||||||
|
if available != -1 {
|
||||||
|
level := &s.levels[available]
|
||||||
|
value := level.PopFront()
|
||||||
|
s.canSend[available].Broadcast()
|
||||||
|
return value, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send adds an item with some priority, blocking if full.
|
||||||
|
func (s *state[P, T]) Send(priority P, value T) {
|
||||||
|
s.validatePriority(priority)
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
levelIdx := priority - s.min
|
||||||
|
level := &s.levels[levelIdx]
|
||||||
|
for {
|
||||||
|
for !s.closed && !level.CanPush() {
|
||||||
|
s.canSend[levelIdx].Wait()
|
||||||
|
}
|
||||||
|
if s.closed {
|
||||||
|
panic("send on closed queue")
|
||||||
|
}
|
||||||
|
if level.CanPush() {
|
||||||
|
level.PushBack(value)
|
||||||
|
s.canRecv.Broadcast()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TryRecv attempts to get an item from the queue, without blocking.
|
||||||
|
//
|
||||||
|
// The error indicates whether the attempt succeeded, the queue is empty, or
|
||||||
|
// the queue is closed.
|
||||||
|
func (s *state[P, T]) TryRecv() (value T, err error) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
for levelIdx := len(s.levels) - 1; levelIdx > -1; levelIdx-- {
|
||||||
|
level := &s.levels[levelIdx]
|
||||||
|
if level.CanPop() {
|
||||||
|
value = level.PopFront()
|
||||||
|
s.canSend[levelIdx].Broadcast()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if s.closed {
|
||||||
|
err = priorityq.ErrClosed
|
||||||
|
} else {
|
||||||
|
err = priorityq.ErrEmpty
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// TrySend attempts to add an item with some priority, without blocking.
|
||||||
|
//
|
||||||
|
// Returns an error from the root priorityq package, or nil if successful.
|
||||||
|
func (s *state[P, T]) TrySend(priority P, value T) error {
|
||||||
|
s.validatePriority(priority)
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
if s.closed {
|
||||||
|
return priorityq.ErrClosed
|
||||||
|
}
|
||||||
|
levelIdx := priority - s.min
|
||||||
|
level := &s.levels[levelIdx]
|
||||||
|
if !level.CanPush() {
|
||||||
|
return priorityq.ErrFull
|
||||||
|
}
|
||||||
|
level.PushBack(value)
|
||||||
|
s.canRecv.Broadcast()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *state[P, T]) validatePriority(priority P) {
|
||||||
|
if priority < s.min || priority > s.max {
|
||||||
|
panic(fmt.Errorf("priority %d out of range (%d, %d)",
|
||||||
|
priority, s.min, s.max))
|
||||||
|
}
|
||||||
|
}
|
277
npq/lib_test.go
Normal file
277
npq/lib_test.go
Normal file
@ -0,0 +1,277 @@
|
|||||||
|
package npq_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/rand"
|
||||||
|
"runtime"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"gogs.humancabbage.net/sam/priorityq"
|
||||||
|
"gogs.humancabbage.net/sam/priorityq/npq"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRecvHighFirst(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := npq.Make[int, int](4, 1, 2)
|
||||||
|
q.Send(1, 1)
|
||||||
|
q.Send(1, 2)
|
||||||
|
q.Send(1, 3)
|
||||||
|
q.Send(1, 4)
|
||||||
|
q.Send(2, 5)
|
||||||
|
q.Send(2, 6)
|
||||||
|
q.Send(2, 7)
|
||||||
|
q.Send(2, 8)
|
||||||
|
checkRecv := func(n int) {
|
||||||
|
if v, _ := q.Recv(); v != n {
|
||||||
|
t.Errorf("popped %d, expected %d", v, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
checkRecv(5)
|
||||||
|
checkRecv(6)
|
||||||
|
checkRecv(7)
|
||||||
|
checkRecv(8)
|
||||||
|
checkRecv(1)
|
||||||
|
checkRecv(2)
|
||||||
|
checkRecv(3)
|
||||||
|
checkRecv(4)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSendClosedPanic(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r == nil {
|
||||||
|
t.Errorf("sending to closed queue did not panic")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
q := npq.Make[int, int](4, 1, 2)
|
||||||
|
q.Close()
|
||||||
|
q.Send(1, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTooLowPriorityPanic(t *testing.T) {
|
||||||
|
testInvalidPriorityPanic(t, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTooHighPriorityPanic(t *testing.T) {
|
||||||
|
testInvalidPriorityPanic(t, 3)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testInvalidPriorityPanic(t *testing.T, invalid int) {
|
||||||
|
t.Parallel()
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r == nil {
|
||||||
|
t.Errorf("sending with invalid priority did not panic")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
q := npq.Make[int, int](4, 1, 2)
|
||||||
|
q.Send(invalid, 1)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRecvClosed(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := npq.Make[int, int](4, 1, 2)
|
||||||
|
q.Send(1, 1)
|
||||||
|
q.Close()
|
||||||
|
_, ok := q.Recv()
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("queue should have item to receive")
|
||||||
|
}
|
||||||
|
_, ok = q.Recv()
|
||||||
|
if ok {
|
||||||
|
t.Errorf("queue should be closed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDoubleClose(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := npq.Make[int, int](4, 1, 2)
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r == nil {
|
||||||
|
t.Errorf("closing a closed queue did not panic")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
q.Close()
|
||||||
|
q.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTrySendRecv(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := npq.Make[int, int](4, 1, 2)
|
||||||
|
assumeSendOk := func(n int, f func(int) error) {
|
||||||
|
err := f(n)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("expected to be able to send")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assumeRecvOk := func(expected int) {
|
||||||
|
actual, err := q.TryRecv()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("expected to be able to receive")
|
||||||
|
}
|
||||||
|
if actual != expected {
|
||||||
|
t.Errorf("expected %d, got %d", expected, actual)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
trySendLow := func(n int) error {
|
||||||
|
return q.TrySend(1, n)
|
||||||
|
}
|
||||||
|
trySendHigh := func(n int) error {
|
||||||
|
return q.TrySend(2, n)
|
||||||
|
}
|
||||||
|
assumeSendOk(1, trySendLow)
|
||||||
|
assumeSendOk(2, trySendLow)
|
||||||
|
assumeSendOk(3, trySendLow)
|
||||||
|
assumeSendOk(4, trySendLow)
|
||||||
|
err := trySendLow(5)
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("expected low buffer to be full")
|
||||||
|
}
|
||||||
|
assumeRecvOk(1)
|
||||||
|
assumeRecvOk(2)
|
||||||
|
assumeRecvOk(3)
|
||||||
|
assumeRecvOk(4)
|
||||||
|
|
||||||
|
assumeSendOk(5, trySendHigh)
|
||||||
|
assumeSendOk(6, trySendHigh)
|
||||||
|
assumeSendOk(7, trySendHigh)
|
||||||
|
assumeSendOk(8, trySendHigh)
|
||||||
|
err = trySendHigh(5)
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("expected high buffer to be full")
|
||||||
|
}
|
||||||
|
assumeRecvOk(5)
|
||||||
|
assumeRecvOk(6)
|
||||||
|
assumeRecvOk(7)
|
||||||
|
assumeRecvOk(8)
|
||||||
|
|
||||||
|
_, err = q.TryRecv()
|
||||||
|
if err != priorityq.ErrEmpty {
|
||||||
|
t.Errorf("expected queue to be empty")
|
||||||
|
}
|
||||||
|
q.Close()
|
||||||
|
_, err = q.TryRecv()
|
||||||
|
if err != priorityq.ErrClosed {
|
||||||
|
t.Errorf("expected queue to be closed ")
|
||||||
|
}
|
||||||
|
err = q.TrySend(1, 5)
|
||||||
|
if err != priorityq.ErrClosed {
|
||||||
|
t.Errorf("expected queue to be closed ")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConcProducerConsumer(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := npq.Make[int, int](4, 1, 2)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
produceDone := make(chan struct{})
|
||||||
|
wg.Add(2)
|
||||||
|
go func() {
|
||||||
|
for i := 0; i < 10000; i++ {
|
||||||
|
q.Send(rand.Intn(2)+1, i)
|
||||||
|
}
|
||||||
|
close(produceDone)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
ok := true
|
||||||
|
for ok {
|
||||||
|
_, ok = q.Recv()
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
<-produceDone
|
||||||
|
t.Logf("producer done, closing channel")
|
||||||
|
q.Close()
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
const highPriority = 2
|
||||||
|
|
||||||
|
func BenchmarkSend(b *testing.B) {
|
||||||
|
q := npq.Make[int, int](b.N, 1, highPriority)
|
||||||
|
// randomize priorities to get amortized cost per op
|
||||||
|
ps := make([]int, b.N)
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
ps[i] = rand.Intn(highPriority) + 1
|
||||||
|
}
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
q.Send(ps[i], i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkRecv(b *testing.B) {
|
||||||
|
q := npq.Make[int, int](b.N, 1, highPriority)
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
q.Send(rand.Intn(highPriority)+1, i)
|
||||||
|
}
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
q.Recv()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkConcSendRecv(b *testing.B) {
|
||||||
|
q := npq.Make[int, int](b.N, 1, highPriority)
|
||||||
|
// randomize priorities to get amortized cost per op
|
||||||
|
ps := make([]int, b.N)
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
ps[i] = rand.Intn(highPriority) + 1
|
||||||
|
}
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(2)
|
||||||
|
start := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
<-start
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
q.Send(ps[i], i)
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
<-start
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
q.Recv()
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
b.ResetTimer()
|
||||||
|
close(start)
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkHighContention(b *testing.B) {
|
||||||
|
q := npq.Make[int, int](b.N, 1, highPriority)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
start := make(chan struct{})
|
||||||
|
done := make(chan struct{})
|
||||||
|
numProducers := runtime.NumCPU()
|
||||||
|
sendsPerProducer := b.N / numProducers
|
||||||
|
wg.Add(numProducers)
|
||||||
|
for i := 0; i < numProducers; i++ {
|
||||||
|
go func() {
|
||||||
|
ps := make([]int, sendsPerProducer)
|
||||||
|
for i := range ps {
|
||||||
|
ps[i] = rand.Intn(highPriority) + 1
|
||||||
|
}
|
||||||
|
<-start
|
||||||
|
for i := 0; i < sendsPerProducer; i++ {
|
||||||
|
q.Send(ps[i], 1)
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
ok := true
|
||||||
|
for ok {
|
||||||
|
_, ok = q.Recv()
|
||||||
|
}
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
b.ResetTimer()
|
||||||
|
close(start)
|
||||||
|
wg.Wait()
|
||||||
|
q.Close()
|
||||||
|
<-done
|
||||||
|
}
|
155
pq/lib.go
Normal file
155
pq/lib.go
Normal file
@ -0,0 +1,155 @@
|
|||||||
|
// Package pq implements a concurrent priority queue.
|
||||||
|
//
|
||||||
|
// [Q] is similar to a buffered channel, except that senders attach to each
|
||||||
|
// item a priority, and receivers always get the highest-priority item.
|
||||||
|
//
|
||||||
|
// For example:
|
||||||
|
//
|
||||||
|
// import "gogs.humancabbage.net/sam/priorityq/pq"
|
||||||
|
// q := pq.Make[int, string](8)
|
||||||
|
// q.Send(1, "world")
|
||||||
|
// q.Send(2, "hello")
|
||||||
|
// _, word1, _ := pq.Recv()
|
||||||
|
// _, word2, _ := pq.Recv()
|
||||||
|
// fmt.Println(word1, word2)
|
||||||
|
// q.Close()
|
||||||
|
// // Output: hello world
|
||||||
|
//
|
||||||
|
// # Implementation
|
||||||
|
//
|
||||||
|
// Each queue has a [binary max-heap]. Sending and receiving items require
|
||||||
|
// heap-up and heap-down operations, respectively.
|
||||||
|
//
|
||||||
|
// [binary max-heap]: https://en.wikipedia.org/wiki/Binary_heap
|
||||||
|
package pq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"gogs.humancabbage.net/sam/priorityq"
|
||||||
|
"gogs.humancabbage.net/sam/priorityq/binheap"
|
||||||
|
"golang.org/x/exp/constraints"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Q is a generic, concurrent priority queue.
|
||||||
|
type Q[P constraints.Ordered, T any] struct {
|
||||||
|
*state[P, T]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make a new queue.
|
||||||
|
func Make[P constraints.Ordered, T any](cap int) Q[P, T] {
|
||||||
|
heap := binheap.Make[P, T](cap)
|
||||||
|
s := &state[P, T]{
|
||||||
|
heap: heap,
|
||||||
|
}
|
||||||
|
s.canRecv = sync.Cond{L: &s.mu}
|
||||||
|
s.canSend = sync.Cond{L: &s.mu}
|
||||||
|
return Q[P, T]{s}
|
||||||
|
}
|
||||||
|
|
||||||
|
type state[P constraints.Ordered, T any] struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
heap binheap.H[P, T]
|
||||||
|
canSend sync.Cond
|
||||||
|
canRecv sync.Cond
|
||||||
|
closed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close marks the queue as closed.
|
||||||
|
//
|
||||||
|
// Attempting to close an already-closed queue results in a panic.
|
||||||
|
func (s *state[P, T]) Close() {
|
||||||
|
s.mu.Lock()
|
||||||
|
if s.closed {
|
||||||
|
s.mu.Unlock()
|
||||||
|
panic("close of closed queue")
|
||||||
|
}
|
||||||
|
s.closed = true
|
||||||
|
s.mu.Unlock()
|
||||||
|
s.canRecv.Broadcast()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recv gets an item, blocking when empty until one is available.
|
||||||
|
//
|
||||||
|
// This returns both the item itself and the its assigned priority.
|
||||||
|
//
|
||||||
|
// The returned bool will be true if the queue still has items or is open.
|
||||||
|
// It will be false if the queue is empty and closed.
|
||||||
|
func (s *state[P, T]) Recv() (P, T, bool) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
for {
|
||||||
|
for !s.closed && !s.heap.CanExtract() {
|
||||||
|
s.canRecv.Wait()
|
||||||
|
}
|
||||||
|
if s.closed && !s.heap.CanExtract() {
|
||||||
|
var emptyP P
|
||||||
|
var emptyT T
|
||||||
|
return emptyP, emptyT, false
|
||||||
|
}
|
||||||
|
if s.heap.CanExtract() {
|
||||||
|
priority, value := s.heap.Extract()
|
||||||
|
s.canSend.Broadcast()
|
||||||
|
return priority, value, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send adds an item with some priority, blocking if full.
|
||||||
|
func (s *state[P, T]) Send(priority P, value T) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
for {
|
||||||
|
for !s.closed && !s.heap.CanInsert() {
|
||||||
|
s.canSend.Wait()
|
||||||
|
}
|
||||||
|
if s.closed {
|
||||||
|
panic("send on closed queue")
|
||||||
|
}
|
||||||
|
if s.heap.CanInsert() {
|
||||||
|
s.heap.Insert(priority, value)
|
||||||
|
s.canRecv.Broadcast()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TryRecv attempts to get an item without blocking.
|
||||||
|
//
|
||||||
|
// This returns both the item itself and the its assigned priority.
|
||||||
|
//
|
||||||
|
// The error indicates whether the attempt succeeded, the queue is empty, or
|
||||||
|
// the queue is closed.
|
||||||
|
func (s *state[P, T]) TryRecv() (priority P, value T, err error) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
if s.heap.CanExtract() {
|
||||||
|
priority, value = s.heap.Extract()
|
||||||
|
s.canSend.Broadcast()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.closed {
|
||||||
|
err = priorityq.ErrClosed
|
||||||
|
} else {
|
||||||
|
err = priorityq.ErrEmpty
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// TrySend attempts to add an item with some priority, without blocking.
|
||||||
|
//
|
||||||
|
// This method does not block. If there is space in the buffer, it returns
|
||||||
|
// true. If the buffer is full, it returns false.
|
||||||
|
func (s *state[P, T]) TrySend(priority P, value T) error {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
if s.closed {
|
||||||
|
return priorityq.ErrClosed
|
||||||
|
}
|
||||||
|
if !s.heap.CanInsert() {
|
||||||
|
return priorityq.ErrFull
|
||||||
|
}
|
||||||
|
s.heap.Insert(priority, value)
|
||||||
|
s.canRecv.Broadcast()
|
||||||
|
return nil
|
||||||
|
}
|
237
pq/lib_test.go
Normal file
237
pq/lib_test.go
Normal file
@ -0,0 +1,237 @@
|
|||||||
|
package pq_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/rand"
|
||||||
|
"runtime"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"gogs.humancabbage.net/sam/priorityq"
|
||||||
|
"gogs.humancabbage.net/sam/priorityq/pq"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRecvHighestFirst(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := pq.Make[int, int](8)
|
||||||
|
q.Send(4, 4)
|
||||||
|
q.Send(2, 2)
|
||||||
|
q.Send(1, 1)
|
||||||
|
q.Send(5, 5)
|
||||||
|
q.Send(7, 7)
|
||||||
|
q.Send(8, 8)
|
||||||
|
q.Send(3, 3)
|
||||||
|
q.Send(6, 6)
|
||||||
|
checkRecv := func(n int) {
|
||||||
|
if _, v, _ := q.Recv(); v != n {
|
||||||
|
t.Errorf("popped %d, expected %d", v, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
checkRecv(8)
|
||||||
|
checkRecv(7)
|
||||||
|
checkRecv(6)
|
||||||
|
checkRecv(5)
|
||||||
|
checkRecv(4)
|
||||||
|
checkRecv(3)
|
||||||
|
checkRecv(2)
|
||||||
|
checkRecv(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSendClosedPanic(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r == nil {
|
||||||
|
t.Errorf("sending to closed queue did not panic")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
q := pq.Make[int, int](4)
|
||||||
|
q.Close()
|
||||||
|
q.Send(1, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRecvClosed(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := pq.Make[int, int](4)
|
||||||
|
q.Send(1, 1)
|
||||||
|
q.Close()
|
||||||
|
_, _, ok := q.Recv()
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("queue should have item to receive")
|
||||||
|
}
|
||||||
|
_, _, ok = q.Recv()
|
||||||
|
if ok {
|
||||||
|
t.Errorf("queue should be closed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDoubleClose(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := pq.Make[int, int](4)
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r == nil {
|
||||||
|
t.Errorf("closing a closed queue did not panic")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
q.Close()
|
||||||
|
q.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTrySendRecv(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := pq.Make[int, int](4)
|
||||||
|
assumeSendOk := func(n int) {
|
||||||
|
err := q.TrySend(n, n)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("expected to be able to send")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assumeRecvOk := func(expected int) {
|
||||||
|
_, actual, err := q.TryRecv()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("expected to be able to receive")
|
||||||
|
}
|
||||||
|
if actual != expected {
|
||||||
|
t.Errorf("expected %d, got %d", expected, actual)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assumeSendOk(1)
|
||||||
|
assumeSendOk(2)
|
||||||
|
assumeSendOk(3)
|
||||||
|
assumeSendOk(4)
|
||||||
|
err := q.TrySend(5, 5)
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("expected queue to be full")
|
||||||
|
}
|
||||||
|
assumeRecvOk(4)
|
||||||
|
assumeRecvOk(3)
|
||||||
|
assumeRecvOk(2)
|
||||||
|
assumeRecvOk(1)
|
||||||
|
|
||||||
|
_, _, err = q.TryRecv()
|
||||||
|
if err != priorityq.ErrEmpty {
|
||||||
|
t.Errorf("expected queue to be empty")
|
||||||
|
}
|
||||||
|
q.Close()
|
||||||
|
_, _, err = q.TryRecv()
|
||||||
|
if err != priorityq.ErrClosed {
|
||||||
|
t.Errorf("expected queue to be closed ")
|
||||||
|
}
|
||||||
|
err = q.TrySend(1, 1)
|
||||||
|
if err != priorityq.ErrClosed {
|
||||||
|
t.Errorf("expected queue to be closed ")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConcProducerConsumer(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := pq.Make[int, int](4)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
produceDone := make(chan struct{})
|
||||||
|
wg.Add(2)
|
||||||
|
go func() {
|
||||||
|
for i := 0; i < 10000; i++ {
|
||||||
|
q.Send(rand.Int(), i)
|
||||||
|
}
|
||||||
|
close(produceDone)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
ok := true
|
||||||
|
for ok {
|
||||||
|
_, _, ok = q.Recv()
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
<-produceDone
|
||||||
|
t.Logf("producer done, closing channel")
|
||||||
|
q.Close()
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkSend(b *testing.B) {
|
||||||
|
q := pq.Make[int, int](b.N)
|
||||||
|
// randomize priorities to get amortized cost per op
|
||||||
|
ps := make([]int, b.N)
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
ps[i] = rand.Int()
|
||||||
|
}
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
q.Send(ps[i], i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkRecv(b *testing.B) {
|
||||||
|
q := pq.Make[int, int](b.N)
|
||||||
|
// randomize priorities to get amortized cost per op
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
q.Send(rand.Int(), i)
|
||||||
|
}
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
q.Recv()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkConcSendRecv(b *testing.B) {
|
||||||
|
q := pq.Make[int, int](b.N)
|
||||||
|
// randomize priorities to get amortized cost per op
|
||||||
|
ps := make([]int, b.N)
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
ps[i] = rand.Int()
|
||||||
|
}
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(2)
|
||||||
|
start := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
<-start
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
q.Send(ps[i], i)
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
<-start
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
q.Recv()
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
b.ResetTimer()
|
||||||
|
close(start)
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkHighContention(b *testing.B) {
|
||||||
|
q := pq.Make[int, int](b.N)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
start := make(chan struct{})
|
||||||
|
done := make(chan struct{})
|
||||||
|
numProducers := runtime.NumCPU()
|
||||||
|
sendsPerProducer := b.N / numProducers
|
||||||
|
wg.Add(numProducers)
|
||||||
|
for i := 0; i < numProducers; i++ {
|
||||||
|
go func() {
|
||||||
|
ps := make([]int, sendsPerProducer)
|
||||||
|
for i := 0; i < sendsPerProducer; i++ {
|
||||||
|
ps[i] = rand.Int()
|
||||||
|
}
|
||||||
|
<-start
|
||||||
|
for i := 0; i < sendsPerProducer; i++ {
|
||||||
|
q.Send(ps[i], 1)
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
ok := true
|
||||||
|
for ok {
|
||||||
|
_, _, ok = q.Recv()
|
||||||
|
}
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
b.ResetTimer()
|
||||||
|
close(start)
|
||||||
|
wg.Wait()
|
||||||
|
q.Close()
|
||||||
|
<-done
|
||||||
|
}
|
164
precise/lib.go
164
precise/lib.go
@ -1,164 +0,0 @@
|
|||||||
package precise
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"gogs.humancabbage.net/sam/priorityq/circ"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Q is a precise, concurrent priority queue.
|
|
||||||
//
|
|
||||||
// Each queue has two internal buffers, high and low. This implementation
|
|
||||||
// guarantees that when there are items in both buffers, consumers receive
|
|
||||||
// ones from the high priority buffer first.
|
|
||||||
//
|
|
||||||
// Each buffer has the same capacity, set on initial construction. Sending to
|
|
||||||
// a buffer will block if it is full, even if the other buffer has space.
|
|
||||||
type Q[T any] struct {
|
|
||||||
*state[T]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make a new priority queue.
|
|
||||||
func Make[T any](cap int) Q[T] {
|
|
||||||
high := circ.Make[T](cap)
|
|
||||||
low := circ.Make[T](cap)
|
|
||||||
s := &state[T]{
|
|
||||||
high: high,
|
|
||||||
low: low,
|
|
||||||
}
|
|
||||||
s.canRecv = sync.NewCond(&s.mu)
|
|
||||||
s.canSendHigh = sync.NewCond(&s.mu)
|
|
||||||
s.canSendLow = sync.NewCond(&s.mu)
|
|
||||||
return Q[T]{s}
|
|
||||||
}
|
|
||||||
|
|
||||||
type state[T any] struct {
|
|
||||||
mu sync.Mutex
|
|
||||||
high circ.B[T]
|
|
||||||
low circ.B[T]
|
|
||||||
canSendHigh *sync.Cond
|
|
||||||
canSendLow *sync.Cond
|
|
||||||
canRecv *sync.Cond
|
|
||||||
closed bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close marks the queue as closed.
|
|
||||||
//
|
|
||||||
// Subsequent attempts to send will panic. Subsequent calls to Recv will
|
|
||||||
// continue to return the remaining items in the queue.
|
|
||||||
func (s *state[T]) Close() {
|
|
||||||
s.mu.Lock()
|
|
||||||
s.closed = true
|
|
||||||
s.mu.Unlock()
|
|
||||||
s.canRecv.Broadcast()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Recv returns an item from the prioritized buffers, blocking if empty.
|
|
||||||
//
|
|
||||||
// The returned bool will be true if the queue still has items or is open.
|
|
||||||
// It will be false if the queue is empty and closed.
|
|
||||||
func (s *state[T]) Recv() (T, bool) {
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
for {
|
|
||||||
for !s.closed && !s.high.CanPop() && !s.low.CanPop() {
|
|
||||||
s.canRecv.Wait()
|
|
||||||
}
|
|
||||||
if s.closed && !s.high.CanPop() && !s.low.CanPop() {
|
|
||||||
var empty T
|
|
||||||
return empty, false
|
|
||||||
}
|
|
||||||
if s.high.CanPop() {
|
|
||||||
value := s.high.PopFront()
|
|
||||||
s.canSendHigh.Broadcast()
|
|
||||||
return value, true
|
|
||||||
}
|
|
||||||
if s.low.CanPop() {
|
|
||||||
value := s.low.PopFront()
|
|
||||||
s.canSendLow.Broadcast()
|
|
||||||
return value, true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send is an alias for SendLow.
|
|
||||||
func (s *state[T]) Send(value T) {
|
|
||||||
s.SendLow(value)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SendHigh adds an item to the high priority buffer, blocking if full.
|
|
||||||
func (s *state[T]) SendHigh(value T) {
|
|
||||||
s.send(value, &s.high, s.canSendHigh)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SendLow adds an item to the low priority buffer, blocking if full.
|
|
||||||
func (s *state[T]) SendLow(value T) {
|
|
||||||
s.send(value, &s.low, s.canSendLow)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TryRecv attempts to return an item from the prioritized buffers.
|
|
||||||
//
|
|
||||||
// This method does not block. If there is an item in a buffer, it returns
|
|
||||||
// true. If the buffer is empty, it returns false.
|
|
||||||
func (s *state[T]) TryRecv() (value T, ok bool) {
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
if s.high.CanPop() {
|
|
||||||
value = s.high.PopFront()
|
|
||||||
ok = true
|
|
||||||
s.canSendHigh.Broadcast()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if s.low.CanPop() {
|
|
||||||
value = s.low.PopFront()
|
|
||||||
ok = true
|
|
||||||
s.canSendLow.Broadcast()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// TrySendHigh attempts to add an item to the high priority buffer.
|
|
||||||
//
|
|
||||||
// This method does not block. If there is space in the buffer, it returns
|
|
||||||
// true. If the buffer is full, it returns false.
|
|
||||||
func (s *state[T]) TrySendHigh(value T) bool {
|
|
||||||
return s.trySend(value, &s.high)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TrySendLow attempts to add an item to the low priority buffer.
|
|
||||||
//
|
|
||||||
// This method does not block. If there is space in the buffer, it returns
|
|
||||||
// true. If the buffer is full, it returns false.
|
|
||||||
func (s *state[T]) TrySendLow(value T) bool {
|
|
||||||
return s.trySend(value, &s.low)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *state[T]) send(value T, buf *circ.B[T], cond *sync.Cond) {
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
for {
|
|
||||||
for !s.closed && !buf.CanPush() {
|
|
||||||
cond.Wait()
|
|
||||||
}
|
|
||||||
if s.closed {
|
|
||||||
panic("send on closed queue")
|
|
||||||
}
|
|
||||||
if buf.CanPush() {
|
|
||||||
buf.PushBack(value)
|
|
||||||
s.canRecv.Broadcast()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *state[T]) trySend(value T, buf *circ.B[T]) bool {
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
if !buf.CanPush() {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
buf.PushBack(value)
|
|
||||||
s.canRecv.Broadcast()
|
|
||||||
return true
|
|
||||||
}
|
|
75
queue/lib.go
Normal file
75
queue/lib.go
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
// Package queue implements a non-concurrent queue.
|
||||||
|
//
|
||||||
|
// # Implementation
|
||||||
|
//
|
||||||
|
// [Q] is a classic ring buffer. It tracks its head, tail, and length. This
|
||||||
|
// makes determining whether the queue is full or empty trivial.
|
||||||
|
package queue
|
||||||
|
|
||||||
|
// Q is a non-concurrent queue.
|
||||||
|
type Q[T any] struct {
|
||||||
|
buf []T
|
||||||
|
len int
|
||||||
|
head int
|
||||||
|
tail int
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make creates a new queue.
|
||||||
|
func Make[T any](cap int) Q[T] {
|
||||||
|
buf := make([]T, cap)
|
||||||
|
return Q[T]{buf: buf}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Capacity returns the total capacity of the queue.
|
||||||
|
func (b *Q[T]) Capacity() int {
|
||||||
|
return cap(b.buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Len returns the number of items in the queue.
|
||||||
|
func (b *Q[T]) Len() int {
|
||||||
|
return b.len
|
||||||
|
}
|
||||||
|
|
||||||
|
// CanPush returns true if the queue has space for new items.
|
||||||
|
func (b *Q[T]) CanPush() bool {
|
||||||
|
return cap(b.buf)-b.len != 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// CanPop returns true if the queue has one or more items.
|
||||||
|
func (b *Q[T]) CanPop() bool {
|
||||||
|
return b.len != 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// PopFront returns the front-most item from the queue.
|
||||||
|
//
|
||||||
|
// If the queue is empty, it panics.
|
||||||
|
func (b *Q[T]) PopFront() T {
|
||||||
|
if !b.CanPop() {
|
||||||
|
panic("cannot pop from empty queue")
|
||||||
|
}
|
||||||
|
item := b.buf[b.head]
|
||||||
|
// clear queue slot so as not to hold on to garbage
|
||||||
|
var empty T
|
||||||
|
b.buf[b.head] = empty
|
||||||
|
b.len--
|
||||||
|
b.head++
|
||||||
|
if b.head == cap(b.buf) {
|
||||||
|
b.head = 0
|
||||||
|
}
|
||||||
|
return item
|
||||||
|
}
|
||||||
|
|
||||||
|
// PushBack adds an item to the end of the queue.
|
||||||
|
//
|
||||||
|
// If the queue is full, it panics.
|
||||||
|
func (b *Q[T]) PushBack(value T) {
|
||||||
|
if !b.CanPush() {
|
||||||
|
panic("cannot push back to full queue")
|
||||||
|
}
|
||||||
|
b.buf[b.tail] = value
|
||||||
|
b.len++
|
||||||
|
b.tail++
|
||||||
|
if b.tail == cap(b.buf) {
|
||||||
|
b.tail = 0
|
||||||
|
}
|
||||||
|
}
|
101
queue/lib_test.go
Normal file
101
queue/lib_test.go
Normal file
@ -0,0 +1,101 @@
|
|||||||
|
package queue_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/rand"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"gogs.humancabbage.net/sam/priorityq/queue"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRepeatPushPop(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := queue.Make[int](4)
|
||||||
|
if q.Capacity() != 4 {
|
||||||
|
t.Errorf("wrong capacity")
|
||||||
|
}
|
||||||
|
for i := 0; i < 50; i++ {
|
||||||
|
q.PushBack(1)
|
||||||
|
q.PushBack(2)
|
||||||
|
q.PushBack(3)
|
||||||
|
q.PushBack(4)
|
||||||
|
if q.Len() != 4 {
|
||||||
|
t.Errorf("wrong length")
|
||||||
|
}
|
||||||
|
checkPop := func(n int) {
|
||||||
|
if v := q.PopFront(); v != n {
|
||||||
|
t.Errorf("popped %d, expected %d", v, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
checkPop(1)
|
||||||
|
checkPop(2)
|
||||||
|
checkPop(3)
|
||||||
|
checkPop(4)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInterleavedPushPop(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := queue.Make[int](4)
|
||||||
|
checkPop := func(n int) {
|
||||||
|
if v := q.PopFront(); v != n {
|
||||||
|
t.Errorf("popped %d, expected %d", v, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
q.PushBack(1)
|
||||||
|
q.PushBack(2)
|
||||||
|
q.PushBack(3)
|
||||||
|
checkPop(1)
|
||||||
|
q.PushBack(4)
|
||||||
|
q.PushBack(5)
|
||||||
|
checkPop(2)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEmptyPopPanic(t *testing.T) {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r == nil {
|
||||||
|
t.Errorf("popping empty buffer did not panic")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
t.Parallel()
|
||||||
|
q := queue.Make[int](4)
|
||||||
|
q.PopFront()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFullPushPanic(t *testing.T) {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r == nil {
|
||||||
|
t.Errorf("pushing full buffer did not panic")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
t.Parallel()
|
||||||
|
q := queue.Make[int](1)
|
||||||
|
q.PushBack(1)
|
||||||
|
q.PushBack(2)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkPush(b *testing.B) {
|
||||||
|
q := queue.Make[int](b.N)
|
||||||
|
rs := rand.NewSource(0)
|
||||||
|
r := rand.New(rs)
|
||||||
|
items := make([]int, b.N)
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
items[i] = r.Int()
|
||||||
|
}
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
q.PushBack(items[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkPop(b *testing.B) {
|
||||||
|
q := queue.Make[int](b.N)
|
||||||
|
rs := rand.NewSource(0)
|
||||||
|
r := rand.New(rs)
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
q.PushBack(r.Int())
|
||||||
|
}
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
q.PopFront()
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user