##language:zh
#pragma section-numbers off
##含有章节索引导航的 ZPyUG 文章通用模板
<<TableOfContents>>
## 默许导航,请保留
<<Include(ZPyUGnav)>>


= 测试线程应用 =
{{{
沈崴 <wileishn@gmail.com>
reply-to	eurasia-users@googlegroups.com
to	eurasia-users <eurasia-users@googlegroups.com>
date	Fri, Dec 26, 2008 at 15:11
}}}
subject:[[http://groups.google.com/group/eurasia-users/browse_thread/thread/dd0e33563295105b|使用线程池解决数据库阻塞问题]] ^ - eurasia-users | Google 网上论坛^


##startInc
== for Stackless Python ==
利用  Stackless Python 的 ThreadPool :) 下面这个例子把处理事务的线程也打印出来, 可以看出所有线程都在工作。
{{{#!python
import sys, stackless
from Queue import Queue
from traceback import print_exc
from thread import start_new_thread
from exceptions import BaseException
from stackless import channel, getcurrent, schedule, tasklet

class ThreadPool:
       def __init__(self, n=32):
               self.queue = Queue()

               for i in xrange(n):
                       start_new_thread(self.pipe, (i,))

       def __call__(self, func):
               def wrapper(*args, **kw):
                       e = channel()
                       self.queue.put((e, func, args, kw))

                       errno, e = e.receive()
                       if errno == 0:
                               return e

                       raise e

               return wrapper

       def pipe(self, i):
               while True:
                       rst, func, args, kw = self.queue.get()
                       try:
                               result = func(i, *args, **kw)

                       except BaseException, e:
                               rst.send((-1, e))
                       else:
                               rst.send((0, result))

nonblock = ThreadPool(32)

@nonblock
def test(i, n):
       print 'thread%d: %d' %(i, n)

for i in xrange(1000):
       test(i)

stackless.run()
}}}
##endInc

----
'''反馈'''

创建 by -- ZoomQuiet [<<DateTime(2008-12-26T07:23:32Z)>>]