blueloveTH 3 månader sedan
förälder
incheckning
450f0263a2
4 ändrade filer med 127 tillägg och 1 borttagningar
  1. 19 0
      build_g_threads.sh
  2. 4 0
      include/pocketpy/common/threads.h
  3. 53 1
      src/common/threads.c
  4. 51 0
      src2/test_threads.c

+ 19 - 0
build_g_threads.sh

@@ -0,0 +1,19 @@
+set -e
+
+# python prebuild.py
+
+SRC=$(find src/ -name "*.c")
+
+FLAGS="-std=c11 -lm -ldl -lpthread -Iinclude -O0 -Wfatal-errors -g -DDEBUG -DPK_ENABLE_OS=1"
+
+SANITIZE_FLAGS="-fsanitize=undefined,thread -fno-sanitize=function"
+
+if [ "$(uname)" == "Darwin" ]; then
+    SANITIZE_FLAGS="-fsanitize=undefined,thread"
+fi
+
+SRC2=${1:-src2/test_threads.c}
+
+echo "Compiling C files..."
+clang $FLAGS $SANITIZE_FLAGS $SRC $SRC2 -o main
+

+ 4 - 0
include/pocketpy/common/threads.h

@@ -54,6 +54,8 @@ typedef struct c11_thrdpool_tasks {
 } c11_thrdpool_tasks;
 
 typedef struct c11_thrdpool_worker {
+    int index;
+    atomic_int* p_ready_workers_num;
     c11_mutex_t* p_mutex;
     c11_cond_t* p_cond;
     c11_thrdpool_tasks* p_tasks;
@@ -62,6 +64,8 @@ typedef struct c11_thrdpool_worker {
 
 typedef struct c11_thrdpool {
     int length;
+    atomic_int ready_workers_num;
+
     c11_thrdpool_worker* workers;
 
     c11_mutex_t workers_mutex;

+ 53 - 1
src/common/threads.c

@@ -1,5 +1,6 @@
 #include "pocketpy/common/threads.h"
 #include "pocketpy/common/utils.h"
+#include <stdarg.h>
 
 #if PK_ENABLE_THREADS
 
@@ -71,15 +72,45 @@ void c11_cond__broadcast(c11_cond_t* cond) { cnd_broadcast(cond); }
 
 #endif
 
+#define C11_THRDPOOL_DEBUG 0
+
+#if C11_THRDPOOL_DEBUG
+int64_t time_ns();
+
+static void c11_thrdpool_debug_log(int index, const char* format, ...) {
+    va_list args;
+    va_start(args, format);
+    char buf[512];
+    int n = sprintf(buf, "[%.6f - Worker %2d] ", time_ns() / 1e9, index);
+    vsprintf(buf + n, format, args);
+    printf("%s\n", buf);
+    va_end(args);
+}
+#else
+#define c11_thrdpool_debug_log(...)                                                                \
+    do {                                                                                           \
+    } while(0)
+#endif
+
 static c11_thrd_retval_t _thrdpool_worker(void* arg) {
     c11_thrdpool_worker* p_worker = (c11_thrdpool_worker*)arg;
     c11_thrdpool_tasks* p_tasks = p_worker->p_tasks;
 
     while(true) {
+        c11_thrdpool_debug_log(p_worker->index, "Waiting for mutex lock...");
         c11_mutex__lock(p_worker->p_mutex);
+        atomic_fetch_add(p_worker->p_ready_workers_num, 1);
+        c11_thrdpool_debug_log(p_worker->index, "Mutex locked, checking for tasks...");
+        if(atomic_load(&p_tasks->sync_val) == -1) {
+            c11_mutex__unlock(p_worker->p_mutex);
+            return 0;  // force kill
+        }
         while(true) {
             c11_cond__wait(p_worker->p_cond, p_worker->p_mutex);
             int sync_val = atomic_load(&p_tasks->sync_val);
+            c11_thrdpool_debug_log(p_worker->index,
+                                   "Woke up from condition variable, sync_val=%d",
+                                   sync_val);
             if(sync_val == 1) break;
             if(sync_val == -1) {
                 c11_mutex__unlock(p_worker->p_mutex);
@@ -87,7 +118,9 @@ static c11_thrd_retval_t _thrdpool_worker(void* arg) {
             }
         }
         c11_mutex__unlock(p_worker->p_mutex);
+        atomic_fetch_sub(p_worker->p_ready_workers_num, 1);
 
+        c11_thrdpool_debug_log(p_worker->index, "Received tasks, starting execution...");
         // execute tasks
         while(true) {
             int arg_index = atomic_fetch_add(&p_tasks->current_index, 1);
@@ -100,15 +133,23 @@ static c11_thrd_retval_t _thrdpool_worker(void* arg) {
             }
         }
 
-        while(atomic_load(&p_tasks->sync_val) == 0) {
+        c11_thrdpool_debug_log(p_worker->index,
+                               "Execution complete, waiting for `sync_val` reset...");
+        while(true) {
+            int sync_val = atomic_load(&p_tasks->sync_val);
+            if(sync_val == 0) break;
+            if(sync_val == -1) return 0;  // force kill
             c11_thrd__yield();
         }
+        c11_thrdpool_debug_log(p_worker->index,
+                               "`sync_val` reset detected, waiting for next tasks...");
     }
     return 0;
 }
 
 void c11_thrdpool__ctor(c11_thrdpool* pool, int length) {
     pool->length = length;
+    atomic_store(&pool->ready_workers_num, 0);
     pool->workers = PK_MALLOC(sizeof(c11_thrdpool_worker) * length);
 
     c11_mutex__ctor(&pool->workers_mutex);
@@ -118,6 +159,8 @@ void c11_thrdpool__ctor(c11_thrdpool* pool, int length) {
 
     for(int i = 0; i < length; i++) {
         c11_thrdpool_worker* p_worker = &pool->workers[i];
+        p_worker->index = i;
+        p_worker->p_ready_workers_num = &pool->ready_workers_num;
         p_worker->p_mutex = &pool->workers_mutex;
         p_worker->p_cond = &pool->workers_cond;
         p_worker->p_tasks = &pool->tasks;
@@ -129,6 +172,7 @@ void c11_thrdpool__ctor(c11_thrdpool* pool, int length) {
 void c11_thrdpool__dtor(c11_thrdpool* pool) {
     c11_mutex__lock(&pool->workers_mutex);
     atomic_store(&pool->tasks.sync_val, -1);
+    c11_thrdpool_debug_log(-1, "Terminating all workers...");
     c11_cond__broadcast(&pool->workers_cond);
     c11_mutex__unlock(&pool->workers_mutex);
 
@@ -144,6 +188,12 @@ void c11_thrdpool__dtor(c11_thrdpool* pool) {
 
 void c11_thrdpool__map(c11_thrdpool* pool, c11_thrdpool_func_t func, void** args, int num_tasks) {
     if(num_tasks == 0) return;
+    c11_thrdpool_debug_log(-1, "c11_thrdpool__map() called on %d tasks...", num_tasks);
+    while(atomic_load(&pool->ready_workers_num) < pool->length) {
+        c11_thrd__yield();
+    }
+    c11_thrdpool_debug_log(-1, "All %d workers are ready.", pool->length);
+
     // assign tasks
     c11_mutex__lock(&pool->workers_mutex);
     pool->tasks.func = func;
@@ -155,10 +205,12 @@ void c11_thrdpool__map(c11_thrdpool* pool, c11_thrdpool_func_t func, void** args
     c11_cond__broadcast(&pool->workers_cond);
     c11_mutex__unlock(&pool->workers_mutex);
     // wait for complete
+    c11_thrdpool_debug_log(-1, "Waiting for %d tasks to complete...", num_tasks);
     while(atomic_load(&pool->tasks.completed_count) < num_tasks) {
         c11_thrd__yield();
     }
     atomic_store(&pool->tasks.sync_val, 0);
+    c11_thrdpool_debug_log(-1, "All %d tasks completed, `sync_val` was reset.", num_tasks);
 }
 
 #endif  // PK_ENABLE_THREADS

+ 51 - 0
src2/test_threads.c

@@ -0,0 +1,51 @@
+#include "pocketpy/common/threads.h"
+#include <stdio.h>
+
+int64_t time_ns();
+
+static void func(void* arg) {
+    long long* val = (long long*)arg;
+    long long sum = 0;
+    for(int i = 0; i < 1000000; i++) {
+        sum += *val;
+    }
+    *val = sum;
+}
+
+int main(int argc, char** argv) {
+    int threads_num = 16;
+    if(argc == 2) threads_num = atoi(argv[1]);
+    printf("Using %d threads in the thread pool.\n", threads_num);
+
+    c11_thrdpool pool;
+    c11_thrdpool__ctor(&pool, threads_num);
+
+    int num_tasks = 2000;
+    long long* data = PK_MALLOC(sizeof(long long) * num_tasks);
+    void** args = PK_MALLOC(sizeof(void*) * num_tasks);
+
+    for(int i = 0; i < 4; i++) {
+        for(int i = 0; i < num_tasks; i++) {
+            data[i] = i;
+            args[i] = &data[i];
+        }
+
+        printf("==> %dth run\n", i + 1);
+        int64_t start_ns = time_ns();
+        c11_thrdpool__map(&pool, func, args, num_tasks);
+        int64_t end_ns = time_ns();
+        double elapsed = (end_ns - start_ns) / 1e9;
+        printf("  Results: %lld, %lld, %lld, %lld, %lld\n",
+               data[0],
+               data[1],
+               data[2],
+               data[100],
+               data[400]);
+        printf("  Elapsed time for %d tasks: %.6f seconds\n", num_tasks, elapsed);
+    }
+
+    c11_thrdpool__dtor(&pool);
+    PK_FREE(args);
+    PK_FREE(data);
+    return 0;
+}