package mq_test import ( "math/rand" "runtime" "sync" "testing" "gogs.humancabbage.net/sam/priorityq" "gogs.humancabbage.net/sam/priorityq/mq" ) func TestRecvHighFirst(t *testing.T) { t.Parallel() q := mq.Make[int](4) q.Send(1) q.Send(2) q.Send(3) q.Send(4) q.SendHigh(5) q.SendHigh(6) q.SendHigh(7) q.SendHigh(8) checkRecv := func(n int) { if v, _ := q.Recv(); v != n { t.Errorf("popped %d, expected %d", v, n) } } checkRecv(5) checkRecv(6) checkRecv(7) checkRecv(8) checkRecv(1) checkRecv(2) checkRecv(3) checkRecv(4) } func TestSendClosedPanic(t *testing.T) { t.Parallel() defer func() { if r := recover(); r == nil { t.Errorf("sending to closed queue did not panic") } }() q := mq.Make[int](4) q.Close() q.Send(1) } func TestRecvClosed(t *testing.T) { t.Parallel() q := mq.Make[int](4) q.Send(1) q.Close() _, ok := q.Recv() if !ok { t.Errorf("queue should have item to receive") } _, ok = q.Recv() if ok { t.Errorf("queue should be closed") } } func TestDoubleClose(t *testing.T) { t.Parallel() q := mq.Make[int](4) defer func() { if r := recover(); r == nil { t.Errorf("closing a closed queue did not panic") } }() q.Close() q.Close() } func TestTrySendRecv(t *testing.T) { t.Parallel() q := mq.Make[int](4) assumeSendOk := func(n int, f func(int) error) { err := f(n) if err != nil { t.Errorf("expected to be able to send") } } assumeRecvOk := func(expected int) { actual, err := q.TryRecv() if err != nil { t.Errorf("expected to be able to receive") } if actual != expected { t.Errorf("expected %d, got %d", expected, actual) } } assumeSendOk(1, q.TrySendLow) assumeSendOk(2, q.TrySendLow) assumeSendOk(3, q.TrySend) assumeSendOk(4, q.TrySendLow) err := q.TrySendLow(5) if err == nil { t.Errorf("expected low buffer to be full") } assumeRecvOk(1) assumeRecvOk(2) assumeRecvOk(3) assumeRecvOk(4) assumeSendOk(5, q.TrySendHigh) assumeSendOk(6, q.TrySendHigh) assumeSendOk(7, q.TrySendHigh) assumeSendOk(8, q.TrySendHigh) err = q.TrySendHigh(5) if err == nil { t.Errorf("expected high buffer to be full") } assumeRecvOk(5) assumeRecvOk(6) assumeRecvOk(7) assumeRecvOk(8) _, err = q.TryRecv() if err != priorityq.ErrEmpty { t.Errorf("expected queue to be empty") } q.Close() _, err = q.TryRecv() if err != priorityq.ErrClosed { t.Errorf("expected queue to be closed ") } err = q.TrySend(5) if err != priorityq.ErrClosed { t.Errorf("expected queue to be closed ") } } func TestConcProducerConsumer(t *testing.T) { t.Parallel() q := mq.Make[int](4) var wg sync.WaitGroup produceDone := make(chan struct{}) wg.Add(2) go func() { for i := 0; i < 10000; i++ { if rand.Intn(2) == 0 { q.Send(i) } else { q.SendHigh(i) } } close(produceDone) wg.Done() }() go func() { ok := true for ok { _, ok = q.Recv() } wg.Done() }() <-produceDone t.Logf("producer done, closing channel") q.Close() wg.Wait() } func BenchmarkSend(b *testing.B) { q := mq.Make[int](b.N) b.ResetTimer() for i := 0; i < b.N; i++ { q.Send(i) } } func BenchmarkSendChan(b *testing.B) { c := make(chan int, b.N) b.ResetTimer() for i := 0; i < b.N; i++ { c <- i } } func BenchmarkRecv(b *testing.B) { q := mq.Make[int](b.N) for i := 0; i < b.N; i++ { q.Send(i) } b.ResetTimer() for i := 0; i < b.N; i++ { q.Recv() } } func BenchmarkRecvChan(b *testing.B) { c := make(chan int, b.N) for i := 0; i < b.N; i++ { c <- i } b.ResetTimer() for i := 0; i < b.N; i++ { <-c } } func BenchmarkConcSendRecv(b *testing.B) { q := mq.Make[int](b.N) var wg sync.WaitGroup wg.Add(2) start := make(chan struct{}) go func() { <-start for i := 0; i < b.N; i++ { q.Send(i) } wg.Done() }() go func() { <-start for i := 0; i < b.N; i++ { q.Recv() } wg.Done() }() b.ResetTimer() close(start) wg.Wait() } func BenchmarkConcSendRecvChan(b *testing.B) { c := make(chan int, b.N) var wg sync.WaitGroup wg.Add(2) start := make(chan struct{}) go func() { <-start for i := 0; i < b.N; i++ { c <- i } wg.Done() }() go func() { <-start for i := 0; i < b.N; i++ { <-c } wg.Done() }() b.ResetTimer() close(start) wg.Wait() } func BenchmarkHighContention(b *testing.B) { q := mq.Make[int](b.N) var wg sync.WaitGroup start := make(chan struct{}) done := make(chan struct{}) numProducers := runtime.NumCPU() sendsPerProducer := b.N / numProducers wg.Add(numProducers) for i := 0; i < numProducers; i++ { go func() { <-start for i := 0; i < sendsPerProducer; i++ { q.Send(1) } wg.Done() }() } go func() { ok := true for ok { _, ok = q.Recv() } close(done) }() b.ResetTimer() close(start) wg.Wait() q.Close() <-done } func BenchmarkHighContentionChan(b *testing.B) { c := make(chan int, b.N) var wg sync.WaitGroup start := make(chan struct{}) done := make(chan struct{}) numProducers := runtime.NumCPU() sendsPerProducer := b.N / numProducers wg.Add(numProducers) for i := 0; i < numProducers; i++ { go func() { <-start for i := 0; i < sendsPerProducer; i++ { c <- 1 } wg.Done() }() } go func() { for n := range c { _ = n } close(done) }() b.ResetTimer() close(start) wg.Wait() close(c) <-done }