曾经有这么一个说法,程序中存在3种类型的bug:你的bug、我的bug和多线程。 这虽然是句调侃,但从某种程度上道出了一个事实:多线程编程不是件容易的事情。 线程间的同步和互斥,线程间数据的共享等这些都是涉及线程安全要考虑的问题。 纵然Python中提供 了众多的同步和互斥机制,如 mutex 、 condition 、 event 等. 但同步和互斥本身就不是一个容易的话题,稍有不慎就会陷人死锁状态或者威胁线程安全。
来看一个经典的多线程同步问题:生产者和消费者模型。 如果用Python来实现,会怎么写?大概思路是这样的:分别创建消费者和生产者线程, 生产者往队列里去放产品,消费者从队列里面取出产品,创建一个线程锁以保证线程间操作的互斥性。
当队列满的时候消费者进入等待状态,当队列空的时候生产者进入等待状态。看一个具体的Python实现:
import queue
import threading
import random
创建锁对象用于控制输出:
writelock = threading.Lock()
class Producer(threading.Thread):
def __init__(self,q,con,name):
super(Producer, self).__init__()
self.q = q
self.name = name
self.con = con
print ("Producer"+self.name+"Started")
def tun (self):
while 1:
global writelock
self.con.acquire () # 获取锁对象
if self.q.full(): # 队列满
with writelock: # 输出信息
print('Queue is full,producer wait!')
self.con.wait() # 等待资源
else:
value = random.randint(0,10)
with writelock:
print (self.name +" put value "+ self.name+":"+ str(value)+"into queue")
self.q.put((self.name+" : "+str(value))) # 放入队列中
self.con.notify() # 通知消费者
self.con. release() # 释放锁对象
class Consumer(threading.Thread):
def __init__(self, q,con,name):
super(Consumer, self).__init__()
self.q = q
self.con = con
self.name = name
print ("Consumer " + self.name +"started \n ")
def run(self):
while 1:
global writelock
self.con.acquire()
if self.q.empty():
with writelock:
print('queue is empty,consumer wait!')
self.con.wait()
else:
value = self.q.get()
with writelock:
print (self.name +"get value"+value+"from queue")
self.con.notify()
self.con.release()
q = queue.Queue(10)
con = threading.Condition() # 条件变量锁
p = Producer(q,con,"P1")
p.start()
p1 = Producer(q,con,"P2")
p1.start()
c1 = Consumer(q,con,"C1")
c1.start()
ProducerP1Started ProducerP2Started Consumer C1started queue is empty,consumer wait!
上面的程序实现有什么问题吗?回答这个问题之前。先来了解一下 Queue 模块的基本知识。
Queue 模块
Python中的 Queue 模块提供了3种队列:
Queue.Queue(maxsize):先进先出,maxsize为队列大小,其值为非正数的时候为无限循环队列。Queue.LifoQueue(maxsize):后进先出,相当于栈。Queue.PriorityQueue(maxsize):优先级队列。
这3种队列支持以下方法:
Queue.qsize():返回近似的队列大小。注意,这里之所以加“近似”二字, 是因为当该值>0的时候并不保证并发执行的时候get()方法不被阻塞,同样,对于put()方法有效。Queue.empty():列队为空的时候返回True,否则返回False。Queue.full():当设定了队列大小的情况下,如果队列满则返回True,否则返回False。Queue.put(item[, block[,timeout]]):往队列中添加元素item,block设置为False的时候。 如果队列满则抛出Full异常。如果block设置为True,timeout为None的时候则会一直等待直到有空位置, 否则会根据timeout的设定超时后抛出Full异常。Queue.put_nowait(item):等价于put(item,False).block设置为False的时候, 如果队列空则抛出Empty异常。如果block设置为True、timeout为None的时候则会一直等待直到有元素可用,否则会根据timeout的设定起时后抛出Empty异常。Queue.get([block[,timeout]]):从队列中删除元素并返回该元素的值。Queue.get_nowait():等价于get(False)。Queue.task_done():发送信号表明入列任务已经完成,经常在消费者线程中用到。Queue.join():阻塞直至队列中所有的元素处理完毕。
Queue 模块更安全
Queue 模块实现了多个生产者多个消费者的队列,当多线程之间需要信息安全的交换的时候特别有用, 因此这个模块实现了所需要的锁原语,为Python多线程编程提供了有力的支持,它的线程是安全的。 需要注意的是 Queue 模块中的列队利 collections.deque 所表示的队列并不一样, 前者主要用于不同线程之间的通信,它内部实现了线程的锁机制; 而后者主要是数据结构上的概念,因此支持 in 方法。 再回过头来看看前面的例子,程序的实现有什么问题呢?答案很明显, 作用于 queue 操作的条件变量完全是不需要的,因为 queue 本身能够保证线程安全, 因此不需要额外的同步机制。那么,该如何修改呢?请读者自行思考。 下面的多线程下载的例子也许有助于完成上面程序的修改。
import os
import queue
import threading
import urllib.request
class DownloadThread(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
url = self.queue.get() # 从队列中取出一个url元素
print(self.name+"begin download"+ url +"...")
self.download_file(url) # 进行文件下载
self.queue.task_done() # 下载完毕发送信号
print(self.name + "download comleted!!!")
def download_file(self,url):
urlhandler = urllib.request.urlopen(url)
fname = os .path .basename (url)+".html " #文件名称
with open (fname,"wb") as f: # 打开文件
while True:
chunk = urlhandler.read(1024)
if not chunk: break
f.write(chunk)
# if __name__ == "__main__":
# urls = [ "http://wiki.python.org/moin/WebProgramming",
# "https://www.createspace.com/3611970",
# "http ://wiki.python.org/moin/Documentation"
# ]
# queue = queue.Queue()
# # create a thread pool and give them a queue
# for i in range(5):
# t=DownloadThread(queue) #启动5个技程同时进行下4E
# t.setDaemon(True)
# t.start()
# # giva the queue some data
# for url in urls:
# queue.put(url)
# # wait for the queue to finish
# queue.join()