Skip to content

Commit

Permalink
Threadpool improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewmturner committed Dec 30, 2024
1 parent 7ed7053 commit 23043d9
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 9 deletions.
18 changes: 18 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,24 @@ env:
CARGO_TERM_COLOR: always

jobs:
Format:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
- name: Build
run: cargo fmt --check

Lint:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
- name: Build
run: cargo clippy --check

Test:

runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion crates/tf_idf/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl TfIdf {
};
let mut guard = remaining.lock().unwrap();
*guard -= 1;
})
})?
}

let mut finished = false;
Expand Down
52 changes: 44 additions & 8 deletions crates/tf_idf/src/threadpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ use std::{
thread,
};

use crate::error::{RFSeeError, RFSeeResult};

type Job = Box<dyn FnOnce() + Send + 'static>;

pub struct Worker {
_id: usize,
_thread: thread::JoinHandle<()>,
thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
Expand All @@ -26,14 +28,14 @@ impl Worker {
});
Self {
_id: id,
_thread: thread,
thread: Some(thread),
}
}
}

pub struct ThreadPool {
_workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}

impl ThreadPool {
Expand All @@ -46,16 +48,50 @@ impl ThreadPool {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
_workers: workers,
sender,
workers,
sender: Some(sender),
}
}

pub fn execute<F>(&self, f: F)
pub fn execute<F>(&self, f: F) -> RFSeeResult<()>
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap()
match self.sender.as_ref() {
Some(sender) => sender
.send(job)
.map_err(|e| RFSeeError::RuntimeError(e.to_string())),
None => Err(RFSeeError::RuntimeError("No sender available".to_string())),
}
}
}

impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());

for worker in &mut self.workers {
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}

#[cfg(test)]
mod tests {
use super::ThreadPool;

#[test]
fn test_single_thread_completes_work() {
let pool = ThreadPool::new(1);

let job = || {
let _ = 1 + 1;
};

pool.execute(job).unwrap();
drop(pool);
}
}

0 comments on commit 23043d9

Please sign in to comment.