// Package npq implements a concurrent, n-priority message queue. // // [Q] is similar to a buffered channel, except that senders can assign one of // some fixed number priority levels to each item. Receivers will always get // the item with the highest priority. // // For example: // // q := npq.Make[int, string](8, 1, 2) // q.Send(1, "world") // q.Send(2, "hello") // word1, _ := q.Recv() // word2, _ := q.Recv() // fmt.Println(word1, word2) // q.Close() // // Output: hello world // // # Implementation // // Each [Q] has an array of circular buffers, one for each priority level. // Currently, the capacities for these are fixed and equal. If one buffer is // full, attempts to send further items with its priority level will block // ([Q.Send]) or fail ([Q.TrySend]). // // Compared to [mq.Q] , which has only two priority levels, [Q] performs // similarly in the Send and Recv benchmarks. In the ConcSendRecv benchmark, // though, this package is around 75% slower. Even more peculiar, however, in // HighContention, it is 7% faster. // // Compared the pq package, this package is faster in the HighContention // benchmark until around 145-150 priority levels; in ConcSendRecv, until // around 200 levels; and in Recv, until around 750 levels. // // It's important to remember the memory requirement differences. A [pq.Q] // with capacity N can store N items; a [Q] with capacity N and P priority // levels can store N*P items. package npq import ( "fmt" "iter" "sync" "gogs.humancabbage.net/sam/priorityq" "gogs.humancabbage.net/sam/priorityq/mq" "gogs.humancabbage.net/sam/priorityq/pq" "gogs.humancabbage.net/sam/priorityq/queue" "golang.org/x/exp/constraints" ) // so that godoc (kinda) works var _ *pq.Q[int, int] var _ *mq.Q[int] // Q is a concurrent, dual-priority message queue. type Q[P constraints.Integer, T any] struct { *state[P, T] } // Make a new queue. func Make[P constraints.Integer, T any](cap int, min P, max P) Q[P, T] { numLevels := max - min + 1 levels := make([]queue.Q[T], numLevels) for i := 0; i < int(numLevels); i++ { levels[i] = queue.Make[T](cap) } s := &state[P, T]{ levels: levels, min: min, max: max, } canSend := make([]sync.Cond, numLevels) for i := 0; i < len(canSend); i++ { canSend[i].L = &s.mu } s.canRecv = sync.Cond{L: &s.mu} s.canSend = canSend return Q[P, T]{s} } type state[P constraints.Integer, T any] struct { mu sync.Mutex levels []queue.Q[T] canSend []sync.Cond canRecv sync.Cond min P max P closed bool } // Close marks the queue as closed. // // Attempting to close an already-closed queue results in a panic. func (s *state[P, T]) Close() { s.mu.Lock() if s.closed { s.mu.Unlock() panic("close of closed queue") } s.closed = true s.mu.Unlock() s.canRecv.Broadcast() } // Recv gets an item, blocking when empty until one is available. // // The returned bool will be true if the queue still has items or is open. // It will be false if the queue is empty and closed. func (s *state[P, T]) Recv() (T, bool) { s.mu.Lock() defer s.mu.Unlock() for { var available int = -1 findAvailable := func() { for levelIdx := len(s.levels) - 1; levelIdx > -1; levelIdx-- { level := &s.levels[levelIdx] if level.CanPop() { available = levelIdx break } } } findAvailable() for !s.closed && available == -1 { s.canRecv.Wait() findAvailable() } if s.closed && available == -1 { var empty T return empty, false } if available != -1 { level := &s.levels[available] value := level.PopFront() s.canSend[available].Broadcast() return value, true } } } // Send adds an item with some priority, blocking if full. func (s *state[P, T]) Send(priority P, value T) { s.validatePriority(priority) s.mu.Lock() defer s.mu.Unlock() levelIdx := priority - s.min level := &s.levels[levelIdx] for { for !s.closed && !level.CanPush() { s.canSend[levelIdx].Wait() } if s.closed { panic("send on closed queue") } if level.CanPush() { level.PushBack(value) s.canRecv.Broadcast() return } } } // TryRecv attempts to get an item from the queue, without blocking. // // The error indicates whether the attempt succeeded, the queue is empty, or // the queue is closed. func (s *state[P, T]) TryRecv() (value T, err error) { s.mu.Lock() defer s.mu.Unlock() for levelIdx := len(s.levels) - 1; levelIdx > -1; levelIdx-- { level := &s.levels[levelIdx] if level.CanPop() { value = level.PopFront() s.canSend[levelIdx].Broadcast() return } } if s.closed { err = priorityq.ErrClosed } else { err = priorityq.ErrEmpty } return } // TrySend attempts to add an item with some priority, without blocking. // // Returns an error from the root priorityq package, or nil if successful. func (s *state[P, T]) TrySend(priority P, value T) error { s.validatePriority(priority) s.mu.Lock() defer s.mu.Unlock() if s.closed { return priorityq.ErrClosed } levelIdx := priority - s.min level := &s.levels[levelIdx] if !level.CanPush() { return priorityq.ErrFull } level.PushBack(value) s.canRecv.Broadcast() return nil } func (s *state[P, T]) validatePriority(priority P) { if priority < s.min || priority > s.max { panic(fmt.Errorf("priority %d out of range (%d, %d)", priority, s.min, s.max)) } } // Iter returns an iterator that consumes values until the queue is closed. func (s *state[P, T]) Iter() iter.Seq[T] { return func(yield func(T) bool) { for { t, ok := s.Recv() if !ok { return } if !yield(t) { return } } } }