214 lines
5.2 KiB
Go
214 lines
5.2 KiB
Go
|
// Package npq implements a concurrent, n-priority message queue.
|
||
|
//
|
||
|
// [Q] is similar to a buffered channel, except that senders can assign one of
|
||
|
// some fixed number priority levels to each item. Receivers will always get
|
||
|
// the item with the highest priority.
|
||
|
//
|
||
|
// For example:
|
||
|
//
|
||
|
// q := npq.Make[int, string](8, 1, 2)
|
||
|
// q.Send(1, "world")
|
||
|
// q.Send(2, "hello")
|
||
|
// word1, _ := q.Recv()
|
||
|
// word2, _ := q.Recv()
|
||
|
// fmt.Println(word1, word2)
|
||
|
// q.Close()
|
||
|
// // Output: hello world
|
||
|
//
|
||
|
// # Implementation
|
||
|
//
|
||
|
// Each [Q] has an array of circular buffers, one for each priority level.
|
||
|
// Currently, the capacities for these are fixed and equal. If one buffer is
|
||
|
// full, attempts to send further items with its priority level will block
|
||
|
// ([Q.Send]) or fail ([Q.TrySend]).
|
||
|
//
|
||
|
// Compared to [mq.Q] , which has only two priority levels, [Q] performs
|
||
|
// similarly in the Send and Recv benchmarks. In the ConcSendRecv benchmark,
|
||
|
// though, this package is around 75% slower. Even more peculiar, however, in
|
||
|
// HighContention, it is 7% faster.
|
||
|
//
|
||
|
// Compared the pq package, this package is faster in the HighContention
|
||
|
// benchmark until around 145-150 priority levels; in ConcSendRecv, until
|
||
|
// around 200 levels; and in Recv, until around 750 levels.
|
||
|
//
|
||
|
// It's important to remember the memory requirement differences. A [pq.Q]
|
||
|
// with capacity N can store N items; a [Q] with capacity N and P priority
|
||
|
// levels can store N*P items.
|
||
|
package npq
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"sync"
|
||
|
|
||
|
"gogs.humancabbage.net/sam/priorityq"
|
||
|
"gogs.humancabbage.net/sam/priorityq/mq"
|
||
|
"gogs.humancabbage.net/sam/priorityq/pq"
|
||
|
"gogs.humancabbage.net/sam/priorityq/queue"
|
||
|
"golang.org/x/exp/constraints"
|
||
|
)
|
||
|
|
||
|
// so that godoc (kinda) works
|
||
|
var _ *pq.Q[int, int]
|
||
|
var _ *mq.Q[int]
|
||
|
|
||
|
// Q is a concurrent, dual-priority message queue.
|
||
|
type Q[P constraints.Integer, T any] struct {
|
||
|
*state[P, T]
|
||
|
}
|
||
|
|
||
|
// Make a new queue.
|
||
|
func Make[P constraints.Integer, T any](cap int, min P, max P) Q[P, T] {
|
||
|
numLevels := max - min + 1
|
||
|
levels := make([]queue.Q[T], numLevels)
|
||
|
for i := 0; i < int(numLevels); i++ {
|
||
|
levels[i] = queue.Make[T](cap)
|
||
|
}
|
||
|
s := &state[P, T]{
|
||
|
levels: levels,
|
||
|
min: min,
|
||
|
max: max,
|
||
|
}
|
||
|
canSend := make([]sync.Cond, numLevels)
|
||
|
for i := 0; i < len(canSend); i++ {
|
||
|
canSend[i].L = &s.mu
|
||
|
}
|
||
|
s.canRecv = sync.Cond{L: &s.mu}
|
||
|
s.canSend = canSend
|
||
|
return Q[P, T]{s}
|
||
|
}
|
||
|
|
||
|
type state[P constraints.Integer, T any] struct {
|
||
|
mu sync.Mutex
|
||
|
levels []queue.Q[T]
|
||
|
canSend []sync.Cond
|
||
|
canRecv sync.Cond
|
||
|
min P
|
||
|
max P
|
||
|
closed bool
|
||
|
}
|
||
|
|
||
|
// Close marks the queue as closed.
|
||
|
//
|
||
|
// Attempting to close an already-closed queue results in a panic.
|
||
|
func (s *state[P, T]) Close() {
|
||
|
s.mu.Lock()
|
||
|
if s.closed {
|
||
|
panic("close of closed queue")
|
||
|
}
|
||
|
s.closed = true
|
||
|
s.mu.Unlock()
|
||
|
s.canRecv.Broadcast()
|
||
|
}
|
||
|
|
||
|
// Recv gets an item, blocking when empty until one is available.
|
||
|
//
|
||
|
// The returned bool will be true if the queue still has items or is open.
|
||
|
// It will be false if the queue is empty and closed.
|
||
|
func (s *state[P, T]) Recv() (T, bool) {
|
||
|
s.mu.Lock()
|
||
|
defer s.mu.Unlock()
|
||
|
for {
|
||
|
var available int = -1
|
||
|
findAvailable := func() {
|
||
|
for levelIdx := len(s.levels) - 1; levelIdx > -1; levelIdx-- {
|
||
|
level := &s.levels[levelIdx]
|
||
|
if level.CanPop() {
|
||
|
available = levelIdx
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
findAvailable()
|
||
|
for !s.closed && available == -1 {
|
||
|
s.canRecv.Wait()
|
||
|
findAvailable()
|
||
|
}
|
||
|
if s.closed && available == -1 {
|
||
|
var empty T
|
||
|
return empty, false
|
||
|
}
|
||
|
if available != -1 {
|
||
|
level := &s.levels[available]
|
||
|
value := level.PopFront()
|
||
|
s.canSend[available].Broadcast()
|
||
|
return value, true
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Send adds an item with some priority, blocking if full.
|
||
|
func (s *state[P, T]) Send(priority P, value T) {
|
||
|
s.validatePriority(priority)
|
||
|
s.mu.Lock()
|
||
|
defer s.mu.Unlock()
|
||
|
|
||
|
levelIdx := priority - s.min
|
||
|
level := &s.levels[levelIdx]
|
||
|
for {
|
||
|
for !s.closed && !level.CanPush() {
|
||
|
s.canSend[levelIdx].Wait()
|
||
|
}
|
||
|
if s.closed {
|
||
|
panic("send on closed queue")
|
||
|
}
|
||
|
if level.CanPush() {
|
||
|
level.PushBack(value)
|
||
|
s.canRecv.Broadcast()
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// TryRecv attempts to get an item from the queue, without blocking.
|
||
|
//
|
||
|
// The error indicates whether the attempt succeeded, the queue is empty, or
|
||
|
// the queue is closed.
|
||
|
func (s *state[P, T]) TryRecv() (value T, err error) {
|
||
|
s.mu.Lock()
|
||
|
defer s.mu.Unlock()
|
||
|
|
||
|
for levelIdx := len(s.levels) - 1; levelIdx > -1; levelIdx-- {
|
||
|
level := &s.levels[levelIdx]
|
||
|
if level.CanPop() {
|
||
|
value = level.PopFront()
|
||
|
s.canSend[levelIdx].Broadcast()
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
if s.closed {
|
||
|
err = priorityq.ErrClosed
|
||
|
} else {
|
||
|
err = priorityq.ErrEmpty
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// TrySend attempts to add an item with some priority, without blocking.
|
||
|
//
|
||
|
// Returns an error from the root priorityq package, or nil if successful.
|
||
|
func (s *state[P, T]) TrySend(priority P, value T) error {
|
||
|
s.validatePriority(priority)
|
||
|
s.mu.Lock()
|
||
|
defer s.mu.Unlock()
|
||
|
if s.closed {
|
||
|
return priorityq.ErrClosed
|
||
|
}
|
||
|
levelIdx := priority - s.min
|
||
|
level := &s.levels[levelIdx]
|
||
|
if !level.CanPush() {
|
||
|
return priorityq.ErrFull
|
||
|
}
|
||
|
level.PushBack(value)
|
||
|
s.canRecv.Broadcast()
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *state[P, T]) validatePriority(priority P) {
|
||
|
if priority < s.min || priority > s.max {
|
||
|
panic(fmt.Errorf("priority %d out of range (%d, %d)",
|
||
|
priority, s.min, s.max))
|
||
|
}
|
||
|
}
|