package priorityq import ( "sync" "gogs.humancabbage.net/sam/priorityq/binheap" "golang.org/x/exp/constraints" ) // Q is a generic, concurrent priority queue. type Q[P constraints.Ordered, T any] struct { *state[P, T] } // Make a new queue. func Make[P constraints.Ordered, T any](cap int) Q[P, T] { heap := binheap.Make[P, T](cap) s := &state[P, T]{ heap: heap, } s.canRecv = sync.NewCond(&s.mu) s.canSend = sync.NewCond(&s.mu) return Q[P, T]{s} } type state[P constraints.Ordered, T any] struct { mu sync.Mutex heap binheap.H[P, T] canSend *sync.Cond canRecv *sync.Cond closed bool } // Close marks the queue as closed. // // Subsequent attempts to send will panic. Subsequent calls to Recv will // continue to return the remaining items in the queue. func (s *state[P, T]) Close() { s.mu.Lock() s.closed = true s.mu.Unlock() s.canRecv.Broadcast() } // Recv returns an item from the prioritized buffers, blocking if empty. // // The returned bool will be true if the queue still has items or is open. // It will be false if the queue is empty and closed. func (s *state[P, T]) Recv() (P, T, bool) { s.mu.Lock() defer s.mu.Unlock() for { for !s.closed && !s.heap.CanExtract() { s.canRecv.Wait() } if s.closed && !s.heap.CanExtract() { var emptyP P var emptyT T return emptyP, emptyT, false } if s.heap.CanExtract() { priority, value := s.heap.Extract() s.canSend.Broadcast() return priority, value, true } } } // Send adds an item to the queue, blocking if full. func (s *state[P, T]) Send(priority P, value T) { s.mu.Lock() defer s.mu.Unlock() for { for !s.closed && !s.heap.CanInsert() { s.canSend.Wait() } if s.closed { panic("send on closed queue") } if s.heap.CanInsert() { s.heap.Insert(priority, value) s.canRecv.Broadcast() return } } } // TryRecv attempts to return an item from the queue. // // This method does not block. If there is an item in the queue, it returns // true. If the queue is empty, it returns false. func (s *state[P, T]) TryRecv() (priority P, value T, ok bool) { s.mu.Lock() defer s.mu.Unlock() if s.heap.CanExtract() { priority, value = s.heap.Extract() ok = true s.canSend.Broadcast() return } return } // TrySend attempts to add an item to the high priority buffer. // // This method does not block. If there is space in the buffer, it returns // true. If the buffer is full, it returns false. func (s *state[P, T]) TrySend(priority P, value T) bool { s.mu.Lock() defer s.mu.Unlock() if !s.heap.CanInsert() { return false } s.heap.Insert(priority, value) s.canRecv.Broadcast() return true }