任务调度模块APScheduler总结

Posted by FanHao on 2019-06-28

前言

首先学习该模块之前,需要了解进程和操作系统的相关知识。

什么是任务调度?

任务调度是指基于给定的时间点,给定的时间间隔或者给定的执行次数自动的执行任务。任务调度是操作系统的重要组成部分,而对于实时的操作系统,任务调度直接影响着操作系统的实时性能。任务调度涉及到多线程并发、运行时间规则定制及解析、线程池的维护等诸多方面的工作。

APScheduler介绍

APScheduler是一个Python定时任务框架。提供了基于日期、固定时间间隔以及crontab类型的任务。目前最新版本为3.6.0。APScheduler模块的官方文档,请点击这里

基本概念

APScheduler模块包含了四个组件:

  • 触发器(trigger)
  • 作业存储(job store)
  • 执行器(executer)
  • 调度器(scheduler)

触发器包含了调度逻辑。每个任务都有它自己的触发器,触发器确定了任务下次什么时候该执行。除了他们的初始化配置,触发器完全是无状态的。

作业存储存放了预定任务。默认的作业存储只是将作业保存在内存中,但是其他作业存储可以存储在各种不同类型的数据库(sqlite或者MongoDB)中。作业的数据保存到持久性作业存储时会被序列化,并在从其中加载时进行反序列化。作业存储(默认存储除外)不会将作业数据保留在内存中,而是充当中间人,用于在后端保存、更新和搜索作业。绝对不能在调度器之间共享作业存储。

执行器是处理任务的运行。它们通常通过将作业中指定的可调用对象交给线程或进程池来完成此操作。当任务完成后,这个执行器会通知调度器,调度器随后发出相应的事件。

调度器将其余部分绑定在一起。你通常只在应用程序中调用一个调度程序(器)。应用程序开发人员通常不直接处理作业存储,执行器或触发器。相反的,调度程序提供了适当的接口来处理所有这些。配置作业存储和执行器是通过调度程序(器)完成的、添加、修改和删除作业也是如此。

安装
1
pip3 install apscheduler
调度器

APScheduler模块有多个不同的调度器。如何选择调度器取决于你程序运行环境和你使用APScheduler的目的。

  • BlockingScheduler:阻塞调度。当你的程序只运行这个调度器时可以使用。
  • BackgroundScheduler:后台调度。你的应用程序不止运行调度器,且想它在应用程序中后台运行时,可以使用该调度器。
  • AsyncIOScheduler:如果 你应用的程序使用了asyncio模块,需要使用此调度器。
  • GeventScheduler:如果你的应用程序使用了gevent技术,需要使用此调度器。
  • TornadoScheduler:可以用于开发Tornado应用程序。
  • TwistedScheduler:可以用于开发Twisted应用程序。
  • QtScheduler:可以用于开发Qt应用程序。
作业存储

要选择适当的作业存储,需要确定是否需要作业持久性。如果您总是在应用程序启动时重新创建作业,那么您可以使用默认作业存储(MemoryJobStore)。但是,如果您需要将作业保留在调度程序重新启动或应用程序崩溃之上,那么您的选择通常可以归结为编程环境中使用的工具。 如果你可以自由选择,那么由于其强大的数据完整性保护,PostgreSQL后端上的SQLAlchemy JobStore是比较推荐的选择。

执行器

默认的执行器ThreadPoolExecutor应该足以满足大多数用途,最大支持10个线程数。如果你的工作负载涉及CPU密集型操作,则应考虑使用ProcessPoolExecutor来使用多个CPU核心。甚至你可以同时使用两者,将进程池执行程序添加为辅助执行程序。

触发器

APScheduler的触发器总共有三种触发类型。

  1. date:当你想要在确定的时间运行一次作业(任务),可以使用。
  2. interval:当你想要运行在固定的间隔时间内运行作业(任务)。
  3. cron:当你想要周期性的运行该任务。例如每天,每周或者每月,可以使用cron类型

配置和使用

配置调度器

使用默认的MemoryJobStore和ThreadPoolExecutor。先创建调度器,在配置和添加作业。

1
2
3
4
5
6
7
from apscheduler.schedulers.blocking import BlockingScheduler
def my_job():
print('hello world')
# 初始化一个Blocking类型的调度器
sched = BlockingScheduler()
sched.add_job(my_job, 'interval', seconds=5, id='my_job_id')
sched.start()

不使用默认的作业存储和执行器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


jobstores = {
'mongo': MongoDBJobStore(),
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
操作作业

运行调度器,通过调用start()方法运行。

1
sched.start()

添加作业,通过add_job()方法或者使用scheduler_job()装饰器进行添加。

1
2
3
4
5
6
from apscheduler.schedulers.background import BackgroundScheduler
# 装饰器的方法添加作业
scheduler=BackgroundScheduler()
@scheduler.scheduled_job('date',run_date=datetime(2019, 6, 18, 16, 40, 30))
def test():
print('Test')

暂停和恢复作业

1
2
3
4
5
6
#暂停
apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()
#恢复
apscheduler.job.Job.resume()
apscheduler.schedulers.base.BaseScheduler.resume_job()

移除作业。通过remove()或者remove_job()方法

1
2
3
4
5
6
# remove()方法
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
# remove_job()方法
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

获取作业(job)列表。通过get_jobs()返回所有job实例或者print_jobs()输出格式化的作业列表

1
2
scheduler.get_jobs()
scheduler.print_jobs()

修改作业。通过modify()和modify_job()方法。可以修改除id外,job的任何属性。

1
job.modify(max_instances=6, name='Alternate name')

重新rescheduler作业。可以使用reschedule()或者reschedule_job()方法。需要重新组件一个触发器,并且重新计算下一次触发时间。

1
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

关闭调度。使用shutdown()方法关闭,不想等待,可以将wait设置为False

1
scheduler.shutdown(wait=False)
触发器参数

date定时,作业只执行一次。

  • run_date (datetime|str) – the date/time to run the job at
  • timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
1
2
3
sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
# 2019年7月6号16时30分5s执行
sched.add_job(my_job, 'date', run_date=datetime(2019, 7, 6, 16, 30, 5), args=['text'])

interval间隔调度

  • weeks (int) – number of weeks to wait
  • days (int) – number of days to wait
  • hours (int) – number of hours to wait
  • minutes (int) – number of minutes to wait
  • seconds (int) – number of seconds to wait
  • start_date (datetime|str) – starting point for the interval calculation
  • end_date (datetime|str) – latest possible date/time to trigger on
  • timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations
1
scheduler.add_job(job_function, 'interval', hours=2)

cron调度

  • year (int|str) – 4-digit year
  • month (int|str) – month (1-12)
  • day (int|str) – day of the (1-31)
  • week (int|str) – ISO week (1-53)
  • day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
  • hour (int|str) – hour (0-23)
  • minute (int|str) – minute (0-59)
  • second (int|str) – second (0-59)
  • start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
  • end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
  • timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)
1
2
3
4
# 6-8,11-12月第三个周五 00:00, 01:00, 02:00, 03:00运行
scheduler.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 每周一到周五运行 直到2024-05-30 00:00:00
scheduler.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2024-05-30'

代码实例

实例一
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import time
import os
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler

def tick():
"""
"""
print('Tick! The time is: %s' % datetime.now())

# 非阻塞,cron类型触发器
scheduler = BackgroundScheduler()
scheduler.add_job(tick, 'cron', day_of_week='mon-fri', hour=9, minute=30)

if __name__ == "__main__":
# 运行任务调度
scheduler.start()
print(scheduler.get_jobs())
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
while True:
time.sleep(5)
print('sleep!')
except (KeyboardInterrupt,SystemExit):
# 关闭调度
scheduler.shutdown(wait=False)
print('Exit The Job!')

解决模块使用问题

查看开发者经常提到的问题

请点击这里,查看问题回答。

通过查看日志

如果你的调度程序没有按预期执行,可以按如下设置查看打印日志

1
2
3
import logging
logging.basicConfig()
logging.getLogger('apscheduler').setlevel(logging.DEBUG)

提交模块bug

如果你发现APScheduler模块有bug,可以通过Github方式提交问题

获取帮助

在Gitter的apscheduler room中提问。

StackOverflow中提问并且打上apscheduler的tag标签。