SDL_asyncio_generic.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465
  1. /*
  2. Simple DirectMedia Layer
  3. Copyright (C) 1997-2025 Sam Lantinga <slouken@libsdl.org>
  4. This software is provided 'as-is', without any express or implied
  5. warranty. In no event will the authors be held liable for any damages
  6. arising from the use of this software.
  7. Permission is granted to anyone to use this software for any purpose,
  8. including commercial applications, and to alter it and redistribute it
  9. freely, subject to the following restrictions:
  10. 1. The origin of this software must not be misrepresented; you must not
  11. claim that you wrote the original software. If you use this software
  12. in a product, an acknowledgment in the product documentation would be
  13. appreciated but is not required.
  14. 2. Altered source versions must be plainly marked as such, and must not be
  15. misrepresented as being the original software.
  16. 3. This notice may not be removed or altered from any source distribution.
  17. */
  18. // The generic backend uses a threadpool to block on synchronous i/o.
  19. // This is not ideal, it's meant to be used if there isn't a platform-specific
  20. // backend that can do something more efficient!
  21. #include "SDL_internal.h"
  22. #include "../SDL_sysasyncio.h"
  23. // on Emscripten without threads, async i/o is synchronous. Sorry. Almost
  24. // everything is MEMFS, so it's just a memcpy anyhow, and the Emscripten
  25. // filesystem APIs don't offer async. In theory, directly accessing
  26. // persistent storage _does_ offer async APIs at the browser level, but
  27. // that's not exposed in Emscripten's filesystem abstraction.
  28. #if defined(SDL_PLATFORM_EMSCRIPTEN) && !defined(__EMSCRIPTEN_PTHREADS__)
  29. #define SDL_ASYNCIO_USE_THREADPOOL 0
  30. #else
  31. #define SDL_ASYNCIO_USE_THREADPOOL 1
  32. #endif
  33. typedef struct GenericAsyncIOQueueData
  34. {
  35. SDL_Mutex *lock;
  36. SDL_Condition *condition;
  37. SDL_AsyncIOTask completed_tasks;
  38. } GenericAsyncIOQueueData;
  39. typedef struct GenericAsyncIOData
  40. {
  41. SDL_Mutex *lock; // !!! FIXME: we can skip this lock if we have an equivalent of pread/pwrite
  42. SDL_IOStream *io;
  43. } GenericAsyncIOData;
  44. static void AsyncIOTaskComplete(SDL_AsyncIOTask *task)
  45. {
  46. SDL_assert(task->queue);
  47. GenericAsyncIOQueueData *data = (GenericAsyncIOQueueData *) task->queue->userdata;
  48. SDL_LockMutex(data->lock);
  49. LINKED_LIST_PREPEND(task, data->completed_tasks, queue);
  50. SDL_SignalCondition(data->condition); // wake a thread waiting on the queue.
  51. SDL_UnlockMutex(data->lock);
  52. }
  53. // synchronous i/o is offloaded onto the threadpool. This function does the threaded work.
  54. // This is called directly, without a threadpool, if !SDL_ASYNCIO_USE_THREADPOOL.
  55. static void SynchronousIO(SDL_AsyncIOTask *task)
  56. {
  57. SDL_assert(task->result != SDL_ASYNCIO_CANCELED); // shouldn't have gotten in here if canceled!
  58. GenericAsyncIOData *data = (GenericAsyncIOData *) task->asyncio->userdata;
  59. SDL_IOStream *io = data->io;
  60. const size_t size = (size_t) task->requested_size;
  61. void *ptr = task->buffer;
  62. // this seek won't work if two tasks are reading from the same file at the same time,
  63. // so we lock here. This makes multiple reads from a single file serialize, but different
  64. // files will still run in parallel. An app can also open the same file twice to avoid this.
  65. SDL_LockMutex(data->lock);
  66. if (task->type == SDL_ASYNCIO_TASK_CLOSE) {
  67. bool okay = true;
  68. if (task->flush) {
  69. okay = SDL_FlushIO(data->io);
  70. }
  71. okay = SDL_CloseIO(data->io) && okay;
  72. task->result = okay ? SDL_ASYNCIO_COMPLETE : SDL_ASYNCIO_FAILURE;
  73. } else if (SDL_SeekIO(io, (Sint64) task->offset, SDL_IO_SEEK_SET) < 0) {
  74. task->result = SDL_ASYNCIO_FAILURE;
  75. } else {
  76. const bool writing = (task->type == SDL_ASYNCIO_TASK_WRITE);
  77. task->result_size = (Uint64) (writing ? SDL_WriteIO(io, ptr, size) : SDL_ReadIO(io, ptr, size));
  78. if (task->result_size == task->requested_size) {
  79. task->result = SDL_ASYNCIO_COMPLETE;
  80. } else {
  81. if (writing) {
  82. task->result = SDL_ASYNCIO_FAILURE; // it's always a failure on short writes.
  83. } else {
  84. const SDL_IOStatus status = SDL_GetIOStatus(io);
  85. SDL_assert(status != SDL_IO_STATUS_READY); // this should have either failed or been EOF.
  86. SDL_assert(status != SDL_IO_STATUS_NOT_READY); // these should not be non-blocking reads!
  87. task->result = (status == SDL_IO_STATUS_EOF) ? SDL_ASYNCIO_COMPLETE : SDL_ASYNCIO_FAILURE;
  88. }
  89. }
  90. }
  91. SDL_UnlockMutex(data->lock);
  92. AsyncIOTaskComplete(task);
  93. }
  94. #if SDL_ASYNCIO_USE_THREADPOOL
  95. static SDL_InitState threadpool_init;
  96. static SDL_Mutex *threadpool_lock = NULL;
  97. static bool stop_threadpool = false;
  98. static SDL_AsyncIOTask threadpool_tasks;
  99. static SDL_Condition *threadpool_condition = NULL;
  100. static int max_threadpool_threads = 0;
  101. static int running_threadpool_threads = 0;
  102. static int idle_threadpool_threads = 0;
  103. static int threadpool_threads_spun = 0;
  104. static int SDLCALL AsyncIOThreadpoolWorker(void *data)
  105. {
  106. SDL_LockMutex(threadpool_lock);
  107. while (!stop_threadpool) {
  108. SDL_AsyncIOTask *task = LINKED_LIST_START(threadpool_tasks, threadpool);
  109. if (!task) {
  110. // if we go 30 seconds without a new task, terminate unless we're the only thread left.
  111. idle_threadpool_threads++;
  112. const bool rc = SDL_WaitConditionTimeout(threadpool_condition, threadpool_lock, 30000);
  113. idle_threadpool_threads--;
  114. if (!rc) {
  115. // decide if we have too many idle threads, and if so, quit to let thread pool shrink when not busy.
  116. if (idle_threadpool_threads) {
  117. break;
  118. }
  119. }
  120. continue;
  121. }
  122. LINKED_LIST_UNLINK(task, threadpool);
  123. SDL_UnlockMutex(threadpool_lock);
  124. // bookkeeping is done, so we drop the mutex and fire the work.
  125. SynchronousIO(task);
  126. SDL_LockMutex(threadpool_lock); // take the lock again and see if there's another task (if not, we'll wait on the Condition).
  127. }
  128. running_threadpool_threads--;
  129. // this is kind of a hack, but this lets us reuse threadpool_condition to block on shutdown until all threads have exited.
  130. if (stop_threadpool) {
  131. SDL_BroadcastCondition(threadpool_condition);
  132. }
  133. SDL_UnlockMutex(threadpool_lock);
  134. return 0;
  135. }
  136. static bool MaybeSpinNewWorkerThread(void)
  137. {
  138. // if all existing threads are busy and the pool of threads isn't maxed out, make a new one.
  139. if ((idle_threadpool_threads == 0) && (running_threadpool_threads < max_threadpool_threads)) {
  140. char threadname[32];
  141. SDL_snprintf(threadname, sizeof (threadname), "SDLasyncio%d", threadpool_threads_spun);
  142. SDL_Thread *thread = SDL_CreateThread(AsyncIOThreadpoolWorker, threadname, NULL);
  143. if (thread == NULL) {
  144. return false;
  145. }
  146. SDL_DetachThread(thread); // these terminate themselves when idle too long, so we never WaitThread.
  147. running_threadpool_threads++;
  148. threadpool_threads_spun++;
  149. }
  150. return true;
  151. }
  152. static void QueueAsyncIOTask(SDL_AsyncIOTask *task)
  153. {
  154. SDL_assert(task != NULL);
  155. SDL_LockMutex(threadpool_lock);
  156. if (stop_threadpool) { // just in case.
  157. task->result = SDL_ASYNCIO_CANCELED;
  158. AsyncIOTaskComplete(task);
  159. } else {
  160. LINKED_LIST_PREPEND(task, threadpool_tasks, threadpool);
  161. MaybeSpinNewWorkerThread(); // okay if this fails or the thread pool is maxed out. Something will get there eventually.
  162. // tell idle threads to get to work.
  163. // This is a broadcast because we want someone from the thread pool to wake up, but
  164. // also shutdown might also be blocking on this. One of the threads will grab
  165. // it, the others will go back to sleep.
  166. SDL_BroadcastCondition(threadpool_condition);
  167. }
  168. SDL_UnlockMutex(threadpool_lock);
  169. }
  170. // We don't initialize async i/o at all until it's used, so
  171. // JUST IN CASE two things try to start at the same time,
  172. // this will make sure everything gets the same mutex.
  173. static bool PrepareThreadpool(void)
  174. {
  175. bool okay = true;
  176. if (SDL_ShouldInit(&threadpool_init)) {
  177. max_threadpool_threads = (SDL_GetNumLogicalCPUCores() * 2) + 1; // !!! FIXME: this should probably have a hint to override.
  178. max_threadpool_threads = SDL_clamp(max_threadpool_threads, 1, 8); // 8 is probably more than enough.
  179. okay = (okay && ((threadpool_lock = SDL_CreateMutex()) != NULL));
  180. okay = (okay && ((threadpool_condition = SDL_CreateCondition()) != NULL));
  181. okay = (okay && MaybeSpinNewWorkerThread()); // make sure at least one thread is going, since we'll need it.
  182. if (!okay) {
  183. if (threadpool_condition) {
  184. SDL_DestroyCondition(threadpool_condition);
  185. threadpool_condition = NULL;
  186. }
  187. if (threadpool_lock) {
  188. SDL_DestroyMutex(threadpool_lock);
  189. threadpool_lock = NULL;
  190. }
  191. }
  192. SDL_SetInitialized(&threadpool_init, okay);
  193. }
  194. return okay;
  195. }
  196. static void ShutdownThreadpool(void)
  197. {
  198. if (SDL_ShouldQuit(&threadpool_init)) {
  199. SDL_LockMutex(threadpool_lock);
  200. // cancel anything that's still pending.
  201. SDL_AsyncIOTask *task;
  202. while ((task = LINKED_LIST_START(threadpool_tasks, threadpool)) != NULL) {
  203. LINKED_LIST_UNLINK(task, threadpool);
  204. task->result = SDL_ASYNCIO_CANCELED;
  205. AsyncIOTaskComplete(task);
  206. }
  207. stop_threadpool = true;
  208. SDL_BroadcastCondition(threadpool_condition); // tell the whole threadpool to wake up and quit.
  209. while (running_threadpool_threads > 0) {
  210. // each threadpool thread will broadcast this condition before it terminates if stop_threadpool is set.
  211. // we can't just join the threads because they are detached, so the thread pool can automatically shrink as necessary.
  212. SDL_WaitCondition(threadpool_condition, threadpool_lock);
  213. }
  214. SDL_UnlockMutex(threadpool_lock);
  215. SDL_DestroyMutex(threadpool_lock);
  216. threadpool_lock = NULL;
  217. SDL_DestroyCondition(threadpool_condition);
  218. threadpool_condition = NULL;
  219. max_threadpool_threads = running_threadpool_threads = idle_threadpool_threads = threadpool_threads_spun = 0;
  220. stop_threadpool = false;
  221. SDL_SetInitialized(&threadpool_init, false);
  222. }
  223. }
  224. #endif
  225. static Sint64 generic_asyncio_size(void *userdata)
  226. {
  227. GenericAsyncIOData *data = (GenericAsyncIOData *) userdata;
  228. return SDL_GetIOSize(data->io);
  229. }
  230. static bool generic_asyncio_io(void *userdata, SDL_AsyncIOTask *task)
  231. {
  232. return task->queue->iface.queue_task(task->queue->userdata, task);
  233. }
  234. static void generic_asyncio_destroy(void *userdata)
  235. {
  236. GenericAsyncIOData *data = (GenericAsyncIOData *) userdata;
  237. SDL_DestroyMutex(data->lock);
  238. SDL_free(data);
  239. }
  240. static bool generic_asyncioqueue_queue_task(void *userdata, SDL_AsyncIOTask *task)
  241. {
  242. #if SDL_ASYNCIO_USE_THREADPOOL
  243. QueueAsyncIOTask(task);
  244. #else
  245. SynchronousIO(task); // oh well. Get a better platform.
  246. #endif
  247. return true;
  248. }
  249. static void generic_asyncioqueue_cancel_task(void *userdata, SDL_AsyncIOTask *task)
  250. {
  251. #if !SDL_ASYNCIO_USE_THREADPOOL // in theory, this was all synchronous and should never call this, but just in case.
  252. task->result = SDL_ASYNCIO_CANCELED;
  253. AsyncIOTaskComplete(task);
  254. #else
  255. // we can't stop i/o that's in-flight, but we _can_ just refuse to start it if the threadpool hadn't picked it up yet.
  256. SDL_LockMutex(threadpool_lock);
  257. if (LINKED_LIST_PREV(task, threadpool) != NULL) { // still in the queue waiting to be run? Take it out.
  258. LINKED_LIST_UNLINK(task, threadpool);
  259. task->result = SDL_ASYNCIO_CANCELED;
  260. AsyncIOTaskComplete(task);
  261. }
  262. SDL_UnlockMutex(threadpool_lock);
  263. #endif
  264. }
  265. static SDL_AsyncIOTask *generic_asyncioqueue_get_results(void *userdata)
  266. {
  267. GenericAsyncIOQueueData *data = (GenericAsyncIOQueueData *) userdata;
  268. SDL_LockMutex(data->lock);
  269. SDL_AsyncIOTask *task = LINKED_LIST_START(data->completed_tasks, queue);
  270. if (task) {
  271. LINKED_LIST_UNLINK(task, queue);
  272. }
  273. SDL_UnlockMutex(data->lock);
  274. return task;
  275. }
  276. static SDL_AsyncIOTask *generic_asyncioqueue_wait_results(void *userdata, Sint32 timeoutMS)
  277. {
  278. GenericAsyncIOQueueData *data = (GenericAsyncIOQueueData *) userdata;
  279. SDL_LockMutex(data->lock);
  280. SDL_AsyncIOTask *task = LINKED_LIST_START(data->completed_tasks, queue);
  281. if (!task) {
  282. SDL_WaitConditionTimeout(data->condition, data->lock, timeoutMS);
  283. task = LINKED_LIST_START(data->completed_tasks, queue);
  284. }
  285. if (task) {
  286. LINKED_LIST_UNLINK(task, queue);
  287. }
  288. SDL_UnlockMutex(data->lock);
  289. return task;
  290. }
  291. static void generic_asyncioqueue_signal(void *userdata)
  292. {
  293. GenericAsyncIOQueueData *data = (GenericAsyncIOQueueData *) userdata;
  294. SDL_LockMutex(data->lock);
  295. SDL_BroadcastCondition(data->condition);
  296. SDL_UnlockMutex(data->lock);
  297. }
  298. static void generic_asyncioqueue_destroy(void *userdata)
  299. {
  300. GenericAsyncIOQueueData *data = (GenericAsyncIOQueueData *) userdata;
  301. SDL_DestroyMutex(data->lock);
  302. SDL_DestroyCondition(data->condition);
  303. SDL_free(data);
  304. }
  305. bool SDL_SYS_CreateAsyncIOQueue_Generic(SDL_AsyncIOQueue *queue)
  306. {
  307. #if SDL_ASYNCIO_USE_THREADPOOL
  308. if (!PrepareThreadpool()) {
  309. return false;
  310. }
  311. #endif
  312. GenericAsyncIOQueueData *data = (GenericAsyncIOQueueData *) SDL_calloc(1, sizeof (*data));
  313. if (!data) {
  314. return false;
  315. }
  316. data->lock = SDL_CreateMutex();
  317. if (!data->lock) {
  318. SDL_free(data);
  319. return false;
  320. }
  321. data->condition = SDL_CreateCondition();
  322. if (!data->condition) {
  323. SDL_DestroyMutex(data->lock);
  324. SDL_free(data);
  325. return false;
  326. }
  327. static const SDL_AsyncIOQueueInterface SDL_AsyncIOQueue_Generic = {
  328. generic_asyncioqueue_queue_task,
  329. generic_asyncioqueue_cancel_task,
  330. generic_asyncioqueue_get_results,
  331. generic_asyncioqueue_wait_results,
  332. generic_asyncioqueue_signal,
  333. generic_asyncioqueue_destroy
  334. };
  335. SDL_copyp(&queue->iface, &SDL_AsyncIOQueue_Generic);
  336. queue->userdata = data;
  337. return true;
  338. }
  339. bool SDL_SYS_AsyncIOFromFile_Generic(const char *file, const char *mode, SDL_AsyncIO *asyncio)
  340. {
  341. #if SDL_ASYNCIO_USE_THREADPOOL
  342. if (!PrepareThreadpool()) {
  343. return false;
  344. }
  345. #endif
  346. GenericAsyncIOData *data = (GenericAsyncIOData *) SDL_calloc(1, sizeof (*data));
  347. if (!data) {
  348. return false;
  349. }
  350. data->lock = SDL_CreateMutex();
  351. if (!data->lock) {
  352. SDL_free(data);
  353. return false;
  354. }
  355. data->io = SDL_IOFromFile(file, mode);
  356. if (!data->io) {
  357. SDL_DestroyMutex(data->lock);
  358. SDL_free(data);
  359. return false;
  360. }
  361. static const SDL_AsyncIOInterface SDL_AsyncIOFile_Generic = {
  362. generic_asyncio_size,
  363. generic_asyncio_io,
  364. generic_asyncio_io,
  365. generic_asyncio_io,
  366. generic_asyncio_destroy
  367. };
  368. SDL_copyp(&asyncio->iface, &SDL_AsyncIOFile_Generic);
  369. asyncio->userdata = data;
  370. return true;
  371. }
  372. void SDL_SYS_QuitAsyncIO_Generic(void)
  373. {
  374. #if SDL_ASYNCIO_USE_THREADPOOL
  375. ShutdownThreadpool();
  376. #endif
  377. }
  378. #if SDL_ASYNCIO_ONLY_HAVE_GENERIC
  379. bool SDL_SYS_AsyncIOFromFile(const char *file, const char *mode, SDL_AsyncIO *asyncio)
  380. {
  381. return SDL_SYS_AsyncIOFromFile_Generic(file, mode, asyncio);
  382. }
  383. bool SDL_SYS_CreateAsyncIOQueue(SDL_AsyncIOQueue *queue)
  384. {
  385. return SDL_SYS_CreateAsyncIOQueue_Generic(queue);
  386. }
  387. void SDL_SYS_QuitAsyncIO(void)
  388. {
  389. SDL_SYS_QuitAsyncIO_Generic();
  390. }
  391. #endif