blueloveTH 3 miesięcy temu
rodzic
commit
4fc18c921d
2 zmienionych plików z 30 dodań i 20 usunięć
  1. 6 4
      include/pocketpy/common/threads.h
  2. 24 16
      src/common/threads.c

+ 6 - 4
include/pocketpy/common/threads.h

@@ -53,20 +53,22 @@ typedef struct c11_thrdpool_tasks {
 } c11_thrdpool_tasks;
 
 typedef struct c11_thrdpool_worker {
-    c11_mutex_t mutex;
+    c11_mutex_t* p_mutex;
     c11_cond_t* p_cond;
-    c11_thrdpool_tasks* tasks;
-    bool should_exit;
+    c11_thrdpool_tasks* p_tasks;
 
+    bool should_exit;
     c11_thrd_t thread;
 } c11_thrdpool_worker;
 
 typedef struct c11_thrdpool {
     int length;
     c11_thrdpool_worker* workers;
-    c11_thrdpool_tasks tasks;
     atomic_bool is_busy;
+    
+    c11_mutex_t workers_mutex;
     c11_cond_t workers_cond;
+    c11_thrdpool_tasks tasks;
 } c11_thrdpool;
 
 void c11_thrdpool__ctor(c11_thrdpool* pool, int length);

+ 24 - 16
src/common/threads.c

@@ -73,26 +73,32 @@ void c11_cond__broadcast(c11_cond_t* cond) { cnd_broadcast(cond); }
 
 static c11_thrd_retval_t _thrdpool_worker(void* arg) {
     c11_thrdpool_worker* p_worker = (c11_thrdpool_worker*)arg;
-    c11_thrdpool_tasks* tasks = p_worker->tasks;
 
     while(true) {
         // wait for tasks
-        c11_mutex__lock(&p_worker->mutex);
-        c11_cond__wait(p_worker->p_cond, &p_worker->mutex);
+        c11_mutex__lock(p_worker->p_mutex);
+        while(!p_worker->p_tasks && !p_worker->should_exit) {
+            c11_cond__wait(p_worker->p_cond, p_worker->p_mutex);
+        }
         if(p_worker->should_exit) {
-            c11_mutex__unlock(&p_worker->mutex);
+            c11_mutex__unlock(p_worker->p_mutex);
             break;
         }
-        c11_mutex__unlock(&p_worker->mutex);
+
+        c11_thrdpool_tasks* p_tasks = p_worker->p_tasks;
+        c11_mutex__unlock(p_worker->p_mutex);
 
         // execute tasks
         while(true) {
-            int arg_index = atomic_fetch_add(&tasks->current_index, 1);
-            if(arg_index < tasks->length) {
-                void* arg = tasks->args[arg_index];
-                tasks->func(arg);
-                atomic_fetch_add(&tasks->completed_count, 1);
+            int arg_index = atomic_fetch_add(&p_tasks->current_index, 1);
+            if(arg_index < p_tasks->length) {
+                void* arg = p_tasks->args[arg_index];
+                p_tasks->func(arg);
+                atomic_fetch_add(&p_tasks->completed_count, 1);
             } else {
+                c11_mutex__lock(p_worker->p_mutex);
+                p_worker->p_tasks = NULL;
+                c11_mutex__unlock(p_worker->p_mutex);
                 break;
             }
         }
@@ -104,14 +110,16 @@ void c11_thrdpool__ctor(c11_thrdpool* pool, int length) {
     pool->length = length;
     pool->workers = PK_MALLOC(sizeof(c11_thrdpool_worker) * length);
     atomic_store(&pool->is_busy, false);
+
+    c11_mutex__ctor(&pool->workers_mutex);
     c11_cond__ctor(&pool->workers_cond);
 
     for(int i = 0; i < length; i++) {
         c11_thrdpool_worker* p_worker = &pool->workers[i];
 
-        c11_mutex__ctor(&p_worker->mutex);
+        p_worker->p_mutex = &pool->workers_mutex;
         p_worker->p_cond = &pool->workers_cond;
-        p_worker->tasks = &pool->tasks;
+        p_worker->p_tasks = &pool->tasks;
         p_worker->should_exit = false;
 
         bool ok = c11_thrd__create(&p_worker->thread, _thrdpool_worker, p_worker);
@@ -122,19 +130,17 @@ void c11_thrdpool__ctor(c11_thrdpool* pool, int length) {
 void c11_thrdpool__dtor(c11_thrdpool* pool) {
     for(int i = 0; i < pool->length; i++) {
         c11_thrdpool_worker* p_worker = &pool->workers[i];
-        c11_mutex__lock(&p_worker->mutex);
-        p_worker->should_exit = true;
-        c11_mutex__unlock(&p_worker->mutex);
+        atomic_store(&p_worker->should_exit, true);
     }
     c11_cond__broadcast(&pool->workers_cond);
 
     for(int i = 0; i < pool->length; i++) {
         c11_thrdpool_worker* p_worker = &pool->workers[i];
         c11_thrd__join(p_worker->thread);
-        c11_mutex__dtor(&p_worker->mutex);
     }
 
     PK_FREE(pool->workers);
+    c11_mutex__dtor(&pool->workers_mutex);
     c11_cond__dtor(&pool->workers_cond);
 }
 
@@ -146,6 +152,7 @@ void c11_thrdpool__map(c11_thrdpool* pool, c11_thrdpool_func_t func, void** args
         c11_thrd__yield();
     }
     // assign tasks
+    c11_mutex__lock(&pool->workers_mutex);
     pool->tasks.func = func;
     pool->tasks.args = args;
     pool->tasks.length = num_tasks;
@@ -153,6 +160,7 @@ void c11_thrdpool__map(c11_thrdpool* pool, c11_thrdpool_func_t func, void** args
     atomic_store(&pool->tasks.completed_count, 0);
     // wake up all workers
     c11_cond__broadcast(&pool->workers_cond);
+    c11_mutex__unlock(&pool->workers_mutex);
     // wait for complete
     while(atomic_load(&pool->tasks.completed_count) < num_tasks) {
         c11_thrd__yield();