曾经有这么一个说法,程序中存在3种类型的bug:你的bug、我的bug和多线程。
这虽然是句调侃,但从某种程度上道出了一个事实:多线程编程不是件容易的事情。
线程间的同步和互斥,线程间数据的共享等这些都是涉及线程安全要考虑的问题。
纵然Python中提供 了众多的同步和互斥机制,如 mutex
、 condition
、 event
等.
但同步和互斥本身就不是一个容易的话题,稍有不慎就会陷人死锁状态或者威胁线程安全。
来看一个经典的多线程同步问题:生产者和消费者模型。 如果用Python来实现,会怎么写?大概思路是这样的:分别创建消费者和生产者线程, 生产者往队列里去放产品,消费者从队列里面取出产品,创建一个线程锁以保证线程间操作的互斥性。
当队列满的时候消费者进入等待状态,当队列空的时候生产者进入等待状态。看一个具体的Python实现:
import queue
import threading
import random
创建锁对象用于控制输出:
writelock = threading.Lock()
class Producer(threading.Thread):
def __init__(self,q,con,name):
super(Producer, self).__init__()
self.q = q
self.name = name
self.con = con
print ("Producer"+self.name+"Started")
def tun (self):
while 1:
global writelock
self.con.acquire () # 获取锁对象
if self.q.full(): # 队列满
with writelock: # 输出信息
print('Queue is full,producer wait!')
self.con.wait() # 等待资源
else:
value = random.randint(0,10)
with writelock:
print (self.name +" put value "+ self.name+":"+ str(value)+"into queue")
self.q.put((self.name+" : "+str(value))) # 放入队列中
self.con.notify() # 通知消费者
self.con. release() # 释放锁对象
class Consumer(threading.Thread):
def __init__(self, q,con,name):
super(Consumer, self).__init__()
self.q = q
self.con = con
self.name = name
print ("Consumer " + self.name +"started \n ")
def run(self):
while 1:
global writelock
self.con.acquire()
if self.q.empty():
with writelock:
print('queue is empty,consumer wait!')
self.con.wait()
else:
value = self.q.get()
with writelock:
print (self.name +"get value"+value+"from queue")
self.con.notify()
self.con.release()
q = queue.Queue(10)
con = threading.Condition() # 条件变量锁
p = Producer(q,con,"P1")
p.start()
p1 = Producer(q,con,"P2")
p1.start()
c1 = Consumer(q,con,"C1")
c1.start()
ProducerP1Started ProducerP2Started Consumer C1started queue is empty,consumer wait!
上面的程序实现有什么问题吗?回答这个问题之前。先来了解一下 Queue
模块的基本知识。
Queue
模块
Python中的 Queue
模块提供了3种队列:
Queue.Queue(maxsize)
:先进先出,maxsize
为队列大小,其值为非正数的时候为无限循环队列。Queue.LifoQueue(maxsize)
:后进先出,相当于栈。Queue.PriorityQueue(maxsize)
:优先级队列。
这3种队列支持以下方法:
Queue.qsize()
:返回近似的队列大小。注意,这里之所以加“近似”二字, 是因为当该值>0的时候并不保证并发执行的时候get()
方法不被阻塞,同样,对于put()
方法有效。Queue.empty()
:列队为空的时候返回True
,否则返回False
。Queue.full()
:当设定了队列大小的情况下,如果队列满则返回True
,否则返回False
。Queue.put(item[, block[,timeout]])
:往队列中添加元素item
,block
设置为False
的时候。 如果队列满则抛出Full
异常。如果block
设置为True
,timeout
为None
的时候则会一直等待直到有空位置, 否则会根据timeout
的设定超时后抛出Full
异常。Queue.put_nowait(item)
:等价于put(item,False).block
设置为False
的时候, 如果队列空则抛出Empty
异常。如果block
设置为True
、timeout
为None
的时候则会一直等待直到有元素可用,否则会根据timeout
的设定起时后抛出Empty
异常。Queue.get([block[,timeout]])
:从队列中删除元素并返回该元素的值。Queue.get_nowait()
:等价于get(False)
。Queue.task_done()
:发送信号表明入列任务已经完成,经常在消费者线程中用到。Queue.join()
:阻塞直至队列中所有的元素处理完毕。
Queue
模块更安全
Queue
模块实现了多个生产者多个消费者的队列,当多线程之间需要信息安全的交换的时候特别有用,
因此这个模块实现了所需要的锁原语,为Python多线程编程提供了有力的支持,它的线程是安全的。
需要注意的是 Queue
模块中的列队利 collections.deque
所表示的队列并不一样,
前者主要用于不同线程之间的通信,它内部实现了线程的锁机制;
而后者主要是数据结构上的概念,因此支持 in
方法。
再回过头来看看前面的例子,程序的实现有什么问题呢?答案很明显,
作用于 queue
操作的条件变量完全是不需要的,因为 queue
本身能够保证线程安全,
因此不需要额外的同步机制。那么,该如何修改呢?请读者自行思考。
下面的多线程下载的例子也许有助于完成上面程序的修改。
import os
import queue
import threading
import urllib.request
class DownloadThread(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
url = self.queue.get() # 从队列中取出一个url元素
print(self.name+"begin download"+ url +"...")
self.download_file(url) # 进行文件下载
self.queue.task_done() # 下载完毕发送信号
print(self.name + "download comleted!!!")
def download_file(self,url):
urlhandler = urllib.request.urlopen(url)
fname = os .path .basename (url)+".html " #文件名称
with open (fname,"wb") as f: # 打开文件
while True:
chunk = urlhandler.read(1024)
if not chunk: break
f.write(chunk)
# if __name__ == "__main__":
# urls = [ "http://wiki.python.org/moin/WebProgramming",
# "https://www.createspace.com/3611970",
# "http ://wiki.python.org/moin/Documentation"
# ]
# queue = queue.Queue()
# # create a thread pool and give them a queue
# for i in range(5):
# t=DownloadThread(queue) #启动5个技程同时进行下4E
# t.setDaemon(True)
# t.start()
# # giva the queue some data
# for url in urls:
# queue.put(url)
# # wait for the queue to finish
# queue.join()
/tmp/ipykernel_7180/1556062644.py:10: DeprecationWarning: setDaemon() is deprecated, set the daemon attribute instead t.setDaemon(True) Exception in thread Thread-9: Traceback (most recent call last): File "/opt/conda/lib/python3.12/threading.py", line 1075, in _bootstrap_inner self.run() File "/tmp/ipykernel_7180/670852231.py", line 9, in run File "/tmp/ipykernel_7180/670852231.py", line 13, in download_file File "/opt/conda/lib/python3.12/urllib/request.py", line 215, in urlopen return opener.open(url, data, timeout) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.12/urllib/request.py", line 515, in open response = self._open(req, data) ^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.12/urllib/request.py", line 537, in _open return self._call_chain(self.handle_open, 'unknown', ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.12/urllib/request.py", line 492, in _call_chain result = func(*args) ^^^^^^^^^^^ File "/opt/conda/lib/python3.12/urllib/request.py", line 1420, in unknown_open raise URLError('unknown url type: %s' % type) urllib.error.URLError: <urlopen error unknown url type: http >
Thread-10begin downloadhttp://wiki.python.org/moin/WebProgramming... Thread-8begin downloadhttps://www.createspace.com/3611970... Thread-9begin downloadhttp ://wiki.python.org/moin/Documentation... Thread-10download comleted!!!