##language:zh #pragma section-numbers off ##含有章节索引导航的 ZPyUG 文章通用模板 <> ## 默许导航,请保留 <> ##startInc = Eurasia:使用线程池解决数据库阻塞问题 = {{{ 沈崴 reply-to eurasia-users@googlegroups.com to eurasia-users date Mon, Dec 8, 2008 at 14:24 subject [eurasia-users] 使用线程池解决数据库阻塞问题 }}} * 不仅是 eurasia, 所有使用 stackless 和 greenlet 轻便线程模型的程序都会遇到 IO 阻塞的问题。因为使用轻便线程的 程序实质上都是单线程程序, 任何一个阻塞都会让所有的轻便线程停掉。不仅是数据库, 文件 IO、网络 IO、sleep (甚至 pool) 都会造成阻塞, 这都是需要解决的。 * 对于支持 async IO 的文件和网络 IO, eurasia 采用了一种细粒度的解决方案, 这里我提供一种粗粒度的更具通用性的基于线程池的解决方案。 {{{#!python from Queue import Queue from thread import start_new_thread from stackless import channel, getcurrent class ThreadPool: def __init__(self, n=32): self.queue = Queue() for i in xrange(n): start_new_thread(self.pipe, ()) def __call__(self, func): def wrapper(*args, **kw): rst = channel() self.queue.put((getcurrent(), rst, func, args, kw)) return rst.receive() return wrapper def pipe(self): while True: curr, rst, func, args, kw = self.queue.get() try: result = func(*args, **kw) except Exception, e: curr.raise_exception(e) else: rst.send(result) }}} 下面假定对某 SQL 客户端的 Cursor 类, 进行线程池包装, 使数据库的 commit 操作不会阻塞掉整个程序。 {{{#!python nonblock = ThreadPool() class Cursor(sqlite3.Cursor): @nonblock def commit(self): return sqlite3.Cursor.commit(self) }}} 使用 processing 库及其 Queue 实现, 我们可以做出进程池, 代码雷同。 沈崴 发送至 eurasia-users 显示详细信息 12月8日 (4天前) 回复 上面那个 ThreadPool 做点修改可以变成 ProcessPool: {{{ from processing import Process, Queue class ProcessPool: def __init__(self, n=32): self.queue = Queue() for i in xrange(n): p = Process(target=self.pipe, args= (self.queue, )) p.start() ... }}} 这个就是传说中的进程池, 用到了 pyprocessing 库 (multiprocessing 在 python2.6 中已经是标准库 了), 我们知道 pyprocessing 库不仅可以创建多进程, 而且支持和远程计算机进行进程间通信。 换句话说, 这就是老张想要实现的东西。processpool(func)(*args, **kw), 使用进程池调用 func, 可以实现网络 运算。 一个 func 函数可以在任意一个 cpu 或核上执行, 甚至可以是在一台远程计算机上。 ##endInc ---- '''反馈''' 创建 by -- ZoomQuiet [<>]