##language:zh #pragma section-numbers off ##含有章节索引导航的 ZPyUG 文章通用模板 <<TableOfContents>> ## 默许导航,请保留 <<Include(ZPyUGnav)>> = cron任务序列实现 = ##startInc == 需求 == [[http://groups.google.com/group/eurasia-users/browse_thread/thread/a31922b2fbdb9d4f|有沒有stackless tasklet 實現的 TaskQueue Server範例呢?]] - eurasia-users | Google 网上论坛 {{{ bawbaw <bawbaw.hu@gmail.com> reply-to eurasia-users@googlegroups.com to eurasia-users <eurasia-users@googlegroups.com> date Thu, Jul 16, 2009 at 19:14 }}} 在遊戲server 中,有些資料如能用TaskQueue進行輪詢處理的話就更省心呢,我有嘗試著實作,不過對於 tasklet 模式不能掌握,請問大家有沒有不錯的代碼可以供我參考使用呢… 像 GAE 中的 * http://googleappengine.blogspot.com/2009/06/new-task-queue-api-on-google-app-engine.html == tasklet == {{{ 沈崴 <wileishn@gmail.com> reply-to eurasia-users@googlegroups.com to eurasia-users <eurasia-users@googlegroups.com> date Fri, Jul 17, 2009 at 09:28 }}} 好像 tasklet 就是你要的東西, 你看看我理解得對不對: {{{#!python def foo(): for u in users: tasklet(sendmail)(to=u.email, subject='Hello ' + u.name, body='this is a message!') # tasklet 調用是直接返回的, 被調用者會在調度時執行 #... def sendmail(**args): mail.send_mail( 'from_me@example.com', args.get('to'), args.get('subject'), args.get('body')) #... stackless.run() # eurasia.mainloop() }}} ~ 不是這種的,這是馬上執行的,我是想要找一種,可以添加任務,設定每個任務每隔多久執行,以及設定運行一段時間候停止執行該任務有點像是 cron 那種,每隔一段時間重覆運行,但是又可以設定其運行時間,以及用程序方式去添加任務, == stackless::ProcessTask == 我来提供我昨天实作的,当服务重启时,会将之前的task 重新载入运行的 {{{ bawbaw <bawbaw.hu@gmail.com> reply-to eurasia-users@googlegroups.com to eurasia-users <eurasia-users@googlegroups.com> date Sun, Jul 19, 2009 at 19:47 }}} {{{#!python # encoding: utf-8 import os import sqlite3 import sha import pickle import sys from time import time, sleep from random import random from sqlite3 import IntegrityError from stackless import tasklet, schedule, channel def Sleep(sec): c = channel() while True: uid = sha.new('%s' % random()).hexdigest() try: cursor.execute( 'INSERT INTO hypnus VALUES (?, ?)', (uid, time() + sec) ) break except IntegrityError: continue channels[uid] = c c.receive() def tasklets(): while True: now = time() l = cursor.execute( 'SELECT id FROM hypnus WHERE timeout<?', (now, ) ).fetchall() for i in l: uid = i[0] c = channels[uid] del channels[uid] c.send(None) if l: cursor.execute( 'DELETE FROM hypnus WHERE timeout<?', (now, )) schedule() def call_func(func, obj = None, args = []): func_list = func.split('.') if obj is None:obj = sys.modules['__main__'] def _call(obj, func_list): if len(func_list) == 1: getattr(obj, func_list[0])(*args) else: obj = getattr(obj, func_list.pop(0))() _call(obj, func_list) _call(obj, func_list) class ProcessTask: DB = None def __init__(self): self.db_path = 'task.db' def init_db(self): '''init setup Task DB''' if os.path.exists(self.db_path):return cursor = sqlite3.connect(self.db_path).cursor() cursor.execute( """ CREATE TABLE task ( name VARCHAR(255) NOT NULL , func VARCHAR(255) NOT NULL , args MEDIUMTEXT NOT NULL , call_path VARCHAR(255) NOT NULL , start_time FLOAT NOT NULL , sleep_time FLOAT NOT NULL , end_time FLOAT NOT NULL , PRIMARY KEY (name) ) """ ) cursor.execute('CREATE INDEX idx_endtime ON task (end_time)') cursor.close() #cron_clear_timeout_task self.add_task(name = 'cron_clear_timeout_task',sleep_time = 86400, func = 'ProcessTask.clear_timeout_task') def get_db(self): if ProcessTask.DB is None: ProcessTask.DB = True self.conn = sqlite3.connect(self.db_path) self.cursor = self.conn.cursor() def load_task(self):#reload old task for task_rs in self.get_task_list(): tasklet(self.exec_task)(*task_rs) def exec_task(self, *arges): name, func, args, call_path, start_time, sleep_time, end_time= arges now = time() while True: if time() > end_time:break#end task if time() < start_time:Sleep(start_time-time())#notstart ,sleep first if call_path == 'local': call_func(func, args = args) else: pass Sleep(sleep_time) #print '.', time() def get_task(self, name): '''get task by name''' self.get_db() #delete end task self.cursor.execute('delete from task where name = ? and end_time<?', (name, time())) rs = self.cursor.execute('select * from task where name = ?', (name, )).fetchone() if rs is None:return rs rs = list(rs) rs[2] = pickle.loads(str(rs[2])) return rs def get_task_list(self): '''get all active task list''' self.get_db() now = time() rows = self.cursor.execute('select * from task where end_time>?', (now, )).fetchall() for rs in rows: rs = list(rs) rs[2] = pickle.loads(str(rs[2])) yield rs def clear_timeout_task(self): '''clear end task''' try: now = time() conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute('delete from task where end_time<?',(now, )) conn.commit() cursor.close() except: pass def add_task(self, **kw): '''add task self.name, self.func, self.args, self.call_path, self.start_time, self.sleep_time ''' self.__dict__.update(kw) self.get_db() if self.get_task(self.name) is not None:return False self.args = pickle.dumps(kw.get('args', [])) self.call_path = kw.get('call_path', 'local') self.start_time = kw.get('start_time', time()) self.sleep_time = kw.get('sleep_time', 3) self.durn_time = kw.get('durn_time', 99999999999999) self.end_time = self.start_time + self.durn_time try: self.cursor.execute('insert into task values(?,?,?,?,?,?,?)', (self.name, self.func, self.args, self.call_path,self.start_time, self.sleep_time, self.end_time) ) self.conn.commit() except IntegrityError: return False except OperationalError: sleep(1) self.add_task(**kw) return (self.name, self.func, kw.get('args', []), self.call_path, self.start_time, self.sleep_time, self.end_time) def close(self): self.cursor.close() cursor = sqlite3.connect(':memory:').cursor() cursor.execute( ( 'CREATE TABLE IF NOT EXISTS hypnus ' '(id TEXT PRIMARY KEY, timeout FLOAT NOT NULL)' ) ) cursor.execute('CREATE INDEX idx_timeout on hypnus(timeout)') channels = {}; tasklet(tasklets)() #使用方式 #启动时必须使用的 pt = ProcessTask() pt.init_db() pt.load_task() #添加任务 def add_task(name, func, args, call_path,start_time = None, durn_time= 86400): if start_time is None:start_time = time.time() add_rs = pt.add_task(name = name, func = func, args = args, call_path = call_path,start_time = start_time, durn_time = durn_time) if add_rs:stackless.tasklet(pt.exec_task)(*add_rs) ##endInc }}} ---- '''反馈''' 创建 by -- ZoomQuiet [<<DateTime(2009-07-19T13:06:31Z)>>]