##language:zh #pragma section-numbers off ##含有章节索引导航的 ZPyUG 文章通用模板 <<TableOfContents>> ## 默许导航,请保留 <<Include(ZPyUGnav)>> = 测试线程应用 = {{{ 沈崴 <wileishn@gmail.com> reply-to eurasia-users@googlegroups.com to eurasia-users <eurasia-users@googlegroups.com> date Fri, Dec 26, 2008 at 15:11 }}} subject:[[http://groups.google.com/group/eurasia-users/browse_thread/thread/dd0e33563295105b|使用线程池解决数据库阻塞问题]] ^ - eurasia-users | Google 网上论坛^ ##startInc == for Stackless Python == 利用 Stackless Python 的 ThreadPool :) 下面这个例子把处理事务的线程也打印出来, 可以看出所有线程都在工作。 {{{#!python import sys, stackless from Queue import Queue from traceback import print_exc from thread import start_new_thread from exceptions import BaseException from stackless import channel, getcurrent, schedule, tasklet class ThreadPool: def __init__(self, n=32): self.queue = Queue() for i in xrange(n): start_new_thread(self.pipe, (i,)) def __call__(self, func): def wrapper(*args, **kw): e = channel() self.queue.put((e, func, args, kw)) errno, e = e.receive() if errno == 0: return e raise e return wrapper def pipe(self, i): while True: rst, func, args, kw = self.queue.get() try: result = func(i, *args, **kw) except BaseException, e: rst.send((-1, e)) else: rst.send((0, result)) nonblock = ThreadPool(32) @nonblock def test(i, n): print 'thread%d: %d' %(i, n) for i in xrange(1000): test(i) stackless.run() }}} ##endInc ---- '''反馈''' 创建 by -- ZoomQuiet [<<DateTime(2008-12-26T07:23:32Z)>>]