package precise import ( "sync" "gogs.humancabbage.net/sam/priorityq/circ" ) // Q is a precise, concurrent priority queue. // // Each queue has two internal buffers, high and low. This implementation // guarantees that when there are items in both buffers, consumers receive // ones from the high priority buffer first. // // Each buffer has the same capacity, set on initial construction. Sending to // a buffer will block if it is full, even if the other buffer has space. type Q[T any] struct { *state[T] } // Make a new priority queue. func Make[T any](cap int) Q[T] { high := circ.Make[T](cap) low := circ.Make[T](cap) s := &state[T]{ high: high, low: low, } s.canRecv = sync.NewCond(&s.mu) s.canSendHigh = sync.NewCond(&s.mu) s.canSendLow = sync.NewCond(&s.mu) return Q[T]{s} } type state[T any] struct { mu sync.Mutex high circ.B[T] low circ.B[T] canSendHigh *sync.Cond canSendLow *sync.Cond canRecv *sync.Cond closed bool } // Close marks the queue as closed. // // Subsequent attempts to send will panic. Subsequent calls to Recv will // continue to return the remaining items in the queue. func (s *state[T]) Close() { s.mu.Lock() s.closed = true s.mu.Unlock() s.canRecv.Broadcast() } // Recv returns an item from the prioritized buffers, blocking if empty. // // The returned bool will be true if the queue still has items or is open. // It will be false if the queue is empty and closed. func (s *state[T]) Recv() (T, bool) { s.mu.Lock() defer s.mu.Unlock() for { for !s.closed && !s.high.CanPop() && !s.low.CanPop() { s.canRecv.Wait() } if s.closed && !s.high.CanPop() && !s.low.CanPop() { var empty T return empty, false } if s.high.CanPop() { value := s.high.PopFront() s.canSendHigh.Broadcast() return value, true } if s.low.CanPop() { value := s.low.PopFront() s.canSendLow.Broadcast() return value, true } } } // Send is an alias for SendLow. func (s *state[T]) Send(value T) { s.SendLow(value) } // SendHigh adds an item to the high priority buffer, blocking if full. func (s *state[T]) SendHigh(value T) { s.send(value, &s.high, s.canSendHigh) } // SendLow adds an item to the low priority buffer, blocking if full. func (s *state[T]) SendLow(value T) { s.send(value, &s.low, s.canSendLow) } // TryRecv attempts to return an item from the prioritized buffers. // // This method does not block. If there is an item in a buffer, it returns // true. If the buffer is empty, it returns false. func (s *state[T]) TryRecv() (value T, ok bool) { s.mu.Lock() defer s.mu.Unlock() if s.high.CanPop() { value = s.high.PopFront() ok = true s.canSendHigh.Broadcast() return } if s.low.CanPop() { value = s.low.PopFront() ok = true s.canSendLow.Broadcast() return } return } // TrySendHigh attempts to add an item to the high priority buffer. // // This method does not block. If there is space in the buffer, it returns // true. If the buffer is full, it returns false. func (s *state[T]) TrySendHigh(value T) bool { return s.trySend(value, &s.high) } // TrySendLow attempts to add an item to the low priority buffer. // // This method does not block. If there is space in the buffer, it returns // true. If the buffer is full, it returns false. func (s *state[T]) TrySendLow(value T) bool { return s.trySend(value, &s.low) } func (s *state[T]) send(value T, buf *circ.B[T], cond *sync.Cond) { s.mu.Lock() defer s.mu.Unlock() for { if s.closed { panic("send on closed queue") } for !buf.CanPush() { cond.Wait() } if buf.CanPush() { buf.PushBack(value) s.canRecv.Broadcast() return } } } func (s *state[T]) trySend(value T, buf *circ.B[T]) bool { s.mu.Lock() defer s.mu.Unlock() if !buf.CanPush() { return false } buf.PushBack(value) s.canRecv.Broadcast() return true }