threads.c 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. #include "pocketpy/common/threads.h"
  2. #include "pocketpy/common/utils.h"
  3. #include <stdarg.h>
  4. #if PK_ENABLE_THREADS
  5. #if PK_USE_PTHREADS
  6. bool c11_thrd__create(c11_thrd_t* thrd, c11_thrd_func_t func, void* arg) {
  7. int res = pthread_create(thrd, NULL, func, arg);
  8. return res == 0;
  9. }
  10. void c11_thrd__yield() { sched_yield(); }
  11. void c11_thrd__join(c11_thrd_t thrd) { pthread_join(thrd, NULL); }
  12. c11_thrd_t c11_thrd__current() { return pthread_self(); }
  13. bool c11_thrd__equal(c11_thrd_t a, c11_thrd_t b) { return pthread_equal(a, b); }
  14. void c11_mutex__ctor(c11_mutex_t* mutex) { pthread_mutex_init(mutex, NULL); }
  15. void c11_mutex__dtor(c11_mutex_t* mutex) { pthread_mutex_destroy(mutex); }
  16. void c11_mutex__lock(c11_mutex_t* mutex) { pthread_mutex_lock(mutex); }
  17. void c11_mutex__unlock(c11_mutex_t* mutex) { pthread_mutex_unlock(mutex); }
  18. void c11_cond__ctor(c11_cond_t* cond) { pthread_cond_init(cond, NULL); }
  19. void c11_cond__dtor(c11_cond_t* cond) { pthread_cond_destroy(cond); }
  20. void c11_cond__wait(c11_cond_t* cond, c11_mutex_t* mutex) { pthread_cond_wait(cond, mutex); }
  21. void c11_cond__signal(c11_cond_t* cond) { pthread_cond_signal(cond); }
  22. void c11_cond__broadcast(c11_cond_t* cond) { pthread_cond_broadcast(cond); }
  23. #else
  24. bool c11_thrd__create(c11_thrd_t* thrd, c11_thrd_func_t func, void* arg) {
  25. int res = thrd_create(thrd, func, arg);
  26. return res == thrd_success;
  27. }
  28. void c11_thrd__yield() { thrd_yield(); }
  29. void c11_thrd__join(c11_thrd_t thrd) { thrd_join(thrd, NULL); }
  30. c11_thrd_t c11_thrd__current() { return thrd_current(); }
  31. bool c11_thrd__equal(c11_thrd_t a, c11_thrd_t b) { return thrd_equal(a, b); }
  32. void c11_mutex__ctor(c11_mutex_t* mutex) { mtx_init(mutex, mtx_plain); }
  33. void c11_mutex__dtor(c11_mutex_t* mutex) { mtx_destroy(mutex); }
  34. void c11_mutex__lock(c11_mutex_t* mutex) { mtx_lock(mutex); }
  35. void c11_mutex__unlock(c11_mutex_t* mutex) { mtx_unlock(mutex); }
  36. void c11_cond__ctor(c11_cond_t* cond) { cnd_init(cond); }
  37. void c11_cond__dtor(c11_cond_t* cond) { cnd_destroy(cond); }
  38. void c11_cond__wait(c11_cond_t* cond, c11_mutex_t* mutex) { cnd_wait(cond, mutex); }
  39. void c11_cond__signal(c11_cond_t* cond) { cnd_signal(cond); }
  40. void c11_cond__broadcast(c11_cond_t* cond) { cnd_broadcast(cond); }
  41. #endif
  42. #define C11_THRDPOOL_DEBUG 0
  43. #if C11_THRDPOOL_DEBUG
  44. int64_t time_ns();
  45. static void c11_thrdpool_debug_log(int index, const char* format, ...) {
  46. va_list args;
  47. va_start(args, format);
  48. char buf[512];
  49. int n = sprintf(buf, "[%.6f - Worker %2d] ", time_ns() / 1e9, index);
  50. vsprintf(buf + n, format, args);
  51. printf("%s\n", buf);
  52. va_end(args);
  53. }
  54. #else
  55. #define c11_thrdpool_debug_log(...) \
  56. do { \
  57. } while(0)
  58. #endif
  59. static c11_thrd_retval_t _thrdpool_worker(void* arg) {
  60. c11_thrdpool_worker* p_worker = (c11_thrdpool_worker*)arg;
  61. c11_thrdpool_tasks* p_tasks = p_worker->p_tasks;
  62. while(true) {
  63. c11_thrdpool_debug_log(p_worker->index, "Waiting for mutex lock...");
  64. c11_mutex__lock(p_worker->p_mutex);
  65. atomic_fetch_add_explicit(p_worker->p_ready_workers_num, 1, memory_order_relaxed);
  66. c11_thrdpool_debug_log(p_worker->index, "Mutex locked, checking for tasks...");
  67. if(atomic_load_explicit(&p_tasks->sync_val, memory_order_relaxed) == -1) {
  68. c11_mutex__unlock(p_worker->p_mutex);
  69. return 0; // force kill
  70. }
  71. while(true) {
  72. c11_cond__wait(p_worker->p_cond, p_worker->p_mutex);
  73. int sync_val = atomic_load_explicit(&p_tasks->sync_val, memory_order_relaxed);
  74. c11_thrdpool_debug_log(p_worker->index,
  75. "Woke up from condition variable, sync_val=%d",
  76. sync_val);
  77. if(sync_val == 1) break;
  78. if(sync_val == -1) {
  79. c11_mutex__unlock(p_worker->p_mutex);
  80. return 0; // force kill
  81. }
  82. }
  83. atomic_fetch_sub_explicit(p_worker->p_ready_workers_num, 1, memory_order_relaxed);
  84. c11_mutex__unlock(p_worker->p_mutex);
  85. c11_thrdpool_debug_log(p_worker->index, "Received tasks, starting execution...");
  86. // execute tasks
  87. int completed_count = 0;
  88. while(true) {
  89. int arg_index =
  90. atomic_fetch_add_explicit(&p_tasks->current_index, 1, memory_order_relaxed);
  91. if(arg_index < p_tasks->length) {
  92. void* arg = p_tasks->args[arg_index];
  93. p_tasks->func(arg);
  94. completed_count++;
  95. } else {
  96. break;
  97. }
  98. }
  99. // sync point
  100. atomic_fetch_add_explicit(&p_tasks->completed_count, completed_count, memory_order_release);
  101. c11_thrdpool_debug_log(p_worker->index,
  102. "Execution complete, waiting for `sync_val` reset...");
  103. while(true) {
  104. int sync_val = atomic_load_explicit(&p_tasks->sync_val, memory_order_relaxed);
  105. if(sync_val == 0) break;
  106. if(sync_val == -1) return 0; // force kill
  107. c11_thrd__yield();
  108. }
  109. c11_thrdpool_debug_log(p_worker->index,
  110. "`sync_val` reset detected, waiting for next tasks...");
  111. }
  112. return 0;
  113. }
  114. void c11_thrdpool__ctor(c11_thrdpool* pool, int length) {
  115. pool->length = length;
  116. atomic_store_explicit(&pool->ready_workers_num, 0, memory_order_relaxed);
  117. pool->workers = PK_MALLOC(sizeof(c11_thrdpool_worker) * length);
  118. c11_mutex__ctor(&pool->workers_mutex);
  119. c11_cond__ctor(&pool->workers_cond);
  120. atomic_store_explicit(&pool->tasks.sync_val, 0, memory_order_relaxed);
  121. for(int i = 0; i < length; i++) {
  122. c11_thrdpool_worker* p_worker = &pool->workers[i];
  123. p_worker->index = i;
  124. p_worker->p_ready_workers_num = &pool->ready_workers_num;
  125. p_worker->p_mutex = &pool->workers_mutex;
  126. p_worker->p_cond = &pool->workers_cond;
  127. p_worker->p_tasks = &pool->tasks;
  128. bool ok = c11_thrd__create(&p_worker->thread, _thrdpool_worker, p_worker);
  129. c11__rtassert(ok);
  130. }
  131. }
  132. void c11_thrdpool__dtor(c11_thrdpool* pool) {
  133. c11_mutex__lock(&pool->workers_mutex);
  134. atomic_store_explicit(&pool->tasks.sync_val, -1, memory_order_relaxed);
  135. c11_thrdpool_debug_log(-1, "Terminating all workers...");
  136. c11_cond__broadcast(&pool->workers_cond);
  137. c11_mutex__unlock(&pool->workers_mutex);
  138. for(int i = 0; i < pool->length; i++) {
  139. c11_thrdpool_worker* p_worker = &pool->workers[i];
  140. c11_thrd__join(p_worker->thread);
  141. }
  142. c11_mutex__dtor(&pool->workers_mutex);
  143. c11_cond__dtor(&pool->workers_cond);
  144. PK_FREE(pool->workers);
  145. }
  146. void c11_thrdpool__map(c11_thrdpool* pool, c11_thrdpool_func_t func, void** args, int num_tasks) {
  147. if(num_tasks == 0) return;
  148. c11_thrdpool_debug_log(-1, "c11_thrdpool__map() called on %d tasks...", num_tasks);
  149. while(atomic_load_explicit(&pool->ready_workers_num, memory_order_relaxed) < pool->length) {
  150. c11_thrd__yield();
  151. }
  152. c11_thrdpool_debug_log(-1, "All %d workers are ready.", pool->length);
  153. // assign tasks
  154. c11_mutex__lock(&pool->workers_mutex);
  155. pool->tasks.func = func;
  156. pool->tasks.args = args;
  157. pool->tasks.length = num_tasks;
  158. atomic_store_explicit(&pool->tasks.sync_val, 1, memory_order_relaxed);
  159. atomic_store_explicit(&pool->tasks.current_index, 0, memory_order_relaxed);
  160. atomic_store_explicit(&pool->tasks.completed_count, 0, memory_order_relaxed);
  161. c11_cond__broadcast(&pool->workers_cond);
  162. c11_mutex__unlock(&pool->workers_mutex);
  163. // wait for complete
  164. c11_thrdpool_debug_log(-1, "Waiting for %d tasks to complete...", num_tasks);
  165. while(atomic_load_explicit(&pool->tasks.completed_count, memory_order_acquire) < num_tasks) {
  166. c11_thrd__yield();
  167. }
  168. atomic_store_explicit(&pool->tasks.sync_val, 0, memory_order_relaxed);
  169. c11_thrdpool_debug_log(-1, "All %d tasks completed, `sync_val` was reset.", num_tasks);
  170. }
  171. #endif // PK_ENABLE_THREADS