「进阶篇」使用 subprocess 管理子进程 您所在的位置:网站首页 获取pid进程标准输出方法 「进阶篇」使用 subprocess 管理子进程

「进阶篇」使用 subprocess 管理子进程

2024-05-24 15:52| 来源: 网络整理| 查看: 265

在进程基础篇系列文章中小鱼为大家介绍了进程有关的概念、进程和进程池技术以及进程间的通信方式。

如果你对进程感到陌生,那么可以阅读上述基础篇的文章进行学习。本篇文章我们来学习和进程有关的另一个 Python 标准库 subprocess 模块。

虽然 Python 解释器本身可能局限在一个 CPU 上面(GIL 全局解释器锁的存在),但通过 Python 启动的子进程可以平行地运行,使得我们可以充分利用计算机的每一个 CPU 核心,从而协调并驱动 CPU 密集型任务。

在进程「基础篇」的学习中,我们创建进程的方式大多是把需要由子进程完成的任务封装成函数,传递给子进程或者提交到进程池,也可以使用 os.fork 函数编排子进程需要执行的代码。

我们接下来要学习的 subprocess 模块允许我们在 Python 代码中创建子进程执行操作系统级别的命令,并且可以连接子进程的输入、输出及错误管道,以便获取它们的返回状态。

因此,借助 subprocess 模块我们可以把其他语言实现的命令行工具很好地拼接起来。

提示:你或许使用 os.popen 或 os.exec* 系列的函数运行子进程,但 subprocess 模块会更简洁。一. 使用 run 函数启动子进程

subprocess 模块提供的 run() 函数用来启动一个子进程,执行命令行,并返回一个 CompletedProcess 实例。

In [1]: import subprocess In [2]: result = subprocess.run(['echo', "Hello Python!"]) Hello Python! In [3]: result Out[3]: CompletedProcess(args=['echo', 'Hello Python!'], returncode=0)

函数 run 默认不会捕获子进程的 stdout 和 stderr 。可以指定 capture_output=True ,或者 stdout/stderr=subprocess.PIPE 来捕获它们。

标准输出将被捕获到 CompletedProcess 对象的 stdout 属性。

In [1]: result = subprocess.run(['python', "-m", "json.tool"], ...: input=b'{"name": "yamfish"}', ...: capture_output=True) In [2]: print(result.stdout.decode()) { "name": "yamfish" }

当提供错误的 JSON 格式的字符串时,标准错误将会被捕获到 CompletedProcess 的 stderr 属性中。

In [1]: result = subprocess.run(['python', "-m", "json.tool"], ...: input=b'{"name", "yamfish"}', ...: capture_output=True) In [2]: result.stderr Out[2]: b"Expecting ':' delimiter: line 1 column 8 (char 7)\n"

其中,可选的参数 input 用来将字符/字节串传递给子进程的 stdin ,当然你还可以直接使用 stdin 来传入一个文件描述符。

默认情况下,所有通信都以字节为单位,stdin 为字节串,相应的 stdout 和 stderr 也为字节串。通常,我们可以使用 encoding 指定编码方式来触发文本模式。

文本模式下,stdin 为字符串,传递给子进程时使用 encoding 指定的方式编码;相应的 stdout 和 stderr 将使用指定的编码方案解码为字符串。

In [1]: result = subprocess.run(['echo', "你好,山药鱼儿"], ...: capture_output=True, ...: encoding='utf-8') In [2]: result.stdout Out[2]: '你好,山药鱼儿\n'

调用 run 方法时还可以指定 timeout 参数,如果子进程没有在指定的时间内结束,子进程将会被杀死,并弹出 TimeoutExpired 异常。

In [1]: result = subprocess.run(['sleep', '10'], timeout=2) ... TimeoutExpired: Command '['sleep', '10']' timed out after 1.9998808240052313 seconds

你可能已经注意到 run 方法返回的 CompletedProcess 有一个 returncode 属性,用来保存子进程的退出状态码。

In [1]: result = subprocess.run( ...: ['python', "-m", "json.tool"], ...: input=b'{"name", "yamfish"}', ...: capture_output=True) In [2]: result.returncode Out[2]: 1 In [3]: result.check_returncode() ... CalledProcessError: Command '['python', '-m', 'json.tool']' returned non-zero exit status 1.

此外,你还可以在调用 run 函数时指定 check=True 来检查该状态码,如果退出状态码不为 0,主进程将抛出一个 CalledProcessError 异常。

In [1]: result = subprocess.run( ...: ['python', "-m", "json.tool"], ...: input=b'{"name", "yamfish"}', ...: capture_output=True, ...: check=True) CalledProcessError: Command '['python', '-m', 'json.tool']' returned non-zero exit status 1.

除了使用列表的形式传递需要子进程执行的命令行及其参数,还可以通过指定 shell=True ,以字符串的形式书写完整的命令行及参数。

In [1]: result = subprocess.run('echo "Hello Python"', shell=True) Hello Python In [2]: result = subprocess.run('exit 1', shell=True) In [3]: result Out[3]: CompletedProcess(args='exit 1', returncode=1)

当 shell 被指定为 True 时,将通过 Linux 系统中的 Shell 执行 args 参数接收的命令行字符串。

最后我们来看最核心的三个参数:stdin stdout stderr ,分别用来指定子进程的标准输入、标准输出和标准错误的文件句柄。通常它们的值会被设置为以下几种形式:

subprocess.PIPE 从管道读取标准输入,以及把标准输出/错误写入管道。

In [1]: result = subprocess.run(['echo', 'Hello Python!'], stdout=subprocess.PIPE) In [2]: result.stdout Out[2]: b'Hello Python!\n'

2. subprocess.DEVNULL 从 /dev/null 读取标准输入,以及把标准输出/错误重定向到 /dev/null 。

In [1]: result = subprocess.run(['echo', 'Hello Python!'], stdout=subprocess.DEVNULL) In [2]: result Out[2]: CompletedProcess(args=['echo', 'Hello Python!'], returncode=0)

空设备 dev/null: Linux 系统中一个特殊的文件,读取 /dev/null 将立即返回文件结束符 EOF,向它写入的任何数据将被彻底丢弃。该设备可以帮我们丢弃不需要的输出并提高系统的效率。

在上述交互式 Ipython Shell 中,我们不仅在 Shell 中看不到任何标准输出,就连返回的 result 也不具有 stdout 和 stderr 这样的属性。

3. 已经打开的文件描述符:从文件描述符读取输入,以及把标准输出/错误写入文件描述符。

In [1]: with open('data.json', 'wb') as f: ...: subprocess.run(['echo', '{"name": "yamfish"}'], stdout=f) ...:

查看当前目录,会看到生成了一个名为 data.json 的文件:

root@fish:/home/admin# cat data.json {"name": "yamfish"}

当然,我们也可以把该文件句柄做为参数传递给 stdin ,以便对其中的 JSON 字符串做格式化处理。

In [1]: with open('data.json', 'rb') as f: ...: result=subprocess.run(['python', '-m', 'json.tool'], stdin=f, capture_output=True) ...: In [2]: print(result.stdout.decode()) { "name": "yamfish" }

此外,还有一个 subprocess.STDOUT 可以传递给 stderr 参数,表示合并 stdout 与 stderr 的输出。为了演示合并输出,小鱼定义了一个 my.py 的 Python 脚本:

if __name__ == "__main__": print("Hello Python!") raise RuntimeError('Raised By Yam Fish!')

如果只捕获了标准输出,那么标准错误将会被打印在 Shell 中:

In [1]: result = subprocess.run(['python', 'my.py'], stdout=subprocess.PIPE) Traceback (most recent call last): File "my.py", line 3, in raise RuntimeError('Raised By Yam Fish!') RuntimeError: Raised By Yam Fish!

如果设置 stderr=subprocess.STDOUT ,标准错误将被合并到标准输出中。

In [1]: result = subprocess.run(['python', 'my.py'], stderr=subprocess.STDOUT, stdout=subprocess.PIPE) In [2]: print(result.stdout.decode()) Hello Python! Traceback (most recent call last): File "my.py", line 3, in raise RuntimeError('Raised By Yam Fish!') RuntimeError: Raised By Yam Fish! In [3]: result.stderr

我们知道子进程可以独立于父进程而运行,比如 run 函数启动的子进程在运行命令行时,就独立于 Python 解释器所在的父进程。

不过 run 函数在启动子进程后,该函数会一直阻塞,直到命令行执行完毕。也就是说父进程必须等待子进程退出后,才可以执行 run 函数后面的语句。

其实,run 函数是通过封装 subprocess 模块中的 Popen 对象来实现的,如果我们需要实现类似 UNIX 管道的高级用法,那么就需要我们去操作更底层的 Popen 类来启动子进程了。

二. 实例化 Popen 对象

我们在前面介绍的 run 函数的方法大多都同样适用于 Popen 类的实例化方法,如 args stdin stdout stderr encoding 等。

只有个别参数是 run 方法所特有的。

subprocess.run( *popenargs, input=None, capture_output=False, timeout=None, check=False, **kwargs, )

因此,接下来我们将重点学习 Popen 的实例方法。

使用 Popen 启动子进程之后,父进程并不需要阻塞等待子进程结束,而是可以去做别的任务。使用 poll 方法可以查询子进程是否已终止。

import time import subprocess proc = subprocess.Popen(['sleep', '3']) while proc.poll() is None: print('Do something others...') time.sleep(1) print('Return Code: ', proc.poll())

运行结果:Popen 的实例方法 poll 将在子进程结束后设置并返回进程的 returncode 属性。

(venv) root@fish:/home/admin# python poll.py Do something others... Do something others... Do something others... Return Code: 0

值得注意的是,和 run 方法返回的 CompletedProcess 对象不同的是,如果没有对 Popen 对象调用 poll communicate 或 wait 等方法获取子进程退出状态,那么在父进程退出前,将会存在僵尸进程。

其实我们不必去害怕僵尸进程,可以思考一下系统为什么会有“僵尸进程”这一种状态?

无论是我们之前提到的 Process 还是这里的 Popen ,父进程在创建子进程时都是异步的,并不会知道子进程的运行状态,而是在子进程退出时收到一个 SIGCHLD 信号来通知父进程子进程已经退出了。

如果父进程没有事先设置 SIG_IGN 忽略子进程退出的话,系统进程默认就不会回收子进程的退出后留下的 PCB 信息。这也使得父进程可以有机会回收并保存子进程的退出信息,而不是把它们一直残留在系统中。

提示:僵尸进程会随着父进程的退出而变为孤儿进程,被系统进程所回收。关于如何避免僵尸进程的产生,小鱼在基础篇的文章中已经做了详细讲解,此处不再赘述。

了解了僵尸进程存在的意义,我们就可以在 Ipython Shell 中使用 wait 方法获取上述僵尸进程的退出状态,并回收其残留在系统中的 PCB 信息。

In [1]: proc = subprocess.Popen(['sleep', '10']) In [2]: proc.wait() Out[2]: 0

现在,再次查看 atop 就看不到 sleep 命令行产生的僵尸进程啦~

把子进程从父进程中剥离以后,可以让程序平行地运行多条子进程。比如,下面的示例中就用 Popen 启用了 10 条平行的子进程。

import time import subprocess if __name__ == "__main__": start = time.time() sleep_proc_list = [] for _ in range(10): proc = subprocess.Popen(['sleep', '30']) sleep_proc_list.append(proc) for proc in sleep_proc_list: proc.communicate() end = time.time() print(f'总耗时 {(end - start):.3f} 秒!') time.sleep(60)

在主进程中,我们可以调用每条子进程的 communicate 方法,等待子进程终止。可以看到系统中启动了 10 条 sleep 命令行进程:

运行结果:这 10 条进程表现出了平行的效果,加入是顺序执行,那么至少需要 30 * 10 = 300 秒!

(venv) root@fish:/home/admin# python process1.py 总耗时 30.013 秒!

此外,communicate() 方法还提供了一个 timeout 参数,让我们有机会把陷入不正常的子进程停掉。比如,有的子进程可能由于某种原因卡在输入端或输出端,而无法结束。

在调用 communicate 方式时,如果子进程没有在指定的时间内结束,该方法将会抛出 TimeoutExpired 异常。

from subprocess import Popen, TimeoutExpired proc = Popen(['sleep', '10']) try: proc.communicate(timeout=1) except TimeoutExpired: proc.terminate() proc.wait() print('子进程退出状态:', proc.poll())

运行结果:

(venv) root@fish:/home/admin# python process2.py 子进程退出状态: -15

其中,terminate 方法会向子进程发送一个 SIGTERM 信号,终止子进程的运行,因此我们可以看到子进程的退出状态为 -15 。

除了 timeout 以外,communicate 还提供了一个可选的 input 参数,用来向子进程的标准输入写入数据。默认情况下,发送(stdin)和接收的(返回值 stdout, stderr)均为 bytes 类型。

下面的示例程序中,通过 encoding='utf-8' 启用了文本模式,此模式下 communicate 函数传入和返回的均为字符串类型。

import subprocess import json d = {'name': '山药鱼儿', 'age': 17, 'motto': '人能弘道,非道弘人!'} proc = subprocess.Popen( ['python', '-m', 'json.tool'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, encoding='utf-8') stdout, _ = proc.communicate(input=json.dumps(d)) print("格式化后的 JSON 字符串: ", stdout, sep='\n')

运行结果:

(venv) root@fish:/home/admin# python process3.py 格式化后的 JSON 字符串: { "name": "\u5c71\u836f\u9c7c\u513f", "age": 17, "motto": "\u4eba\u80fd\u5f18\u9053\uff0c\u975e\u9053\u5f18\u4eba\uff01" }

最后,我们再来看几个和信号有关的方法。

我们前面使用的 terminate() 方法会发送 SIGTERM 信号来终止子进程的执行,子进程可以捕获和处理该信号,执行相关的退出逻辑,关闭资源。

不过,对于 terminate() 无法关闭的进程,我们就需要用kill() 方法来向子进程发送 SIGKILL 信号以便强制杀死子进程,相当于我们在终端执行 kill -9 pid 命令。

In [1]: proc = subprocess.Popen(['sleep', '60']) In [2]: proc.kill()

注意:kill() 方法发送的 SIGKILL 信号无法被捕获/忽略,当程序收到该信号时,会立即终止进程,不会执行任何清理。

在前面介绍信号时,我们使用 os.kill(pid, signal) 函数向指定 PID 的进程发送一个指定的信号。Popen 为我们提供了一个实例方法 send_signal() 可以发送指定的信号给子进程。

In [1]: from signal import SIGTERM In [2]: proc = subprocess.Popen(['sleep', '60']) In [3]: proc.send_signal(SIGTERM) In [4]: proc.wait() Out[4]: -15

上述的 proc.send_signal(SIGTERM) 就相当于 proc.terminate() 。

三. 管道 PIPE 的高级应用

使用 subprocess 模块提供的 PIPE 管道,我们可以在 Python 程序中把数据通过管道发送给子进程所运行的外部命令,并通过管道将命令的输出结果读取到 Python 程序中。

命令行 openssl 是一个典型的具备输入、输出的命令行工具,可以对输入数据进行加密,输出加密后的结果。

root@fish:/home/admin# echo "hello Python!" > hello.txt root@fish:/home/admin# openssl aes-256-cbc -salt -pbkdf2 -a -in hello.txt -out hello.aes -pass pass:123456 root@fish:/home/admin# cat hello.aes U2FsdGVkX190jmTlsfh+CMr95vh0eETMjwOxvJIvR1c=

下面的示例程序中,小鱼使用 subprocess 模块中的 Popen 运行 openssl 这个命令行工具,并配置好相应的 IO 管道。

import os import subprocess def run_encrypt(input_data): env = os.environ.copy() env['password'] = 'YamFish' process = subprocess.Popen( ['openssl', 'aes-256-cbc', '-salt', '-pbkdf2', '-a', '-pass', 'env:password'], env=env, stdin=subprocess.PIPE, stdout=subprocess.PIPE ) process.stdin.write(input_data) process.stdin.flush() return process

其中写入标准输入的 input_data 应以字节序列的形式提供,proc.stdin.write 用于向管道写入数据,proc.stdin.flush() 确保子进程从管道读取到数据。

提示:命令行演示中加密密码使用了 pass:123456 明文的方式提供,代码中使用的是env:password,从环境变量 password 中读取。

为了测试上述定义的函数,我们来平行地运行三个子进程,并使用 communicate 方法获取子进程的标准输出。

if __name__ == "__main__": proc_list = [] for _ in range(3): data = os.urandom(10) print('原始字节序列:', data) proc = run_encrypt(data) proc_list.append(proc) for proc in proc_list: stdout, _ = proc.communicate() print('加密后的数据:', stdout)

其中,os.urandom(10) 用于生成一个长度为 10 的随机的字节序列,作为管道的输入数据。

提示:使用 Popen 时除了可以读取来自用户输入、文件句柄或网络套接字的数据,写入输入管道,也可以将某个文件描述符作为参数传递给 stdin 。

运行结果:

原始字节序列: b'\xecn\x8d\xe7\xdb\xb5\x08\x9ez\xca' 原始字节序列: b'\x8f\x0e\xa7\x1d\xca\x85\x18\xb3`\xfa' 原始字节序列: b'\xf5\xdb\xc3u\xf3:r\x1a]\xf6' 加密后的数据: b'U2FsdGVkX18eJVYU0WIYJxJMVv+L+166upg7es5YJ2I=\n' 加密后的数据: b'U2FsdGVkX19/bAh7Iq/LkVSQHsK8ZD5MgP4PZsoCYPI=\n' 加密后的数据: b'U2FsdGVkX1/ZVmID9XhYtA2Pwh10xEfMey/vvNaG+zc=\n'

经过测试,我们已经确定 run_encrypt 函数可以对输入的字节序列进行加密,并且我们可以从它返回的工作进程中读取加密结果。

下面,小鱼要介绍的重点来了!我们不仅可以向 PIPE 写入和读取数据,并且这种管道还可以像 UNIX 管道一样,能够把一条子进程的输出端同另一条子进程的输入端连接起来。

比如,我们将上述 openssl 加密后的结果通过管道,发送给 openssl dgst 计算 md5 值。

root@fish:/home/admin# openssl aes-256-cbc -salt -pbkdf2 -a -in hello.txt -pass pass:123456 | openssl dgst -md5 (stdin)= 0b2559e398faf960fe40465f991d3bd7

为了实现类似上述 UNIX 管道的操作,我们需要编写一个 run_md5 函数,使用输入的 input_stdin 作为子进程的标准输入。

def run_md5(input_stdin): return subprocess.Popen( ['openssl', 'dgst', '-md5'], stdin=input_stdin, stdout=subprocess.PIPE)

接下来,我们需要调整调用部分的代码:将上游进程 encrypt_proc 的标准输出作为下游进程的标准输入。

if __name__ == "__main__": encrypt_procs = [] md5_procs = [] for _ in range(3): data = os.urandom(10) print('原始字节序列:', data) encrypt_proc = run_encrypt(data) encrypt_procs.append(encrypt_proc) md5_proc = run_md5(encrypt_proc.stdout) md5_procs.append(md5_proc) encrypt_proc.stdout.close() encrypt_proc.stdout = None for proc in encrypt_procs: proc.communicate() assert proc.returncode == 0 for proc in md5_procs: stdout, _ = proc.communicate() assert proc.returncode == 0 print('最终结果:', stdout)

值得注意的是上游进程的 stdout 实例必须谨慎地处理,用它把相应的下游进程 md5_proc 启动之后,应将该实例及时关闭并设为 None 。

提示:及时关闭上游进程 的 stdout 实例,可以防止上游进程在调用 communicate() 方法时窃取下游进程的输入流。此外,如果下游进程死亡还允许 SIGPIPE 传播到上游,以终止进程。

提示:向一个没有读进程的管道写数据将触发 SIGPIPE 信号。

只要上下游的子进程都启动起来,它们之间的 IO 管道就会自动打通。我们只需要依次等待上游进程和下游进程结束,并从下游进程的标准输出读取最终结果即可。

运行结果如下:

(venv) root@fish:/home/admin# python process5.py 原始字节序列: b'q?q\xb4\xb0q/j\xa2\xca' 原始字节序列: b'H[!\xdb\xc3\x0b\xa4\x88}\xfa' 原始字节序列: b'\xf7XR\xc2i\xed/\x90\x81\xe3' 最终结果: b'(stdin)= 3ee6308d6a5bb43d7e9abc93d0b4a939\n' 最终结果: b'(stdin)= 62bad6e3bb3ac36640504bd5bce82e14\n' 最终结果: b'(stdin)= b82ee252f9614c0c2d1857bfaad80ba8\n'

通过 atop 命令,我们可以看到刚刚的程序平行地启动了六条 openssl 进程完成计算任务。

以上就是本篇文章的全部内容啦~

总的来说 subprocess 模块可以很方便地运行子进程执行系统命令,并允许我们管理子进程的输入流与输出流。因为这些子进程可以和 Python 解释器所在的进程并行,从而充分利用 CPU 的各个核心。

开启一个子进程最简单的方式是使用 subprocess.run函数,如果要使用类似 UNIX 管道的高级用法,或者平行地运行多条子进程,那么请移步至subprocess.Popen 。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有