// Package mq implements a concurrent, dual-priority message queue. // // [Q] is similar to a buffered channel, except that senders can assign one of // two priority levels to each item, "high" or "low." Receivers will always // get a high-priority item ahead of any low-priority ones. // // For example: // // q := mq.Make[string](8) // mq.SendLow("world") // mq.SendHigh("hello") // word1, _ := mq.Recv() // word2, _ := mq.Recv() // fmt.Println(word1, word2) // q.Close() // // Output: hello world // // # Implementation // // Each queue has two circular buffers, one for each priority level. // Currently, the capacities for these are fixed and equal. If one buffer is // full, attempts to send further items with its priority level will block // ([Q.Send]) or fail ([Q.TrySend]). // // Compared the pq package, the limitation on priority levels increases // performance, as its circular buffers are much less expensive than the heap // operations of a traditional priority queue. package mq import ( "sync" "gogs.humancabbage.net/sam/priorityq" "gogs.humancabbage.net/sam/priorityq/circ" ) // Q is a concurrent, dual-priority message queue. type Q[T any] struct { *state[T] } // Make a new queue. func Make[T any](cap int) Q[T] { high := circ.Make[T](cap) low := circ.Make[T](cap) s := &state[T]{ high: high, low: low, } s.canRecv = sync.Cond{L: &s.mu} s.canSendHigh = sync.Cond{L: &s.mu} s.canSendLow = sync.Cond{L: &s.mu} return Q[T]{s} } type state[T any] struct { mu sync.Mutex high circ.B[T] low circ.B[T] canSendHigh sync.Cond canSendLow sync.Cond canRecv sync.Cond closed bool } // Close marks the queue as closed. // // Attempting to close an already-closed queue results in a panic. func (s *state[T]) Close() { s.mu.Lock() if s.closed { panic("close of closed queue") } s.closed = true s.mu.Unlock() s.canRecv.Broadcast() } // Recv gets an item, blocking when empty until one is available. // // The returned bool will be true if the queue still has items or is open. // It will be false if the queue is empty and closed. func (s *state[T]) Recv() (T, bool) { s.mu.Lock() defer s.mu.Unlock() for { for !s.closed && !s.high.CanPop() && !s.low.CanPop() { s.canRecv.Wait() } if s.closed && !s.high.CanPop() && !s.low.CanPop() { var empty T return empty, false } if s.high.CanPop() { value := s.high.PopFront() s.canSendHigh.Broadcast() return value, true } if s.low.CanPop() { value := s.low.PopFront() s.canSendLow.Broadcast() return value, true } } } // Send is an alias for SendLow. func (s *state[T]) Send(value T) { s.SendLow(value) } // SendHigh adds an item with high priority, blocking if full. func (s *state[T]) SendHigh(value T) { s.send(value, &s.high, &s.canSendHigh) } // SendLow adds an item with low buffer, blocking if full. func (s *state[T]) SendLow(value T) { s.send(value, &s.low, &s.canSendLow) } // TryRecv attempts to get an item from the queue, without blocking. // // The error indicates whether the attempt succeeded, the queue is empty, or // the queue is closed. func (s *state[T]) TryRecv() (value T, err error) { s.mu.Lock() defer s.mu.Unlock() if s.high.CanPop() { value = s.high.PopFront() s.canSendHigh.Broadcast() return } if s.low.CanPop() { value = s.low.PopFront() s.canSendLow.Broadcast() return } if s.closed { err = priorityq.ErrClosed } else { err = priorityq.ErrEmpty } return } // TrySend is an alias for TrySendLow. func (s *state[T]) TrySend(value T) error { return s.trySend(value, &s.low) } // TrySendHigh attempts to add an item with high priority, without blocking. // // Returns an error from the root priorityq package, or nil if successful. func (s *state[T]) TrySendHigh(value T) error { return s.trySend(value, &s.high) } // TrySendLow attempts to add an item with low priority, without blocking. // // Returns an error from the root priorityq package, or nil if successful. func (s *state[T]) TrySendLow(value T) error { return s.trySend(value, &s.low) } func (s *state[T]) send(value T, buf *circ.B[T], cond *sync.Cond) { s.mu.Lock() defer s.mu.Unlock() for { for !s.closed && !buf.CanPush() { cond.Wait() } if s.closed { panic("send on closed queue") } if buf.CanPush() { buf.PushBack(value) s.canRecv.Broadcast() return } } } func (s *state[T]) trySend(value T, buf *circ.B[T]) error { s.mu.Lock() defer s.mu.Unlock() if s.closed { return priorityq.ErrClosed } if !buf.CanPush() { return priorityq.ErrFull } buf.PushBack(value) s.canRecv.Broadcast() return nil }