Skip to content

Commit

Permalink
test_single_concurrent_enqueuer_multiple_concurrent_dequeuers
Browse files Browse the repository at this point in the history
  • Loading branch information
harryscholes committed Feb 21, 2024
1 parent 07e89ac commit 22140bd
Showing 1 changed file with 49 additions and 2 deletions.
51 changes: 49 additions & 2 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ mod tests {

#[test]
fn test_fill_and_empty_repeatedly(
test_size in 1..10_000usize,
num_repeats in 1..100usize
test_size in 1..1_000usize,
num_repeats in 1..10usize
) {
let q = Queue::new();

Expand Down Expand Up @@ -323,6 +323,53 @@ mod tests {
prop_assert!(expected.is_empty());
}

#[test]
fn test_single_concurrent_enqueuer_multiple_concurrent_dequeuers(
num_threads in 2..10usize,
mut items in vec(any::<usize>(), 1..1_000)
) {
let q = Arc::new(Queue::new());

let (tx, rx) = std::sync::mpsc::channel();
let enqueuing = Arc::new(AtomicBool::new(true));

for _ in 0..num_threads {
let q = q.clone();
let tx = tx.clone();
let enqueuing = enqueuing.clone();

thread::spawn(move || {
loop {
while let Some(item) = q.dequeue() {
tx.send(item).unwrap();
}

if !enqueuing.load(Ordering::Relaxed) {
break;
}
}
});
}

drop(tx);

for item in items.clone() {
q.enqueue(item);
}

enqueuing.store(false, Ordering::Relaxed);

let mut res = vec![];
while let Ok(item) = rx.recv() {
res.push(item);
}

items.sort();
res.sort();

prop_assert_eq!(res, items);
}

#[test]
fn test_multiple_concurrent_enqueuers_multiple_concurrent_dequeuers(
num_threads in 2..10usize,
Expand Down

0 comments on commit 22140bd

Please sign in to comment.