线程同步

一、实现线程同步的方式有

  • 1、线程锁
  • 2、使用Event同步
  • 3、使用Semaphore(信号量)

二、使用加锁的方式实现线程同步(代码见线程锁)

三、使用Event同步

  • 1、python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法waitclearset。 事件处理的机制:全局定义了一个Flag, 如果Flag值为False,那么当程序执行 event.wait 方法时就会阻塞,如果Flag值为True,那么event.wait方法时便不再阻塞。

    • clear:将Flag设置为False
    • set:将Flag设置为True
  • 2、具体实现代码案例

      import threading
      import time
    
      class Boss(threading.Thread):
          def run(self):
              print("BOSS:今晚大家都要加班到22:00。")
              print(event.isSet())
              event.set()
              print(event.isSet())
              time.sleep(5)
              print("BOSS:<20:00>可以下班了。")
              print(event.isSet())
              event.set()
    
      class Worker(threading.Thread):
          def run(self):
              event.wait()
              print("Worker:哎……命苦啊!")
              time.sleep(1)
              event.clear()
              event.wait()
              print("Worker:OhYeah!")
    
      if __name__ == "__main__":
          event = threading.Event()
          threads = []
          for i in range(5):
              threads.append(Worker())
              threads.append(Boss())
    
          for t in threads:
              t.start()
    
          for t in threads:
              t.join()
    

四、使用信号量(Semaphore)实现同步(有点类似线程池,同时最多多少线程抢占资源)

  • 1、信号量用来控制线程并发数的,BoundedSemaphoreSemaphore管理一个内置的计数器,每当调用acquire()时-1,调用release()时+1。计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()
  • 2、BoundedSemaphoreSemaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。

  • 3、具体实现代码

      import threading
      import time
    
      class MyThread(threading.Thread):
              def __init__(self, sem):
                      super().__init__()
                      self.semaphore = sem
    
              def run(self):
                      self.semaphore.acquire()
                      print(self.name)
                      time.sleep(5)
                      self.semaphore.release()
    
      if __name__ == "__main__":
              # 限制多线程每次只能执行5个,有点类似今后介绍的线程池一样的
              semaphore = threading.Semaphore(5)
              thread_list = []
              for i in range(100):
                      thread_list.append(MyThread(semaphore))
    
              for t in thread_list:
                      t.start()
    
              """
              执行结果:每隔5秒打印5个,
              """
    
  • 4、使用信号量模仿一个爬虫抓取网页数据

      import threading
      import time
    
      class HtmlSpider(threading.Thread):
              """
              获取页面数据
              """
    
              def __init__(self, url, sem):
                      super().__init__()
                      self.url = url
                      self.sem = sem
    
              def run(self):
                      time.sleep(1)
                      print('获取页面内容成功:{}'.format(self.url))
                      self.sem.release()
    
      class UrlProducer(threading.Thread):
              """
              获取url链接的
              """
    
              def __init__(self, sem):
                      super().__init__()
                      self.sem = sem
    
              def run(self):
                      for i in range(20):
                              self.sem.acquire()
                              html_thread = HtmlSpider('https://www.baidu.com/{}'.format(i), self.sem)
                              html_thread.start()
    
      if __name__ == "__main__":
              sem = threading.Semaphore(3)
              url_producer = UrlProducer(sem)
              url_producer.start()
    

五、使用condition条件变量实现线程同步(复杂的线程间同步)

  • Condition(条件变量)通常与一个锁关联。需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。
  • 可以认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于状态图中的等待阻塞状态,直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定。

  • Condition()的方法:

    • acquire(): 线程锁
    • release(): 释放锁
    • wait(timeout): 线程挂起,直到收到一个notify通知或者超时(可选的,浮点数,单位是秒s)才会被唤醒继续运行。wait()必须在已获得Lock前提下才能调用,否则会触发RuntimeError
    • notify(n=1): 通知其他线程,那些挂起的线程接到这个通知之后会开始运行,默认是通知一个正等待该condition的线程,最多则唤醒n个等待的线程。notify()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。notify()不会主动释放Lock
    • notifyAll(): 如果wait状态线程比较多,notifyAll的作用就是通知所有线程
  • 具体实现代码

      import threading
      import time
    
      num = 0
    
      con = threading.Condition()
    
      class Producer(threading.Thread):
              """
              定义一个生产者类
              """
    
              def run(self):
                      global num
                      con.acquire()
                      while True:
                              print('开始生产')
                              num += 1
                              time.sleep(1)
                              print('当前数量:{}'.format(num))
                              if num >= 5:
                                      print('当前数量已经到了5个,不能再添加了')
                                      # 唤醒在等待的线程
                                      con.notify()
                                      # 等待别的线程通知
                                      con.wait()
                      con.release()
    
      class Consumers(threading.Thread):
              """
              定义一个消费者的类
              """
    
              def run(self):
                      global num
                      con.acquire()
                      while True:
                              print('开始吃东西了')
                              num -= 1
                              time.sleep(2)
                              print('当前数量:{}'.format(num))
                              if num <= 0:
                                      print('已经吃完了,要通知生产')
                                      # 唤醒在等待的线程
                                      con.notify()
                                      # 等待别的线程通知
                                      con.wait()
                      con.release()
    
      if __name__ == "__main__":
              p = Producer()
              c = Consumers()
              p.start()
              c.start()
    

results matching ""

    No results matching ""