cron任务序列实现

需求

有沒有stackless tasklet 實現的 TaskQueue Server範例呢? - eurasia-users | Google 网上论坛

bawbaw <[email protected]>
reply-to        [email protected]
to      eurasia-users <[email protected]>
date    Thu, Jul 16, 2009 at 19:14

在遊戲server 中,有些資料如能用TaskQueue進行輪詢處理的話就更省心呢,我有嘗試著實作,不過對於 tasklet 模式不能掌握,請問大家有沒有不錯的代碼可以供我參考使用呢…

像 GAE 中的

tasklet

沈崴 <[email protected]>
reply-to        [email protected]
to      eurasia-users <[email protected]>
date    Fri, Jul 17, 2009 at 09:28

好像 tasklet 就是你要的東西, 你看看我理解得對不對:

   1 def foo():
   2   for u in users:
   3      tasklet(sendmail)(to=u.email, subject='Hello ' + u.name,
   4      body='this is a message!') # tasklet 調用是直接返回的, 被調用者會在調度時執行
   5 
   6   #...
   7 
   8 def sendmail(**args):
   9   mail.send_mail(
  10      '[email protected]',
  11      args.get('to'),
  12      args.get('subject'),
  13      args.get('body'))
  14 
  15   #...
  16 
  17 stackless.run() # eurasia.mainloop()

~ 不是這種的,這是馬上執行的,我是想要找一種,可以添加任務,設定每個任務每隔多久執行,以及設定運行一段時間候停止執行該任務有點像是 cron 那種,每隔一段時間重覆運行,但是又可以設定其運行時間,以及用程序方式去添加任務,

stackless::ProcessTask

我来提供我昨天实作的,当服务重启时,会将之前的task 重新载入运行的

bawbaw <[email protected]>
reply-to        [email protected]
to      eurasia-users <[email protected]>
date    Sun, Jul 19, 2009 at 19:47

   1 # encoding: utf-8
   2 
   3 import os
   4 import sqlite3
   5 import sha
   6 import pickle
   7 import sys
   8 from time import time, sleep
   9 from random import random
  10 from sqlite3 import IntegrityError
  11 from stackless import tasklet, schedule, channel
  12 
  13 def Sleep(sec):
  14        c = channel()
  15        while True:
  16                uid = sha.new('%s' % random()).hexdigest()
  17 
  18                try:
  19                        cursor.execute(
  20                                'INSERT INTO hypnus VALUES (?, ?)',
  21                                (uid, time() + sec) )
  22                        break
  23                except IntegrityError:
  24                        continue
  25 
  26        channels[uid] = c
  27        c.receive()
  28 
  29 def tasklets():
  30        while True:
  31                now = time()
  32                l = cursor.execute(
  33                        'SELECT id FROM hypnus WHERE timeout<?',
  34                        (now, ) ).fetchall()
  35                for i in l:
  36                        uid = i[0]
  37                        c = channels[uid]
  38                        del channels[uid]
  39 
  40                        c.send(None)
  41                if l:
  42                        cursor.execute(
  43                                'DELETE FROM hypnus WHERE timeout<?',
  44                                (now, ))
  45                schedule()
  46 def call_func(func, obj = None, args = []):
  47        func_list = func.split('.')
  48        if obj is None:obj = sys.modules['__main__']
  49        def _call(obj, func_list):
  50                if len(func_list) == 1:
  51                        getattr(obj, func_list[0])(*args)
  52                else:
  53                        obj = getattr(obj, func_list.pop(0))()
  54                        _call(obj, func_list)
  55        _call(obj, func_list)
  56 class ProcessTask:
  57    DB = None
  58    def __init__(self):
  59        self.db_path = 'task.db'
  60 
  61    def init_db(self):
  62        '''init setup Task DB'''
  63        if os.path.exists(self.db_path):return
  64        cursor = sqlite3.connect(self.db_path).cursor()
  65        cursor.execute( """
  66        CREATE TABLE task (
  67        name VARCHAR(255) NOT NULL ,
  68        func VARCHAR(255) NOT NULL ,
  69        args MEDIUMTEXT NOT NULL ,
  70        call_path VARCHAR(255) NOT NULL ,
  71        start_time FLOAT NOT NULL ,
  72        sleep_time FLOAT NOT NULL ,
  73        end_time FLOAT NOT NULL ,
  74        PRIMARY KEY (name)
  75        )
  76        """ )
  77        cursor.execute('CREATE INDEX idx_endtime ON task (end_time)')
  78        cursor.close()
  79        #cron_clear_timeout_task
  80        self.add_task(name = 'cron_clear_timeout_task',sleep_time =
  81 86400, func = 'ProcessTask.clear_timeout_task')
  82    def get_db(self):
  83        if ProcessTask.DB is None:
  84            ProcessTask.DB = True
  85            self.conn = sqlite3.connect(self.db_path)
  86            self.cursor = self.conn.cursor()
  87    def load_task(self):#reload old task
  88        for task_rs in self.get_task_list():
  89            tasklet(self.exec_task)(*task_rs)
  90    def exec_task(self, *arges):
  91        name, func, args, call_path, start_time, sleep_time, end_time= arges
  92        now = time()
  93 
  94        while True:
  95            if time() > end_time:break#end task
  96            if time() < start_time:Sleep(start_time-time())#notstart ,sleep first
  97            if call_path == 'local':
  98                call_func(func, args = args)
  99            else:
 100                pass
 101            Sleep(sleep_time)
 102            #print '.', time()
 103    def get_task(self, name):
 104        '''get task by name'''
 105        self.get_db()
 106        #delete end task
 107        self.cursor.execute('delete from task where name = ? and end_time<?', (name, time()))
 108        rs = self.cursor.execute('select * from task where name = ?', (name, )).fetchone()
 109        if rs is None:return rs
 110        rs = list(rs)
 111        rs[2] = pickle.loads(str(rs[2]))
 112        return rs
 113    def get_task_list(self):
 114        '''get all active task list'''
 115        self.get_db()
 116        now = time()
 117        rows = self.cursor.execute('select * from task where end_time>?', (now, )).fetchall()
 118        for rs in rows:
 119            rs = list(rs)
 120            rs[2] = pickle.loads(str(rs[2]))
 121            yield rs
 122    def clear_timeout_task(self):
 123        '''clear end task'''
 124        try:
 125            now = time()
 126            conn = sqlite3.connect(self.db_path)
 127            cursor = conn.cursor()
 128            cursor.execute('delete from task where end_time<?',(now, ))
 129            conn.commit()
 130            cursor.close()
 131        except:
 132            pass
 133    def add_task(self, **kw):
 134        '''add task self.name, self.func, self.args, self.call_path,
 135        self.start_time, self.sleep_time
 136        '''
 137        self.__dict__.update(kw)
 138        self.get_db()
 139        if self.get_task(self.name) is not None:return False
 140        self.args = pickle.dumps(kw.get('args', []))
 141        self.call_path = kw.get('call_path', 'local')
 142        self.start_time = kw.get('start_time', time())
 143        self.sleep_time = kw.get('sleep_time', 3)
 144        self.durn_time = kw.get('durn_time', 99999999999999)
 145        self.end_time = self.start_time + self.durn_time
 146 
 147        try:
 148            self.cursor.execute('insert into task values(?,?,?,?,?,?,?)',
 149            (self.name, self.func, self.args, 
 150             self.call_path,self.start_time, self.sleep_time, self.end_time)
 151            )
 152            self.conn.commit()
 153 
 154        except IntegrityError:
 155            return False
 156        except OperationalError:
 157            sleep(1)
 158            self.add_task(**kw)
 159 
 160        return (self.name, self.func, kw.get('args', []),
 161         self.call_path, self.start_time, self.sleep_time, self.end_time)
 162    def close(self):
 163        self.cursor.close()
 164 
 165 
 166 
 167 cursor = sqlite3.connect(':memory:').cursor()
 168 cursor.execute( ( 'CREATE TABLE IF NOT EXISTS hypnus '
 169        '(id TEXT PRIMARY KEY, timeout FLOAT NOT NULL)' ) )
 170 cursor.execute('CREATE INDEX idx_timeout on hypnus(timeout)')
 171 channels = {}; tasklet(tasklets)()
 172 
 173 
 174 #使用方式
 175 #启动时必须使用的
 176 pt = ProcessTask()
 177 pt.init_db()
 178 pt.load_task()
 179 
 180 #添加任务
 181 def add_task(name, func, args, call_path,start_time = None, durn_time= 86400):
 182    if start_time is None:start_time = time.time()
 183    add_rs = pt.add_task(name = name, func = func, args = args,
 184 call_path = call_path,start_time = start_time, durn_time = durn_time)
 185    if add_rs:stackless.tasklet(pt.exec_task)(*add_rs)
 186 ##endInc


反馈

创建 by -- ZoomQuiet [2009-07-19 13:06:31]

MiscItems/2009-07-19 (last edited 2009-12-25 07:16:04 by localhost)