解读Twisted的reactor(2)
解读Twisted的reactor(2)
Twisted框架不只是想到了跨操作系统、跨进程/线程,还考虑到了跨语言的Jython和Cython。这一点会让所有python开发人员受益。先来看看在Cython下,Twisted的reactor的初始化:
from twisted.internet import default
不只是import了一个module,它所做的事比我们想像的远要多的多呢。我们来看看import时它所做的事:
1 from bisect import insort
2 from time import time, sleep
3 import os
4 import socket
5 import sys
6 import warnings
7
8 from twisted.internet.interfaces import IReactorCore, IReactorTime, IReactorUNIX, IReactorUNIXDatagram
9 from twisted.internet.interfaces import IReactorTCP, IReactorUDP, IReactorSSL, IReactorArbitrary
10 from twisted.internet.interfaces import IReactorProcess, IReactorFDSet, IReactorMulticast
11 from twisted.internet import main, error, protocol, interfaces
12 from twisted.internet import tcp, udp, defer
13
14 from twisted.python import log, threadable, failure
15 from twisted.persisted import styles
16 from twisted.python.runtime import platformType, platform
17
18 from twisted.internet.base import ReactorBase
引用了所有的所需要使用的module
导入ssl和unix两个module。
根据操作系统的不同引入不同的线程、进程的操作库。注意,win32process等这些特别的平台的module,都需要到site-packages下去找。
识别不同的操作系统,来统一一个Socket操作方法。注意,就是在这里,我们统一使用一个_Waker,但是可以使用不同操作系统的socket操作。 这是win32的初始化:
1 def __init__(self, reactor):
2 """Initialize.
3 """
4 log.msg("starting waker")
5 self.reactor = reactor
6 # Following select_trigger (from asyncore)'s example;
7 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
8 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
9 client.setsockopt(socket.IPPROTO_TCP, 1, 1)
10 server.bind(('127.0.0.1', 0))
11 server.listen(1)
12 client.connect(server.getsockname())
13 reader, clientaddr = server.accept()
14 client.setblocking(1)
15 reader.setblocking(0)
16 self.r = reader
17 self.w = client
18 self.fileno = self.r.fileno
而unix的操作初始化显然简单的多:
原因很简单,我们完全可以针对平台的不同,使用不同的优化方法来进行socket操作。
1 if platform.getType() == "win32":
2 _select = win32select
3 else:
4 _select = select.select
5
6 # Exceptions that doSelect might return frequently
7 _NO_FILENO = error.ConnectionFdescWentAway('Handler has no fileno method')
8 _NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away')
使用select进行socket操作显然是最经济的,它可以让你启动最少的线程或进程就进行大量的并发连接。如果你一定要构建一个超容量的服务器,这会是最经济运行的基础。当然,由于操作系统不一样,你的select的处理也可能完全不同的。
下面再来看看reactor的最后一行
default.install()
做了点什么。
它初始化了一个SelectReactor类。初始化会先执行ReactorBase的初始化方法:
1 def __init__(self):
2 self._eventTriggers = {}
3 self._pendingTimedCalls = []
4 self.running = 0
5 self.waker = None
6 self.resolver = None
7 self.usingThreads = 0
8 self.addSystemEventTrigger('during', 'shutdown', self.crash)
9 self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
10 threadable.whenThreaded(self.initThreads)
然后运行PosixReactorBase的初始化方法:
我们可以看到,这部分完全是应平台的不同在初始化select时所使用的各个变量及线程或是信号处理方法。
main.installReactor(reactor)
是最重要的,它将我们初始华好的reactor正名为twisted.internet.reactor让所有的人都能使用它:
1 def installReactor(reactor):
2 global addReader, addWriter, removeReader, removeWriter
3 global iterate, addTimeout, wakeUp
4 # this stuff should be common to all reactors.
5 import twisted.internet
6 import sys
7 assert not sys.modules.has_key('twisted.internet.reactor'), \
8 "reactor already installed"
9 twisted.internet.reactor = reactor
10 sys.modules['twisted.internet.reactor'] = reactor
11
12 # install stuff for backwards compatability
13
14 # IReactorCore
15 if implements(reactor, IReactorCore):
16 iterate = reactor.iterate
17
18 # IReactorFDSet
19 if implements(reactor, IReactorFDSet):
20 addReader = reactor.addReader
21 addWriter = reactor.addWriter
22 removeWriter = reactor.removeWriter
23 removeReader = reactor.removeReader
24
25 # IReactorTime
26 if implements(reactor, IReactorTime):
27 def addTimeout(m, t, f=reactor.callLater):
28 warnings.warn("main.addTimeout is deprecated, use reactor.callLater instead.")
29 f(t, m)
30
31 # ???
32 if hasattr(reactor, "wakeUp"):
33 wakeUp = reactor.wakeUp
仔细看看这两句:
它将我们初始化好的reactor放到了原来del的位置。