##language:zh
#pragma section-numbers off
##含有章节索引导航的 ZPyUG 文章通用模板
<<TableOfContents>>
## 默许导航,请保留
<<Include(ZPyUGnav)>>


= cron任务序列实现 =


##startInc
== 需求 ==
[[http://groups.google.com/group/eurasia-users/browse_thread/thread/a31922b2fbdb9d4f|有沒有stackless tasklet 實現的 TaskQueue Server範例呢?]] - eurasia-users | Google 网上论坛

{{{
bawbaw <bawbaw.hu@gmail.com>
reply-to	eurasia-users@googlegroups.com
to	eurasia-users <eurasia-users@googlegroups.com>
date	Thu, Jul 16, 2009 at 19:14
}}}	
	
在遊戲server 中,有些資料如能用TaskQueue進行輪詢處理的話就更省心呢,我有嘗試著實作,不過對於 tasklet 模式不能掌握,請問大家有沒有不錯的代碼可以供我參考使用呢…

像 GAE 中的
 * http://googleappengine.blogspot.com/2009/06/new-task-queue-api-on-google-app-engine.html

== tasklet ==
{{{
沈崴 <wileishn@gmail.com>
reply-to	eurasia-users@googlegroups.com
to	eurasia-users <eurasia-users@googlegroups.com>
date	Fri, Jul 17, 2009 at 09:28
}}}
好像 tasklet 就是你要的東西, 你看看我理解得對不對:


{{{#!python
def foo():
  for u in users:
     tasklet(sendmail)(to=u.email, subject='Hello ' + u.name,
     body='this is a message!') # tasklet 調用是直接返回的, 被調用者會在調度時執行

  #...

def sendmail(**args):
  mail.send_mail(
     'from_me@example.com',
     args.get('to'),
     args.get('subject'),
     args.get('body'))

  #...

stackless.run() # eurasia.mainloop()
}}}

~ 不是這種的,這是馬上執行的,我是想要找一種,可以添加任務,設定每個任務每隔多久執行,以及設定運行一段時間候停止執行該任務有點像是  cron 那種,每隔一段時間重覆運行,但是又可以設定其運行時間,以及用程序方式去添加任務,


== stackless::ProcessTask ==
我来提供我昨天实作的,当服务重启时,会将之前的task 重新载入运行的
{{{
bawbaw <bawbaw.hu@gmail.com>
reply-to	eurasia-users@googlegroups.com
to	eurasia-users <eurasia-users@googlegroups.com>
date	Sun, Jul 19, 2009 at 19:47
}}}

{{{#!python
# encoding: utf-8

import os
import sqlite3
import sha
import pickle
import sys
from time import time, sleep
from random import random
from sqlite3 import IntegrityError
from stackless import tasklet, schedule, channel

def Sleep(sec):
       c = channel()
       while True:
               uid = sha.new('%s' % random()).hexdigest()

               try:
                       cursor.execute(
                               'INSERT INTO hypnus VALUES (?, ?)',
                               (uid, time() + sec) )
                       break
               except IntegrityError:
                       continue

       channels[uid] = c
       c.receive()

def tasklets():
       while True:
               now = time()
               l = cursor.execute(
                       'SELECT id FROM hypnus WHERE timeout<?',
                       (now, ) ).fetchall()
               for i in l:
                       uid = i[0]
                       c = channels[uid]
                       del channels[uid]

                       c.send(None)
               if l:
                       cursor.execute(
                               'DELETE FROM hypnus WHERE timeout<?',
                               (now, ))
               schedule()
def call_func(func, obj = None, args = []):
       func_list = func.split('.')
       if obj is None:obj = sys.modules['__main__']
       def _call(obj, func_list):
               if len(func_list) == 1:
                       getattr(obj, func_list[0])(*args)
               else:
                       obj = getattr(obj, func_list.pop(0))()
                       _call(obj, func_list)
       _call(obj, func_list)
class ProcessTask:
   DB = None
   def __init__(self):
       self.db_path = 'task.db'

   def init_db(self):
       '''init setup Task DB'''
       if os.path.exists(self.db_path):return
       cursor = sqlite3.connect(self.db_path).cursor()
       cursor.execute( """
       CREATE TABLE task (
       name VARCHAR(255) NOT NULL ,
       func VARCHAR(255) NOT NULL ,
       args MEDIUMTEXT NOT NULL ,
       call_path VARCHAR(255) NOT NULL ,
       start_time FLOAT NOT NULL ,
       sleep_time FLOAT NOT NULL ,
       end_time FLOAT NOT NULL ,
       PRIMARY KEY (name)
       )
       """ )
       cursor.execute('CREATE INDEX idx_endtime ON task (end_time)')
       cursor.close()
       #cron_clear_timeout_task
       self.add_task(name = 'cron_clear_timeout_task',sleep_time =
86400, func = 'ProcessTask.clear_timeout_task')
   def get_db(self):
       if ProcessTask.DB is None:
           ProcessTask.DB = True
           self.conn = sqlite3.connect(self.db_path)
           self.cursor = self.conn.cursor()
   def load_task(self):#reload old task
       for task_rs in self.get_task_list():
           tasklet(self.exec_task)(*task_rs)
   def exec_task(self, *arges):
       name, func, args, call_path, start_time, sleep_time, end_time= arges
       now = time()

       while True:
           if time() > end_time:break#end task
           if time() < start_time:Sleep(start_time-time())#notstart ,sleep first
           if call_path == 'local':
               call_func(func, args = args)
           else:
               pass
           Sleep(sleep_time)
           #print '.', time()
   def get_task(self, name):
       '''get task by name'''
       self.get_db()
       #delete end task
       self.cursor.execute('delete from task where name = ? and end_time<?', (name, time()))
       rs = self.cursor.execute('select * from task where name = ?', (name, )).fetchone()
       if rs is None:return rs
       rs = list(rs)
       rs[2] = pickle.loads(str(rs[2]))
       return rs
   def get_task_list(self):
       '''get all active task list'''
       self.get_db()
       now = time()
       rows = self.cursor.execute('select * from task where end_time>?', (now, )).fetchall()
       for rs in rows:
           rs = list(rs)
           rs[2] = pickle.loads(str(rs[2]))
           yield rs
   def clear_timeout_task(self):
       '''clear end task'''
       try:
           now = time()
           conn = sqlite3.connect(self.db_path)
           cursor = conn.cursor()
           cursor.execute('delete from task where end_time<?',(now, ))
           conn.commit()
           cursor.close()
       except:
           pass
   def add_task(self, **kw):
       '''add task self.name, self.func, self.args, self.call_path,
       self.start_time, self.sleep_time
       '''
       self.__dict__.update(kw)
       self.get_db()
       if self.get_task(self.name) is not None:return False
       self.args = pickle.dumps(kw.get('args', []))
       self.call_path = kw.get('call_path', 'local')
       self.start_time = kw.get('start_time', time())
       self.sleep_time = kw.get('sleep_time', 3)
       self.durn_time = kw.get('durn_time', 99999999999999)
       self.end_time = self.start_time + self.durn_time

       try:
           self.cursor.execute('insert into task values(?,?,?,?,?,?,?)',
           (self.name, self.func, self.args, 
            self.call_path,self.start_time, self.sleep_time, self.end_time)
           )
           self.conn.commit()

       except IntegrityError:
           return False
       except OperationalError:
           sleep(1)
           self.add_task(**kw)

       return (self.name, self.func, kw.get('args', []),
        self.call_path, self.start_time, self.sleep_time, self.end_time)
   def close(self):
       self.cursor.close()



cursor = sqlite3.connect(':memory:').cursor()
cursor.execute( ( 'CREATE TABLE IF NOT EXISTS hypnus '
       '(id TEXT PRIMARY KEY, timeout FLOAT NOT NULL)' ) )
cursor.execute('CREATE INDEX idx_timeout on hypnus(timeout)')
channels = {}; tasklet(tasklets)()


#使用方式
#启动时必须使用的
pt = ProcessTask()
pt.init_db()
pt.load_task()

#添加任务
def add_task(name, func, args, call_path,start_time = None, durn_time= 86400):
   if start_time is None:start_time = time.time()
   add_rs = pt.add_task(name = name, func = func, args = args,
call_path = call_path,start_time = start_time, durn_time = durn_time)
   if add_rs:stackless.tasklet(pt.exec_task)(*add_rs)
##endInc
}}}

----
'''反馈'''

创建 by -- ZoomQuiet [<<DateTime(2009-07-19T13:06:31Z)>>]