【Medium Python】第三话:python多线程为什么不能并行? 您所在的位置:网站首页 aj什么牌子是正品 【Medium Python】第三话:python多线程为什么不能并行?

【Medium Python】第三话:python多线程为什么不能并行?

2024-06-23 07:21| 来源: 网络整理| 查看: 265

python的多线程,这是个老生常谈的话题了,网上资料也一大把。python默认的threading模块对多线程提供了支持,但实际多个threading.Thread实例无法并行运行(不是无法并发哦!)。 ​ 一句话概括答案:python的线程实质是操作系统原生的线程,而每个线程要执行python代码的话,需要获得对应代码解释器的锁GIL。一般我们运行python程序都只有一个解释器,这样不同线程需要获得同一个锁才能执行各自的代码,互斥了,于是代码就不能同时运行了。 ​ 好的,接下来我们细细讲解这句话背后的故事:



import threading import datetime import time COUNT = int(1e8) def _count_task(start, end): start_time = datetime.datetime.now() while start config._isolated_interpreter) { PyErr_SetString(PyExc_RuntimeError, "thread is not supported for isolated subinterpreters"); return NULL; } // 设置bootstate实例 boot = PyMem_NEW(struct bootstate, 1); if (boot == NULL) return PyErr_NoMemory(); boot->interp = _PyInterpreterState_GET(); boot->func = func; boot->args = args; boot->keyw = keyw; boot->tstate = _PyThreadState_Prealloc(boot->interp); boot->runtime = runtime; if (boot->tstate == NULL) { PyMem_DEL(boot); return PyErr_NoMemory(); } Py_INCREF(func); Py_INCREF(args); Py_XINCREF(keyw); // 启动线程,传参t_bootstrap跟bootstate实例 ident = PyThread_start_new_thread(t_bootstrap, (void*) boot); if (ident == PYTHREAD_INVALID_THREAD_ID) { PyErr_SetString(ThreadError, "can't start new thread"); Py_DECREF(func); Py_DECREF(args); Py_XDECREF(keyw); PyThreadState_Clear(boot->tstate); PyMem_DEL(boot); return NULL; } return PyLong_FromUnsignedLong(ident); }


解包fargs,并检查合法性。这里由于进了空的tuple,所以暂时不需要过多分析。设置bootstate实例boot 一个bootstate是fargs以及对应的thread state、intepreter state以及runtime state的打包,囊括了启动新线程需要有的信息 调用PyThread_start_new_thread函数,把bootstate实例以及一个回调函数t_bootstrap传进去 其返回值是线程的实例ID,在python端,我们也可以通过线程实例的ident属性得到。t_bootstrap回调函数,是需要在新启动的子线程里运行的!


// thread_nt.h /* thunker to call adapt between the function type used by the system's thread start function and the internally used one. */ static unsigned __stdcall bootstrap(void *call) { callobj *obj = (callobj*)call; void (*func)(void*) = obj->func; void *arg = obj->arg; HeapFree(GetProcessHeap(), 0, obj); func(arg); return 0; } unsigned long PyThread_start_new_thread(void (*func)(void *), void *arg) { HANDLE hThread; unsigned threadID; callobj *obj; dprintf(("%lu: PyThread_start_new_thread called\n", PyThread_get_thread_ident())); if (!initialized) PyThread_init_thread(); obj = (callobj*)HeapAlloc(GetProcessHeap(), 0, sizeof(*obj)); if (!obj) return PYTHREAD_INVALID_THREAD_ID; obj->func = func; obj->arg = arg; PyThreadState *tstate = _PyThreadState_GET(); size_t stacksize = tstate ? tstate->interp->pythread_stacksize : 0; hThread = (HANDLE)_beginthreadex(0, Py_SAFE_DOWNCAST(stacksize, Py_ssize_t, unsigned int), bootstrap, obj, 0, &threadID); if (hThread == 0) { /* I've seen errno == EAGAIN here, which means "there are * too many threads". */ int e = errno; dprintf(("%lu: PyThread_start_new_thread failed, errno %d\n", PyThread_get_thread_ident(), e)); threadID = (unsigned)-1; HeapFree(GetProcessHeap(), 0, obj); } else { dprintf(("%lu: PyThread_start_new_thread succeeded: %p\n", PyThread_get_thread_ident(), (void*)hThread)); CloseHandle(hThread); } return threadID; }




// _threadmodule.c static void t_bootstrap(void *boot_raw) { struct bootstate *boot = (struct bootstate *) boot_raw; PyThreadState *tstate; PyObject *res; tstate = boot->tstate; tstate->thread_id = PyThread_get_thread_ident(); // reset thread ID _PyThreadState_Init(tstate); PyEval_AcquireThread(tstate); // take gil for executing thread task tstate->interp->num_threads++; res = PyObject_Call(boot->func, boot->args, boot->keyw); if (res == NULL) { if (PyErr_ExceptionMatches(PyExc_SystemExit)) /* SystemExit is ignored silently */ PyErr_Clear(); else { _PyErr_WriteUnraisableMsg("in thread started by", boot->func); } } else { Py_DECREF(res); } Py_DECREF(boot->func); Py_DECREF(boot->args); Py_XDECREF(boot->keyw); PyMem_DEL(boot_raw); tstate->interp->num_threads--; PyThreadState_Clear(tstate); _PyThreadState_DeleteCurrent(tstate); // bpo-44434: Don't call explicitly PyThread_exit_thread(). On Linux with // the glibc, pthread_exit() can abort the whole process if dlopen() fails // to open the libgcc_s.so library (ex: EMFILE error). }


class Thread: def _bootstrap(self): # Wrapper around the real bootstrap code that ignores # exceptions during interpreter cleanup. Those typically # happen when a daemon thread wakes up at an unfortunate # moment, finds the world around it destroyed, and raises some # random exception *** while trying to report the exception in # _bootstrap_inner() below ***. Those random exceptions # don't help anybody, and they confuse users, so we suppress # them. We suppress them only when it appears that the world # indeed has already been destroyed, so that exceptions in # _bootstrap_inner() during normal business hours are properly # reported. Also, we only suppress them for daemonic threads; # if a non-daemonic encounters this, something else is wrong. try: self._bootstrap_inner() except: if self._daemonic and _sys is None: return raise def _bootstrap_inner(self): try: self._set_ident() self._set_tstate_lock() if _HAVE_THREAD_NATIVE_ID: self._set_native_id() self._started.set() with _active_limbo_lock: _active[self._ident] = self del _limbo[self] if _trace_hook: _sys.settrace(_trace_hook) if _profile_hook: _sys.setprofile(_profile_hook) try: self.run() except: self._invoke_excepthook(self) finally: with _active_limbo_lock: try: # We don't call self._delete() because it also # grabs _active_limbo_lock. del _active[get_ident()] except: pass


notify self._started,这样先前python端的start函数流程就完成了把自己从准备态_limbo中移除,并把自己加到active态里执行self.run,开始线程逻辑





class Thread: def run(self): """Method representing the thread's activity. You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively. """ try: if self._target: self._target(*self._args, **self._kwargs) finally: # Avoid a refcycle if the thread is running a function with # an argument that has a member that points to the thread. del self._target, self._args, self._kwargs

其中真正执行函数的一行self._target(*self._args, **self._kwargs),对应的opcodes是:

910 8 LOAD_FAST 0 (self) 10 LOAD_ATTR 0 (_target) 12 LOAD_FAST 0 (self) 14 LOAD_ATTR 1 (_args) 16 BUILD_MAP 0 18 LOAD_FAST 0 (self) 20 LOAD_ATTR 2 (_kwargs) 22 DICT_MERGE 1 24 CALL_FUNCTION_EX 1 26 POP_TOP >> 28 POP_BLOCK


// ceval.c PyObject* _Py_HOT_FUNCTION _PyEval_EvalFrameDefault(PyThreadState *tstate, PyFrameObject *f, int throwflag) { // 省略超多行 switch (opcode) { // 省略超多行 case TARGET(CALL_FUNCTION_EX): { // 检查函数跟参数 PREDICTED(CALL_FUNCTION_EX); PyObject *func, *callargs, *kwargs = NULL, *result; if (oparg & 0x01) { kwargs = POP(); if (!PyDict_CheckExact(kwargs)) { PyObject *d = PyDict_New(); if (d == NULL) goto error; if (_PyDict_MergeEx(d, kwargs, 2) tp_name); return NULL; } if (_Py_EnterRecursiveCall(tstate, " while calling a Python object")) { return NULL; } result = (*call)(callable, args, kwargs); _Py_LeaveRecursiveCall(tstate); return _Py_CheckFunctionResult(tstate, callable, result, NULL); } }


from threading import Thread def _stat(a, b): print(a + b) t = Thread(target=_stat, args=(2, 5)) t.run()

t.run中的self._target(*self._args, **self._kwargs)一行触发了_PyObject_Call中PyVectorcall_Call分支。一路step into下去,最终来到了_PyEval_EvalFrame函数:

static inline PyObject* _PyEval_EvalFrame(PyThreadState *tstate, PyFrameObject *f, int throwflag) { return tstate->interp->eval_frame(tstate, f, throwflag); }


从_PyEval_EvalFrameDefault的main_loop这个goto记号往下,就是无限循环处理opcode了。但在switch opcode之前,有一个判断逻辑:

// ceval.c PyObject* _Py_HOT_FUNCTION _PyEval_EvalFrameDefault(PyThreadState *tstate, PyFrameObject *f, int throwflag) { // 省略上面 main_loop: for (;;) { // 省略上面 if (_Py_atomic_load_relaxed(eval_breaker)) { opcode = _Py_OPCODE(*next_instr); if (opcode == SETUP_FINALLY || opcode == SETUP_WITH || opcode == BEFORE_ASYNC_WITH || opcode == YIELD_FROM) { /* Few cases where we skip running signal handlers and other pending calls: - If we're about to enter the 'with:'. It will prevent emitting a resource warning in the common idiom 'with open(path) as file:'. - If we're about to enter the 'async with:'. - If we're about to enter the 'try:' of a try/finally (not *very* useful, but might help in some cases and it's traditional) - If we're resuming a chain of nested 'yield from' or 'await' calls, then each frame is parked with YIELD_FROM as its next opcode. If the user hit control-C we want to wait until we've reached the innermost frame before running the signal handler and raising KeyboardInterrupt (see bpo-30039). */ goto fast_next_opcode; } if (eval_frame_handle_pending(tstate) != 0) { goto error; } } // 省略下面 } // 省略下面 }


// ceval.c /* Handle signals, pending calls, GIL drop request and asynchronous exception */ static int eval_frame_handle_pending(PyThreadState *tstate) { _PyRuntimeState * const runtime = &_PyRuntime; struct _ceval_runtime_state *ceval = &runtime->ceval; /* Pending signals */ if (_Py_atomic_load_relaxed(&ceval->signals_pending)) { if (handle_signals(tstate) != 0) { return -1; } } /* Pending calls */ struct _ceval_state *ceval2 = &tstate->interp->ceval; if (_Py_atomic_load_relaxed(&ceval2->pending.calls_to_do)) { if (make_pending_calls(tstate) != 0) { return -1; } } /* GIL drop request */ if (_Py_atomic_load_relaxed(&ceval2->gil_drop_request)) { /* Give another thread a chance */ if (_PyThreadState_Swap(&runtime->gilstate, NULL) != tstate) { Py_FatalError("tstate mix-up"); } drop_gil(ceval, ceval2, tstate); /* Other threads may run now */ take_gil(tstate); if (_PyThreadState_Swap(&runtime->gilstate, tstate) != NULL) { Py_FatalError("orphan tstate"); } } /* Check for asynchronous exception. */ if (tstate->async_exc != NULL) { PyObject *exc = tstate->async_exc; tstate->async_exc = NULL; UNSIGNAL_ASYNC_EXC(tstate->interp); _PyErr_SetNone(tstate, exc); Py_DECREF(exc); return -1; } #ifdef MS_WINDOWS // bpo-42296: On Windows, _PyEval_SignalReceived() can be called in a // different thread than the Python thread, in which case // _Py_ThreadCanHandleSignals() is wrong. Recompute eval_breaker in the // current Python thread with the correct _Py_ThreadCanHandleSignals() // value. It prevents to interrupt the eval loop at every instruction if // the current Python thread cannot handle signals (if // _Py_ThreadCanHandleSignals() is false). COMPUTE_EVAL_BREAKER(tstate->interp, ceval, ceval2); #endif return 0; }

eval_frame_handle_pending处理了多种opcode解析中断的场景。在这里我们可以看到,不论是哪个线程跑到这里,如果遇到了gil_drop_request,就得drop_gil给到其他线程,之后再尝试take_gil,重新竞争解释器锁。 在先前讲解线程启动逻辑的时候,新线程调用的t_bootstrap函数里,有一句PyEval_AcquireThread(tstate),这里面就包含了take_gil的逻辑。我们可以看一下take_gil到底干了什么事情:

// ceval_gil.h /* Take the GIL. The function saves errno at entry and restores its value at exit. tstate must be non-NULL. */ static void take_gil(PyThreadState *tstate) { // 省略上面 if (!_Py_atomic_load_relaxed(&gil->locked)) { goto _ready; } while (_Py_atomic_load_relaxed(&gil->locked)) { // 没有拿到gil的情况 unsigned long saved_switchnum = gil->switch_number; unsigned long interval = (gil->interval >= 1 ? gil->interval : 1); int timed_out = 0; COND_TIMED_WAIT(gil->cond, gil->mutex, interval, timed_out); /* If we timed out and no switch occurred in the meantime, it is time to ask the GIL-holding thread to drop it. */ if (timed_out && _Py_atomic_load_relaxed(&gil->locked) && gil->switch_number == saved_switchnum) { if (tstate_must_exit(tstate)) { MUTEX_UNLOCK(gil->mutex); PyThread_exit_thread(); } assert(is_tstate_valid(tstate)); SET_GIL_DROP_REQUEST(interp); } } _ready: // 省略FORCE_SWITCHING宏相关代码 /* We now hold the GIL */ _Py_atomic_store_relaxed(&gil->locked, 1); _Py_ANNOTATE_RWLOCK_ACQUIRED(&gil->locked, /*is_write=*/1); if (tstate != (PyThreadState*)_Py_atomic_load_relaxed(&gil->last_holder)) { _Py_atomic_store_relaxed(&gil->last_holder, (uintptr_t)tstate); ++gil->switch_number; } // 省略FORCE_SWITCHING宏相关代码 // 省略下面 }


static inline void SET_GIL_DROP_REQUEST(PyInterpreterState *interp) { struct _ceval_state *ceval2 = &interp->ceval; _Py_atomic_store_relaxed(&ceval2->gil_drop_request, 1); _Py_atomic_store_relaxed(&ceval2->eval_breaker, 1); }










        CopyRight 2018-2019 实验室设备网 版权所有