Przeglądaj źródła

test impl of `pkpy.ComputeThread`

blueloveTH 10 miesięcy temu
rodzic
commit
9448214317

+ 1 - 1
build.sh

@@ -20,7 +20,7 @@ SRC2=${1:-src2/main.c}
 
 echo "> Compiling and linking source files... "
 
-clang -std=c11 -O2 -Wfatal-errors -Iinclude -DNDEBUG -o main $SRC $SRC2 -lm -ldl
+clang -std=c11 -O2 -Wfatal-errors -Iinclude -DNDEBUG -o main $SRC $SRC2 -lm -ldl -lpthread
 
 if [ $? -eq 0 ]; then
     echo "Build completed. Type \"./main\" to enter REPL."

+ 1 - 1
build_g.sh

@@ -4,7 +4,7 @@ set -e
 
 SRC=$(find src/ -name "*.c")
 
-FLAGS="-std=c11 -lm -ldl -Iinclude -O0 -Wfatal-errors -g -DDEBUG -DPK_ENABLE_OS=1"
+FLAGS="-std=c11 -lm -ldl -lpthread -Iinclude -O0 -Wfatal-errors -g -DDEBUG -DPK_ENABLE_OS=1"
 
 SANITIZE_FLAGS="-fsanitize=address,leak,undefined"
 

+ 1 - 1
build_g_32.sh

@@ -4,7 +4,7 @@ set -e
 
 SRC=$(find src/ -name "*.c")
 
-FLAGS="-std=c11 -lm -ldl -Iinclude -O0 -Wfatal-errors -g -DDEBUG -DPK_ENABLE_OS=1"
+FLAGS="-std=c11 -lm -ldl -lpthread -Iinclude -O0 -Wfatal-errors -g -DDEBUG -DPK_ENABLE_OS=1"
 
 SANITIZE_FLAGS="-fsanitize=address,leak,undefined"
 

+ 3 - 0
include/pocketpy/common/str.h

@@ -68,6 +68,9 @@ int c11__u8_header(unsigned char c, bool suppress);
 int c11__u8_value(int u8bytes, const char* data);
 int c11__u32_to_u8(uint32_t utf32_char, char utf8_output[4]);
 
+char* c11_strdup(const char* str);
+unsigned char* c11_memdup(const unsigned char* data, int size);
+
 typedef enum IntParsingResult {
     IntParsing_SUCCESS,
     IntParsing_FAILURE,

+ 21 - 2
include/typings/pkpy.pyi

@@ -1,4 +1,4 @@
-from typing import Self
+from typing import Self, Literal
 from linalg import vec2, vec2i
 
 class TValue[T]:
@@ -16,4 +16,23 @@ def memory_usage() -> str:
     """Return a summary of the memory usage."""
 
 def is_user_defined_type(t: type) -> bool:
-    """Check if a type is user-defined. This means the type was created by executing python `class` statement."""
+    """Check if a type is user-defined. This means the type was created by executing python `class` statement."""
+
+def currentvm() -> int:
+    """Return the current VM index."""
+
+
+class ComputeThread:
+    def __init__(self, vm_index: Literal[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]): ...
+
+    @property
+    def is_done(self) -> bool: ...
+
+    def join(self) -> None: ...
+
+    def last_error(self) -> str | None: ...
+    def last_retval(self): ...
+
+    def exec(self, source: str) -> None: ...
+    def eval(self, source: str) -> None: ...
+    def call(self, eval_src: str, *args, **kwargs) -> None: ...

+ 14 - 0
src/common/str.c

@@ -348,6 +348,20 @@ int c11__u32_to_u8(uint32_t utf32_char, char utf8_output[4]) {
     return length;
 }
 
+char* c11_strdup(const char* str){
+    int len = strlen(str);
+    char* dst = PK_MALLOC(len + 1);
+    memcpy(dst, str, len);
+    dst[len] = '\0';
+    return dst;
+}
+
+unsigned char* c11_memdup(const unsigned char* src, int size) {
+    unsigned char* dst = PK_MALLOC(size);
+    memcpy(dst, src, size);
+    return dst;
+}
+
 IntParsingResult c11__parse_uint(c11_sv text, int64_t* out, int base) {
     *out = 0;
 

+ 309 - 0
src/modules/pkpy.c

@@ -7,6 +7,9 @@
 #include "pocketpy/common/sstream.h"
 #include "pocketpy/interpreter/vm.h"
 
+#include <threads.h>
+#include <stdatomic.h>
+
 #define DEF_TVALUE_METHODS(T, Field)                                                               \
     static bool TValue_##T##__new__(int argc, py_Ref argv) {                                       \
         PY_CHECK_ARGC(2);                                                                          \
@@ -63,6 +66,308 @@ static bool pkpy_is_user_defined_type(int argc, py_Ref argv) {
     return true;
 }
 
+static bool pkpy_currentvm(int argc, py_Ref argv) {
+    PY_CHECK_ARGC(0);
+    py_newint(py_retval(), py_currentvm());
+    return true;
+}
+
+typedef struct c11_ComputeThread c11_ComputeThread;
+
+typedef struct {
+    c11_ComputeThread* self;
+    char* eval_src;
+    unsigned char* args_data;
+    int args_size;
+    unsigned char* kwargs_data;
+    int kwargs_size;
+} ComputeThreadJobCall;
+
+typedef struct {
+    c11_ComputeThread* self;
+    char* source;
+    enum py_CompileMode mode;
+} ComputeThreadJobExec;
+
+static void ComputeThreadJobCall__dtor(void* arg) {
+    ComputeThreadJobCall* self = arg;
+    PK_FREE(self->eval_src);
+    PK_FREE(self->args_data);
+    PK_FREE(self->kwargs_data);
+}
+
+static void ComputeThreadJobExec__dtor(void* arg) {
+    ComputeThreadJobExec* self = arg;
+    PK_FREE(self->source);
+}
+
+typedef struct c11_ComputeThread {
+    int vm_index;
+    atomic_bool is_done;
+    unsigned char* last_retval_data;
+    int last_retval_size;
+    char* last_error;
+
+    thrd_t thread;
+    void* job;
+    void (*job_dtor)(void*);
+} c11_ComputeThread;
+
+static void
+    c11_ComputeThread__reset_job(c11_ComputeThread* self, void* job, void (*job_dtor)(void*)) {
+    if(self->job) {
+        self->job_dtor(self->job);
+        PK_FREE(self->job);
+    }
+    self->job = job;
+    self->job_dtor = job_dtor;
+}
+
+static bool _pk_compute_thread_flags[16];
+
+static void c11_ComputeThread__dtor(c11_ComputeThread* self) {
+    if(!self->is_done) {
+        c11__abort("ComputeThread(%d) is not done yet!! But the object was deleted.",
+                   self->vm_index);
+    }
+    c11_ComputeThread__reset_job(self, NULL, NULL);
+    _pk_compute_thread_flags[self->vm_index] = false;
+}
+
+static void c11_ComputeThread__on_job_begin(c11_ComputeThread* self) {
+    if(self->last_retval_data) {
+        PK_FREE(self->last_retval_data);
+        self->last_retval_data = NULL;
+        self->last_retval_size = 0;
+    }
+    if(self->last_error) {
+        PK_FREE(self->last_error);
+        self->last_error = NULL;
+    }
+    py_switchvm(self->vm_index);
+}
+
+static bool ComputeThread__new__(int argc, py_Ref argv) {
+    c11_ComputeThread* self =
+        py_newobject(py_retval(), py_totype(argv), 0, sizeof(c11_ComputeThread));
+    self->vm_index = 0;
+    self->is_done = true;
+    self->last_retval_data = NULL;
+    self->last_retval_size = 0;
+    self->last_error = NULL;
+    self->job = NULL;
+    self->job_dtor = NULL;
+    return true;
+}
+
+static bool ComputeThread__init__(int argc, py_Ref argv) {
+    PY_CHECK_ARGC(2);
+    PY_CHECK_ARG_TYPE(1, tp_int);
+    c11_ComputeThread* self = py_touserdata(py_arg(0));
+    int index = py_toint(py_arg(1));
+    if(index >= 1 && index < 16) {
+        if(_pk_compute_thread_flags[index]) {
+            return ValueError("vm_index %d is already in use", index);
+        }
+        _pk_compute_thread_flags[index] = true;
+        self->vm_index = index;
+    } else {
+        return ValueError("vm_index %d is out of range", index);
+    }
+    py_newnone(py_retval());
+    return true;
+}
+
+static bool ComputeThread_is_done(int argc, py_Ref argv) {
+    PY_CHECK_ARGC(1);
+    c11_ComputeThread* self = py_touserdata(argv);
+    py_newbool(py_retval(), self->is_done);
+    return true;
+}
+
+static bool ComputeThread_join(int argc, py_Ref argv) {
+    PY_CHECK_ARGC(1);
+    c11_ComputeThread* self = py_touserdata(argv);
+    while(!self->is_done)
+        thrd_yield();
+    py_newnone(py_retval());
+    return true;
+}
+
+static bool ComputeThread_last_error(int argc, py_Ref argv) {
+    PY_CHECK_ARGC(1);
+    c11_ComputeThread* self = py_touserdata(argv);
+    if(!self->is_done) return OSError("thread is not done yet");
+    if(self->last_error) {
+        py_newstr(py_retval(), self->last_error);
+    } else {
+        py_newnone(py_retval());
+    }
+    return true;
+}
+
+static bool ComputeThread_last_retval(int argc, py_Ref argv) {
+    PY_CHECK_ARGC(1);
+    c11_ComputeThread* self = py_touserdata(argv);
+    if(!self->is_done) return OSError("thread is not done yet");
+    if(self->last_retval_data == NULL) return ValueError("no retval available");
+    return py_pickle_loads(self->last_retval_data, self->last_retval_size);
+}
+
+static int ComputeThreadJob_call(void* arg) {
+    ComputeThreadJobCall* job = arg;
+    c11_ComputeThread* self = job->self;
+    c11_ComputeThread__on_job_begin(self);
+
+    py_StackRef p0 = py_peek(0);
+
+    if(!py_pusheval(job->eval_src, NULL)) goto __ERROR;
+    // [callable]
+    if(!py_pickle_loads(job->args_data, job->args_size)) goto __ERROR;
+    py_push(py_retval());
+    // [callable, args]
+    if(!py_pickle_loads(job->kwargs_data, job->kwargs_size)) goto __ERROR;
+    py_push(py_retval());
+    // [callable, args, kwargs]
+    if(!py_smarteval("_0(*_1, **_2)", NULL, py_peek(-3), py_peek(-2), py_peek(-1))) goto __ERROR;
+
+    py_shrink(3);
+    if(!py_pickle_dumps(py_retval())) goto __ERROR;
+    int retval_size;
+    unsigned char* retval_data = py_tobytes(py_retval(), &retval_size);
+    self->last_retval_data = c11_memdup(retval_data, retval_size);
+    self->last_retval_size = retval_size;
+    self->is_done = true;
+    return 0;
+
+__ERROR:
+    self->last_error = py_formatexc();
+    self->is_done = true;
+    py_clearexc(p0);
+    py_newnone(py_retval());
+    return 0;
+}
+
+static int ComputeThreadJob_exec(void* arg) {
+    ComputeThreadJobExec* job = arg;
+    c11_ComputeThread* self = job->self;
+    c11_ComputeThread__on_job_begin(self);
+
+    py_StackRef p0 = py_peek(0);
+    if(!py_exec(job->source, "<job>", job->mode, NULL)) goto __ERROR;
+    if(!py_pickle_dumps(py_retval())) goto __ERROR;
+    int retval_size;
+    unsigned char* retval_data = py_tobytes(py_retval(), &retval_size);
+    self->last_retval_data = c11_memdup(retval_data, retval_size);
+    self->last_retval_size = retval_size;
+    self->is_done = true;
+    return 0;
+
+__ERROR:
+    self->last_error = py_formatexc();
+    self->is_done = true;
+    py_clearexc(p0);
+    return 0;
+}
+
+static bool ComputeThread_exec(int argc, py_Ref argv) {
+    PY_CHECK_ARGC(2);
+    c11_ComputeThread* self = py_touserdata(py_arg(0));
+    if(!self->is_done) return OSError("thread is not done yet");
+    PY_CHECK_ARG_TYPE(1, tp_str);
+    const char* source = py_tostr(py_arg(1));
+    /**************************/
+    ComputeThreadJobExec* job = PK_MALLOC(sizeof(ComputeThreadJobExec));
+    job->self = self;
+    job->source = c11_strdup(source);
+    job->mode = EXEC_MODE;
+    c11_ComputeThread__reset_job(self, job, ComputeThreadJobExec__dtor);
+    /**************************/
+    self->is_done = false;
+    int res = thrd_create(&self->thread, ComputeThreadJob_exec, job);
+    if(res != thrd_success) {
+        self->is_done = true;
+        return OSError("thrd_create() failed");
+    }
+    py_newnone(py_retval());
+    return true;
+}
+
+static bool ComputeThread_eval(int argc, py_Ref argv) {
+    PY_CHECK_ARGC(2);
+    c11_ComputeThread* self = py_touserdata(py_arg(0));
+    if(!self->is_done) return OSError("thread is not done yet");
+    PY_CHECK_ARG_TYPE(1, tp_str);
+    const char* source = py_tostr(py_arg(1));
+    /**************************/
+    ComputeThreadJobExec* job = PK_MALLOC(sizeof(ComputeThreadJobExec));
+    job->self = self;
+    job->source = c11_strdup(source);
+    job->mode = EVAL_MODE;
+    c11_ComputeThread__reset_job(self, job, ComputeThreadJobExec__dtor);
+    /**************************/
+    self->is_done = false;
+    int res = thrd_create(&self->thread, ComputeThreadJob_exec, job);
+    if(res != thrd_success) {
+        self->is_done = true;
+        return OSError("thrd_create() failed");
+    }
+    py_newnone(py_retval());
+    return true;
+}
+
+static bool ComputeThread_call(int argc, py_Ref argv) {
+    PY_CHECK_ARGC(4);
+    c11_ComputeThread* self = py_touserdata(py_arg(0));
+    if(!self->is_done) return OSError("thread is not done yet");
+    PY_CHECK_ARG_TYPE(1, tp_str);
+    PY_CHECK_ARG_TYPE(2, tp_tuple);
+    PY_CHECK_ARG_TYPE(3, tp_dict);
+    // eval_src
+    const char* eval_src = py_tostr(py_arg(1));
+    // *args
+    if(!py_pickle_dumps(py_arg(2))) return false;
+    int args_size;
+    unsigned char* args_data = py_tobytes(py_retval(), &args_size);
+    // *kwargs
+    if(!py_pickle_dumps(py_arg(3))) return false;
+    int kwargs_size;
+    unsigned char* kwargs_data = py_tobytes(py_retval(), &kwargs_size);
+    /**************************/
+    ComputeThreadJobCall* job = PK_MALLOC(sizeof(ComputeThreadJobCall));
+    job->self = self;
+    job->eval_src = c11_strdup(eval_src);
+    job->args_data = c11_memdup(args_data, args_size);
+    job->args_size = args_size;
+    job->kwargs_data = c11_memdup(kwargs_data, kwargs_size);
+    job->kwargs_size = kwargs_size;
+    c11_ComputeThread__reset_job(self, job, ComputeThreadJobCall__dtor);
+    /**************************/
+    self->is_done = false;
+    int res = thrd_create(&self->thread, ComputeThreadJob_call, job);
+    if(res != thrd_success) {
+        self->is_done = true;
+        return OSError("thrd_create() failed");
+    }
+    py_newnone(py_retval());
+    return true;
+}
+
+static void pk_ComputeThread__register(py_Ref mod) {
+    py_Type type = py_newtype("ComputeThread", tp_object, mod, (py_Dtor)c11_ComputeThread__dtor);
+
+    py_bindmagic(type, __new__, ComputeThread__new__);
+    py_bindmagic(type, __init__, ComputeThread__init__);
+    py_bindproperty(type, "is_done", ComputeThread_is_done, NULL);
+    py_bindmethod(type, "join", ComputeThread_join);
+    py_bindmethod(type, "last_error", ComputeThread_last_error);
+    py_bindmethod(type, "last_retval", ComputeThread_last_retval);
+
+    py_bindmethod(type, "exec", ComputeThread_exec);
+    py_bindmethod(type, "eval", ComputeThread_eval);
+    py_bind(py_tpobject(type), "call(self, eval_src, *args, **kwargs)", ComputeThread_call);
+}
+
 void pk__add_module_pkpy() {
     py_Ref mod = py_newmodule("pkpy");
 
@@ -99,6 +404,10 @@ void pk__add_module_pkpy() {
 
     py_bindfunc(mod, "memory_usage", pkpy_memory_usage);
     py_bindfunc(mod, "is_user_defined_type", pkpy_is_user_defined_type);
+
+    py_bindfunc(mod, "currentvm", pkpy_currentvm);
+
+    pk_ComputeThread__register(mod);
 }
 
 #undef DEF_TVALUE_METHODS

+ 49 - 0
src2/multi_vm_isolate.c

@@ -0,0 +1,49 @@
+#include "pocketpy.h"
+#include "threads.h"
+#include <stdio.h>
+
+int run_huge_job_in_vm1(void* arg) {
+    py_switchvm(1);
+    bool ok = py_exec((const char*)arg, "<job>", EXEC_MODE, NULL);
+    if(!ok) {
+        py_printexc();
+        return 1;
+    }
+    return 0;
+}
+
+int main() {
+    py_initialize();
+
+    bool ok = py_exec("print('Hello world from VM0!')", "<string1>", EXEC_MODE, NULL);
+    if(!ok) {
+        py_printexc();
+        return 1;
+    }
+
+    printf("main vm index: %d\n", py_currentvm());
+
+    char* job_string =
+        "import time\n"
+        "res = 0\n"
+        "time.sleep(3)\n"
+        "res = 100\n"
+        "print('Huge job done!')\n"
+        "print('Result:', res)\n";
+
+    thrd_t thread1;
+    thrd_create(&thread1, run_huge_job_in_vm1, job_string);
+
+    for(int i = 0; i < 5; i++) {
+        thrd_sleep(&(struct timespec){.tv_sec = 1, .tv_nsec = 0}, NULL);
+        printf("main vm index: %d\n", py_currentvm());
+    }
+
+    int thrd_res;
+    thrd_join(thread1, &thrd_res);
+    printf("Thread result: %d\n", thrd_res);
+
+    py_finalize();
+
+    return 0;
+}

+ 0 - 0
tests/98_lz4.py → tests/72_lz4.py


+ 30 - 0
tests/98_thread.py

@@ -0,0 +1,30 @@
+from pkpy import ComputeThread
+import time
+
+thread_1 = ComputeThread(1)
+thread_2 = ComputeThread(2)
+
+for t in [thread_1, thread_2]:
+    t.exec('''
+def func(a):
+    from pkpy import currentvm
+    print("Hello from thread", currentvm(), "a =", a)
+    for i in range(500000):
+        if i % 100000 == 0:
+            print(i, "from thread", currentvm())
+    return a
+''')
+    
+thread_1.join()
+thread_2.join()
+
+thread_1.call('func', [1, 2, 3])
+thread_2.call('func', [4, 5, 6])
+
+while not thread_1.is_done or not thread_2.is_done:
+    print("Waiting for threads to finish...")
+    time.sleep(1)
+
+print("Thread 1 last return value:", thread_1.last_retval())
+print("Thread 2 last return value:", thread_2.last_retval())
+