ソースを参照

add threads pool impl

blueloveTH 3 ヶ月 前
コミット
10c81469c0
4 ファイル変更178 行追加11 行削除
  1. 42 2
      include/pocketpy/common/threads.h
  2. 1 1
      src/common/name.c
  3. 131 4
      src/common/threads.c
  4. 4 4
      src/modules/pkpy.c

+ 42 - 2
include/pocketpy/common/threads.h

@@ -12,14 +12,54 @@
 #define PK_USE_PTHREADS 1
 typedef pthread_t c11_thrd_t;
 typedef void* c11_thrd_retval_t;
+typedef pthread_mutex_t c11_mutex_t;
+typedef pthread_cond_t c11_cond_t;
 #else
 #include <threads.h>
 #define PK_USE_PTHREADS 0
 typedef thrd_t c11_thrd_t;
 typedef int c11_thrd_retval_t;
+typedef mtx_t c11_mutex_t;
+typedef cnd_t c11_cond_t;
 #endif
 
-bool c11_thrd_create(c11_thrd_t* thrd, c11_thrd_retval_t (*func)(void*), void* arg);
-void c11_thrd_yield();
+typedef c11_thrd_retval_t (*c11_thrd_func_t)(void*);
+
+bool c11_thrd__create(c11_thrd_t* thrd, c11_thrd_func_t func, void* arg);
+void c11_thrd__yield();
+void c11_thrd__join(c11_thrd_t thrd);
+c11_thrd_t c11_thrd__current();
+bool c11_thrd__equal(c11_thrd_t a, c11_thrd_t b);
+
+void c11_mutex__ctor(c11_mutex_t* mutex);
+void c11_mutex__dtor(c11_mutex_t* mutex);
+void c11_mutex__lock(c11_mutex_t* mutex);
+void c11_mutex__unlock(c11_mutex_t* mutex);
+
+void c11_cond__ctor(c11_cond_t* cond);
+void c11_cond__dtor(c11_cond_t* cond);
+void c11_cond__wait(c11_cond_t* cond, c11_mutex_t* mutex);
+void c11_cond__signal(c11_cond_t* cond);
+
+typedef struct c11_thrdpool_worker {
+    c11_thrd_t thread;
+    c11_mutex_t mutex;
+    c11_cond_t cond;
+
+    c11_thrd_func_t func;
+    void* arg;
+
+    bool should_exit;
+} c11_thrdpool_worker;
+
+typedef struct c11_thrdpool {
+    int length;
+    c11_thrdpool_worker* workers;
+    c11_thrd_t main_thread;
+} c11_thrdpool;
+
+void c11_thrdpool__ctor(c11_thrdpool* pool, int length);
+void c11_thrdpool__dtor(c11_thrdpool* pool);
+bool c11_thrdpool__create(c11_thrdpool* pool, c11_thrd_func_t func, void* arg);
 
 #endif

+ 1 - 1
src/common/name.c

@@ -47,7 +47,7 @@ void pk_names_finalize() {
 py_Name py_namev(c11_sv name) {
 #if PK_ENABLE_THREADS
     while(atomic_flag_test_and_set(&pk_string_table.lock)) {
-        c11_thrd_yield();
+        c11_thrd__yield();
     }
 #endif
     uint64_t hash = c11_sv__hash(name);

+ 131 - 4
src/common/threads.c

@@ -1,25 +1,152 @@
 #include "pocketpy/common/threads.h"
+#include "pocketpy/common/utils.h"
 
 #if PK_ENABLE_THREADS
 
 #if PK_USE_PTHREADS
 
-bool c11_thrd_create(c11_thrd_t* thrd, c11_thrd_retval_t (*func)(void*), void* arg) {
+bool c11_thrd__create(c11_thrd_t* thrd, c11_thrd_func_t func, void* arg) {
     int res = pthread_create(thrd, NULL, func, arg);
     return res == 0;
 }
 
-void c11_thrd_yield() { sched_yield(); }
+void c11_thrd__yield() { sched_yield(); }
+
+void c11_thrd__join(c11_thrd_t thrd) { pthread_join(thrd, NULL); }
+
+c11_thrd_t c11_thrd__current() { return pthread_self(); }
+
+bool c11_thrd__equal(c11_thrd_t a, c11_thrd_t b) { return pthread_equal(a, b); }
+
+void c11_mutex__ctor(c11_mutex_t* mutex) { pthread_mutex_init(mutex, NULL); }
+
+void c11_mutex__dtor(c11_mutex_t* mutex) { pthread_mutex_destroy(mutex); }
+
+void c11_mutex__lock(c11_mutex_t* mutex) { pthread_mutex_lock(mutex); }
+
+void c11_mutex__unlock(c11_mutex_t* mutex) { pthread_mutex_unlock(mutex); }
+
+void c11_cond__ctor(c11_cond_t* cond) { pthread_cond_init(cond, NULL); }
+
+void c11_cond__dtor(c11_cond_t* cond) { pthread_cond_destroy(cond); }
+
+void c11_cond__wait(c11_cond_t* cond, c11_mutex_t* mutex) { pthread_cond_wait(cond, mutex); }
+
+void c11_cond__signal(c11_cond_t* cond) { pthread_cond_signal(cond); }
 
 #else
 
-bool c11_thrd_create(c11_thrd_t* thrd, c11_thrd_retval_t (*func)(void*), void* arg) {
+bool c11_thrd__create(c11_thrd_t* thrd, c11_thrd_func_t func, void* arg) {
     int res = thrd_create(thrd, func, arg);
     return res == thrd_success;
 }
 
-void c11_thrd_yield() { thrd_yield(); }
+void c11_thrd__yield() { thrd_yield(); }
+
+void c11_thrd__join(c11_thrd_t thrd) { thrd_join(thrd, NULL); }
+
+c11_thrd_t c11_thrd__current() { return thrd_current(); }
+
+bool c11_thrd__equal(c11_thrd_t a, c11_thrd_t b) { return thrd_equal(a, b); }
+
+void c11_mutex__ctor(c11_mutex_t* mutex) { mtx_init(mutex, mtx_plain); }
+
+void c11_mutex__dtor(c11_mutex_t* mutex) { mtx_destroy(mutex); }
+
+void c11_mutex__lock(c11_mutex_t* mutex) { mtx_lock(mutex); }
+
+void c11_mutex__unlock(c11_mutex_t* mutex) { mtx_unlock(mutex); }
+
+void c11_cond__ctor(c11_cond_t* cond) { cnd_init(cond); }
+
+void c11_cond__dtor(c11_cond_t* cond) { cnd_destroy(cond); }
+
+void c11_cond__wait(c11_cond_t* cond, c11_mutex_t* mutex) { cnd_wait(cond, mutex); }
+
+void c11_cond__signal(c11_cond_t* cond) { cnd_signal(cond); }
 
 #endif
 
+static c11_thrd_retval_t _thrdpool_worker(void* arg) {
+    c11_thrdpool_worker* p_worker = (c11_thrdpool_worker*)arg;
+    while(true) {
+        c11_mutex__lock(&p_worker->mutex);
+        while(p_worker->func == NULL && !p_worker->should_exit) {
+            c11_cond__wait(&p_worker->cond, &p_worker->mutex);
+        }
+
+        if(p_worker->should_exit) {
+            c11_mutex__unlock(&p_worker->mutex);
+            break;
+        }
+
+        c11_thrd_func_t func = p_worker->func;
+        void* arg = p_worker->arg;
+        p_worker->func = NULL;
+        p_worker->arg = NULL;
+        c11_mutex__unlock(&p_worker->mutex);
+
+        func(arg);
+    }
+    return 0;
+}
+
+void c11_thrdpool__ctor(c11_thrdpool* pool, int length) {
+    pool->length = length;
+    pool->workers = PK_MALLOC(sizeof(c11_thrdpool_worker) * length);
+    pool->main_thread = c11_thrd__current();
+    for(int i = 0; i < length; i++) {
+        c11_thrdpool_worker* p_worker = &pool->workers[i];
+
+        c11_mutex__ctor(&p_worker->mutex);
+        c11_cond__ctor(&p_worker->cond);
+        p_worker->func = NULL;
+        p_worker->arg = NULL;
+        p_worker->should_exit = false;
+
+        bool ok = c11_thrd__create(&p_worker->thread, _thrdpool_worker, p_worker);
+        c11__rtassert(ok);
+    }
+}
+
+void c11_thrdpool__dtor(c11_thrdpool* pool) {
+    for(int i = 0; i < pool->length; i++) {
+        c11_thrdpool_worker* p_worker = &pool->workers[i];
+        c11_mutex__lock(&p_worker->mutex);
+        p_worker->should_exit = true;
+        c11_cond__signal(&p_worker->cond);
+        c11_mutex__unlock(&p_worker->mutex);
+    }
+
+    for(int i = 0; i < pool->length; i++) {
+        c11_thrdpool_worker* p_worker = &pool->workers[i];
+        c11_thrd__join(p_worker->thread);
+        c11_mutex__dtor(&p_worker->mutex);
+        c11_cond__dtor(&p_worker->cond);
+    }
+
+    PK_FREE(pool->workers);
+}
+
+bool c11_thrdpool__create(c11_thrdpool* pool, c11_thrd_func_t func, void* arg) {
+    // must be called from the main thread
+    c11_thrd_t curr_thread = c11_thrd__current();
+    c11__rtassert(c11_thrd__equal(curr_thread, pool->main_thread));
+
+    // find the 1st idle worker
+    for(int i = 0; i < pool->length; i++) {
+        c11_thrdpool_worker* p_worker = &pool->workers[i];
+        c11_mutex__lock(&p_worker->mutex);
+        if(p_worker->func == NULL) {
+            p_worker->func = func;
+            p_worker->arg = arg;
+            c11_cond__signal(&p_worker->cond);
+            c11_mutex__unlock(&p_worker->mutex);
+            return true;  // Task assigned
+        }
+        c11_mutex__unlock(&p_worker->mutex);
+    }
+    return false;  // no idle worker found
+}
+
 #endif  // PK_ENABLE_THREADS

+ 4 - 4
src/modules/pkpy.c

@@ -212,7 +212,7 @@ static bool ComputeThread_wait_for_done(int argc, py_Ref argv) {
     PY_CHECK_ARGC(1);
     c11_ComputeThread* self = py_touserdata(argv);
     while(!atomic_load(&self->is_done)) {
-        c11_thrd_yield();
+        c11_thrd__yield();
     }
     py_newnone(py_retval());
     return true;
@@ -308,7 +308,7 @@ static bool ComputeThread_submit_exec(int argc, py_Ref argv) {
     c11_ComputeThread__reset_job(self, job, ComputeThreadJobExec__dtor);
     /**************************/
     atomic_store(&self->is_done, false);
-    bool ok = c11_thrd_create(&self->thread, ComputeThreadJob_exec, job);
+    bool ok = c11_thrd__create(&self->thread, ComputeThreadJob_exec, job);
     if(!ok) {
         atomic_store(&self->is_done, true);
         return OSError("thrd_create() failed");
@@ -331,7 +331,7 @@ static bool ComputeThread_submit_eval(int argc, py_Ref argv) {
     c11_ComputeThread__reset_job(self, job, ComputeThreadJobExec__dtor);
     /**************************/
     atomic_store(&self->is_done, false);
-    bool ok = c11_thrd_create(&self->thread, ComputeThreadJob_exec, job);
+    bool ok = c11_thrd__create(&self->thread, ComputeThreadJob_exec, job);
     if(!ok) {
         atomic_store(&self->is_done, true);
         return OSError("thrd_create() failed");
@@ -368,7 +368,7 @@ static bool ComputeThread_submit_call(int argc, py_Ref argv) {
     c11_ComputeThread__reset_job(self, job, ComputeThreadJobCall__dtor);
     /**************************/
     atomic_store(&self->is_done, false);
-    bool ok = c11_thrd_create(&self->thread, ComputeThreadJob_call, job);
+    bool ok = c11_thrd__create(&self->thread, ComputeThreadJob_call, job);
     if(!ok) {
         atomic_store(&self->is_done, true);
         return OSError("thrd_create() failed");