##language:zh [[self:解读_PyTwisted|返回目录]] ------------- ''' 解读Twisted的reactor(2) ''' = 解读Twisted的reactor(2) = Twisted框架不只是想到了跨操作系统、跨进程/线程,还考虑到了跨语言的Jython和Cython。这一点会让所有python开发人员受益。先来看看在Cython下,Twisted的reactor的初始化: {{{ from twisted.internet import default }}} 不只是import了一个module,它所做的事比我们想像的远要多的多呢。我们来看看import时它所做的事: {{{ #!python from bisect import insort from time import time, sleep import os import socket import sys import warnings from twisted.internet.interfaces import IReactorCore, IReactorTime, IReactorUNIX, IReactorUNIXDatagram from twisted.internet.interfaces import IReactorTCP, IReactorUDP, IReactorSSL, IReactorArbitrary from twisted.internet.interfaces import IReactorProcess, IReactorFDSet, IReactorMulticast from twisted.internet import main, error, protocol, interfaces from twisted.internet import tcp, udp, defer from twisted.python import log, threadable, failure from twisted.persisted import styles from twisted.python.runtime import platformType, platform from twisted.internet.base import ReactorBase }}} 引用了所有的所需要使用的module {{{ #!python try: from twisted.internet import ssl sslEnabled = True except ImportError: sslEnabled = False try: from twisted.internet import unix unixEnabled = True except ImportError: unixEnabled = False }}} 导入ssl和unix两个module。 {{{ #!python from main import CONNECTION_LOST if platformType != 'java': import select from errno import EINTR, EBADF if platformType == 'posix': import process if platformType == "win32": try: import win32process except ImportError: win32process = None }}} 根据操作系统的不同引入不同的线程、进程的操作库。注意,win32process等这些特别的平台的module,都需要到site-packages下去找。 {{{ #!python if platform.getType() == 'posix': _Waker = _UnixWaker elif platform.getType() == 'win32': _Waker = _Win32Waker # global state for selector reads = {} writes = {} }}} 识别不同的操作系统,来统一一个Socket操作方法。注意,就是在这里,我们统一使用一个_Waker,但是可以使用不同操作系统的socket操作。 这是win32的初始化: {{{ #!python def __init__(self, reactor): """Initialize. """ log.msg("starting waker") self.reactor = reactor # Following select_trigger (from asyncore)'s example; server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.setsockopt(socket.IPPROTO_TCP, 1, 1) server.bind(('127.0.0.1', 0)) server.listen(1) client.connect(server.getsockname()) reader, clientaddr = server.accept() client.setblocking(1) reader.setblocking(0) self.r = reader self.w = client self.fileno = self.r.fileno }}} 而unix的操作初始化显然简单的多: {{{ #!python def __init__(self, reactor): """Initialize. """ self.reactor = reactor i, o = os.pipe() self.i = os.fdopen(i,'r') self.o = os.fdopen(o,'w') self.fileno = self.i.fileno }}} 原因很简单,我们完全可以针对平台的不同,使用不同的优化方法来进行socket操作。 {{{ #!python if platform.getType() == "win32": _select = win32select else: _select = select.select # Exceptions that doSelect might return frequently _NO_FILENO = error.ConnectionFdescWentAway('Handler has no fileno method') _NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away') }}} 使用select进行socket操作显然是最经济的,它可以让你启动最少的线程或进程就进行大量的并发连接。如果你一定要构建一个超容量的服务器,这会是最经济运行的基础。当然,由于操作系统不一样,你的select的处理也可能完全不同的。 下面再来看看reactor的最后一行 {{{ default.install() }}} 做了点什么。 {{{ #!python def install(): """Configure the twisted mainloop to be run using the select() reactor. """ reactor = SelectReactor() main.installReactor(reactor) }}} 它初始化了一个{{{SelectReactor}}}类。初始化会先执行{{{ReactorBase}}}的初始化方法: {{{ #!python def __init__(self): self._eventTriggers = {} self._pendingTimedCalls = [] self.running = 0 self.waker = None self.resolver = None self.usingThreads = 0 self.addSystemEventTrigger('during', 'shutdown', self.crash) self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll) threadable.whenThreaded(self.initThreads) }}} 然后运行{{{PosixReactorBase}}}的初始化方法: {{{ #!python def __init__(self): ReactorBase.__init__(self) if self.usingThreads or platformType == "posix": self.installWaker() }}} 我们可以看到,这部分完全是应平台的不同在初始化select时所使用的各个变量及线程或是信号处理方法。 {{{ main.installReactor(reactor) }}} 是最重要的,它将我们初始华好的reactor正名为twisted.internet.reactor让所有的人都能使用它: {{{ #!python def installReactor(reactor): global addReader, addWriter, removeReader, removeWriter global iterate, addTimeout, wakeUp # this stuff should be common to all reactors. import twisted.internet import sys assert not sys.modules.has_key('twisted.internet.reactor'), \ "reactor already installed" twisted.internet.reactor = reactor sys.modules['twisted.internet.reactor'] = reactor # install stuff for backwards compatability # IReactorCore if implements(reactor, IReactorCore): iterate = reactor.iterate # IReactorFDSet if implements(reactor, IReactorFDSet): addReader = reactor.addReader addWriter = reactor.addWriter removeWriter = reactor.removeWriter removeReader = reactor.removeReader # IReactorTime if implements(reactor, IReactorTime): def addTimeout(m, t, f=reactor.callLater): warnings.warn("main.addTimeout is deprecated, use reactor.callLater instead.") f(t, m) # ??? if hasattr(reactor, "wakeUp"): wakeUp = reactor.wakeUp }}} 仔细看看这两句: {{{ #!python twisted.internet.reactor = reactor sys.modules['twisted.internet.reactor'] = reactor }}} 它将我们初始化好的reactor放到了原来del的位置。 ----------------- [[self:解读_PyTwisted|返回目录]]