diff --git a/mq/lib.go b/mq/lib.go index f93bda3..ccf704e 100644 --- a/mq/lib.go +++ b/mq/lib.go @@ -28,6 +28,7 @@ package mq import ( + "iter" "sync" "gogs.humancabbage.net/sam/priorityq" @@ -199,3 +200,18 @@ func (s *state[T]) trySend(value T, buf *queue.Q[T]) error { s.canRecv.Broadcast() return nil } + +// Iter returns an iterator that consumes values until the queue is closed. +func (s *state[T]) Iter() iter.Seq[T] { + return func(yield func(T) bool) { + for { + t, ok := s.Recv() + if !ok { + return + } + if !yield(t) { + return + } + } + } +} diff --git a/mq/lib_test.go b/mq/lib_test.go index 5b3eb0a..dfcfa5b 100644 --- a/mq/lib_test.go +++ b/mq/lib_test.go @@ -164,6 +164,30 @@ func TestConcProducerConsumer(t *testing.T) { wg.Wait() } +func TestIter(t *testing.T) { + t.Parallel() + q := mq.Make[int](4) + q.Send(1) + q.Send(2) + q.Send(3) + q.SendHigh(0) + q.Close() + i := 0 + for v := range q.Iter() { + if v != i { + t.Errorf("expected %d, got %d", i, v) + } + i++ + } + + // to test yield() returning false + q = mq.Make[int](4) + q.Send(3) + for _ = range q.Iter() { + break + } +} + func BenchmarkSend(b *testing.B) { q := mq.Make[int](b.N) b.ResetTimer() diff --git a/npq/lib.go b/npq/lib.go index 2e0d79f..229e911 100644 --- a/npq/lib.go +++ b/npq/lib.go @@ -38,6 +38,7 @@ package npq import ( "fmt" + "iter" "sync" "gogs.humancabbage.net/sam/priorityq" @@ -212,3 +213,18 @@ func (s *state[P, T]) validatePriority(priority P) { priority, s.min, s.max)) } } + +// Iter returns an iterator that consumes values until the queue is closed. +func (s *state[P, T]) Iter() iter.Seq[T] { + return func(yield func(T) bool) { + for { + t, ok := s.Recv() + if !ok { + return + } + if !yield(t) { + return + } + } + } +} diff --git a/npq/lib_test.go b/npq/lib_test.go index 026a701..e24ac90 100644 --- a/npq/lib_test.go +++ b/npq/lib_test.go @@ -186,6 +186,30 @@ func TestConcProducerConsumer(t *testing.T) { wg.Wait() } +func TestIter(t *testing.T) { + t.Parallel() + q := npq.Make[int, int](4, 0, 16) + q.Send(4, 0) + q.Send(3, 1) + q.Send(2, 2) + q.Send(1, 3) + q.Close() + i := 0 + for v := range q.Iter() { + if v != i { + t.Errorf("expected %d, got %d", i, v) + } + i++ + } + + // to test yield() returning false + q = npq.Make[int, int](4, 0, 16) + q.Send(1, 3) + for _ = range q.Iter() { + break + } +} + const highPriority = 2 func BenchmarkSend(b *testing.B) {