【Medium Python】第三话:python多线程为什么不能并行? 您所在的位置:网站首页 aj什么牌子是正品 【Medium Python】第三话:python多线程为什么不能并行?

【Medium Python】第三话:python多线程为什么不能并行?

2024-06-23 07:21| 来源: 网络整理| 查看: 265

python的多线程,这是个老生常谈的话题了,网上资料也一大把。python默认的threading模块对多线程提供了支持,但实际多个threading.Thread实例无法并行运行(不是无法并发哦!)。 ​ 一句话概括答案:python的线程实质是操作系统原生的线程,而每个线程要执行python代码的话,需要获得对应代码解释器的锁GIL。一般我们运行python程序都只有一个解释器,这样不同线程需要获得同一个锁才能执行各自的代码,互斥了,于是代码就不能同时运行了。 ​ 好的,接下来我们细细讲解这句话背后的故事:

多线程并行测试

首先我们通过一些代码来测试多线程是否真的并行:

import threading import datetime import time COUNT = int(1e8) def _count_task(start, end): start_time = datetime.datetime.now() while start config._isolated_interpreter) { PyErr_SetString(PyExc_RuntimeError, "thread is not supported for isolated subinterpreters"); return NULL; } // 设置bootstate实例 boot = PyMem_NEW(struct bootstate, 1); if (boot == NULL) return PyErr_NoMemory(); boot->interp = _PyInterpreterState_GET(); boot->func = func; boot->args = args; boot->keyw = keyw; boot->tstate = _PyThreadState_Prealloc(boot->interp); boot->runtime = runtime; if (boot->tstate == NULL) { PyMem_DEL(boot); return PyErr_NoMemory(); } Py_INCREF(func); Py_INCREF(args); Py_XINCREF(keyw); // 启动线程,传参t_bootstrap跟bootstate实例 ident = PyThread_start_new_thread(t_bootstrap, (void*) boot); if (ident == PYTHREAD_INVALID_THREAD_ID) { PyErr_SetString(ThreadError, "can't start new thread"); Py_DECREF(func); Py_DECREF(args); Py_XDECREF(keyw); PyThreadState_Clear(boot->tstate); PyMem_DEL(boot); return NULL; } return PyLong_FromUnsignedLong(ident); }

python中的_start_new_thread,对应了C层的thread_PyThread_start_new_thread,而thread_PyThread_start_new_thread传入的两个参数self跟fargs,则对应python代码里的self._bootstrap跟空的tuple。_start_new_thread的大致步骤如下:

解包fargs,并检查合法性。这里由于进了空的tuple,所以暂时不需要过多分析。设置bootstate实例boot 一个bootstate是fargs以及对应的thread state、intepreter state以及runtime state的打包,囊括了启动新线程需要有的信息 调用PyThread_start_new_thread函数,把bootstate实例以及一个回调函数t_bootstrap传进去 其返回值是线程的实例ID,在python端,我们也可以通过线程实例的ident属性得到。t_bootstrap回调函数,是需要在新启动的子线程里运行的!

PyThread_start_new_thread函数,根据不同操作系统环境有不同的定义。以windows环境为例,其定义如下:

// thread_nt.h /* thunker to call adapt between the function type used by the system's thread start function and the internally used one. */ static unsigned __stdcall bootstrap(void *call) { callobj *obj = (callobj*)call; void (*func)(void*) = obj->func; void *arg = obj->arg; HeapFree(GetProcessHeap(), 0, obj); func(arg); return 0; } unsigned long PyThread_start_new_thread(void (*func)(void *), void *arg) { HANDLE hThread; unsigned threadID; callobj *obj; dprintf(("%lu: PyThread_start_new_thread called\n", PyThread_get_thread_ident())); if (!initialized) PyThread_init_thread(); obj = (callobj*)HeapAlloc(GetProcessHeap(), 0, sizeof(*obj)); if (!obj) return PYTHREAD_INVALID_THREAD_ID; obj->func = func; obj->arg = arg; PyThreadState *tstate = _PyThreadState_GET(); size_t stacksize = tstate ? tstate->interp->pythread_stacksize : 0; hThread = (HANDLE)_beginthreadex(0, Py_SAFE_DOWNCAST(stacksize, Py_ssize_t, unsigned int), bootstrap, obj, 0, &threadID); if (hThread == 0) { /* I've seen errno == EAGAIN here, which means "there are * too many threads". */ int e = errno; dprintf(("%lu: PyThread_start_new_thread failed, errno %d\n", PyThread_get_thread_ident(), e)); threadID = (unsigned)-1; HeapFree(GetProcessHeap(), 0, obj); } else { dprintf(("%lu: PyThread_start_new_thread succeeded: %p\n", PyThread_get_thread_ident(), (void*)hThread)); CloseHandle(hThread); } return threadID; }

参数func和arg,对应的是t_bootstrap回调跟bootstate实例。为了适配windows下的_beginthreadex接口定义,t_bootstrap跟bootstate实例又打包成callobj,作为bootstrap函数(适配用)的参数,随bootstrap一起入参_beginthreadex。

这时候我们已经可以确定,python启动的新线程是操作系统的原生线程。

新线程诞生时,调用了bootstrap,在bootstrap里拆包callobj,调用func(arg),也就是t_bootstrap(boot)

// _threadmodule.c static void t_bootstrap(void *boot_raw) { struct bootstate *boot = (struct bootstate *) boot_raw; PyThreadState *tstate; PyObject *res; tstate = boot->tstate; tstate->thread_id = PyThread_get_thread_ident(); // reset thread ID _PyThreadState_Init(tstate); PyEval_AcquireThread(tstate); // take gil for executing thread task tstate->interp->num_threads++; res = PyObject_Call(boot->func, boot->args, boot->keyw); if (res == NULL) { if (PyErr_ExceptionMatches(PyExc_SystemExit)) /* SystemExit is ignored silently */ PyErr_Clear(); else { _PyErr_WriteUnraisableMsg("in thread started by", boot->func); } } else { Py_DECREF(res); } Py_DECREF(boot->func); Py_DECREF(boot->args); Py_XDECREF(boot->keyw); PyMem_DEL(boot_raw); tstate->interp->num_threads--; PyThreadState_Clear(tstate); _PyThreadState_DeleteCurrent(tstate); // bpo-44434: Don't call explicitly PyThread_exit_thread(). On Linux with // the glibc, pthread_exit() can abort the whole process if dlopen() fails // to open the libgcc_s.so library (ex: EMFILE error). }

回到t_bootstrap中,我们发现,最终t_bootstrap会取出来boot的func&args,然后调用PyObject_Call调用func(args)。回到前面去看,这个func(args)就是python端的self._bootstrap()

class Thread: def _bootstrap(self): # Wrapper around the real bootstrap code that ignores # exceptions during interpreter cleanup. Those typically # happen when a daemon thread wakes up at an unfortunate # moment, finds the world around it destroyed, and raises some # random exception *** while trying to report the exception in # _bootstrap_inner() below ***. Those random exceptions # don't help anybody, and they confuse users, so we suppress # them. We suppress them only when it appears that the world # indeed has already been destroyed, so that exceptions in # _bootstrap_inner() during normal business hours are properly # reported. Also, we only suppress them for daemonic threads; # if a non-daemonic encounters this, something else is wrong. try: self._bootstrap_inner() except: if self._daemonic and _sys is None: return raise def _bootstrap_inner(self): try: self._set_ident() self._set_tstate_lock() if _HAVE_THREAD_NATIVE_ID: self._set_native_id() self._started.set() with _active_limbo_lock: _active[self._ident] = self del _limbo[self] if _trace_hook: _sys.settrace(_trace_hook) if _profile_hook: _sys.setprofile(_profile_hook) try: self.run() except: self._invoke_excepthook(self) finally: with _active_limbo_lock: try: # We don't call self._delete() because it also # grabs _active_limbo_lock. del _active[get_ident()] except: pass

在self._bootstrap_inner()中,大致有以下步骤:

notify self._started,这样先前python端的start函数流程就完成了把自己从准备态_limbo中移除,并把自己加到active态里执行self.run,开始线程逻辑

这样,python中新线程启动的全貌就展现在我们面前了。除了线程的来源外,很多关于线程相关的基础问题(比如为啥不能直接执行self.run),答案也都一目了然

线程执行代码的过程

在先前一小节我们知晓了python新的线程从何而来,然而,只有通过剖析线程执行代码的过程,我们才可以明确为什么python线程不能并行运行。

一个线程执行其任务,最终还是要落实到run方法上来。首先我们通过python自带的反编译库dis来看下Thread的run函数对应的操作码(opcode),这样就通过python内部对应opcode的执行逻辑来进一步分析:

class Thread: def run(self): """Method representing the thread's activity. You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively. """ try: if self._target: self._target(*self._args, **self._kwargs) finally: # Avoid a refcycle if the thread is running a function with # an argument that has a member that points to the thread. del self._target, self._args, self._kwargs

其中真正执行函数的一行self._target(*self._args, **self._kwargs),对应的opcodes是:

910 8 LOAD_FAST 0 (self) 10 LOAD_ATTR 0 (_target) 12 LOAD_FAST 0 (self) 14 LOAD_ATTR 1 (_args) 16 BUILD_MAP 0 18 LOAD_FAST 0 (self) 20 LOAD_ATTR 2 (_kwargs) 22 DICT_MERGE 1 24 CALL_FUNCTION_EX 1 26 POP_TOP >> 28 POP_BLOCK

很明显,CALL_FUNCTION_EX——调用函数,就是我们需要找到的opcode。

// ceval.c PyObject* _Py_HOT_FUNCTION _PyEval_EvalFrameDefault(PyThreadState *tstate, PyFrameObject *f, int throwflag) { // 省略超多行 switch (opcode) { // 省略超多行 case TARGET(CALL_FUNCTION_EX): { // 检查函数跟参数 PREDICTED(CALL_FUNCTION_EX); PyObject *func, *callargs, *kwargs = NULL, *result; if (oparg & 0x01) { kwargs = POP(); if (!PyDict_CheckExact(kwargs)) { PyObject *d = PyDict_New(); if (d == NULL) goto error; if (_PyDict_MergeEx(d, kwargs, 2) tp_name); return NULL; } if (_Py_EnterRecursiveCall(tstate, " while calling a Python object")) { return NULL; } result = (*call)(callable, args, kwargs); _Py_LeaveRecursiveCall(tstate); return _Py_CheckFunctionResult(tstate, callable, result, NULL); } }

在_PyObject_Call中,调用函数的方式最后都以通用的形式(vectorcall以及Py_TYPE(callable)->tp_call)呈现,这说明入参不同的callable,可能需要不同的caller方法来handle。基于此,我们可以通过直接debug线程类Thread的run方法(在主线程直接跑就行了),来观察线程run函数调用的过程。测试代码如下:

from threading import Thread def _stat(a, b): print(a + b) t = Thread(target=_stat, args=(2, 5)) t.run()

t.run中的self._target(*self._args, **self._kwargs)一行触发了_PyObject_Call中PyVectorcall_Call分支。一路step into下去,最终来到了_PyEval_EvalFrame函数:

static inline PyObject* _PyEval_EvalFrame(PyThreadState *tstate, PyFrameObject *f, int throwflag) { return tstate->interp->eval_frame(tstate, f, throwflag); }

frame就是python函数调用栈上面的单位实例(类似于lua的callinfo),包含了一个函数调用的相关信息。eval_frame就是对frame保存的code(代码)实例解析并执行。解释器用的是tstate->interp,从先前线程启动的逻辑来看,在thread_PyThread_start_new_thread里,主线程就把自己的interp给到子线程了,所以不管创建多少个线程,所有线程都共用一套解释器。那解释器的eval_frame是什么呢?兜兜转转,又回到了超大函数_PyEval_EvalFrameDefault。

从_PyEval_EvalFrameDefault的main_loop这个goto记号往下,就是无限循环处理opcode了。但在switch opcode之前,有一个判断逻辑:

// ceval.c PyObject* _Py_HOT_FUNCTION _PyEval_EvalFrameDefault(PyThreadState *tstate, PyFrameObject *f, int throwflag) { // 省略上面 main_loop: for (;;) { // 省略上面 if (_Py_atomic_load_relaxed(eval_breaker)) { opcode = _Py_OPCODE(*next_instr); if (opcode == SETUP_FINALLY || opcode == SETUP_WITH || opcode == BEFORE_ASYNC_WITH || opcode == YIELD_FROM) { /* Few cases where we skip running signal handlers and other pending calls: - If we're about to enter the 'with:'. It will prevent emitting a resource warning in the common idiom 'with open(path) as file:'. - If we're about to enter the 'async with:'. - If we're about to enter the 'try:' of a try/finally (not *very* useful, but might help in some cases and it's traditional) - If we're resuming a chain of nested 'yield from' or 'await' calls, then each frame is parked with YIELD_FROM as its next opcode. If the user hit control-C we want to wait until we've reached the innermost frame before running the signal handler and raising KeyboardInterrupt (see bpo-30039). */ goto fast_next_opcode; } if (eval_frame_handle_pending(tstate) != 0) { goto error; } } // 省略下面 } // 省略下面 }

这段代码首先会判断代码解释是否达到中断条件eval_breaker,如果达到了的话,可能会走到eval_frame_handle_pending处理中断。

// ceval.c /* Handle signals, pending calls, GIL drop request and asynchronous exception */ static int eval_frame_handle_pending(PyThreadState *tstate) { _PyRuntimeState * const runtime = &_PyRuntime; struct _ceval_runtime_state *ceval = &runtime->ceval; /* Pending signals */ if (_Py_atomic_load_relaxed(&ceval->signals_pending)) { if (handle_signals(tstate) != 0) { return -1; } } /* Pending calls */ struct _ceval_state *ceval2 = &tstate->interp->ceval; if (_Py_atomic_load_relaxed(&ceval2->pending.calls_to_do)) { if (make_pending_calls(tstate) != 0) { return -1; } } /* GIL drop request */ if (_Py_atomic_load_relaxed(&ceval2->gil_drop_request)) { /* Give another thread a chance */ if (_PyThreadState_Swap(&runtime->gilstate, NULL) != tstate) { Py_FatalError("tstate mix-up"); } drop_gil(ceval, ceval2, tstate); /* Other threads may run now */ take_gil(tstate); if (_PyThreadState_Swap(&runtime->gilstate, tstate) != NULL) { Py_FatalError("orphan tstate"); } } /* Check for asynchronous exception. */ if (tstate->async_exc != NULL) { PyObject *exc = tstate->async_exc; tstate->async_exc = NULL; UNSIGNAL_ASYNC_EXC(tstate->interp); _PyErr_SetNone(tstate, exc); Py_DECREF(exc); return -1; } #ifdef MS_WINDOWS // bpo-42296: On Windows, _PyEval_SignalReceived() can be called in a // different thread than the Python thread, in which case // _Py_ThreadCanHandleSignals() is wrong. Recompute eval_breaker in the // current Python thread with the correct _Py_ThreadCanHandleSignals() // value. It prevents to interrupt the eval loop at every instruction if // the current Python thread cannot handle signals (if // _Py_ThreadCanHandleSignals() is false). COMPUTE_EVAL_BREAKER(tstate->interp, ceval, ceval2); #endif return 0; }

eval_frame_handle_pending处理了多种opcode解析中断的场景。在这里我们可以看到,不论是哪个线程跑到这里,如果遇到了gil_drop_request,就得drop_gil给到其他线程,之后再尝试take_gil,重新竞争解释器锁。 在先前讲解线程启动逻辑的时候,新线程调用的t_bootstrap函数里,有一句PyEval_AcquireThread(tstate),这里面就包含了take_gil的逻辑。我们可以看一下take_gil到底干了什么事情:

// ceval_gil.h /* Take the GIL. The function saves errno at entry and restores its value at exit. tstate must be non-NULL. */ static void take_gil(PyThreadState *tstate) { // 省略上面 if (!_Py_atomic_load_relaxed(&gil->locked)) { goto _ready; } while (_Py_atomic_load_relaxed(&gil->locked)) { // 没有拿到gil的情况 unsigned long saved_switchnum = gil->switch_number; unsigned long interval = (gil->interval >= 1 ? gil->interval : 1); int timed_out = 0; COND_TIMED_WAIT(gil->cond, gil->mutex, interval, timed_out); /* If we timed out and no switch occurred in the meantime, it is time to ask the GIL-holding thread to drop it. */ if (timed_out && _Py_atomic_load_relaxed(&gil->locked) && gil->switch_number == saved_switchnum) { if (tstate_must_exit(tstate)) { MUTEX_UNLOCK(gil->mutex); PyThread_exit_thread(); } assert(is_tstate_valid(tstate)); SET_GIL_DROP_REQUEST(interp); } } _ready: // 省略FORCE_SWITCHING宏相关代码 /* We now hold the GIL */ _Py_atomic_store_relaxed(&gil->locked, 1); _Py_ANNOTATE_RWLOCK_ACQUIRED(&gil->locked, /*is_write=*/1); if (tstate != (PyThreadState*)_Py_atomic_load_relaxed(&gil->last_holder)) { _Py_atomic_store_relaxed(&gil->last_holder, (uintptr_t)tstate); ++gil->switch_number; } // 省略FORCE_SWITCHING宏相关代码 // 省略下面 }

我们看到这段逻辑:当gil一直被占时,就会进入while循环的COND_TIMED_WAIT,等待gil->cond的信号。这个信号的通知逻辑是在drop_gil的里面的,也就是说如果另一个线程执行了drop_gil就会触发这个信号,而由于python的线程是操作系统原生线程,因此我们如果深挖COND_TIMED_WAIT内部的实现也可以看到实质上是操作系统在调度信号触发后线程的唤醒。COND_TIMED_WAIT的时长是gil->interval(也就是sys.getswitchinterval(),线程切换时间),过了这段时间还是原来线程hold住gil的话,就强制触发SET_GIL_DROP_REQUEST逻辑

static inline void SET_GIL_DROP_REQUEST(PyInterpreterState *interp) { struct _ceval_state *ceval2 = &interp->ceval; _Py_atomic_store_relaxed(&ceval2->gil_drop_request, 1); _Py_atomic_store_relaxed(&ceval2->eval_breaker, 1); }

我们看到SET_GIL_DROP_REQUEST强制激活gil_drop_request跟eval_breaker,这样持有GIL的线程在EvalFrame的时候发现满足eval_breaker,就会走eval_frame_handle_pending的逻辑,里面再判断有gil_drop_request之后,就调用drop_gil把解释器锁释放出来。这样,另一个线程在执行SET_GIL_DROP_REQUEST之后的某次COND_TIMED_WAIT时候,就有可能提前被signal到,之后又发现gil没有被locked,于是就能够继续下面的逻辑,持有GIL了。最后,另一个线程拿到了代码的执行权,而原先丢掉GIL的线程,在eval_frame_handle_pending再次调用take_gil,反过来走在了竞争代码执行权的路上。循环往复,如是而已。

总结

通过对线程启动机制、代码运行机制以及基于GIL的线程调度机制的剖析,我们可以“一图流”,解释“python多线程为什么不能并行”这个问题:

在这里插入图片描述



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

      专题文章
        CopyRight 2018-2019 实验室设备网 版权所有