202 lines
4.6 KiB
Go
202 lines
4.6 KiB
Go
// Package mq implements a concurrent, dual-priority message queue.
|
|
//
|
|
// [Q] is similar to a buffered channel, except that senders can assign one of
|
|
// two priority levels to each item, "high" or "low." Receivers will always
|
|
// get a high-priority item ahead of any low-priority ones.
|
|
//
|
|
// For example:
|
|
//
|
|
// q := mq.Make[string](8)
|
|
// q.SendLow("world")
|
|
// q.SendHigh("hello")
|
|
// word1, _ := mq.Recv()
|
|
// word2, _ := mq.Recv()
|
|
// fmt.Println(word1, word2)
|
|
// q.Close()
|
|
// // Output: hello world
|
|
//
|
|
// # Implementation
|
|
//
|
|
// Each [Q] has two circular buffers, one for each priority level. Currently,
|
|
// the capacities for these are fixed and equal. If one buffer is full,
|
|
// attempts to send further items with its priority level will block
|
|
// ([Q.Send]) or fail ([Q.TrySend]).
|
|
//
|
|
// Compared [pq.Q], the limitation on priority levels increases performance,
|
|
// as its circular buffers are much less expensive than the heap operations of
|
|
// a traditional priority queue.
|
|
package mq
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"gogs.humancabbage.net/sam/priorityq"
|
|
"gogs.humancabbage.net/sam/priorityq/pq"
|
|
"gogs.humancabbage.net/sam/priorityq/queue"
|
|
)
|
|
|
|
// so that godoc (kinda) works
|
|
var _ *pq.Q[int, int]
|
|
|
|
// Q is a concurrent, dual-priority message queue.
|
|
type Q[T any] struct {
|
|
*state[T]
|
|
}
|
|
|
|
// Make a new queue.
|
|
func Make[T any](cap int) Q[T] {
|
|
high := queue.Make[T](cap)
|
|
low := queue.Make[T](cap)
|
|
s := &state[T]{
|
|
high: high,
|
|
low: low,
|
|
}
|
|
s.canRecv = sync.Cond{L: &s.mu}
|
|
s.canSendHigh = sync.Cond{L: &s.mu}
|
|
s.canSendLow = sync.Cond{L: &s.mu}
|
|
return Q[T]{s}
|
|
}
|
|
|
|
type state[T any] struct {
|
|
mu sync.Mutex
|
|
high queue.Q[T]
|
|
low queue.Q[T]
|
|
canSendHigh sync.Cond
|
|
canSendLow sync.Cond
|
|
canRecv sync.Cond
|
|
closed bool
|
|
}
|
|
|
|
// Close marks the queue as closed.
|
|
//
|
|
// Attempting to close an already-closed queue results in a panic.
|
|
func (s *state[T]) Close() {
|
|
s.mu.Lock()
|
|
if s.closed {
|
|
s.mu.Unlock()
|
|
panic("close of closed queue")
|
|
}
|
|
s.closed = true
|
|
s.mu.Unlock()
|
|
s.canRecv.Broadcast()
|
|
}
|
|
|
|
// Recv gets an item, blocking when empty until one is available.
|
|
//
|
|
// The returned bool will be true if the queue still has items or is open.
|
|
// It will be false if the queue is empty and closed.
|
|
func (s *state[T]) Recv() (T, bool) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
for {
|
|
for !s.closed && !s.high.CanPop() && !s.low.CanPop() {
|
|
s.canRecv.Wait()
|
|
}
|
|
if s.closed && !s.high.CanPop() && !s.low.CanPop() {
|
|
var empty T
|
|
return empty, false
|
|
}
|
|
if s.high.CanPop() {
|
|
value := s.high.PopFront()
|
|
s.canSendHigh.Broadcast()
|
|
return value, true
|
|
}
|
|
if s.low.CanPop() {
|
|
value := s.low.PopFront()
|
|
s.canSendLow.Broadcast()
|
|
return value, true
|
|
}
|
|
}
|
|
}
|
|
|
|
// Send is an alias for SendLow.
|
|
func (s *state[T]) Send(value T) {
|
|
s.SendLow(value)
|
|
}
|
|
|
|
// SendHigh adds an item with high priority, blocking if full.
|
|
func (s *state[T]) SendHigh(value T) {
|
|
s.send(value, &s.high, &s.canSendHigh)
|
|
}
|
|
|
|
// SendLow adds an item with low buffer, blocking if full.
|
|
func (s *state[T]) SendLow(value T) {
|
|
s.send(value, &s.low, &s.canSendLow)
|
|
}
|
|
|
|
// TryRecv attempts to get an item from the queue, without blocking.
|
|
//
|
|
// The error indicates whether the attempt succeeded, the queue is empty, or
|
|
// the queue is closed.
|
|
func (s *state[T]) TryRecv() (value T, err error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.high.CanPop() {
|
|
value = s.high.PopFront()
|
|
s.canSendHigh.Broadcast()
|
|
return
|
|
}
|
|
if s.low.CanPop() {
|
|
value = s.low.PopFront()
|
|
s.canSendLow.Broadcast()
|
|
return
|
|
}
|
|
if s.closed {
|
|
err = priorityq.ErrClosed
|
|
} else {
|
|
err = priorityq.ErrEmpty
|
|
}
|
|
return
|
|
}
|
|
|
|
// TrySend is an alias for TrySendLow.
|
|
func (s *state[T]) TrySend(value T) error {
|
|
return s.trySend(value, &s.low)
|
|
}
|
|
|
|
// TrySendHigh attempts to add an item with high priority, without blocking.
|
|
//
|
|
// Returns an error from the root priorityq package, or nil if successful.
|
|
func (s *state[T]) TrySendHigh(value T) error {
|
|
return s.trySend(value, &s.high)
|
|
}
|
|
|
|
// TrySendLow attempts to add an item with low priority, without blocking.
|
|
//
|
|
// Returns an error from the root priorityq package, or nil if successful.
|
|
func (s *state[T]) TrySendLow(value T) error {
|
|
return s.trySend(value, &s.low)
|
|
}
|
|
|
|
func (s *state[T]) send(value T, buf *queue.Q[T], cond *sync.Cond) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
for {
|
|
for !s.closed && !buf.CanPush() {
|
|
cond.Wait()
|
|
}
|
|
if s.closed {
|
|
panic("send on closed queue")
|
|
}
|
|
if buf.CanPush() {
|
|
buf.PushBack(value)
|
|
s.canRecv.Broadcast()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *state[T]) trySend(value T, buf *queue.Q[T]) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.closed {
|
|
return priorityq.ErrClosed
|
|
}
|
|
if !buf.CanPush() {
|
|
return priorityq.ErrFull
|
|
}
|
|
buf.PushBack(value)
|
|
s.canRecv.Broadcast()
|
|
return nil
|
|
}
|