Eurasia:使用线程池解决数据库阻塞问题

沈崴 <[email protected]>
reply-to        [email protected]
to      eurasia-users <[email protected]>
date    Mon, Dec 8, 2008 at 14:24
subject [eurasia-users] 使用线程池解决数据库阻塞问题

程序实质上都是单线程程序, 任何一个阻塞都会让所有的轻便线程停掉。不仅是数据库, 文件 IO、网络 IO、sleep (甚至 pool) 都会造成阻塞, 这都是需要解决的。

   1 from Queue import Queue
   2 from thread import start_new_thread
   3 from stackless import channel, getcurrent
   4 
   5 class ThreadPool:
   6        def __init__(self, n=32):
   7                self.queue = Queue()
   8 
   9                for i in xrange(n):
  10                        start_new_thread(self.pipe, ())
  11 
  12        def __call__(self, func):
  13                def wrapper(*args, **kw):
  14                        rst = channel()
  15                        self.queue.put((getcurrent(), rst, func, args, kw))
  16 
  17                        return rst.receive()
  18 
  19                return wrapper
  20 
  21        def pipe(self):
  22                while True:
  23                        curr, rst, func, args, kw = self.queue.get()
  24                        try:
  25                                result = func(*args, **kw)
  26 
  27                        except Exception, e:
  28                                curr.raise_exception(e)
  29                        else:
  30                                rst.send(result)

下面假定对某 SQL 客户端的 Cursor 类, 进行线程池包装, 使数据库的 commit 操作不会阻塞掉整个程序。

   1 nonblock = ThreadPool()
   2 
   3 class Cursor(sqlite3.Cursor):
   4        @nonblock
   5        def commit(self):
   6                return sqlite3.Cursor.commit(self)

使用 processing 库及其 Queue 实现, 我们可以做出进程池, 代码雷同。

沈崴

显示详细信息 12月8日 (4天前)

回复

上面那个 ThreadPool 做点修改可以变成 ProcessPool:

from processing import Process, Queue

class ProcessPool:
       def __init__(self, n=32):
               self.queue = Queue()

               for i in xrange(n):
                       p = Process(target=self.pipe, args=
(self.queue, ))
                       p.start()

       ...

这个就是传说中的进程池, 用到了 pyprocessing 库 (multiprocessing 在 python2.6 中已经是标准库 了), 我们知道 pyprocessing 库不仅可以创建多进程, 而且支持和远程计算机进行进程间通信。

换句话说, 这就是老张想要实现的东西。processpool(func)(*args, **kw), 使用进程池调用 func, 可以实现网络 运算。 一个 func 函数可以在任意一个 cpu 或核上执行, 甚至可以是在一台远程计算机上。


反馈

创建 by -- ZoomQuiet [2008-12-08 06:40:20]

MiscItems/2008-12-08 (last edited 2009-12-25 07:09:03 by localhost)