Differences between revisions 1 and 2
Revision 1 as of 2006-11-15 13:57:54
Size: 3482
Editor: HuangYi
Comment: 基于allegra的简单rpc实现
Revision 2 as of 2006-11-16 06:47:59
Size: 3589
Editor: HuangYi
Comment:
Deletions are marked like this. Additions are marked like this.
Line 3: Line 3:
其中有代码是直接从 limodou 为UliPad编写的插件 pairpong 中复制过来的,感谢 limodou

TableOfContents

基于allegra的简单rpc实现

其中有代码是直接从 limodou 为UliPad编写的插件 pairpong 中复制过来的,感谢 limodou

async_rpc.py

   1 import pickle
   2 from allegra import (
   3         async_loop, async_chat, 
   4         async_server, async_client )
   5 
   6 def dump_call(funcname, args, kwargs):
   7     para = (args, kwargs)
   8     para = pickle.dumps( para )
   9     data = funcname + '\t' + para
  10     length = len(data)
  11     return '%s\r\n%s' % ( hex(length), data )
  12 
  13 def parse_call(data):
  14     funcname, para = data.split('\t', 1)
  15     para = pickle.loads(para)
  16     return funcname, para
  17 
  18 class Dispatcher(async_chat.Dispatcher):
  19     ''' the dispatcher for both server and client '''
  20     terminator = '\r\n'
  21     def __init__(self, obj):
  22         super(Dispatcher, self).__init__()
  23         self.buffer = []
  24         self.obj = obj
  25         self.obj.remote = Remote(self.async_chat_push)
  26 
  27     def handle_connect(self):
  28         self.obj.handle_connected()
  29 
  30     def handle_close(self):
  31         self.obj.handle_close()
  32 
  33     def collect_incoming_data(self, data):
  34         self.buffer.append(data)
  35 
  36     def found_terminator(self):
  37         data = ''.join(self.buffer)
  38         self.buffer = []
  39         if not data:
  40             return
  41         if self.terminator == '\r\n':
  42             length = int(data, 16)
  43             self.set_terminator(length)
  44         else:
  45             self.set_terminator('\r\n')
  46             self.call(data)
  47 
  48     def call(self, data):
  49         funcname, para = parse_call(data)
  50         try:
  51             func = getattr(self.obj, funcname)
  52         except AttributeError:
  53             return
  54         else:
  55             try:
  56                 func(*para[0], **para[1])
  57             except TypeError:
  58                 print 'parameter error'
  59 
  60 def dispatcher_wrapper(obj):
  61     def new_obj():
  62         return Dispatcher(obj)
  63     return new_obj
  64 
  65 class Remote(object):
  66     ''' wrapper for remote object '''
  67     def __init__(self, send):
  68         self.send = send
  69     def __getattr__(self, name):
  70         def method(*args, **kw):
  71             self.send( dump_call( name, args, kw ) )
  72         return method
  73 
  74 def serve(obj, ip, port):
  75     ''' start a server '''
  76     return async_server.Listen( dispatcher_wrapper(obj), (ip, port), 6.0, 5 )
  77 
  78 def connect(obj, ip, port):
  79     ''' connect a client '''
  80     d = Dispatcher(obj)
  81     return async_client.connect( d, (ip, port), 3)

test_server.py

   1 from allegra import async_loop
   2 from async_rpc import serve
   3 
   4 class Temp(object):
   5     def __init__(self):
   6         self.remote = None
   7     def handle_connect(self):
   8         print 'connected'
   9     def handle_close(self):
  10         print 'closed'
  11     def say(self, str):
  12         print str
  13         self.remote.test(1)
  14     def test(self, a, b=1):
  15         print a,b
  16 
  17     def __getattr__(self, name):
  18         def func_wrapper(*args, **kw):
  19             print 'not implemented method: %s' % name
  20         return func_wrapper
  21 
  22 t = Temp()
  23 serve(t, 'localhost', 9000)
  24 async_loop.dispatch()

test_client.py

   1 from allegra import async_loop
   2 from async_rpc import connect
   3 
   4 class Temp(object):
   5     def handle_connect(self):
   6         print 'connected'
   7     def handle_close(self):
   8         print 'closed'
   9     def say(self, str):
  10         print str
  11         self.remote.say(str)
  12     def test(self, a, b=1):
  13         print a,b
  14 
  15 t = Temp()
  16 connect(t, 'localhost', 9000)
  17 t.say('hello')
  18 t.remote.test(1,2)
  19 async_loop.dispatch()

huangyi/simple_rpc_with_allegra (last edited 2009-12-25 07:14:17 by localhost)