灵感的来临,没有任何预兆;灵感的消失,也不会有告别仪式;用文字记下她们吧,让灵感永存……

用 Python 实现的线程池

davies 发表于 2006 年 10 月 9 日

为了提高程序的效率,经常要用到多线程,尤其是IO等需要等待外部响应的部分。线程的创建、销毁和调度本身是有代价的,如果一个线程的任务相对简单,那这些时间和空间开销就不容忽视了,此时用线程池就是更好的选择,即创建一些线程然后反复利用它们,而不是在完成单个任务后就结束。

下面是用Python实现的通用的线程池代码:

Worker类是一个工作线程,不断地从workQueue队列中获取需要执行的任务,执行之,并将结果写入到resultQueue中,这里的workQueue和resultQueue都是现成安全的,其内部对各个线程的操作做了互斥。当从workQueue中获取任务超时,则线程结束。

WorkerManager负责初始化Worker线程,提供将任务加入队列和获取结果的接口,并能等待所有任务完成。

一个典型的测试例子如下,它用10个线程去下载一个固定页面的内容,实际应用时应该是执行不同的任务。

完成的程序可以在这里下载

网友留言:

Re: 用 Python 实现的线程池1. 发表于 2006 年 12 月 11 日 3:25 p.m.

楼主,请问你的线程持timeout怎么设置是socket.setdefaulttimeout(10) 还是worker.Worker.timeout = 20

另外请问下,我开5个线程cpu就是100%,似乎google能跑300个线程。

Re: 用 Python 实现的线程池2. 发表于 2006 年 12 月 11 日 7:40 p.m.

你看一下代码吧,还是比较易读的。WorkerManager有timeout参数,socket.setdefaulttimeout()是与网络相关的,与线程没有关系。

开多少线程得看是做什么事情,如果是CPU密集型的,一个线程都会导致CPU 100%,如果是IO密集型,比如用urllib获取网页,则可以开多一些,我用过100多个,甚至可以更多。

Re: 用 Python 实现的线程池3. 发表于 2006 年 12 月 12 日 6:21 p.m.

我就是用来抓网页。用perl写开30个线程要用2G内存,用python写5线程只用40M内存但是cpu是100%。另外关于timeout能明确说明下吗?刚从perl转python还不熟悉基本是拿人家代码修改。正好google到你这里来。

class WorkerManager:

def __init__( self, num_of_workers=10, timeout = 2):

Worker.py中的这两行中的timeout就是线程timeout的时间吗?

Re: 用 Python 实现的线程池4. 发表于 2006 年 12 月 12 日 7:07 p.m.

对,这个 timeout 是线程等待任务的时间,如果在 timeout 时间内任务队列中没有要执行的任务,则会结束线程。这就要求在线程开始后,timeout 时间内添加足够的任务,让所有线程都在运行。

现在这样设计是不好的,只适合与事先知道要做什么这样的场合,要改成其它方式也容易

Re: 用 Python 实现的线程池5. 发表于 2006 年 12 月 14 日 1:03 a.m.

原来是任务的timeout啊,我以为是每个线程的timeout时间呢。我用你的worker,往往会出现意外,最后只剩下一个线程再跑,只好在线程任务中每个关键点都加上try

Re: 用 Python 实现的线程池6. 发表于 2006 年 12 月 14 日 8:25 p.m.

看看第28-29行,在执行任务出现意外时,会打印出错信息后继续执行下一个任务,只有当队列中没有其它任务时才会结束。有可能是你的timeout设得太短,而添加任务又相对较慢,导致任务还没加进去,就有线程结束了。

可以给WorkerManager加一个start函数,手动让所有线程开始工作。

Re: 用 Python 实现的线程池7. 发表于 2007 年 01 月 23 日 12:12 a.m.

环境:简体XP+python 2.4+mysql

初学python,使用了你这个链接池,遇到个奇怪的问题

你这里的test_job,我又调用了自己写外部的一个类,test_job是将URL地址,给外部类,然后由外部类对URL处理,包括read()等,返回值,test_job再将返回值插入数据库。

现在问题是,这个外部类,在非多线程情况下运行正常,能正常处理URL,包括read,parse等。但是在多线程运行时候就抛出exception:

class exceptions.UnicodeDecodeError at 0x00B64F90

百思不得其解~~~

Re: 用 Python 实现的线程池8. 发表于 2007 年 01 月 23 日 8:43 a.m.

看看是哪一句抛出的异常,可能你的外部类里有部分代码不是线程安全的,比如用到的第三方库之类

Re: 用 Python 实现的线程池9. 发表于 2007 年 01 月 23 日 9:33 a.m.

呵呵,你真早。

外部类中没有第三方的库,仅仅是string的方法,发现抛出异常的代码是String.find()方法,后来将

String.decode('utf-8','ignore'),然后再执行find,就通过了。

但让我真正疑问的是,这样的decode与多线程有关?因为段代码在非多线程情况下测试是正确的

方便的话,加gtalk交流:-)1380127(#)gmail.com

Re: 用 Python 实现的线程池10. 发表于 2007 年 09 月 11 日 1:58 a.m.

由于你这个只能完成设定好的工作,想把他改成能自动添加工作任务的。

增加class WorkPump,能够往WorkManager.workQueue添加工作任务。

那class WorkManager修改为:

# class WorkerManager:

# def __init__( self, num_of_workers=10, timeout = 1):

# self.workQueue = Queue.Queue()

# self.resultQueue = Queue.Queue()

# self.workers = []

# self.pump = WorkPump() 新增

# self.timeout = timeout

# self._recruitThreads( num_of_workers )

新增的这个我也想把它做成一个线程,我应该怎么调度增加工作任务的线程和其他下载任务的线程?

Re: 用 Python 实现的线程池11. 发表于 2007 年 09 月 11 日 8:46 a.m.

现在已经是可以一边下载一遍添加任务,调用add_job方法就可以了,新添加的任务会自动执行,当所有任务添加完成后,再调用wait_for_complete。

Re: 用 Python 实现的线程池12. 发表于 2007 年 09 月 11 日 10:05 a.m.

因为任务也不确定,只能说是一般添加一边下载

今天我试试用Condition

Re: 用 Python 实现的线程池13. 发表于 2007 年 09 月 11 日 4:37 p.m.

只要构造一个函数,加参数就行了,支持任意的任务,但得是线程安全的。

可以把线程的等待时间改长一些,否则没任务了它就退出了。

我来留言