Contents
Eurasia:使用线程池解决数据库阻塞问题
沈崴 <[email protected]> reply-to [email protected] to eurasia-users <[email protected]> date Mon, Dec 8, 2008 at 14:24 subject [eurasia-users] 使用线程池解决数据库阻塞问题
- 不仅是 eurasia, 所有使用 stackless 和 greenlet 轻便线程模型的程序都会遇到 IO 阻塞的问题。因为使用轻便线程的
程序实质上都是单线程程序, 任何一个阻塞都会让所有的轻便线程停掉。不仅是数据库, 文件 IO、网络 IO、sleep (甚至 pool) 都会造成阻塞, 这都是需要解决的。
- 对于支持 async IO 的文件和网络 IO, eurasia 采用了一种细粒度的解决方案, 这里我提供一种粗粒度的更具通用性的基于线程池的解决方案。
1 from Queue import Queue
2 from thread import start_new_thread
3 from stackless import channel, getcurrent
4
5 class ThreadPool:
6 def __init__(self, n=32):
7 self.queue = Queue()
8
9 for i in xrange(n):
10 start_new_thread(self.pipe, ())
11
12 def __call__(self, func):
13 def wrapper(*args, **kw):
14 rst = channel()
15 self.queue.put((getcurrent(), rst, func, args, kw))
16
17 return rst.receive()
18
19 return wrapper
20
21 def pipe(self):
22 while True:
23 curr, rst, func, args, kw = self.queue.get()
24 try:
25 result = func(*args, **kw)
26
27 except Exception, e:
28 curr.raise_exception(e)
29 else:
30 rst.send(result)
下面假定对某 SQL 客户端的 Cursor 类, 进行线程池包装, 使数据库的 commit 操作不会阻塞掉整个程序。
使用 processing 库及其 Queue 实现, 我们可以做出进程池, 代码雷同。
沈崴
- 发送至 eurasia-users
显示详细信息 12月8日 (4天前)
回复
上面那个 ThreadPool 做点修改可以变成 ProcessPool:
from processing import Process, Queue class ProcessPool: def __init__(self, n=32): self.queue = Queue() for i in xrange(n): p = Process(target=self.pipe, args= (self.queue, )) p.start() ...
这个就是传说中的进程池, 用到了 pyprocessing 库 (multiprocessing 在 python2.6 中已经是标准库 了), 我们知道 pyprocessing 库不仅可以创建多进程, 而且支持和远程计算机进行进程间通信。
换句话说, 这就是老张想要实现的东西。processpool(func)(*args, **kw), 使用进程池调用 func, 可以实现网络 运算。 一个 func 函数可以在任意一个 cpu 或核上执行, 甚至可以是在一台远程计算机上。
反馈
创建 by -- ZoomQuiet [2008-12-08 06:40:20]