diff --git a/Include/internal/pycore_interp.h b/Include/internal/pycore_interp.h index edc076fc04f6c3..98f620c59c6b24 100644 --- a/Include/internal/pycore_interp.h +++ b/Include/internal/pycore_interp.h @@ -57,8 +57,6 @@ struct _is { uint64_t next_unique_id; /* The linked list of threads, newest first. */ PyThreadState *head; - /* Used in Modules/_threadmodule.c. */ - long count; /* Support for runtime thread stack size tuning. A value of 0 means using the platform's default stack size or the size specified by the THREAD_STACK_SIZE macro. */ diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 97165264b34bbe..222e096f57e3e0 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -345,7 +345,7 @@ def run(self): def test_limbo_cleanup(self): # Issue 7481: Failure to start thread should cleanup the limbo map. - def fail_new_thread(*args): + def fail_new_thread(*args, **kwargs): raise threading.ThreadError() _start_new_thread = threading._start_new_thread threading._start_new_thread = fail_new_thread diff --git a/Lib/threading.py b/Lib/threading.py index df273870fa4273..1d5d9dd98d5551 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -49,6 +49,10 @@ except AttributeError: _CRLock = None TIMEOUT_MAX = _thread.TIMEOUT_MAX +try: + _internal_after_fork = _thread._after_fork +except AttributeError: + _internal_after_fork = None del _thread @@ -968,7 +972,7 @@ def start(self): with _active_limbo_lock: _limbo[self] = self try: - _start_new_thread(self._bootstrap, ()) + _start_new_thread(self._bootstrap, (), daemonic=self._daemonic) except Exception: with _active_limbo_lock: del _limbo[self] @@ -1677,4 +1681,6 @@ def _after_fork(): if hasattr(_os, "register_at_fork"): + if _internal_after_fork is not None: + _os.register_at_fork(after_in_child=_internal_after_fork) _os.register_at_fork(after_in_child=_after_fork) diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index 5d753b4a0ebc5e..7a71adc73e7997 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -22,7 +22,223 @@ static struct PyModuleDef thread_module; +/* state for threads owned by the module */ + +struct module_thread { + PyThreadState *tstate; + int daemonic; + struct { + unsigned int initialized:1; + unsigned int started:1; + unsigned int func_started:1; + unsigned int func_ended:1; + unsigned int tstate_cleared:1; + /* padding to align to 4 bytes */ + unsigned int :27; + } status; +}; + +static struct module_thread * +new_module_thread(PyInterpreterState *interp, int daemonic) +{ + PyThreadState *tstate = _PyThreadState_New(interp); + if (tstate == NULL) { + if (!PyErr_Occurred()) { + PyErr_NoMemory(); + } + return NULL; + } + + struct module_thread *mt = PyMem_RawMalloc(sizeof(struct module_thread)); + if (mt == NULL) { + PyThreadState_Clear(tstate); + PyThreadState_Delete(tstate); + if (!PyErr_Occurred()) { + PyErr_NoMemory(); + } + return NULL; + } + + *mt = (struct module_thread){ + .tstate = tstate, + .daemonic = daemonic, + .status = { + .initialized = 1, + }, + }; + return mt; +} + +static void +delete_module_thread(struct module_thread *mt) +{ + if (mt->tstate != NULL) { + assert(!mt->status.tstate_cleared); + PyThreadState_Clear(mt->tstate); + PyThreadState_Delete(mt->tstate); + mt->tstate = NULL; + } + PyMem_RawFree(mt); +} + + +struct module_threads { + PyThread_type_lock mutex; + struct { + long all; + long running; + long pyfuncs_running; + } counts; +}; + +static int +module_threads_init(struct module_threads *threads) +{ + PyThread_type_lock lock = PyThread_allocate_lock(); + if (lock == NULL) { + PyErr_NoMemory(); + return -1; + } + + *threads = (struct module_threads){ + .mutex = lock, + }; + return 0; +} + +#ifdef HAVE_FORK +static int +module_threads_reinit(struct module_threads *threads) +{ +#ifndef NDEBUG + PyThreadState *tstate = _PyThreadState_GET(); +#endif + assert(tstate->thread_id == PyThread_get_thread_ident()); + + PyThread_type_lock lock = threads->mutex; + assert(lock != NULL); + if (_PyThread_at_fork_reinit(&lock) < 0) { + PyErr_SetString(ThreadError, "failed to reinitialize lock at fork"); + return -1; + } + + *threads = (struct module_threads){ + .mutex = lock, + // The counts are all reset to 0. + }; + return 0; +} +#endif + +static void +module_threads_fini(struct module_threads *threads) +{ + PyThread_acquire_lock(threads->mutex, WAIT_LOCK); + + // XXX Wait for all module threads to finish running thread_run(). + // XXX assert(threads->counts.running == 0); + + PyThread_release_lock(threads->mutex); + + PyThread_free_lock(threads->mutex); +} + + +/* high-level helpers for threads owned by the module */ + +static void +bind_module_thread(struct module_threads *threads, + struct module_thread *mt) +{ + assert(_PyThreadState_GET() == NULL); + + PyThread_acquire_lock(threads->mutex, WAIT_LOCK); + + assert(mt->status.initialized); + assert(!mt->status.started); + mt->status.started = 1; + threads->counts.all++; + threads->counts.running++; + + PyThread_release_lock(threads->mutex); + + _PyThreadState_Bind(mt->tstate); +} + +static void +set_module_thread_starting(struct module_threads *threads, + struct module_thread *mt) +{ + PyThread_acquire_lock(threads->mutex, WAIT_LOCK); + + assert(mt->tstate == _PyThreadState_GET()); + assert(mt->status.started); + assert(!mt->status.func_started); + mt->status.func_started = 1; + threads->counts.pyfuncs_running++; + + PyThread_release_lock(threads->mutex); +} + +static void +set_module_thread_finished(struct module_threads *threads, + struct module_thread *mt) +{ + PyThread_acquire_lock(threads->mutex, WAIT_LOCK); + + assert(mt->tstate == _PyThreadState_GET()); + assert(mt->status.func_started); + assert(!mt->status.func_ended); + mt->status.func_ended = 1; + threads->counts.pyfuncs_running--; + + PyThread_release_lock(threads->mutex); + + // Notify other threads that this one is done. + // XXX Do it explicitly here rather than via tstate.on_delete(). +} + +static void +release_module_thread_tstate(struct module_threads *threads, + struct module_thread *mt) +{ + PyThread_acquire_lock(threads->mutex, WAIT_LOCK); + + assert(mt->tstate == _PyThreadState_GET()); + assert(mt->status.func_ended); + assert(!mt->status.tstate_cleared); + PyThreadState *tstate = mt->tstate; + mt->tstate = NULL; + mt->status.tstate_cleared = 1; + + PyThread_release_lock(threads->mutex); + + PyThreadState_Clear(tstate); + // This releases the GIL. + _PyThreadState_DeleteCurrent(tstate); +} + +static void +untrack_module_thread(struct module_threads *threads, + struct module_thread *mt) +{ + assert(_PyThreadState_GET() == NULL); + + PyThread_acquire_lock(threads->mutex, WAIT_LOCK); + + assert(mt->status.func_ended); + assert(mt->status.tstate_cleared); + threads->counts.running--; + + PyThread_release_lock(threads->mutex); +} + + +/* module state */ + typedef struct { + struct module_threads threads; + PyTypeObject *excepthook_type; PyTypeObject *lock_type; PyTypeObject *local_type; @@ -1048,12 +1264,11 @@ _localdummy_destroyed(PyObject *localweakref, PyObject *dummyweakref) /* Module functions */ struct bootstate { - PyInterpreterState *interp; + thread_module_state *module_state; + struct module_thread *module_thread; PyObject *func; PyObject *args; PyObject *kwargs; - PyThreadState *tstate; - _PyRuntimeState *runtime; }; @@ -1071,30 +1286,43 @@ static void thread_run(void *boot_raw) { struct bootstate *boot = (struct bootstate *) boot_raw; - PyThreadState *tstate; - - tstate = boot->tstate; - _PyThreadState_Bind(tstate); - PyEval_AcquireThread(tstate); - tstate->interp->threads.count++; + struct module_threads *threads = &boot->module_state->threads; + struct module_thread *mt = boot->module_thread; + PyObject *pyfunc = boot->func; + PyObject *pyargs = boot->args; + PyObject *pykwargs = boot->kwargs; + + bind_module_thread(threads, mt); + + PyEval_AcquireThread(mt->tstate); + // We free the boot state before running pyfunc + // since daemon threads can exit before PyObject_Call() returns. + // We can't do much about leaking pyfunc/pyargs/pykwargs though. + PyMem_Free(boot); - PyObject *res = PyObject_Call(boot->func, boot->args, boot->kwargs); + // Run the Python function with the GIL held. + set_module_thread_starting(threads, mt); + PyObject *res = PyObject_Call(pyfunc, pyargs, pykwargs); if (res == NULL) { if (PyErr_ExceptionMatches(PyExc_SystemExit)) /* SystemExit is ignored silently */ PyErr_Clear(); else { - _PyErr_WriteUnraisableMsg("in thread started by", boot->func); + _PyErr_WriteUnraisableMsg("in thread started by", pyfunc); } } else { Py_DECREF(res); } + set_module_thread_finished(threads, mt); - thread_bootstate_free(boot); - tstate->interp->threads.count--; - PyThreadState_Clear(tstate); - _PyThreadState_DeleteCurrent(tstate); + // Clean up everything we created in thread_PyThread_start_new_thread(). + Py_DECREF(pyfunc); + Py_DECREF(pyargs); + Py_XDECREF(pykwargs); + release_module_thread_tstate(threads, mt); + untrack_module_thread(threads, mt); + delete_module_thread(mt); // bpo-44434: Don't call explicitly PyThread_exit_thread(). On Linux with // the glibc, pthread_exit() can abort the whole process if dlopen() fails @@ -1120,14 +1348,18 @@ Return True if daemon threads are allowed in the current interpreter,\n\ and False otherwise.\n"); static PyObject * -thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs) +thread_PyThread_start_new_thread(PyObject *self, + PyObject *fargs, PyObject *fkwargs) { - _PyRuntimeState *runtime = &_PyRuntime; + char *kwlist[] = {"", "", "", "daemonic", NULL}; PyObject *func, *args, *kwargs = NULL; - - if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 3, - &func, &args, &kwargs)) + int daemonic = 0; + if (!PyArg_ParseTupleAndKeywords(fargs, fkwargs, + "OO|Op:start_new_thread", kwlist, + &func, &args, &kwargs, &daemonic)) + { return NULL; + } if (!PyCallable_Check(func)) { PyErr_SetString(PyExc_TypeError, "first arg must be callable"); @@ -1155,21 +1387,19 @@ thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs) "thread is not supported for isolated subinterpreters"); return NULL; } + thread_module_state *state = get_thread_state(self); + + struct module_thread *mt = new_module_thread(interp, daemonic); + if (mt == NULL) { + return NULL; + } struct bootstate *boot = PyMem_NEW(struct bootstate, 1); if (boot == NULL) { return PyErr_NoMemory(); } - boot->interp = _PyInterpreterState_GET(); - boot->tstate = _PyThreadState_New(boot->interp); - if (boot->tstate == NULL) { - PyMem_Free(boot); - if (!PyErr_Occurred()) { - return PyErr_NoMemory(); - } - return NULL; - } - boot->runtime = runtime; + boot->module_state = state; + boot->module_thread = mt; boot->func = Py_NewRef(func); boot->args = Py_NewRef(args); boot->kwargs = Py_XNewRef(kwargs); @@ -1177,7 +1407,7 @@ thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs) unsigned long ident = PyThread_start_new_thread(thread_run, (void*) boot); if (ident == PYTHREAD_INVALID_THREAD_ID) { PyErr_SetString(ThreadError, "can't start new thread"); - PyThreadState_Clear(boot->tstate); + delete_module_thread(mt); thread_bootstate_free(boot); return NULL; } @@ -1185,7 +1415,7 @@ thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs) } PyDoc_STRVAR(start_new_doc, -"start_new_thread(function, args[, kwargs])\n\ +"start_new_thread(function, args[, kwargs], daemonic=0)\n\ (start_new() is an obsolete synonym)\n\ \n\ Start a new thread and return its identifier. The thread will call the\n\ @@ -1291,8 +1521,8 @@ particular thread within a system."); static PyObject * thread__count(PyObject *self, PyObject *Py_UNUSED(ignored)) { - PyInterpreterState *interp = _PyInterpreterState_GET(); - return PyLong_FromLong(interp->threads.count); + thread_module_state *state = get_thread_state(self); + return PyLong_FromLong(state->threads.counts.pyfuncs_running); } PyDoc_STRVAR(_count_doc, @@ -1563,11 +1793,23 @@ PyDoc_STRVAR(excepthook_doc, \n\ Handle uncaught Thread.run() exception."); +#ifdef HAVE_FORK +static PyObject * +thread__after_fork(PyObject *module, PyObject *Py_UNUSED(ignored)) +{ + thread_module_state *state = get_thread_state(module); + if (module_threads_reinit(&state->threads) < 0) { + return NULL; + } + Py_RETURN_NONE; +} +#endif + static PyMethodDef thread_methods[] = { - {"start_new_thread", (PyCFunction)thread_PyThread_start_new_thread, - METH_VARARGS, start_new_doc}, - {"start_new", (PyCFunction)thread_PyThread_start_new_thread, - METH_VARARGS, start_new_doc}, + {"start_new_thread", _PyCFunction_CAST(thread_PyThread_start_new_thread), + METH_VARARGS | METH_KEYWORDS, start_new_doc}, + {"start_new", _PyCFunction_CAST(thread_PyThread_start_new_thread), + METH_VARARGS | METH_KEYWORDS, start_new_doc}, {"daemon_threads_allowed", (PyCFunction)thread_daemon_threads_allowed, METH_NOARGS, daemon_threads_allowed_doc}, {"allocate_lock", thread_PyThread_allocate_lock, @@ -1594,6 +1836,10 @@ static PyMethodDef thread_methods[] = { METH_NOARGS, _set_sentinel_doc}, {"_excepthook", thread_excepthook, METH_O, excepthook_doc}, +#ifdef HAVE_FORK + {"_after_fork", (PyCFunction)thread__after_fork, + METH_NOARGS, NULL}, +#endif {NULL, NULL} /* sentinel */ }; @@ -1609,6 +1855,11 @@ thread_module_exec(PyObject *module) // Initialize the C thread library PyThread_init_thread(); + // Initialize the list of threads owned by this module. + if (module_threads_init(&state->threads) < 0) { + return -1; + } + // Lock state->lock_type = (PyTypeObject *)PyType_FromSpec(&lock_type_spec); if (state->lock_type == NULL) { @@ -1699,6 +1950,8 @@ thread_module_clear(PyObject *module) static void thread_module_free(void *module) { + thread_module_state *state = get_thread_state(module); + module_threads_fini(&state->threads); thread_module_clear((PyObject *)module); }