用 Python 实现的线程池
davies 发表于 2006 年 10 月 9 日
为了提高程序的效率,经常要用到多线程,尤其是IO等需要等待外部响应的部分。线程的创建、销毁和调度本身是有代价的,如果一个线程的任务相对简单,那这些时间和空间开销就不容忽视了,此时用线程池就是更好的选择,即创建一些线程然后反复利用它们,而不是在完成单个任务后就结束。
下面是用Python实现的通用的线程池代码:
Worker类是一个工作线程,不断地从workQueue队列中获取需要执行的任务,执行之,并将结果写入到resultQueue中,这里的workQueue和resultQueue都是现成安全的,其内部对各个线程的操作做了互斥。当从workQueue中获取任务超时,则线程结束。
WorkerManager负责初始化Worker线程,提供将任务加入队列和获取结果的接口,并能等待所有任务完成。
一个典型的测试例子如下,它用10个线程去下载一个固定页面的内容,实际应用时应该是执行不同的任务。
完成的程序可以在这里下载。
网友留言:
2. davies 发表于 2006 年 12 月 11 日 7:40 p.m.
你看一下代码吧,还是比较易读的。WorkerManager有timeout参数,socket.setdefaulttimeout()是与网络相关的,与线程没有关系。
开多少线程得看是做什么事情,如果是CPU密集型的,一个线程都会导致CPU 100%,如果是IO密集型,比如用urllib获取网页,则可以开多一些,我用过100多个,甚至可以更多。
3. fandatou 发表于 2006 年 12 月 12 日 6:21 p.m.
我就是用来抓网页。用perl写开30个线程要用2G内存,用python写5线程只用40M内存但是cpu是100%。另外关于timeout能明确说明下吗?刚从perl转python还不熟悉基本是拿人家代码修改。正好google到你这里来。
class WorkerManager:
def __init__( self, num_of_workers=10, timeout = 2):
Worker.py中的这两行中的timeout就是线程timeout的时间吗?
4. davies 发表于 2006 年 12 月 12 日 7:07 p.m.
对,这个 timeout 是线程等待任务的时间,如果在 timeout 时间内任务队列中没有要执行的任务,则会结束线程。这就要求在线程开始后,timeout 时间内添加足够的任务,让所有线程都在运行。
现在这样设计是不好的,只适合与事先知道要做什么这样的场合,要改成其它方式也容易
5. fandatou 发表于 2006 年 12 月 14 日 1:03 a.m.
原来是任务的timeout啊,我以为是每个线程的timeout时间呢。我用你的worker,往往会出现意外,最后只剩下一个线程再跑,只好在线程任务中每个关键点都加上try
6. davies 发表于 2006 年 12 月 14 日 8:25 p.m.
看看第28-29行,在执行任务出现意外时,会打印出错信息后继续执行下一个任务,只有当队列中没有其它任务时才会结束。有可能是你的timeout设得太短,而添加任务又相对较慢,导致任务还没加进去,就有线程结束了。
可以给WorkerManager加一个start函数,手动让所有线程开始工作。
7. 老熊 发表于 2007 年 01 月 23 日 12:12 a.m.
环境:简体XP+python 2.4+mysql
初学python,使用了你这个链接池,遇到个奇怪的问题
你这里的test_job,我又调用了自己写外部的一个类,test_job是将URL地址,给外部类,然后由外部类对URL处理,包括read()等,返回值,test_job再将返回值插入数据库。
现在问题是,这个外部类,在非多线程情况下运行正常,能正常处理URL,包括read,parse等。但是在多线程运行时候就抛出exception:
class exceptions.UnicodeDecodeError at 0x00B64F90
百思不得其解~~~
8. davies 发表于 2007 年 01 月 23 日 8:43 a.m.
看看是哪一句抛出的异常,可能你的外部类里有部分代码不是线程安全的,比如用到的第三方库之类
9. 老熊 发表于 2007 年 01 月 23 日 9:33 a.m.
呵呵,你真早。
外部类中没有第三方的库,仅仅是string的方法,发现抛出异常的代码是String.find()方法,后来将
String.decode('utf-8','ignore'),然后再执行find,就通过了。
但让我真正疑问的是,这样的decode与多线程有关?因为段代码在非多线程情况下测试是正确的
方便的话,加gtalk交流:-)1380127(#)gmail.com
10. 嘛呢 发表于 2007 年 09 月 11 日 1:58 a.m.
由于你这个只能完成设定好的工作,想把他改成能自动添加工作任务的。
增加class WorkPump,能够往WorkManager.workQueue添加工作任务。
那class WorkManager修改为:
# class WorkerManager:
# def __init__( self, num_of_workers=10, timeout = 1):
# self.workQueue = Queue.Queue()
# self.resultQueue = Queue.Queue()
# self.workers = []
# self.pump = WorkPump() 新增
# self.timeout = timeout
# self._recruitThreads( num_of_workers )
新增的这个我也想把它做成一个线程,我应该怎么调度增加工作任务的线程和其他下载任务的线程?
11. davies 发表于 2007 年 09 月 11 日 8:46 a.m.
现在已经是可以一边下载一遍添加任务,调用add_job方法就可以了,新添加的任务会自动执行,当所有任务添加完成后,再调用wait_for_complete。
12. 嘛呢 发表于 2007 年 09 月 11 日 10:05 a.m.
因为任务也不确定,只能说是一般添加一边下载
今天我试试用Condition
13. davies 发表于 2007 年 09 月 11 日 4:37 p.m.
只要构造一个函数,加参数就行了,支持任意的任务,但得是线程安全的。
可以把线程的等待时间改长一些,否则没任务了它就退出了。
14. rober 发表于 2008 年 03 月 6 日 2:06 p.m.
我运行的你的代码时候发生如下错误:
start testing
[ 1] (<class exceptions.IOError at 0x812744c>, <exceptions.IOError instance at 0x824222c>)
这到底是什么错误!
15. davies 发表于 2008 年 03 月 8 日 11:46 a.m.
估计是网络错误吧,可以用traceback模块把调用栈打印出来
16. robert 发表于 2008 年 03 月 11 日 1:26 p.m.
线程池的初始化是不是挺慢的,还有,我有很多不同url在mysql数据库里,
然后查询出来用你的上面的模块去并发抓取,好象还没有不用线程池的快呢!! 这个线程池对大量抓取网页有性能提高么?
17. davies 发表于 2008 年 03 月 14 日 10:45 a.m.
显然是有性能提高的,尤其是网友抓取。
如果你使用了没有效果,可能是某个地方用得不对,或者有隐藏的问题。
18. Robert 发表于 2008 年 03 月 22 日 2:42 p.m.
当现有的任务都完成之后,你的程序就会调用线程销毁的函数吧,能不能任务完成之后,10个并发的线程都进入睡眠状态,等待新的任务加入,程序应该做成无限循环吧。。。。。
while 1:
19. davies 发表于 2008 年 03 月 23 日 11:39 p.m.
很容易改一下就像你说的这样工作的,我实际使用时也是根据具体情况再改改。
20. joey 发表于 2008 年 08 月 3 日 1:31 a.m.
多谢大大的好东西,确实很好用
不过我有一个问题就是kwds是什么意思?
1. fandatou 发表于 2006 年 12 月 11 日 3:25 p.m.
楼主,请问你的线程持timeout怎么设置是socket.setdefaulttimeout(10) 还是worker.Worker.timeout = 20
另外请问下,我开5个线程cpu就是100%,似乎google能跑300个线程。