Attachment 'Casing.py'
Download 1 # Programmer: limodou
2 # E-mail: [email protected]
3 #
4 # Copyleft 2005 limodou
5 #
6 # Distributed under the terms of the GPL (GNU Public License)
7 #
8 # NewEdit is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 2 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program; if not, write to the Free Software
20 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21 #
22 # $Id$
23
24 import threading
25 import time
26
27 class DUMP_CLASS:pass
28 class AbortException(Exception):pass
29
30 class SyncVar(object):
31 def __init__(self):
32 self.flag = False
33 self.lock = threading.Lock()
34
35 def set(self, flag=True):
36 self.lock.acquire()
37 self.flag = flag
38 self.lock.release()
39
40 def isset(self):
41 return self.flag
42
43 def get(self):
44 return self.flag
45
46 def clear(self):
47 self.lock.acquire()
48 self.flag = False
49 self.lock.release()
50
51 def __ne__(self, other):
52 return self.flag != other
53
54 def __eq__(self, other):
55 return self.flag == other
56
57 def __nonzero__(self):
58 return bool(self.flag)
59
60 class FuncThread(threading.Thread):
61 def __init__(self, casing, syncvar, sync=False):
62 threading.Thread.__init__(self)
63 self.casing = casing
64 self.syncvar = syncvar
65 self.sync = sync
66
67 def run(self):
68 if self.sync:
69 self.casing.sync_start(self.syncvar)
70 else:
71 self.casing.start()
72 self.syncvar.clear()
73
74 class ProcessThread(threading.Thread):
75 def __init__(self, casing, syncvar):
76 threading.Thread.__init__(self)
77 self.casing = casing
78 self.syncvar = syncvar
79
80 def run(self):
81 func, args, kwargs = self.casing.on_process
82 if kwargs.has_key('timestep'):
83 timestep = kwargs['timestep']
84 del kwargs['timestep']
85 else:
86 timestep = 0.5
87 while 1:
88 if self.syncvar:
89 kwargs['syncvar'] = self.syncvar
90 func(*args, **kwargs)
91 time.sleep(timestep)
92 else:
93 break
94
95 class Casing(object):
96 def __init__(self, func=None, *args, **kwargs):
97 self.funcs = []
98 if func:
99 self.funcs.append((func, args, kwargs))
100 self.on_success = None
101 self.on_exception = None
102 self.on_abort = None
103 self.on_process = None
104
105 self.syncvar = None
106 self.t_func = None
107 self.p_func = None
108
109 def __add__(self, obj):
110 assert isinstance(obj, Casing)
111 self.funcs.extend(obj.funcs)
112 return self
113
114 def __radd__(self, obj):
115 assert isinstance(obj, Casing)
116 self.funcs.extend(obj.funcs)
117 return self
118
119 def append(self, func, *args, **kwargs):
120 self.funcs.append((func, args, kwargs))
121
122 def onsuccess(self, func, *args, **kwargs):
123 self.on_success = func, args, kwargs
124
125 def onexception(self, func, *args, **kwargs):
126 self.on_exception = func, args, kwargs
127
128 def onabort(self, func, *args, **kwargs):
129 self.on_abort = func, args, kwargs
130
131 def onprocess(self, func, *args, **kwargs):
132 self.on_process = func, args, kwargs
133
134 def start(self):
135 try:
136 for func, args, kwargs in self.funcs:
137 ret = self._run((func, args, kwargs))
138 if self.on_success:
139 self._run(self.on_success)
140 except AbortException:
141 if self.on_abort:
142 self._run(self.on_abort)
143 else:
144 print 'Abort'
145 return
146 except:
147 if self.on_exception:
148 self._run(self.on_exception)
149 else:
150 import traceback
151 traceback.print_exc()
152
153 def start_thread(self):
154 self.syncvar = syncvar = SyncVar()
155 self.syncvar.set()
156 self.t_func = t = FuncThread(self, syncvar)
157 self.p_func = None
158 t.setDaemon(True)
159 t.start()
160 if self.on_process:
161 self.p_func = t1 = ProcessThread(self, syncvar)
162 t1.setDaemon(True)
163 t1.start()
164
165 def sync_start(self, syncvar):
166 try:
167 for func, args, kwargs in self.funcs:
168 kwargs['syncvar'] = syncvar
169 if not syncvar:
170 return
171 self._run((func, args, kwargs))
172 if self.on_success:
173 self._run(self.on_success)
174 except AbortException:
175 if self.on_abort:
176 self._run(self.on_abort)
177 else:
178 print 'Abort'
179 return
180 except:
181 if self.on_exception:
182 self._run(self.on_exception)
183 else:
184 import traceback
185 traceback.print_exc()
186
187 def start_sync_thread(self):
188 self.syncvar = syncvar = SyncVar()
189 self.syncvar.set()
190 self.t_func = t = FuncThread(self, syncvar, sync=True)
191 self.p_func = None
192 t.setDaemon(True)
193 t.start()
194 if self.on_process:
195 self.p_func = t1 = ProcessThread(self, syncvar)
196 t1.setDaemon(True)
197 t1.start()
198
199 def stop_thread(self):
200 if self.syncvar:
201 self.syncvar.clear()
202
203 def _run(self, func):
204 f, args, kwargs = func
205 return f(*args, **kwargs)
206
207 def new_obj():
208 return DUMP_CLASS()
209
210 if __name__ == '__main__':
211 def test(n, syncvar):
212 for i in range(1, n):
213 if syncvar:
214 syncvar.set(i)
215 print "=",i
216 time.sleep(1)
217 else:
218 break
219
220 def process(syncvar):
221 print 'process...', syncvar.get()
222
223 d = Casing(test, 10) + Casing(test, 20)
224 d.onprocess(process, timestep=2)
225 d.start_sync_thread()
226 time.sleep(10)
227 print 'stop'
228 d.stop_thread()
229
Attached Files
To refer to attachments on a page, use attachment:filename, as shown below in the list of files. Do NOT use the URL of the [get] link, since this is subject to change and can break easily.You are not allowed to attach a file to this page.