Attachment 'Casing.py'
Download 1 # Programmer: limodou
2 # E-mail: [email protected]
3 #
4 # Copyleft 2005 limodou
5 #
6 # Distributed under the terms of the GPL (GNU Public License)
7 #
8 # Casing is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 2 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program; if not, write to the Free Software
20 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21 #
22 # $Id$
23 # Version = 0.1
24 # Update:
25 #
26
27 import threading
28 import time
29 import Queue
30
31 class DUMP_CLASS:pass
32 class AbortException(Exception):pass
33
34 class SyncVar(object):
35 def __init__(self):
36 self.flag = False
37 self.lock = threading.Lock()
38
39 def set(self, flag=True):
40 self.lock.acquire()
41 self.flag = flag
42 self.lock.release()
43
44 def isset(self):
45 return self.flag
46
47 def get(self):
48 return self.flag
49
50 def clear(self):
51 self.lock.acquire()
52 self.flag = False
53 self.lock.release()
54
55 def __ne__(self, other):
56 return self.flag != other
57
58 def __eq__(self, other):
59 return self.flag == other
60
61 def __nonzero__(self):
62 return bool(self.flag)
63
64 class FuncThread(threading.Thread):
65 def __init__(self, casing, syncvar, sync=False, kws={}):
66 threading.Thread.__init__(self)
67 self.casing = casing
68 self.syncvar = syncvar
69 self.sync = sync
70 self.kws = kws
71
72 def run(self):
73 if self.sync:
74 self.casing.sync_start(self.syncvar, kws=self.kws)
75 else:
76 self.casing.start(**self.kws)
77 self.syncvar.clear()
78
79 class ProcessThread(threading.Thread):
80 def __init__(self, casing, syncvar, sync=False):
81 threading.Thread.__init__(self)
82 self.casing = casing
83 self.syncvar = syncvar
84 self.sync = sync
85
86 def run(self):
87 func, args, kwargs = self.casing.on_process
88 if kwargs.has_key('timestep'):
89 timestep = kwargs['timestep']
90 del kwargs['timestep']
91 else:
92 timestep = 0.5
93 while 1:
94 if self.syncvar:
95 if self.sync:
96 kwargs['syncvar'] = self.syncvar
97 func(*args, **kwargs)
98 time.sleep(timestep)
99 else:
100 break
101
102 class MultiCasing(object):
103 def __init__(self, size=10, need_manual_stop=False, timestep=1):
104 self.on_finish = None
105 self.on_process = None
106 self.on_abort = None
107 self.size = size
108 self.need_manual_stop = need_manual_stop
109 self.queue = Queue.Queue()
110 self.active = []
111 self.event = threading.Event()
112 self.event.set()
113 self._exit_flag = False
114 self.thread_d = None
115 self.timestep = timestep
116
117 def start_thread(self):
118 self.thread_d = d = Casing(self._start)
119 if self.on_process:
120 d.onprocess(self.on_process[0], *self.on_process[1], **self.on_process[2])
121 d.start_thread()
122
123 def start_sync_thread(self):
124 self.thread_d = d = Casing(self._start, sync=True)
125 if self.on_process:
126 d.onprocess(self.on_process[0], *self.on_process[1], **self.on_process[2])
127 d.start_sync_thread()
128
129 def append(self, casing_obj):
130 self.queue.put(casing_obj, block=True, timeout=1)
131
132 def stop_thread(self):
133 for obj in self.active:
134 obj.stop_thread()
135 self._exit_flag = True
136
137 def _start(self, syncvar=None, sync=False):
138 self._exit_flag = False
139 self.running = True
140 while not self._exit_flag:
141 self.event.wait(self.timestep)
142 if not self.queue.empty() and len(self.active) < self.size:
143 casing = self.queue.get()
144 self.active.append(casing)
145 casing.onsync(self._on_sync, casing)
146 if not sync:
147 casing.start_thread()
148 else:
149 casing.start_sync_thread()
150 elif self.queue.empty() and not self.active: #not more thread obj need to run
151 if not self.need_manual_stop:
152 break
153 elif len(self.active) == self.size:
154 self.event.clear()
155 self.running = False
156 if not self.active and self.queue.empty() and self.on_finish:
157 self._run(self.on_finish)
158 elif self.on_abort:
159 self._run(self.on_abort)
160
161 def _on_sync(self, obj):
162 self.active.remove(obj)
163 self.event.set()
164
165 def onfinish(self, func, *args, **kwargs):
166 self.on_finish = func, args, kwargs
167
168 def onprocess(self, func, *args, **kwargs):
169 self.on_process = func, args, kwargs
170
171 def onabort(self, func, *args, **kwargs):
172 self.on_abort = func, args, kwargs
173
174 def _run(self, func):
175 f, args, kwargs = func
176 return f(*args, **kwargs)
177
178 def isactive(self):
179 return self.thread_d.isactive()
180
181 def running_count(self):
182 return len(self.active)
183
184 def remaining_count(self):
185 return self.queue.qsize()
186
187 class Casing(object):
188 def __init__(self, func=None, *args, **kwargs):
189 self.funcs = []
190 if func:
191 self.funcs.append((func, args, kwargs))
192 self.on_success = None
193 self.on_exception = None
194 self.on_abort = None
195 self.on_process = None
196 self.on_sync = None #used internal
197
198 self.syncvar = None
199 self.t_func = None
200 self.p_func = None
201
202 def __add__(self, obj):
203 assert isinstance(obj, Casing)
204 self.funcs.extend(obj.funcs)
205 return self
206
207 def __radd__(self, obj):
208 assert isinstance(obj, Casing)
209 self.funcs.extend(obj.funcs)
210 return self
211
212 def copy(self):
213 obj = Casing()
214 for name, value in vars(self).items():
215 setattr(obj, name, self._deepcopy(value))
216 return obj
217
218 def _deepcopy(self, obj):
219 if isinstance(obj, tuple):
220 s = []
221 for i in range(len(obj)):
222 s.append(self._deepcopy(obj[i]))
223 return tuple(s)
224 elif isinstance(obj, list):
225 s = []
226 for i in range(len(obj)):
227 s.append(self._deepcopy(obj[i]))
228 return s
229 elif isinstance(obj, dict):
230 s = {}
231 for k, v in obj.items():
232 s[k] = self._deepcopy(v)
233 return s
234 else:
235 return obj
236
237 def _update(self, src, kdict):
238 for k, v in src.items():
239 if kdict.has_key(k):
240 src[k] = kdict[k]
241
242 def append(self, func, *args, **kwargs):
243 self.funcs.append((func, args, kwargs))
244
245 def onsuccess(self, func, *args, **kwargs):
246 self.on_success = func, args, kwargs
247
248 def onexception(self, func, *args, **kwargs):
249 self.on_exception = func, args, kwargs
250
251 def onabort(self, func, *args, **kwargs):
252 self.on_abort = func, args, kwargs
253
254 def onprocess(self, func, *args, **kwargs):
255 self.on_process = func, args, kwargs
256
257 def onsync(self, func, *args, **kwargs):
258 self.on_sync = func, args, kwargs
259
260 def start(self, **kws):
261 try:
262 for func, args, kwargs in self.funcs:
263 self._update(kwargs, kws)
264 ret = self._run((func, args, kwargs))
265 if self.on_success:
266 self._run(self.on_success)
267 if self.on_sync:
268 self._run(self.on_sync)
269 except AbortException:
270 if self.on_abort:
271 self._run(self.on_abort)
272 else:
273 print 'Abort'
274 return
275 except:
276 if self.on_exception:
277 self._run(self.on_exception)
278 else:
279 import traceback
280 traceback.print_exc()
281
282 def start_thread(self, **kws):
283 self.syncvar = syncvar = SyncVar()
284 self.syncvar.set()
285 self.t_func = t = FuncThread(self, syncvar, kws=kws)
286 self.p_func = None
287 t.setDaemon(True)
288 t.start()
289 if self.on_process:
290 self.p_func = t1 = ProcessThread(self, syncvar)
291 t1.setDaemon(True)
292 t1.start()
293
294 def sync_start(self, syncvar, kws={}):
295 try:
296 for func, args, kwargs in self.funcs:
297 self._update(kwargs, kws)
298 kwargs['syncvar'] = syncvar
299 if not syncvar:
300 return
301 self._run((func, args, kwargs))
302 syncvar.clear()
303 if self.on_success:
304 self._run(self.on_success)
305 if self.on_sync:
306 self._run(self.on_sync)
307 except AbortException:
308 syncvar.clear()
309 if self.on_abort:
310 self._run(self.on_abort)
311 else:
312 print 'Abort'
313 return
314 except:
315 syncvar.clear()
316 if self.on_exception:
317 self._run(self.on_exception)
318 else:
319 import traceback
320 traceback.print_exc()
321
322 def start_sync_thread(self, **kws):
323 self.syncvar = syncvar = SyncVar()
324 self.syncvar.set()
325 self.t_func = t = FuncThread(self, syncvar, sync=True, kws=kws)
326 self.p_func = None
327 t.setDaemon(True)
328 t.start()
329 if self.on_process:
330 self.p_func = t1 = ProcessThread(self, syncvar, sync=True)
331 t1.setDaemon(True)
332 t1.start()
333
334 def stop_thread(self):
335 if self.syncvar:
336 self.syncvar.clear()
337
338 def _run(self, func):
339 f, args, kwargs = func
340 return f(*args, **kwargs)
341
342 def isactive(self):
343 return self.t_func.isAlive()
344
345 def new_obj():
346 return DUMP_CLASS()
347
348 if __name__ == '__main__':
349 def test(n, syncvar):
350 for i in range(1, n):
351 if syncvar:
352 syncvar.set(i)
353 print "=",i
354 time.sleep(1)
355 else:
356 break
357
358 def process(syncvar):
359 print 'process...', syncvar.get()
360
361 d = Casing(test, 10) + Casing(test, 20)
362 d.onprocess(process, timestep=2)
363 d.start_sync_thread()
364 time.sleep(10)
365 print 'stop'
366 d.stop_thread()
367
Attached Files
To refer to attachments on a page, use attachment:filename, as shown below in the list of files. Do NOT use the URL of the [get] link, since this is subject to change and can break easily.You are not allowed to attach a file to this page.