소스 검색

fix some bugs

blueloveTH 3 달 전
부모
커밋
276d2173f1
2개의 변경된 파일64개의 추가작업 그리고 42개의 파일을 삭제
  1. 19 8
      include/pocketpy/common/threads.h
  2. 45 34
      src/common/threads.c

+ 19 - 8
include/pocketpy/common/threads.h

@@ -40,26 +40,37 @@ void c11_cond__ctor(c11_cond_t* cond);
 void c11_cond__dtor(c11_cond_t* cond);
 void c11_cond__wait(c11_cond_t* cond, c11_mutex_t* mutex);
 void c11_cond__signal(c11_cond_t* cond);
+void c11_cond__broadcast(c11_cond_t* cond);
 
-typedef struct c11_thrdpool_worker {
-    c11_thrd_t thread;
-    c11_mutex_t mutex;
-    c11_cond_t cond;
+typedef void (*c11_thrdpool_func_t)(void* arg);
 
-    c11_thrd_func_t func;
-    void* arg;
+typedef struct c11_thrdpool_tasks {
+    c11_thrdpool_func_t func;
+    void** args;
+    int length;
+    atomic_int current_index;
+    atomic_int completed_count;
+} c11_thrdpool_tasks;
 
+typedef struct c11_thrdpool_worker {
+    c11_mutex_t mutex;
+    c11_cond_t* p_cond;
+    c11_thrdpool_tasks* tasks;
     bool should_exit;
+
+    c11_thrd_t thread;
 } c11_thrdpool_worker;
 
 typedef struct c11_thrdpool {
     int length;
     c11_thrdpool_worker* workers;
-    c11_thrd_t main_thread;
+    c11_thrdpool_tasks tasks;
+    atomic_bool is_busy;
+    c11_cond_t workers_cond;
 } c11_thrdpool;
 
 void c11_thrdpool__ctor(c11_thrdpool* pool, int length);
 void c11_thrdpool__dtor(c11_thrdpool* pool);
-bool c11_thrdpool__create(c11_thrdpool* pool, c11_thrd_func_t func, void* arg);
+void c11_thrdpool__map(c11_thrdpool* pool, c11_thrdpool_func_t func, void** args, int num_tasks);
 
 #endif

+ 45 - 34
src/common/threads.c

@@ -34,6 +34,8 @@ void c11_cond__wait(c11_cond_t* cond, c11_mutex_t* mutex) { pthread_cond_wait(co
 
 void c11_cond__signal(c11_cond_t* cond) { pthread_cond_signal(cond); }
 
+void c11_cond__broadcast(c11_cond_t* cond) { pthread_cond_broadcast(cond); }
+
 #else
 
 bool c11_thrd__create(c11_thrd_t* thrd, c11_thrd_func_t func, void* arg) {
@@ -65,28 +67,35 @@ void c11_cond__wait(c11_cond_t* cond, c11_mutex_t* mutex) { cnd_wait(cond, mutex
 
 void c11_cond__signal(c11_cond_t* cond) { cnd_signal(cond); }
 
+void c11_cond__broadcast(c11_cond_t* cond) { cnd_broadcast(cond); }
+
 #endif
 
 static c11_thrd_retval_t _thrdpool_worker(void* arg) {
     c11_thrdpool_worker* p_worker = (c11_thrdpool_worker*)arg;
+    c11_thrdpool_tasks* tasks = p_worker->tasks;
+
     while(true) {
+        // wait for tasks
         c11_mutex__lock(&p_worker->mutex);
-        while(p_worker->func == NULL && !p_worker->should_exit) {
-            c11_cond__wait(&p_worker->cond, &p_worker->mutex);
-        }
-
+        c11_cond__wait(p_worker->p_cond, &p_worker->mutex);
         if(p_worker->should_exit) {
             c11_mutex__unlock(&p_worker->mutex);
             break;
         }
-
-        c11_thrd_func_t func = p_worker->func;
-        void* arg = p_worker->arg;
-        p_worker->func = NULL;
-        p_worker->arg = NULL;
         c11_mutex__unlock(&p_worker->mutex);
 
-        func(arg);
+        // execute tasks
+        while(true) {
+            int arg_index = atomic_fetch_add(&tasks->current_index, 1);
+            if(arg_index < tasks->length) {
+                void* arg = tasks->args[arg_index];
+                tasks->func(arg);
+                atomic_fetch_add(&tasks->completed_count, 1);
+            } else {
+                break;
+            }
+        }
     }
     return 0;
 }
@@ -94,14 +103,15 @@ static c11_thrd_retval_t _thrdpool_worker(void* arg) {
 void c11_thrdpool__ctor(c11_thrdpool* pool, int length) {
     pool->length = length;
     pool->workers = PK_MALLOC(sizeof(c11_thrdpool_worker) * length);
-    pool->main_thread = c11_thrd__current();
+    atomic_store(&pool->is_busy, false);
+    c11_cond__ctor(&pool->workers_cond);
+
     for(int i = 0; i < length; i++) {
         c11_thrdpool_worker* p_worker = &pool->workers[i];
 
         c11_mutex__ctor(&p_worker->mutex);
-        c11_cond__ctor(&p_worker->cond);
-        p_worker->func = NULL;
-        p_worker->arg = NULL;
+        p_worker->p_cond = &pool->workers_cond;
+        p_worker->tasks = &pool->tasks;
         p_worker->should_exit = false;
 
         bool ok = c11_thrd__create(&p_worker->thread, _thrdpool_worker, p_worker);
@@ -114,39 +124,40 @@ void c11_thrdpool__dtor(c11_thrdpool* pool) {
         c11_thrdpool_worker* p_worker = &pool->workers[i];
         c11_mutex__lock(&p_worker->mutex);
         p_worker->should_exit = true;
-        c11_cond__signal(&p_worker->cond);
         c11_mutex__unlock(&p_worker->mutex);
     }
+    c11_cond__broadcast(&pool->workers_cond);
 
     for(int i = 0; i < pool->length; i++) {
         c11_thrdpool_worker* p_worker = &pool->workers[i];
         c11_thrd__join(p_worker->thread);
         c11_mutex__dtor(&p_worker->mutex);
-        c11_cond__dtor(&p_worker->cond);
     }
 
     PK_FREE(pool->workers);
+    c11_cond__dtor(&pool->workers_cond);
 }
 
-bool c11_thrdpool__create(c11_thrdpool* pool, c11_thrd_func_t func, void* arg) {
-    // must be called from the main thread
-    c11_thrd_t curr_thread = c11_thrd__current();
-    c11__rtassert(c11_thrd__equal(curr_thread, pool->main_thread));
-
-    // find the 1st idle worker
-    for(int i = 0; i < pool->length; i++) {
-        c11_thrdpool_worker* p_worker = &pool->workers[i];
-        c11_mutex__lock(&p_worker->mutex);
-        if(p_worker->func == NULL) {
-            p_worker->func = func;
-            p_worker->arg = arg;
-            c11_cond__signal(&p_worker->cond);
-            c11_mutex__unlock(&p_worker->mutex);
-            return true;  // Task assigned
-        }
-        c11_mutex__unlock(&p_worker->mutex);
+void c11_thrdpool__map(c11_thrdpool* pool, c11_thrdpool_func_t func, void** args, int num_tasks) {
+    if(num_tasks == 0) return;
+    bool expected = false;
+    while(!atomic_compare_exchange_weak(&pool->is_busy, &expected, true)) {
+        expected = false;
+        c11_thrd__yield();
+    }
+    // assign tasks
+    pool->tasks.func = func;
+    pool->tasks.args = args;
+    pool->tasks.length = num_tasks;
+    atomic_store(&pool->tasks.current_index, 0);
+    atomic_store(&pool->tasks.completed_count, 0);
+    // wake up all workers
+    c11_cond__broadcast(&pool->workers_cond);
+    // wait for complete
+    while(atomic_load(&pool->tasks.completed_count) < num_tasks) {
+        c11_thrd__yield();
     }
-    return false;  // no idle worker found
+    atomic_store(&pool->is_busy, false);
 }
 
 #endif  // PK_ENABLE_THREADS