blueloveTH 3 ماه پیش
والد
کامیت
18d1585440
2فایلهای تغییر یافته به همراه22 افزوده شده و 14 حذف شده
  1. 19 11
      src/common/threads.c
  2. 3 3
      src2/test_threads.c

+ 19 - 11
src/common/threads.c

@@ -99,15 +99,16 @@ static c11_thrd_retval_t _thrdpool_worker(void* arg) {
     while(true) {
         c11_thrdpool_debug_log(p_worker->index, "Waiting for mutex lock...");
         c11_mutex__lock(p_worker->p_mutex);
-        atomic_fetch_add(p_worker->p_ready_workers_num, 1);
+        atomic_fetch_add_explicit(p_worker->p_ready_workers_num, 1, memory_order_relaxed);
         c11_thrdpool_debug_log(p_worker->index, "Mutex locked, checking for tasks...");
-        if(atomic_load(&p_tasks->sync_val) == -1) {
+
+        if(atomic_load_explicit(&p_tasks->sync_val, memory_order_relaxed) == -1) {
             c11_mutex__unlock(p_worker->p_mutex);
             return 0;  // force kill
         }
         while(true) {
             c11_cond__wait(p_worker->p_cond, p_worker->p_mutex);
-            int sync_val = atomic_load(&p_tasks->sync_val);
+            int sync_val = atomic_load_explicit(&p_tasks->sync_val, memory_order_relaxed);
             c11_thrdpool_debug_log(p_worker->index,
                                    "Woke up from condition variable, sync_val=%d",
                                    sync_val);
@@ -117,21 +118,25 @@ static c11_thrd_retval_t _thrdpool_worker(void* arg) {
                 return 0;  // force kill
             }
         }
+
+        atomic_fetch_sub_explicit(p_worker->p_ready_workers_num, 1, memory_order_relaxed);
         c11_mutex__unlock(p_worker->p_mutex);
-        atomic_fetch_sub(p_worker->p_ready_workers_num, 1);
 
         c11_thrdpool_debug_log(p_worker->index, "Received tasks, starting execution...");
         // execute tasks
+        int completed_count = 0;
         while(true) {
-            int arg_index = atomic_fetch_add(&p_tasks->current_index, 1);
+            int arg_index =
+                atomic_fetch_add_explicit(&p_tasks->current_index, 1, memory_order_relaxed);
             if(arg_index < p_tasks->length) {
                 void* arg = p_tasks->args[arg_index];
                 p_tasks->func(arg);
-                atomic_fetch_add(&p_tasks->completed_count, 1);
+                completed_count++;
             } else {
                 break;
             }
         }
+        atomic_fetch_add_explicit(&p_tasks->completed_count, completed_count, memory_order_relaxed);
 
         c11_thrdpool_debug_log(p_worker->index,
                                "Execution complete, waiting for `sync_val` reset...");
@@ -189,7 +194,7 @@ void c11_thrdpool__dtor(c11_thrdpool* pool) {
 void c11_thrdpool__map(c11_thrdpool* pool, c11_thrdpool_func_t func, void** args, int num_tasks) {
     if(num_tasks == 0) return;
     c11_thrdpool_debug_log(-1, "c11_thrdpool__map() called on %d tasks...", num_tasks);
-    while(atomic_load(&pool->ready_workers_num) < pool->length) {
+    while(atomic_load_explicit(&pool->ready_workers_num, memory_order_relaxed) < pool->length) {
         c11_thrd__yield();
     }
     c11_thrdpool_debug_log(-1, "All %d workers are ready.", pool->length);
@@ -199,16 +204,19 @@ void c11_thrdpool__map(c11_thrdpool* pool, c11_thrdpool_func_t func, void** args
     pool->tasks.func = func;
     pool->tasks.args = args;
     pool->tasks.length = num_tasks;
-    atomic_store(&pool->tasks.sync_val, 1);
-    atomic_store(&pool->tasks.current_index, 0);
-    atomic_store(&pool->tasks.completed_count, 0);
+    atomic_store_explicit(&pool->tasks.sync_val, 1, memory_order_relaxed);
+    atomic_store_explicit(&pool->tasks.current_index, 0, memory_order_relaxed);
+    atomic_store_explicit(&pool->tasks.completed_count, 0, memory_order_relaxed);
     c11_cond__broadcast(&pool->workers_cond);
     c11_mutex__unlock(&pool->workers_mutex);
+
     // wait for complete
     c11_thrdpool_debug_log(-1, "Waiting for %d tasks to complete...", num_tasks);
-    while(atomic_load(&pool->tasks.completed_count) < num_tasks) {
+    while(atomic_load_explicit(&pool->tasks.completed_count, memory_order_relaxed) < num_tasks) {
         c11_thrd__yield();
     }
+
+    // the last sync point
     atomic_store(&pool->tasks.sync_val, 0);
     c11_thrdpool_debug_log(-1, "All %d tasks completed, `sync_val` was reset.", num_tasks);
 }

+ 3 - 3
src2/test_threads.c

@@ -6,7 +6,7 @@ int64_t time_ns();
 static void func(void* arg) {
     long long* val = (long long*)arg;
     long long sum = 0;
-    for(int i = 0; i < 1000000; i++) {
+    for(int i = 0; i < 100000; i++) {
         sum += *val;
     }
     *val = sum;
@@ -20,11 +20,11 @@ int main(int argc, char** argv) {
     c11_thrdpool pool;
     c11_thrdpool__ctor(&pool, threads_num);
 
-    int num_tasks = 2000;
+    int num_tasks = 10000;
     long long* data = PK_MALLOC(sizeof(long long) * num_tasks);
     void** args = PK_MALLOC(sizeof(void*) * num_tasks);
 
-    for(int i = 0; i < 4; i++) {
+    for(int i = 0; i < 3; i++) {
         for(int i = 0; i < num_tasks; i++) {
             data[i] = i;
             args[i] = &data[i];