使用队列的方式创建一个生产者与消费者

一、使用队列来构造一个生产者和消费者模型

生产者和消费者模式是设计模式中常见的一种设计模式,其中原理大致可以这样理解

  • 1、一个方法(或者类)负责产生数据,往一个媒介(现在我们使用的是队列)中添加数据
  • 2、消费者就不停的监听媒介中是否有数据,有就直径消费
  • 3、生产者也监听媒介中,如果没有数据了就制作数据,往媒介中添加

为了方便大家理解,可以参考生活中的场景: 我们去饭点吃饭

  • 你是一个消费者,服务器是一个媒介,厨房的厨师是生产者
  • 你下单,传递给服务员,服务员把单下给厨师,
  • 当厨师做好了,告诉服务员,服务员然后给你上菜

  • 1、定义一个生产者(生产包子)

      def Producer(name):
          count = 0
          while count < 10:
              print('开始制作包子')
              # 休眠当做制作包子需要时间
              time.sleep(random.randrange(3))
              q.put(count)
              print('生产者: %s 已经生产%s个包子' % (name, count))
              count += 1
              print('生产完成')
    
  • 2、定义一个消费者(吃包子)

      def Consumer(name):
          count = 0
          while count < 10:
              time.sleep(random.randrange(4))
              if not q.empty():
                  data = q.get()
                  print('消费者: %s 吃了%s包子' % (name, count))
              else:
                  print('目前还没生产包子')
              count += 1
    
  • 3、媒介(负责生产者和消费者之间通讯的,上面代码中已经使用了队列)

  • 4、完整代码

      import time
      import random
      import queue
      import threading
    
      def Producer(name):
          count = 0
          while count < 10:
              print('开始制作包子')
              # 休眠当做制作包子需要时间
              time.sleep(random.randrange(3))
              q.put(count)
              print('生产者: %s 已经生产%s个包子' % (name, count))
              count += 1
              print('生产完成')
    
      def Consumer(name):
          count = 0
          while count < 10:
              time.sleep(random.randrange(4))
              if not q.empty():
                  data = q.get()
                  print('消费者: %s 吃了%s包子' % (name, count))
              else:
                  print('目前还没生产包子')
              count += 1
    
      if __name__ == "__main__":
          q = queue.Queue()
          # 创建一个生产者
          p1 = threading.Thread(target=Producer, args=('生产工1',))
    
          # 创建一个消费者
          c1 = threading.Thread(target=Consumer, args=('消费者1',))
    
          p1.start()
          c1.start()
    

二、改造上面的生产者与消费者模型(增加几个消费者)

  • 1、具体代码实现(生产者就忙不过来了)

      import time
      import random
      import queue
      import threading
    
      def Producer(name):
          count = 0
          while count < 10:
              print('开始制作包子')
              # 休眠当做制作包子需要时间
              time.sleep(random.randrange(3))
              q.put(count)
              print('生产者: %s 已经生产%s个包子' % (name, count))
              count += 1
              print('生产完成')
    
      def Consumer(name):
          count = 0
          while count < 10:
              time.sleep(random.randrange(4))
              if not q.empty():
                  data = q.get()
                  print('消费者: %s 吃了%s包子' % (name, count))
              else:
                  print('目前还没生产包子')
              count += 1
    
      if __name__ == "__main__":
          q = queue.Queue()
          # 创建一个生产者
          p1 = threading.Thread(target=Producer, args=('生产工1',))
    
          # 创建一个消费者
          consumerList = []
          for i in range(5):
              consumerList.append(threading.Thread(target=Consumer, args=('消费者{0}'.format(i),)))
    
          for t in consumerList:
              t.start()
    
          p1.start()
    

三、在生产者与消费者模型中使用信号量

  • 1、上面介绍队列的时候讲到q.task_done()q.join()方法
  • 2、具体代码实现

      import time
      import random
      import queue
      import threading
    
      def Producer(name):
          count = 0
          while count < 10:
              print('开始制作包子')
              # 休眠当做制作包子需要时间
              time.sleep(random.randrange(3))
              q.put(count)
              print('生产者: %s 已经生产%s个包子' % (name, count))
              count += 1
              # 发送一个信号告诉媒介已经生产好了
              q.task_done()
              print('生产完成')
    
      def Consumer(name):
          count = 0
          while count < 10:
              time.sleep(random.randrange(4))
              if not q.empty():
                  # 接收信号等待
                  q.join()
                  data = q.get()
    
                  print('消费者: %s 吃了%s包子' % (name, count))
              else:
                  print('目前还没生产包子')
              count += 1
    
      if __name__ == "__main__":
          q = queue.Queue()
          # 创建一个生产者
          p1 = threading.Thread(target=Producer, args=('生产工1',))
    
          # 创建一个消费者
          consumerList = []
          for i in range(5):
              consumerList.append(threading.Thread(target=Consumer, args=('消费者{0}'.format(i),)))
    
          for t in consumerList:
              t.start()
    
          p1.start()
    

results matching ""

    No results matching ""