[           ]
宙飒天下网		1. 定时任务
我们当然可以使用crontab或者airflow来执行定时任务,但如果我们希望不依赖其他工具只用python实现定时任务,那就需要使用第三方库来实现了,目前我知道的比较好用的定时任务库有两个:
- apscheduler支持最多的底层实现方式,因此比较重,同时定时语法可读性不高
- schedule只支持线程模型,但定时语法可读性比较高,接口也较为简单
我个人比较建议使用线程模型就用schedule,使用协程模型就用apscheduler
1.1. 使用schedule做基于线程的定时任务
schedule这个库.
这个库有两种用法:
- 起一个进行做定时任务
- 结合其他服务,挂在后台做定时任务
一个基本的定时任务结构如下:
import schedule import time  def job():     print("I'm working...")  schedule.every(10).seconds.do(job)  count=0# while True:     if count >= 15:#         break#     schedule.run_pending()     time.sleep(1)     count+=1 I'm working... I'm working... I'm working... 打注释的部分是不用的,通常我们需要维持主线程是一个死循环.
上面的代码中实际执行的部分在job函数中.而定义日程表则使用的schedule对象
如果要让主进程不用堵塞(通常用在服务上),我们可以为其打个补丁
from types import MethodType import schedule import time import threading  def run_continuously(self, interval=1):     """Continuously run, while executing pending jobs at each elapsed     time interval.     @return cease_continuous_run: threading.Event which can be set to     cease continuous run.     Please note that it is *intended behavior that run_continuously()     does not run missed jobs*. For example, if you've registered a job     that should run every minute and you set a continuous run interval     of one hour then your job won't be run 60 times at each interval but     only once.     """     cease_continuous_run = threading.Event()      class ScheduleThread(threading.Thread):         @classmethod         def run(cls):             while not cease_continuous_run.is_set():                 self.run_pending()                 time.sleep(interval)      continuous_thread = ScheduleThread()     continuous_thread.start()     return cease_continuous_run  schedule.default_scheduler.run_continuously = MethodType(run_continuously, schedule.default_scheduler)  def run_continuously(interval=1):     """Continuously run, while executing pending jobs at each elapsed     time interval.     @return cease_continuous_run: threading.Event which can be set to     cease continuous run.     Please note that it is *intended behavior that run_continuously()     does not run missed jobs*. For example, if you've registered a job     that should run every minute and you set a continuous run interval     of one hour then your job won't be run 60 times at each interval but     only once.     """     return schedule.default_scheduler.run_continuously(interval)   schedule.run_continuously = run_continuously def job():     print("I'm working...") schedule.every(10).seconds.do(job) schedule.run_continuously() I'm working...      <threading.Event at 0x108eff358>    I'm working... I'm working... I'm working... I'm working... I'm working... I'm working... 1.2. 使用apscheduler做基于协程的定时任务
apscheduler的使用方式也简单,将job封装成协程函数,放入scheduler中,设置好间隔时间就行
from apscheduler.schedulers.asyncio import AsyncIOScheduler scheduler = AsyncIOScheduler()  async def job():     print("I'm working...")   scheduler.add_job(job, 'interval', seconds=3) scheduler.start() I'm working... I'm working... scheduler.pause() 


 
		 
		 
		 
		

还没有评论,来说两句吧...