一文读懂Python多进程 您所在的位置:网站首页 python简单加法程序 一文读懂Python多进程

一文读懂Python多进程

2023-04-08 23:43| 来源: 网络整理| 查看: 265

大家好,这里是Hard Open数据分析,我会分析一些数据分析相关的知识,也会偶尔发一些带货文(为了生活),欢迎大家点赞关注。(一)多进程的作用

有个客户找我写代码,需要对几十万条数据进行科学计算。客户已经写完代码了,但是几十万条数据实在跑不完,于是让我帮忙搞多进程。

多进程是将一个任务分成多个独立的子任务(进程),每个进程都有自己独立的内存空间和系统资源,可以并行运行。举个例子,明天要结课了,但你所有的作业都还没写,于是你发动影分身之术,变成100个你。一个去刷网课、一个去写第一章作业、一个去写第二章作业……最终,你本来需要3个月完成的任务,在一个晚上就完成了。

在这个任务中,多进程同样能派上用场。第一个进程处理1~10000个数据,第二个处理10001~20000个数据,依次类推……

不能泄露客户隐私,文章的代码例子为ChatGPT生成,经检查无误。

(二)python的多进程模块

在python中,我们用multiprocessing来处理多线程问题,multiprocessing基本用法如下:

导入 multiprocessing 模块中的 Process 类。定义一个函数,作为子进程要执行的任务。创建 Process 对象,指定要执行的任务和参数。调用 start() 方法启动子进程,开始执行任务。调用 join() 方法等待子进程执行完成,并回收资源。

下面是一个简单的利用 multiprocessing 模块创建两个子进程并进行加法计算的例子:

import multiprocessing ​ # 子进程要执行的任务 def add(a, b): result = a + b print(f"{a} + {b} = {result}") ​ if __name__ == '__main__': # 创建两个子进程,分别执行两次加法计算 p1 = multiprocessing.Process(target=add, args=(2, 3)) p2 = multiprocessing.Process(target=add, args=(5, 7)) ​ # 启动子进程 p1.start() p2.start() ​ # 等待子进程执行完成,并回收资源 p1.join() p2.join()

在这个例子中,首先定义了一个简单的加法计算函数 add(),然后创建了两个 Process 对象,分别指定要执行的任务和参数。接着调用 start() 方法启动子进程,开始执行相应的加法计算。最后调用 join() 方法等待子进程执行完毕,并回收资源。

需要注意的是,在 Windows 系统上,multiprocessing 模块需要使用 if __name__ == '__main__': 来保证主程序不会被子进程重复执行导致错误。

(三)进程间通信

进程间通信是多进程编程中一个非常重要的话题,因为在多进程环境下,不同进程之间需要共享数据、交换信息等。Python 中提供了多种进程间通信的机制,包括共享内存、队列、管道、文件锁等。

共享内存

共享内存是一种通过在不同进程之间共享相同的物理内存来实现通信的机制。在 Python 中,可以使用 multiprocessing.Value() 和 multiprocessing.Array() 函数创建共享内存对象。

Value() 函数用于创建一个指定类型的共享变量,并赋予一个初始值。例如:

import multiprocessing ​ # 创建一个整数型的共享变量,并赋值为0 counter = multiprocessing.Value('i', 0)

Array() 函数用于创建一个共享数组对象。例如:

import multiprocessing ​ # 创建一个包含10个整数的共享数组对象 arr = multiprocessing.Array('i', range(10))

以下是一个共享内存的简单例子:

import multiprocessing ​ # 子进程函数 def worker(counter): for i in range(5): counter.value += 1 ​ if __name__ == '__main__': # 创建一个共享整数型变量,并赋值为0 counter = multiprocessing.Value('i', 0) ​ # 创建多个子进程,并将共享变量传递给它们 processes = [multiprocessing.Process(target=worker, args=(counter,)) for i in range(4)] for p in processes: p.start() ​ # 等待所有子进程执行完成 for p in processes: p.join() ​ # 输出共享变量的值 print(counter.value)

在这个例子中,首先通过 Value() 函数创建了一个指定类型(此处为整数型)的共享变量 counter,并赋初值为0。然后创建了4个子进程,并将共享变量传递给它们。子进程中通过循环计数器加1。主进程等待所有子进程执行完成后,输出共享变量的值。需要注意的是,在使用共享内存时需要特别注意数据竞争和死锁等问题。

该例子实现了一个简单的计数器功能,多个进程同时对 counter 进行读写操作。因为多个进程之间共享同一块物理内存,所以在不同进程之间修改 counter 的值,相当于对同一块内存进行读写操作。因此,最终输出的结果应该是 20,即4个子进程每个执行了5次加1操作。

使用共享内存时需要特别注意多个进程同时访问同一块内存区域可能导致的数据竞争和死锁等问题。

队列(能用就好,不深入讲,容易混乱)

多个进程可以通过队列来传递消息和数据,常见的队列类型有 multiprocessing.Queue()

管道(能用就好,不深入讲,容易混乱)

两个进程之间可以通过管道进行双向通信。Python 中的 multiprocessing.Pipe() 函数可以创建管道对象,并返回两个连接对象,分别用于读取和写入数据。

文件锁

文件锁是一种基于文件系统的同步机制,可以防止多个进程同时对同一文件进行读写操作,从而保护数据的完整性。在 Python 中,可以使用 multiprocessing.Lock() 类创建文件锁对象,用于控制临界区代码的访问。

例如:

import multiprocessing ​ # 创建一个文件锁对象 lock = multiprocessing.Lock()

在需要保护的临界区代码块中,可以使用文件锁来保证同一时刻只有一个进程能够访问该区域。例如:

import multiprocessing ​ # 加锁的临界区代码 def critical_section(lock, data): lock.acquire() # 访问共享资源 print(f"data: {data}") lock.release() ​ if __name__ == '__main__': # 创建一个文件锁对象,并启动两个子进程 lock = multiprocessing.Lock() p1 = multiprocessing.Process(target=critical_section, args=(lock, 1)) p2 = multiprocessing.Process(target=critical_section, args=(lock, 2)) p1.start() p2.start() ​ # 等待子进程执行完成 p1.join() p2.join()

在这个例子中,创建了一个文件锁对象,并将其传递给两个子进程。critical_section() 函数代表一个需要加锁的临界区代码块,在访问共享资源前获取锁,访问完成后释放锁。需要注意的是,在使用文件锁时需要非常小心,避免出现死锁和竞争条件等问题。

(四)进程池和进程管理器进程池

进程池是一种用于处理批量运算任务的机制,它可以让多个进程共同完成一个耗时的计算任务。在 Python 中,可以使用 multiprocessing.Pool() 类创建进程池对象,该对象可以接受多个任务,并利用可用的 CPU 核心数自动分配并发执行这些任务,从而提高程序的运行效率。

import multiprocessing ​ # 定义任务函数 def task(x): return x*x ​ if __name__ == '__main__': # 创建一个拥有4个进程的进程池对象 pool = multiprocessing.Pool(processes=4) ​ # 向进程池提交10个任务 # 下面用的是map,在很多任务中还可以使用apply tasks = range(10) results = pool.map(task, tasks) ​ # 输出结果 print(results)

在这个例子中,首先通过 Pool() 函数创建了一个拥有4个进程的进程池对象 pool。然后通过 map() 方法向进程池提交10个任务,每个任务都是对一个数进行平方运算。进程池会自动将这些任务分配给可用的进程执行,并返回每个任务的结果。需要注意的是,在使用进程池时需要特别注意内存占用和任务分配等问题,避免出现阻塞或死锁等问题。

进程管理器

Manager() 是 Python multiprocessing 模块中提供的一个进程间通信机制,可以用于在不同进程之间共享数据。Manager() 函数创建的是一个进程间服务进程池,该进程池可以管理多个共享对象(如共享列表、字典等),并提供了一些方法来对这些共享对象进行访问和修改。

使用 Manager() 实现共享内存的流程如下:

创建一个进程间服务管道。在服务进程池中创建一个共享对象,例如共享列表。将共享对象传递给多个子进程,以实现数据共享。使用共享对象的方法对数据进行操作。关闭进程间服务管道,释放资源。

下面是一个简单的例子,演示如何使用 Manager() 实现共享内存:

import multiprocessing ​ # 子进程函数 def worker(lst): for i in range(len(lst)): lst[i] += 1 ​ if __name__ == '__main__': # 创建一个进程间服务管道,并在服务进程池中创建一个共享列表对象 manager = multiprocessing.Manager() lst = manager.list(range(10)) ​ # 创建多个子进程,并将共享列表对象传递给它们 processes = [multiprocessing.Process(target=worker, args=(lst,)) for i in range(4)] for p in processes: p.start() ​ # 等待所有子进程执行完成 for p in processes: p.join() ​ # 输出共享列表对象的内容 print(lst)

在这个例子中,首先通过 Manager() 函数创建了一个进程间服务管道,并在服务进程池中创建了一个共享列表对象 lst。然后创建了4个子进程,并将共享列表对象传递给它们。子进程中通过循环遍历共享列表对象,并将每个元素加1。主进程等待所有子进程执行完成后,输出共享列表对象的内容。需要注意的是,在程序结束时需要关闭进程间服务管道并释放资源,可以通过调用共享对象的 __del__() 方法来实现。

使用 Manager() 虽然方便,但是其性能可能会受到影响,因为在服务进程池中维护多个共享对象会消耗更多的资源。同时,由于数据的复制和传递,使用 Manager() 进行数据共享也可能会导致一定的延迟。因此,在实际应用中需要权衡利弊,选择合适的进程间通信机制。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有