我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Python进程间通讯与进程池超详细讲解

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Python进程间通讯与进程池超详细讲解

在《多进程并发与同步》中介绍了进程创建与信息共享,除此之外python还提供了更方便的进程间通讯方式。

进程间通讯

multiprocessing中提供了Pipe(一对一)和Queue(多对多)用于进程间通讯。

队列Queue

队列是一个可用于进程间共享的Queue(内部使用pipe与锁),其接口与普通队列类似:

put(obj[, block[, timeout]]):插入数据到队列(默认阻塞,且没有超时时间);

  • 若设定了超时且队列已满,会抛出queue.Full异常;
  • 队列已关闭时,抛出ValueError异常

get([block[, timeout]]):读取并删除一个元素;

  • 若设定了超时且队列为空,会抛出queue.Empty异常;
  • 队列已关闭时,抛出ValueError异常;若已阻塞后,再关闭则会一直阻塞;

qsize():返回一个近似队列长度(因多进程原因,长度会有误差);

empty()/full():队列空或慢(因多进程原因,会有误差);

close():关闭队列;

当主进程(创建Queue的)关闭队列时,子进程中的队列并没有关闭,所以getElement进程会一直阻塞等待(为保证能正常退出,需要设为后台进程):

def putElement(name, qu: multiprocessing.Queue):
    try:
        for i in range(10):
            qu.put(f"{name}-{i + 1}")
            time.sleep(.1)
    except ValueError:
        print("queue closed")
    print(f"{name}: put complete")
def getElement(name, qu: multiprocessing.Queue):
    try:
        while True:
            r = qu.get()
            print(f"{name} recv: {r}")
    except ValueError:
        print("queue closed")
    print(f"{name}: get complete")
if __name__ == '__main__':
    qu = multiprocessing.Queue(100)
    puts = [multiprocessing.Process(target=putElement, args=(f"send{i}", qu)) for i in range(10)]
    gets = [multiprocessing.Process(target=getElement, args=(f"recv{i}", qu), daemon=True) for i in range(2)]
    list(map(lambda f: f.start(), puts))
    list(map(lambda f: f.start(), gets))
    for f in puts:
        f.join()
    print("To close")
    qu.close() # 只是main中的close了,其他进程中的并没有

管道Pipe

multiprocessing.Pipe([duplex])返回一个连接对象对(conn1, conn2)。若duplex为True(默认),创建的是双向管道;否则conn1只能用于接收消息,conn2只能用于发送消息:

  • send():发送消息;
  • recv():接收消息;

进程间的Pipe基于fork机制建立:

  • 主进程创建Pipe:Pipe的两个Connections连接的的都是主进程;
  • 创建子进程后,Pipe也被拷贝了一份:此时有了4个Connections;
  • 主进程关闭一个Out Connection,子进程关闭一个In Connection:就建立好了一个输入在主进程,输出在子进程的管道。
def pipeProc(pipe):
    outPipe, inPipe = pipe
    inPipe.close() # 必须关闭,否则结束时不会收到EOFError异常
    try:
        while True:
            r = outPipe.recv()
            print("Recv:", r)
    except EOFError:
        print("RECV end")
if __name__ == '__main__':
    outPipe, inPipe = multiprocessing.Pipe()
    sub = multiprocessing.Process(target=pipeProc, args=((outPipe, inPipe),))
    sub.start()
    outPipe.close() # 必须在进程成功运行后,才可关闭
    with inPipe:
        for x in range(10):
            inPipe.send(x)
            time.sleep(.1)
    print("send complete")
    sub.join()

进程池Pool

虽然使用多进程能提高效率,但进程的创建与销毁会消耗较长时间;同时,过多进程会引起频繁的调度,也增加了开销。

进程池中有固定数量的进程:

  • 请求到来时,从池中取出一个进程来处理任务;理完毕后,进程并不立即关闭,而是再放回进程池中;
  • 当池中进程数量不够,请求就要等待,直到拿到空闲进程后才能继续执行;
  • 池中进程的数量是固定的,隐藏同一时间最多有固定数量的进程在运行。

multiprocessing.Pool([processes[, initializer[, initargs]]])

  • processes:要创建进程数量(默认os.cpu_count()个),在需要时才会创建;
  • initializer(*initargs):每个工作进程启动时执行的方法(一般processes为几就执行几次);

Pool类中主要方法:

  • apply(func[, args[, kwds]]):以阻塞方式,从池中获取进程并执行func(*args,**kwargs)
  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]):异步方式(从池中获取一个进程)执行func(*args,**kwargs),返回AsyncResult;
  • map(func, iterable[, chunksize])/map_async:map的并行版本(可同时处理多个任务),异步时返回MapResult;
  • starmap(func, iterable[, chunksize])/starmap_async:与map的区别是允许传入多个参数;
  • imap(func, iterable[, chunksize]):map的惰性版本(返回结果是可迭代对象),内存消耗会低些,返回迭代器IMapIterator;
  • imap_unordered(func, iterable[, chunksize]):imap返回的结果顺序与map顺序是相同的,而此方法返回的顺序是乱序的(不依次等待每个任务完成,先完成的先返回),返回迭代器IMapIterator;
  • close():关闭,禁止继续提交任务(已提交任务会继续执行完成);
  • terminate():立即终止所有任务;
  • join():等待工作进程完成(必须已close或terminate了);
def poolWorker():
    print(f"worker in process {os.getpid()}")
    time.sleep(1)
def poolWorkerOne(name):
    print(f"worker one {name} in process {os.getpid()}")
    time.sleep(random.random())
    return name
def poolWorkerTwo(first, second):
    res = first + second
    print(f"worker two {res} in process {os.getpid()}")
    time.sleep(1./(first+1))
    return res
def poolInit():
    print("pool init")
if __name__ == '__main__':
    workers = multiprocessing.Pool(5, poolInit) # poolInit会被调用5次(线程启动时)
    with workers:
        for i in range(5):
            workers.apply_async(poolWorker)
        arg = [(i, i) for i in range(10)]
        workers.map_async(poolWorkerOne, arg)
        results = workers.starmap_async(poolWorkerTwo, arg) # 每个元素(元组)会被拆分为独立的参数
        print("Starmap:", results.get())
        results = workers.imap_unordered(poolWorkerOne, arg)
        for r in results: # r是乱序的(若使用imap,则与输入arg的顺序相同)
            print("Unordered:", r)
    # 必须保证workers已close了
    workers.join()

到此这篇关于Python进程间通讯与进程池超详细讲解的文章就介绍到这了,更多相关Python进程间通讯内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Python进程间通讯与进程池超详细讲解

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Python进程间通讯与进程池超详细讲解

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块主要通过队列方式,队列:队列类似于一条管道,元素先进先出,需要注意的一点是:队列都是在内存中操作,进程退出,队列清空,另外,队列也是一个阻塞的形态
2022-12-23

Python多进程并发与同步机制超详细讲解

进程(Process),顾名思义,就是进行中的程序。有一句话说得好:程序是一个没有生命的实体,只有处理器赋予程序生命时,它才能成为一个活动的实体。进程是资源分配的最小单元,也就是说每个进程都有其单独的内存空间
2022-12-23

Python进程间通信之共享内存详解

前一篇博客说了怎样通过命名管道实现进程间通信,但是要在windows是使用命名管道,需要使用python调研windows api,太麻烦,于是想到是不是可以通过共享内存的方式来实现。查了一下,Python中可以使用mmap模块来实现这一功
2022-06-04

Android Activity与Service通信(不同进程之间)详解

在Android中,Activity主要负责前台页面的展示,Service主要负责需要长期运行的任务,所以在我们实际开发中,就会常常遇到Activity与Service之间的通信,我们一般在Activity中启动后台Service,通过In
2022-06-06

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录