# 多进程的生产者消费者模型# 队列 # 队列是进程安全的,同时只能有一个进程从队列中取到数据# 生产者消费者模型 # 为什么要这个模型 # 这个模型经常性的解决数据的供需不平衡的问题 # 经常有两拨人,一拨是生产数据的,一拨是消费数据的。 # 消费者指的是使用数据处理数据的一端 # 生产数据的一端生产的数据过快 # 当生产数据过快时,消费数据过慢时,可以弄出一个进程队列,将生产的数据放入到队列中,消费数据端可以开多个进程从这个进程队列中消费数据 # 当生产数据过慢时,消费数据过快时,可以弄出一个进程队列,多几个进程作为生产者,将生产的数据放到队列中,消费者从队列中消费数据# 生产者 消费者 模型# 生产者,每一个生产者就是一个进程,每一个消费者就是一个进程# 下面的例子,当生产者生产的东西全部生产完了以后,消费者进程还一直阻塞在队列那里等待数据,让消费者进程结束使用的方法是,在阻塞等待生产者执行后了后,向队列中插入了# None值,消费者判断队列中的值为none后,则退出消费者进程。但是因为进程队列是进程安全的,多个进程一个时间点上只能有一个进程获取到队列中的数据,所以当有一个# 消费者拿到队列中的None后,队列中就没有了None,其他消费者就拿不到None无法退出消费者进程,必须要有多个消费者就要向队列中丢多少# 个None才能让消费者进程全部结束# import time# import random# from multiprocessing import Process, Queue### def producer(name, food, q):# '''# 生产者# name:谁生产# food:生产了什么东西# q:生产出来的东西放到哪里# :return:# '''# for i in range(4):# time.sleep(random.randint(1, 3)) # 模拟生产数据的时间# f = ('%s 生产了%s %s' % (name, food, i)) # 模拟生产数据# print(f) # 查看生产的数据# q.put(f) # 将生产的数据放入到进程队列中## def consumer(q, name):# '''# 消费者# q:数据在哪里# name:谁来消费# :return:# '''# while 1:# food = q.get()# if food is None:# print('%s 获取到了一个空' % name)# break# print('\033[31m%s 消费了 %s\033[0m' % (name, food))# time.sleep(random.randint(1, 3))## if __name__ == '__main__':# q = Queue(20)# p = Process(target=producer, args=('Egon', '包子', q))# p.start()## p1 = Process(target=producer, args=('wusir', '泔水', q))# p1.start()## p2 = Process(target=consumer, args=(q, 'why'))# p2.start()## p3 = Process(target=consumer, args=(q, 'Jbox'))# p3.start()## p.join() # 阻塞等待生产者1进程结束# p1.join() # 阻塞等待生产者2进程结束## q.put(None)# q.put(None)# 下面的例子使用JoinableQueue进程队列实现生产者消费者模型,放入数据到队列中时,内部多了一个计数+的机制,从队列中取数据后,调用队列的task_done方法,将计数-1# 然后在生产者中使用队列的join阻塞等待队列的数据全部为空,全部被消费者消费掉,如果数据全被消耗掉了,生产者进程才会结束import timeimport randomfrom multiprocessing import Process, JoinableQueuedef producer(name, food, q): ''' 生产者 name:谁生产 food:生产了什么东西 q:生产出来的东西放到哪里 :return: ''' for i in range(4): time.sleep(random.randint(1, 3)) # 模拟生产数据的时间 f = ('%s 生产了%s %s' % (name, food, i)) # 模拟生产数据 print(f) # 查看生产的数据 q.put(f) # 将生产的数据放入到进程队列中,每放入一个数据到队列中后,机制中的count就会被+1 q.join() # 阻塞等待队列中的数据全部被取走,q队列中的数据全部被取走则不再阻塞在这里def consumer(q, name): ''' 消费者 q:数据在哪里 name:谁来消费 :return: ''' while 1: food = q.get() if food is None: print('%s 获取到了一个空' % name) break print('\033[31m%s 消费了 %s\033[0m' % (name, food)) time.sleep(random.randint(1, 3)) q.task_done() # 每从队列中消费了一个数据后,让机制中的conut计数减一if __name__ == '__main__': q = JoinableQueue(20) p = Process(target=producer, args=('Egon', '包子', q)) p.start() p1 = Process(target=producer, args=('wusir', '泔水', q)) p1.start() p2 = Process(target=consumer, args=(q, 'why')) p2.daemon = True # 设置为守护进程的目的是为了让主进程的代码执行完毕后,该守护进程的代码自动结束 p2.start() p3 = Process(target=consumer, args=(q, 'Jbox')) p3.daemon = True # 设置为守护进程的目的是为了让主进程的代码执行完毕后,该守护进程的代码自动结束 p3.start() p.join() # 阻塞等待生产者1进程结束 p1.join() # 阻塞等待生产者2进程结束 print('生产者进程结束,同时主进程要结束,因为其他两个消费者进程被设置为守护进程,所以主进程结束后,那两个消费者守护进程也会结束')