diff --git a/src/queue.rs b/src/queue.rs index 0893308..a681149 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -216,8 +216,8 @@ mod tests { #[test] fn test_fill_and_empty_repeatedly( - test_size in 1..10_000usize, - num_repeats in 1..100usize + test_size in 1..1_000usize, + num_repeats in 1..10usize ) { let q = Queue::new(); @@ -323,6 +323,53 @@ mod tests { prop_assert!(expected.is_empty()); } + #[test] + fn test_single_concurrent_enqueuer_multiple_concurrent_dequeuers( + num_threads in 2..10usize, + mut items in vec(any::(), 1..1_000) + ) { + let q = Arc::new(Queue::new()); + + let (tx, rx) = std::sync::mpsc::channel(); + let enqueuing = Arc::new(AtomicBool::new(true)); + + for _ in 0..num_threads { + let q = q.clone(); + let tx = tx.clone(); + let enqueuing = enqueuing.clone(); + + thread::spawn(move || { + loop { + while let Some(item) = q.dequeue() { + tx.send(item).unwrap(); + } + + if !enqueuing.load(Ordering::Relaxed) { + break; + } + } + }); + } + + drop(tx); + + for item in items.clone() { + q.enqueue(item); + } + + enqueuing.store(false, Ordering::Relaxed); + + let mut res = vec![]; + while let Ok(item) = rx.recv() { + res.push(item); + } + + items.sort(); + res.sort(); + + prop_assert_eq!(res, items); + } + #[test] fn test_multiple_concurrent_enqueuers_multiple_concurrent_dequeuers( num_threads in 2..10usize,