Skip to content

Commit

Permalink
Merge pull request #4541 from Waqar144/work/thread-pool-use-queue
Browse files Browse the repository at this point in the history
Use a proper Queue in thread.Pool
  • Loading branch information
gingerBill authored Dec 2, 2024
2 parents 7d5ac2a + 8a27042 commit af8122e
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions core/thread/thread_pool.odin
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package thread
import "base:intrinsics"
import "core:sync"
import "core:mem"
import "core:container/queue"

Task_Proc :: #type proc(task: Task)

Expand Down Expand Up @@ -40,7 +41,7 @@ Pool :: struct {
threads: []^Thread,


tasks: [dynamic]Task,
tasks: queue.Queue(Task),
tasks_done: [dynamic]Task,
}

Expand Down Expand Up @@ -69,13 +70,13 @@ pool_thread_runner :: proc(t: ^Thread) {
}

// Once initialized, the pool's memory address is not allowed to change until
// it is destroyed.
// it is destroyed.
//
// The thread pool requires an allocator which it either owns, or which is thread safe.
pool_init :: proc(pool: ^Pool, allocator: mem.Allocator, thread_count: int) {
context.allocator = allocator
pool.allocator = allocator
pool.tasks = make([dynamic]Task)
queue.init(&pool.tasks)
pool.tasks_done = make([dynamic]Task)
pool.threads = make([]^Thread, max(thread_count, 1))

Expand All @@ -92,7 +93,7 @@ pool_init :: proc(pool: ^Pool, allocator: mem.Allocator, thread_count: int) {
}

pool_destroy :: proc(pool: ^Pool) {
delete(pool.tasks)
queue.destroy(&pool.tasks)
delete(pool.tasks_done)

for &t in pool.threads {
Expand Down Expand Up @@ -140,11 +141,11 @@ pool_join :: proc(pool: ^Pool) {
// the thread pool. You can even add tasks from inside other tasks.
//
// Each task also needs an allocator which it either owns, or which is thread
// safe.
// safe.
pool_add_task :: proc(pool: ^Pool, allocator: mem.Allocator, procedure: Task_Proc, data: rawptr, user_index: int = 0) {
sync.guard(&pool.mutex)

append(&pool.tasks, Task{
queue.push_back(&pool.tasks, Task{
procedure = procedure,
data = data,
user_index = user_index,
Expand Down Expand Up @@ -288,10 +289,10 @@ pool_is_empty :: #force_inline proc(pool: ^Pool) -> bool {
pool_pop_waiting :: proc(pool: ^Pool) -> (task: Task, got_task: bool) {
sync.guard(&pool.mutex)

if len(pool.tasks) != 0 {
if queue.len(pool.tasks) != 0 {
intrinsics.atomic_sub(&pool.num_waiting, 1)
intrinsics.atomic_add(&pool.num_in_processing, 1)
task = pop_front(&pool.tasks)
task = queue.pop_front(&pool.tasks)
got_task = true
}

Expand Down

0 comments on commit af8122e

Please sign in to comment.