Contents
cron任务序列实现
需求
有沒有stackless tasklet 實現的 TaskQueue Server範例呢? - eurasia-users | Google 网上论坛
bawbaw <[email protected]> reply-to [email protected] to eurasia-users <[email protected]> date Thu, Jul 16, 2009 at 19:14
在遊戲server 中,有些資料如能用TaskQueue進行輪詢處理的話就更省心呢,我有嘗試著實作,不過對於 tasklet 模式不能掌握,請問大家有沒有不錯的代碼可以供我參考使用呢…
像 GAE 中的
tasklet
沈崴 <[email protected]> reply-to [email protected] to eurasia-users <[email protected]> date Fri, Jul 17, 2009 at 09:28
好像 tasklet 就是你要的東西, 你看看我理解得對不對:
1 def foo():
2 for u in users:
3 tasklet(sendmail)(to=u.email, subject='Hello ' + u.name,
4 body='this is a message!') # tasklet 調用是直接返回的, 被調用者會在調度時執行
5
6 #...
7
8 def sendmail(**args):
9 mail.send_mail(
10 '[email protected]',
11 args.get('to'),
12 args.get('subject'),
13 args.get('body'))
14
15 #...
16
17 stackless.run() # eurasia.mainloop()
~ 不是這種的,這是馬上執行的,我是想要找一種,可以添加任務,設定每個任務每隔多久執行,以及設定運行一段時間候停止執行該任務有點像是 cron 那種,每隔一段時間重覆運行,但是又可以設定其運行時間,以及用程序方式去添加任務,
stackless::ProcessTask
我来提供我昨天实作的,当服务重启时,会将之前的task 重新载入运行的
bawbaw <[email protected]> reply-to [email protected] to eurasia-users <[email protected]> date Sun, Jul 19, 2009 at 19:47
1 # encoding: utf-8
2
3 import os
4 import sqlite3
5 import sha
6 import pickle
7 import sys
8 from time import time, sleep
9 from random import random
10 from sqlite3 import IntegrityError
11 from stackless import tasklet, schedule, channel
12
13 def Sleep(sec):
14 c = channel()
15 while True:
16 uid = sha.new('%s' % random()).hexdigest()
17
18 try:
19 cursor.execute(
20 'INSERT INTO hypnus VALUES (?, ?)',
21 (uid, time() + sec) )
22 break
23 except IntegrityError:
24 continue
25
26 channels[uid] = c
27 c.receive()
28
29 def tasklets():
30 while True:
31 now = time()
32 l = cursor.execute(
33 'SELECT id FROM hypnus WHERE timeout<?',
34 (now, ) ).fetchall()
35 for i in l:
36 uid = i[0]
37 c = channels[uid]
38 del channels[uid]
39
40 c.send(None)
41 if l:
42 cursor.execute(
43 'DELETE FROM hypnus WHERE timeout<?',
44 (now, ))
45 schedule()
46 def call_func(func, obj = None, args = []):
47 func_list = func.split('.')
48 if obj is None:obj = sys.modules['__main__']
49 def _call(obj, func_list):
50 if len(func_list) == 1:
51 getattr(obj, func_list[0])(*args)
52 else:
53 obj = getattr(obj, func_list.pop(0))()
54 _call(obj, func_list)
55 _call(obj, func_list)
56 class ProcessTask:
57 DB = None
58 def __init__(self):
59 self.db_path = 'task.db'
60
61 def init_db(self):
62 '''init setup Task DB'''
63 if os.path.exists(self.db_path):return
64 cursor = sqlite3.connect(self.db_path).cursor()
65 cursor.execute( """
66 CREATE TABLE task (
67 name VARCHAR(255) NOT NULL ,
68 func VARCHAR(255) NOT NULL ,
69 args MEDIUMTEXT NOT NULL ,
70 call_path VARCHAR(255) NOT NULL ,
71 start_time FLOAT NOT NULL ,
72 sleep_time FLOAT NOT NULL ,
73 end_time FLOAT NOT NULL ,
74 PRIMARY KEY (name)
75 )
76 """ )
77 cursor.execute('CREATE INDEX idx_endtime ON task (end_time)')
78 cursor.close()
79 #cron_clear_timeout_task
80 self.add_task(name = 'cron_clear_timeout_task',sleep_time =
81 86400, func = 'ProcessTask.clear_timeout_task')
82 def get_db(self):
83 if ProcessTask.DB is None:
84 ProcessTask.DB = True
85 self.conn = sqlite3.connect(self.db_path)
86 self.cursor = self.conn.cursor()
87 def load_task(self):#reload old task
88 for task_rs in self.get_task_list():
89 tasklet(self.exec_task)(*task_rs)
90 def exec_task(self, *arges):
91 name, func, args, call_path, start_time, sleep_time, end_time= arges
92 now = time()
93
94 while True:
95 if time() > end_time:break#end task
96 if time() < start_time:Sleep(start_time-time())#notstart ,sleep first
97 if call_path == 'local':
98 call_func(func, args = args)
99 else:
100 pass
101 Sleep(sleep_time)
102 #print '.', time()
103 def get_task(self, name):
104 '''get task by name'''
105 self.get_db()
106 #delete end task
107 self.cursor.execute('delete from task where name = ? and end_time<?', (name, time()))
108 rs = self.cursor.execute('select * from task where name = ?', (name, )).fetchone()
109 if rs is None:return rs
110 rs = list(rs)
111 rs[2] = pickle.loads(str(rs[2]))
112 return rs
113 def get_task_list(self):
114 '''get all active task list'''
115 self.get_db()
116 now = time()
117 rows = self.cursor.execute('select * from task where end_time>?', (now, )).fetchall()
118 for rs in rows:
119 rs = list(rs)
120 rs[2] = pickle.loads(str(rs[2]))
121 yield rs
122 def clear_timeout_task(self):
123 '''clear end task'''
124 try:
125 now = time()
126 conn = sqlite3.connect(self.db_path)
127 cursor = conn.cursor()
128 cursor.execute('delete from task where end_time<?',(now, ))
129 conn.commit()
130 cursor.close()
131 except:
132 pass
133 def add_task(self, **kw):
134 '''add task self.name, self.func, self.args, self.call_path,
135 self.start_time, self.sleep_time
136 '''
137 self.__dict__.update(kw)
138 self.get_db()
139 if self.get_task(self.name) is not None:return False
140 self.args = pickle.dumps(kw.get('args', []))
141 self.call_path = kw.get('call_path', 'local')
142 self.start_time = kw.get('start_time', time())
143 self.sleep_time = kw.get('sleep_time', 3)
144 self.durn_time = kw.get('durn_time', 99999999999999)
145 self.end_time = self.start_time + self.durn_time
146
147 try:
148 self.cursor.execute('insert into task values(?,?,?,?,?,?,?)',
149 (self.name, self.func, self.args,
150 self.call_path,self.start_time, self.sleep_time, self.end_time)
151 )
152 self.conn.commit()
153
154 except IntegrityError:
155 return False
156 except OperationalError:
157 sleep(1)
158 self.add_task(**kw)
159
160 return (self.name, self.func, kw.get('args', []),
161 self.call_path, self.start_time, self.sleep_time, self.end_time)
162 def close(self):
163 self.cursor.close()
164
165
166
167 cursor = sqlite3.connect(':memory:').cursor()
168 cursor.execute( ( 'CREATE TABLE IF NOT EXISTS hypnus '
169 '(id TEXT PRIMARY KEY, timeout FLOAT NOT NULL)' ) )
170 cursor.execute('CREATE INDEX idx_timeout on hypnus(timeout)')
171 channels = {}; tasklet(tasklets)()
172
173
174 #使用方式
175 #启动时必须使用的
176 pt = ProcessTask()
177 pt.init_db()
178 pt.load_task()
179
180 #添加任务
181 def add_task(name, func, args, call_path,start_time = None, durn_time= 86400):
182 if start_time is None:start_time = time.time()
183 add_rs = pt.add_task(name = name, func = func, args = args,
184 call_path = call_path,start_time = start_time, durn_time = durn_time)
185 if add_rs:stackless.tasklet(pt.exec_task)(*add_rs)
186 ##endInc
反馈
创建 by -- ZoomQuiet [2009-07-19 13:06:31]