博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
进程中的生产者消费者模型
阅读量:6567 次
发布时间:2019-06-24

本文共 3375 字,大约阅读时间需要 11 分钟。

# 多进程的生产者消费者模型# 队列    # 队列是进程安全的,同时只能有一个进程从队列中取到数据# 生产者消费者模型    # 为什么要这个模型        # 这个模型经常性的解决数据的供需不平衡的问题    # 经常有两拨人,一拨是生产数据的,一拨是消费数据的。        # 消费者指的是使用数据处理数据的一端        # 生产数据的一端生产的数据过快        # 当生产数据过快时,消费数据过慢时,可以弄出一个进程队列,将生产的数据放入到队列中,消费数据端可以开多个进程从这个进程队列中消费数据        # 当生产数据过慢时,消费数据过快时,可以弄出一个进程队列,多几个进程作为生产者,将生产的数据放到队列中,消费者从队列中消费数据# 生产者 消费者 模型# 生产者,每一个生产者就是一个进程,每一个消费者就是一个进程# 下面的例子,当生产者生产的东西全部生产完了以后,消费者进程还一直阻塞在队列那里等待数据,让消费者进程结束使用的方法是,在阻塞等待生产者执行后了后,向队列中插入了# None值,消费者判断队列中的值为none后,则退出消费者进程。但是因为进程队列是进程安全的,多个进程一个时间点上只能有一个进程获取到队列中的数据,所以当有一个# 消费者拿到队列中的None后,队列中就没有了None,其他消费者就拿不到None无法退出消费者进程,必须要有多个消费者就要向队列中丢多少# 个None才能让消费者进程全部结束# import time# import random# from multiprocessing import Process, Queue### def producer(name, food, q):#     '''#     生产者#     name:谁生产#     food:生产了什么东西#     q:生产出来的东西放到哪里#     :return:#     '''#     for i in range(4):#         time.sleep(random.randint(1, 3))    # 模拟生产数据的时间#         f = ('%s 生产了%s %s' % (name, food, i))   # 模拟生产数据#         print(f)    # 查看生产的数据#         q.put(f)    # 将生产的数据放入到进程队列中## def consumer(q, name):#     '''#     消费者#     q:数据在哪里#     name:谁来消费#     :return:#     '''#     while 1:#         food = q.get()#         if food is None:#             print('%s 获取到了一个空' % name)#             break#         print('\033[31m%s 消费了 %s\033[0m' % (name, food))#         time.sleep(random.randint(1, 3))## if __name__ == '__main__':#     q = Queue(20)#     p = Process(target=producer, args=('Egon', '包子', q))#     p.start()##     p1 = Process(target=producer, args=('wusir', '泔水', q))#     p1.start()##     p2 = Process(target=consumer, args=(q, 'why'))#     p2.start()##     p3 = Process(target=consumer, args=(q, 'Jbox'))#     p3.start()##     p.join()   # 阻塞等待生产者1进程结束#     p1.join()   # 阻塞等待生产者2进程结束##     q.put(None)#     q.put(None)# 下面的例子使用JoinableQueue进程队列实现生产者消费者模型,放入数据到队列中时,内部多了一个计数+的机制,从队列中取数据后,调用队列的task_done方法,将计数-1# 然后在生产者中使用队列的join阻塞等待队列的数据全部为空,全部被消费者消费掉,如果数据全被消耗掉了,生产者进程才会结束import timeimport randomfrom multiprocessing import Process, JoinableQueuedef producer(name, food, q):    '''    生产者    name:谁生产    food:生产了什么东西    q:生产出来的东西放到哪里    :return:    '''    for i in range(4):        time.sleep(random.randint(1, 3))    # 模拟生产数据的时间        f = ('%s 生产了%s %s' % (name, food, i))   # 模拟生产数据        print(f)    # 查看生产的数据        q.put(f)    # 将生产的数据放入到进程队列中,每放入一个数据到队列中后,机制中的count就会被+1    q.join()    # 阻塞等待队列中的数据全部被取走,q队列中的数据全部被取走则不再阻塞在这里def consumer(q, name):    '''    消费者    q:数据在哪里    name:谁来消费    :return:    '''    while 1:        food = q.get()        if food is None:            print('%s 获取到了一个空' % name)            break        print('\033[31m%s 消费了 %s\033[0m' % (name, food))        time.sleep(random.randint(1, 3))        q.task_done()   # 每从队列中消费了一个数据后,让机制中的conut计数减一if __name__ == '__main__':    q = JoinableQueue(20)    p = Process(target=producer, args=('Egon', '包子', q))    p.start()    p1 = Process(target=producer, args=('wusir', '泔水', q))    p1.start()    p2 = Process(target=consumer, args=(q, 'why'))    p2.daemon = True    # 设置为守护进程的目的是为了让主进程的代码执行完毕后,该守护进程的代码自动结束    p2.start()    p3 = Process(target=consumer, args=(q, 'Jbox'))    p3.daemon = True    # 设置为守护进程的目的是为了让主进程的代码执行完毕后,该守护进程的代码自动结束    p3.start()    p.join()   # 阻塞等待生产者1进程结束    p1.join()   # 阻塞等待生产者2进程结束    print('生产者进程结束,同时主进程要结束,因为其他两个消费者进程被设置为守护进程,所以主进程结束后,那两个消费者守护进程也会结束')

 

转载于:https://www.cnblogs.com/whylinux/p/9824705.html

你可能感兴趣的文章
让批处理运行不显示窗口的两个方法
查看>>
江苏省环保厅数据中心同城灾备建设项目
查看>>
hadoop 安全模式
查看>>
我的友情链接
查看>>
新手教程:用.htaccess实现二级域名功能
查看>>
How to attack a windows domain
查看>>
安装完Arch后,要安装的软件
查看>>
洛谷——P2035 iCow
查看>>
空类,虚函数类,虚继承类的空间大小
查看>>
sysaux表空间数据库块损坏/游离块的修复
查看>>
OSPF中stub area配置实例
查看>>
Exchange 2010 OAB无法更新
查看>>
CentOS系统中PHP和MySQL的升级方法
查看>>
Excel 统计IP
查看>>
pptpd的远程连接成功并使用
查看>>
javascript操作cookie
查看>>
我的友情链接
查看>>
部署CFCA_RA本地测试环境
查看>>
JAVA取属性
查看>>
我的友情链接
查看>>