Add npq package.

This commit is contained in:
Sam Fredrickson 2023-03-03 21:18:31 -08:00
parent f7474fb673
commit 9f8d0760f1
5 changed files with 493 additions and 9 deletions

View File

@ -16,6 +16,10 @@ queue, using a binary max-heap. This is more general than `mq`, because it
allows multiple levels of priority. This, of course, also makes operations allows multiple levels of priority. This, of course, also makes operations
slower. slower.
Lastly, the `npq` package implements a concurrent, n-priority message queue.
It's similar to `mq`, except that it an arbitrary fixed number of priority
levels. It can have better performance than `pq` for several hundred levels.
## Benchmarks ## Benchmarks
Here are some benchmark results from running on a Mac Studio/M1 Ultra. Here are some benchmark results from running on a Mac Studio/M1 Ultra.
@ -30,6 +34,12 @@ Here are some benchmark results from running on a Mac Studio/M1 Ultra.
HighContention-20 128.2n ± 0% HighContention-20 128.2n ± 0%
HighContentionChan-20 47.27n ± 0% HighContentionChan-20 47.27n ± 0%
pkg: gogs.humancabbage.net/sam/priorityq/npq
Send-20 13.56n ± 0%
Recv-20 13.51n ± 0%
ConcSendRecv-20 176.3n ± 8%
HighContention-20 121.0n ± 0%
pkg: gogs.humancabbage.net/sam/priorityq/pq pkg: gogs.humancabbage.net/sam/priorityq/pq
Send-20 18.79n ± 1% Send-20 18.79n ± 1%
Recv-20 268.1n ± 3% Recv-20 268.1n ± 3%

View File

@ -1,4 +1,4 @@
// Package binheap implements a binary max-heap. // Package binheap implements a non-concurrent binary max-heap.
// //
// # Implementation // # Implementation
// //

View File

@ -7,8 +7,8 @@
// For example: // For example:
// //
// q := mq.Make[string](8) // q := mq.Make[string](8)
// mq.SendLow("world") // q.SendLow("world")
// mq.SendHigh("hello") // q.SendHigh("hello")
// word1, _ := mq.Recv() // word1, _ := mq.Recv()
// word2, _ := mq.Recv() // word2, _ := mq.Recv()
// fmt.Println(word1, word2) // fmt.Println(word1, word2)
@ -17,23 +17,27 @@
// //
// # Implementation // # Implementation
// //
// Each queue has two circular buffers, one for each priority level. // Each [Q] has two circular buffers, one for each priority level. Currently,
// Currently, the capacities for these are fixed and equal. If one buffer is // the capacities for these are fixed and equal. If one buffer is full,
// full, attempts to send further items with its priority level will block // attempts to send further items with its priority level will block
// ([Q.Send]) or fail ([Q.TrySend]). // ([Q.Send]) or fail ([Q.TrySend]).
// //
// Compared the pq package, the limitation on priority levels increases // Compared [pq.Q], the limitation on priority levels increases performance,
// performance, as its circular buffers are much less expensive than the heap // as its circular buffers are much less expensive than the heap operations of
// operations of a traditional priority queue. // a traditional priority queue.
package mq package mq
import ( import (
"sync" "sync"
"gogs.humancabbage.net/sam/priorityq" "gogs.humancabbage.net/sam/priorityq"
"gogs.humancabbage.net/sam/priorityq/pq"
"gogs.humancabbage.net/sam/priorityq/queue" "gogs.humancabbage.net/sam/priorityq/queue"
) )
// so that godoc (kinda) works
var _ *pq.Q[int, int]
// Q is a concurrent, dual-priority message queue. // Q is a concurrent, dual-priority message queue.
type Q[T any] struct { type Q[T any] struct {
*state[T] *state[T]

213
npq/lib.go Normal file
View File

@ -0,0 +1,213 @@
// Package npq implements a concurrent, n-priority message queue.
//
// [Q] is similar to a buffered channel, except that senders can assign one of
// some fixed number priority levels to each item. Receivers will always get
// the item with the highest priority.
//
// For example:
//
// q := npq.Make[int, string](8, 1, 2)
// q.Send(1, "world")
// q.Send(2, "hello")
// word1, _ := q.Recv()
// word2, _ := q.Recv()
// fmt.Println(word1, word2)
// q.Close()
// // Output: hello world
//
// # Implementation
//
// Each [Q] has an array of circular buffers, one for each priority level.
// Currently, the capacities for these are fixed and equal. If one buffer is
// full, attempts to send further items with its priority level will block
// ([Q.Send]) or fail ([Q.TrySend]).
//
// Compared to [mq.Q] , which has only two priority levels, [Q] performs
// similarly in the Send and Recv benchmarks. In the ConcSendRecv benchmark,
// though, this package is around 75% slower. Even more peculiar, however, in
// HighContention, it is 7% faster.
//
// Compared the pq package, this package is faster in the HighContention
// benchmark until around 145-150 priority levels; in ConcSendRecv, until
// around 200 levels; and in Recv, until around 750 levels.
//
// It's important to remember the memory requirement differences. A [pq.Q]
// with capacity N can store N items; a [Q] with capacity N and P priority
// levels can store N*P items.
package npq
import (
"fmt"
"sync"
"gogs.humancabbage.net/sam/priorityq"
"gogs.humancabbage.net/sam/priorityq/mq"
"gogs.humancabbage.net/sam/priorityq/pq"
"gogs.humancabbage.net/sam/priorityq/queue"
"golang.org/x/exp/constraints"
)
// so that godoc (kinda) works
var _ *pq.Q[int, int]
var _ *mq.Q[int]
// Q is a concurrent, dual-priority message queue.
type Q[P constraints.Integer, T any] struct {
*state[P, T]
}
// Make a new queue.
func Make[P constraints.Integer, T any](cap int, min P, max P) Q[P, T] {
numLevels := max - min + 1
levels := make([]queue.Q[T], numLevels)
for i := 0; i < int(numLevels); i++ {
levels[i] = queue.Make[T](cap)
}
s := &state[P, T]{
levels: levels,
min: min,
max: max,
}
canSend := make([]sync.Cond, numLevels)
for i := 0; i < len(canSend); i++ {
canSend[i].L = &s.mu
}
s.canRecv = sync.Cond{L: &s.mu}
s.canSend = canSend
return Q[P, T]{s}
}
type state[P constraints.Integer, T any] struct {
mu sync.Mutex
levels []queue.Q[T]
canSend []sync.Cond
canRecv sync.Cond
min P
max P
closed bool
}
// Close marks the queue as closed.
//
// Attempting to close an already-closed queue results in a panic.
func (s *state[P, T]) Close() {
s.mu.Lock()
if s.closed {
panic("close of closed queue")
}
s.closed = true
s.mu.Unlock()
s.canRecv.Broadcast()
}
// Recv gets an item, blocking when empty until one is available.
//
// The returned bool will be true if the queue still has items or is open.
// It will be false if the queue is empty and closed.
func (s *state[P, T]) Recv() (T, bool) {
s.mu.Lock()
defer s.mu.Unlock()
for {
var available int = -1
findAvailable := func() {
for levelIdx := len(s.levels) - 1; levelIdx > -1; levelIdx-- {
level := &s.levels[levelIdx]
if level.CanPop() {
available = levelIdx
break
}
}
}
findAvailable()
for !s.closed && available == -1 {
s.canRecv.Wait()
findAvailable()
}
if s.closed && available == -1 {
var empty T
return empty, false
}
if available != -1 {
level := &s.levels[available]
value := level.PopFront()
s.canSend[available].Broadcast()
return value, true
}
}
}
// Send adds an item with some priority, blocking if full.
func (s *state[P, T]) Send(priority P, value T) {
s.validatePriority(priority)
s.mu.Lock()
defer s.mu.Unlock()
levelIdx := priority - s.min
level := &s.levels[levelIdx]
for {
for !s.closed && !level.CanPush() {
s.canSend[levelIdx].Wait()
}
if s.closed {
panic("send on closed queue")
}
if level.CanPush() {
level.PushBack(value)
s.canRecv.Broadcast()
return
}
}
}
// TryRecv attempts to get an item from the queue, without blocking.
//
// The error indicates whether the attempt succeeded, the queue is empty, or
// the queue is closed.
func (s *state[P, T]) TryRecv() (value T, err error) {
s.mu.Lock()
defer s.mu.Unlock()
for levelIdx := len(s.levels) - 1; levelIdx > -1; levelIdx-- {
level := &s.levels[levelIdx]
if level.CanPop() {
value = level.PopFront()
s.canSend[levelIdx].Broadcast()
return
}
}
if s.closed {
err = priorityq.ErrClosed
} else {
err = priorityq.ErrEmpty
}
return
}
// TrySend attempts to add an item with some priority, without blocking.
//
// Returns an error from the root priorityq package, or nil if successful.
func (s *state[P, T]) TrySend(priority P, value T) error {
s.validatePriority(priority)
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return priorityq.ErrClosed
}
levelIdx := priority - s.min
level := &s.levels[levelIdx]
if !level.CanPush() {
return priorityq.ErrFull
}
level.PushBack(value)
s.canRecv.Broadcast()
return nil
}
func (s *state[P, T]) validatePriority(priority P) {
if priority < s.min || priority > s.max {
panic(fmt.Errorf("priority %d out of range (%d, %d)",
priority, s.min, s.max))
}
}

257
npq/lib_test.go Normal file
View File

@ -0,0 +1,257 @@
package npq_test
import (
"math/rand"
"runtime"
"sync"
"testing"
"gogs.humancabbage.net/sam/priorityq"
"gogs.humancabbage.net/sam/priorityq/npq"
)
func TestRecvHighFirst(t *testing.T) {
t.Parallel()
q := npq.Make[int, int](4, 1, 2)
q.Send(1, 1)
q.Send(1, 2)
q.Send(1, 3)
q.Send(1, 4)
q.Send(2, 5)
q.Send(2, 6)
q.Send(2, 7)
q.Send(2, 8)
checkRecv := func(n int) {
if v, _ := q.Recv(); v != n {
t.Errorf("popped %d, expected %d", v, n)
}
}
checkRecv(5)
checkRecv(6)
checkRecv(7)
checkRecv(8)
checkRecv(1)
checkRecv(2)
checkRecv(3)
checkRecv(4)
}
func TestSendClosedPanic(t *testing.T) {
t.Parallel()
defer func() {
if r := recover(); r == nil {
t.Errorf("sending to closed queue did not panic")
}
}()
q := npq.Make[int, int](4, 1, 2)
q.Close()
q.Send(1, 1)
}
func TestRecvClosed(t *testing.T) {
t.Parallel()
q := npq.Make[int, int](4, 1, 2)
q.Send(1, 1)
q.Close()
_, ok := q.Recv()
if !ok {
t.Errorf("queue should have item to receive")
}
_, ok = q.Recv()
if ok {
t.Errorf("queue should be closed")
}
}
func TestDoubleClose(t *testing.T) {
t.Parallel()
q := npq.Make[int, int](4, 1, 2)
defer func() {
if r := recover(); r == nil {
t.Errorf("closing a closed queue did not panic")
}
}()
q.Close()
q.Close()
}
func TestTrySendRecv(t *testing.T) {
t.Parallel()
q := npq.Make[int, int](4, 1, 2)
assumeSendOk := func(n int, f func(int) error) {
err := f(n)
if err != nil {
t.Errorf("expected to be able to send")
}
}
assumeRecvOk := func(expected int) {
actual, err := q.TryRecv()
if err != nil {
t.Errorf("expected to be able to receive")
}
if actual != expected {
t.Errorf("expected %d, got %d", expected, actual)
}
}
trySendLow := func(n int) error {
return q.TrySend(1, n)
}
trySendHigh := func(n int) error {
return q.TrySend(2, n)
}
assumeSendOk(1, trySendLow)
assumeSendOk(2, trySendLow)
assumeSendOk(3, trySendLow)
assumeSendOk(4, trySendLow)
err := trySendLow(5)
if err == nil {
t.Errorf("expected low buffer to be full")
}
assumeRecvOk(1)
assumeRecvOk(2)
assumeRecvOk(3)
assumeRecvOk(4)
assumeSendOk(5, trySendHigh)
assumeSendOk(6, trySendHigh)
assumeSendOk(7, trySendHigh)
assumeSendOk(8, trySendHigh)
err = trySendHigh(5)
if err == nil {
t.Errorf("expected high buffer to be full")
}
assumeRecvOk(5)
assumeRecvOk(6)
assumeRecvOk(7)
assumeRecvOk(8)
_, err = q.TryRecv()
if err != priorityq.ErrEmpty {
t.Errorf("expected queue to be empty")
}
q.Close()
_, err = q.TryRecv()
if err != priorityq.ErrClosed {
t.Errorf("expected queue to be closed ")
}
err = q.TrySend(1, 5)
if err != priorityq.ErrClosed {
t.Errorf("expected queue to be closed ")
}
}
func TestConcProducerConsumer(t *testing.T) {
t.Parallel()
q := npq.Make[int, int](4, 1, 2)
var wg sync.WaitGroup
produceDone := make(chan struct{})
wg.Add(2)
go func() {
for i := 0; i < 10000; i++ {
q.Send(rand.Intn(2)+1, i)
}
close(produceDone)
wg.Done()
}()
go func() {
ok := true
for ok {
_, ok = q.Recv()
}
wg.Done()
}()
<-produceDone
t.Logf("producer done, closing channel")
q.Close()
wg.Wait()
}
const highPriority = 2
func BenchmarkSend(b *testing.B) {
q := npq.Make[int, int](b.N, 1, highPriority)
// randomize priorities to get amortized cost per op
ps := make([]int, b.N)
for i := 0; i < b.N; i++ {
ps[i] = rand.Intn(highPriority) + 1
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Send(ps[i], i)
}
}
func BenchmarkRecv(b *testing.B) {
q := npq.Make[int, int](b.N, 1, highPriority)
for i := 0; i < b.N; i++ {
q.Send(rand.Intn(highPriority)+1, i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Recv()
}
}
func BenchmarkConcSendRecv(b *testing.B) {
q := npq.Make[int, int](b.N, 1, highPriority)
// randomize priorities to get amortized cost per op
ps := make([]int, b.N)
for i := 0; i < b.N; i++ {
ps[i] = rand.Intn(highPriority) + 1
}
var wg sync.WaitGroup
wg.Add(2)
start := make(chan struct{})
go func() {
<-start
for i := 0; i < b.N; i++ {
q.Send(ps[i], i)
}
wg.Done()
}()
go func() {
<-start
for i := 0; i < b.N; i++ {
q.Recv()
}
wg.Done()
}()
b.ResetTimer()
close(start)
wg.Wait()
}
func BenchmarkHighContention(b *testing.B) {
q := npq.Make[int, int](b.N, 1, highPriority)
var wg sync.WaitGroup
start := make(chan struct{})
done := make(chan struct{})
numProducers := runtime.NumCPU()
sendsPerProducer := b.N / numProducers
wg.Add(numProducers)
for i := 0; i < numProducers; i++ {
go func() {
ps := make([]int, sendsPerProducer)
for i := range ps {
ps[i] = rand.Intn(highPriority) + 1
}
<-start
for i := 0; i < sendsPerProducer; i++ {
q.Send(ps[i], 1)
}
wg.Done()
}()
}
go func() {
ok := true
for ok {
_, ok = q.Recv()
}
close(done)
}()
b.ResetTimer()
close(start)
wg.Wait()
q.Close()
<-done
}