Python进程间通信 您所在的位置:网站首页 多进程通信 Python进程间通信

Python进程间通信

2023-04-11 17:19| 来源: 网络整理| 查看: 265

1. 前言

进程是操作系统进行资源分配的最小单位,进程之间是相互独立的。在多进程开发过程中,往往需要实现进程之间的通信。在本文,将讲述进程之间进行通信的方法。

2. 队列2.1 Queue

Queue类用于进程间资源共享的队列,能够在多进程之间进行数据传递。支持多个生产者和消费者,并且内部自动实现了同步机制,保证线程安全。

maxsize为队列的最大长度,即队列中最多能放入的元素个数。若构造时maxsize省略,则队列大小无限制,在内存等限制内,可以一直向Queue中放置元素。

方法:

put方法

put(self, obj, block=True, timeout=None)

用于向Queue中放置元素,向队列中放置的元素类型不要求是同一数据类型。block表示阻塞等待,timeout为超时时间,默认为None,表示永不超时。block为True,timeout为None,表示向队列中放入元素时,若队列已满,则会一直阻塞等待,直到队列中有空间;block为True,timeout为N(N > 0),表示向队列中放入元素时,若队列已满,则会进行阻塞等待,若等待N秒后队列仍然没有空间,则会抛出Queue.Full异常。若block为False,表示非阻塞等待,若向队列中放入元素,且队列已满,则会立即抛出Queue.Full异常。

get方法

get(self, block=True, timeout=None)

用于从Queue中取出元素。block表示阻塞等待,timeout为超时时间,默认为None,表示永不超时。block为True,timeout为None,表示尝试从队列中取元素时,若队列已空,则会一直阻塞等待,直到队列中有元素;block为True,timeout为N(N > 0),表示尝试从队列中取出元素时,若队列已空,则会进行阻塞等待,若等待N秒后队列仍然没有元素,则会抛出Queue.Empty异常。若block为False,表示非阻塞等待,若从队列中取出元素,但队列为空,则会立即抛出Queue.Empty异常。

其他方法

get_nowait(self) # 同get(self, block=False) put_nowait(self, obj) # 同put(self, obj, block=False) qsize(self) # 获取queue中元素个数 empty(self) # 队列是否为空 full(self) # 队列是否满 close(self) # 关闭队列

示例:

import os import time from multiprocessing import Process, Queue def producer_function(q: Queue): for i in range(3): if not q.full(): msg = f'msg: {i}' q.put(msg) print(f'producer:{os.getpid()} put {msg}') time.sleep(1) for i in range(3): if not q.full(): msg = f'msg: {i}' test_list = [msg] q.put(test_list) print(f'producer:{os.getpid()} put {test_list}') time.sleep(1) def consumer_function(q: Queue): try: while True: msg = q.get(block=True, timeout=3) print(f'consumer: {os.getpid()} get {msg}') time.sleep(2) except Exception as e: print(f'queue: {q} is empty') if __name__ == '__main__': print(f'begin test ...') q = Queue() producer = Process(target=producer_function, args=(q, ), name='producer') producer1 = Process(target=producer_function, args=(q, ), name='producer1') consumer = Process(target=consumer_function, args=(q, ), name='consumer') consumer1 = Process(target=consumer_function, args=(q, ), name='consumer1') producer.start() producer1.start() consumer.start() consumer1.start() producer.join() producer1.join() consumer.join() consumer1.join() print(f'finish test ...')

2.2 SimpleQueue

SimpleQueue是一个轻量级队列,只提供了get、put、empty、close方法,仅用于在单个生产者和单个消费者之间传递消息。SimpleQueue有更好的性能和内存效率。

2.3 JoinableQueue

JoinableQueue继承自Queue,重写了方法put、join,新增了task_done方法。

3. 管道

构造函数:

def Pipe(duplex: bool = ...) -> tuple[_ConnectionBase, _ConnectionBase]: ...

duplex为True,表明创建的管道为全双工管道,数据可以在管道内双向流动,可以在管道的两端分别进行读写;duplex为False,表明创建的管道为半双工管道,数据在管道内只能单向流动,只能在管道一端写入数据,另外一段读取数据。创建管道的函数返回的是两个_ConnectionBase对象,可以通过这两个_ConnectionBase对管道进行操作。

方法

send(obj):通过连接发送对象。obj是与序列化兼容的任意对象 def send(self, obj): """Send a (picklable) object""" recv():接收send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError def recv(self): """Receive a (picklable) object"""close():关闭连接。fileno():返回连接使用的文件描述符或者连接的句柄 def fileno(self): """File descriptor or handle of the connection"""poll([timeout]):如果连接有数据可读取,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。 def poll(self, timeout=0.0): """Whether there is any input available to be read"""send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用recv_bytes()函数进行接收recv_bytes([maxlength]):接收send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。

Queue与Pipe的区别

Queue是在Pipe的基础之上实现的;Queue只能单向,Pipe可以是全双工的;Queue有超时机制,Pipe没有;Queue可以由用户设置,而Pipe则是只能由操作系统设定


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有