测试线程应用

沈崴 <[email protected]>
reply-to        [email protected]
to      eurasia-users <[email protected]>
date    Fri, Dec 26, 2008 at 15:11

subject:使用线程池解决数据库阻塞问题 - eurasia-users | Google 网上论坛

for Stackless Python

利用 Stackless Python 的 ThreadPool :) 下面这个例子把处理事务的线程也打印出来, 可以看出所有线程都在工作。

   1 import sys, stackless
   2 from Queue import Queue
   3 from traceback import print_exc
   4 from thread import start_new_thread
   5 from exceptions import BaseException
   6 from stackless import channel, getcurrent, schedule, tasklet
   7 
   8 class ThreadPool:
   9        def __init__(self, n=32):
  10                self.queue = Queue()
  11 
  12                for i in xrange(n):
  13                        start_new_thread(self.pipe, (i,))
  14 
  15        def __call__(self, func):
  16                def wrapper(*args, **kw):
  17                        e = channel()
  18                        self.queue.put((e, func, args, kw))
  19 
  20                        errno, e = e.receive()
  21                        if errno == 0:
  22                                return e
  23 
  24                        raise e
  25 
  26                return wrapper
  27 
  28        def pipe(self, i):
  29                while True:
  30                        rst, func, args, kw = self.queue.get()
  31                        try:
  32                                result = func(i, *args, **kw)
  33 
  34                        except BaseException, e:
  35                                rst.send((-1, e))
  36                        else:
  37                                rst.send((0, result))
  38 
  39 nonblock = ThreadPool(32)
  40 
  41 @nonblock
  42 def test(i, n):
  43        print 'thread%d: %d' %(i, n)
  44 
  45 for i in xrange(1000):
  46        test(i)
  47 
  48 stackless.run()


反馈

创建 by -- ZoomQuiet [2008-12-26 07:23:32]