APScheduler 计划任务

实现一个类似定时执行脚本的功能,最简单的方式是使用操作系统提供的计划任务,比如Linux下的Crontab服务。然而如果计划任务较多,任务之间的关系复杂,对任务的启停和状态记录也有要求,很显然使用这种方式会非常难以维护。APScheduler是Python中的一个轻量级定时任务调度框架,相比Crontab,使用APScheduler框架我们能更容易的实现健壮的计划任务调度逻辑。

https://apscheduler.readthedocs.io/en/stable/

安装APScheduler

执行以下pip命令安装APScheduler框架。

pip install apscheduler

基本概念

具体使用APScheduler前,我们需要了解任务调度框架中的几个基本概念,如果你用过Java的Quartz等框架会对这些概念非常熟悉,任务调度框架的设计都是类似的。

触发器(Trigger):触发器用于根据调度规则触发计划任务,APScheduler中支持间隔触发、按日期时间触发、cron表达式触发

任务存储器(Job Store):任务存储器是储存任务执行情况的组件,APScheduler中支持储存到内存、数据库、Redis等,默认为内存存储(不会持久化)

执行器(Executor):执行器是具体安排任务到线程池或进程池等执行环境的组件

调度器(Scheduler):调度器负责根据执行计划协调上述组件执行任务

对于APScheduler框架使用还是比较简单的,我们直接实例化一个调度器然后在其中配置好任务,启动调度器后调度器就能自动按照我们的计划执行了。

APScheduler实现简单定时任务

我们先看一个最简单的定时任务例子。下面代码中,我们每隔1秒打印信息,信息中包含当前的线程ID。

import logging
import threading

from apscheduler.schedulers.blocking import BlockingScheduler

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s [%(levelname)s] [%(threadName)s] %(name)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

logger = logging.getLogger(__name__)


def foo():
    logger.info('计划任务执行 线程ID %d' % threading.current_thread().ident)


if __name__ == '__main__':
    logger.info('计划任务开始 主线程ID %d' % threading.current_thread().ident)
    # 创建调度器
    scheduler = BlockingScheduler()
    # 添加任务
    scheduler.add_job(foo, 'interval', seconds=1)
    # 开启计划任务
    scheduler.start()

这里任务存储器和执行器我们没有明确指定,它们都使用了默认值。默认的存储器是内存,这也意味着当程序关闭时所有执行信息都会丢失,而默认的执行器是线程池,默认大小为10

调度器使用的是BlockingScheduler,它会阻塞主线程阻止程序返回。对于BlockingScheduler,除了它其实还有一个BackgroundScheduler,它们之间唯一的区别就是是否阻塞当前线程,一般来说对于控制台程序我们可能需要阻塞主线程,避免主线程执行完程序直接结束,此时使用BlockingScheduler;而集成到Django等框架中时,我们不需要阻塞当前线程,此时需要使用BackgroundScheduler

后面代码中我们还调用了scheduler.add_job()方法,其中第1个参数是要调用的函数,后两个参数都是调度规则,第2个参数是触发方式,这里我们采用间隔触发,第3个参数指定了间隔1秒触发。

任务触发方式

APScheduler实际上支持三种方式触发任务执行,间隔触发、按日期时间触发和cron表达式触发,我们这里直接看一些例子。

interval间隔触发例子如下,该触发方式需要设置secondsminuteshoursweeks等关键字参数,设置秒级、分钟级、小时级、周级等时间间隔。

scheduler.add_job(foo, 'interval', seconds=1)

date按日期时间触发例子如下,该触发方式需要设置run_date关键字参数,指定触发日期。

scheduler.add_job(foo, 'date', run_date='2020-01-01 12:00:00')

cron即按Linux风格的cron表达式触发,这种方式需要CronTrigger类型的参数,它使用Linux风格的cron表达式字符串构建。

scheduler.add_job(foo, CronTrigger.from_crontab('* * * * *'))

三种方式都比较简单,我们选择一种最适合的就可以了。

“重叠”任务

假设任务执行可能需要1-3秒(偶尔会变慢),但触发器采用了间隔2秒调度,当下一次触发任务调度时,上一轮还没执行完成,此时会发生什么呢?对于默认配置,你会看到一条类似下面的警告信息,日志告诉我们这次触发被跳过了。

2020-05-30 19:55:33 [WARNING] [MainThread] apscheduler.scheduler - Execution of job "foo (trigger: interval[0:00:02], next run at: 2020-05-30 19:55:33 CST)" skipped: maximum number of running instances reached (1)

APScheduler默认采用直接跳过的策略处理这种“重叠”的任务触发,即当上一轮任务还没执行完成时,下一次触发直接跳过。但假如我们需要下次触发时并发执行新的一轮任务逻辑,APScheduler也提供了相关的配置方式。下面例子中,我们在添加任务时指定了coalescemax_instances参数。

scheduler.add_job(foo, 'interval', seconds=2, coalesce=True, max_instances=3)
  • coalesce:是否开启合并执行,默认值为True,这表示如果作业错过了触发时机(例如调度器关闭了一段时间或是上一轮没能执行完且实例数不允许并发执行),当调度器恢复时,只会执行一次最新的作业,即合并错过的所有执行时机;如果设置为False,调度器会试图将所有错过的触发机会“弥补”上,你可能会看到任务被持续的触发,直到所有触发机会“弥补”完成。
  • max_instances:一个任务可以并发执行的最大实例数量,默认值为1,即不允许并发执行。如果设置为大于1,则出现触发时上一轮任务还未执行完成时,下一轮会并发的开启执行,而最大并发执行数就是这个参数指定的大小。

在调度器上我们也可以统一设置job_defaults关键字参数,它设置了接下来要添加的任务的默认行为。

scheduler = BlockingScheduler(job_defaults={
    'coalesce': True,
    'max_instances': 3
})

线程池执行器和进程池执行器

前面我们说过,APScheduler默认使用线程池执行器,且线程池大小为10。考虑这样的场景,假如我们要调度11个任务,每个任务都是每秒执行一次,且每个任务都需要耗时1秒才能执行完成,我们的这11个任务还能全部及时调度吗?答案是不能,由于我们的线程池大小为10,显然每轮触发都会至少有一个任务无法及时调度。

这里假设我们的这些任务都是IO密集型任务,此时解决这个问题的最简单方式就是调大线程池,这需要对执行器进行配置。下面例子中,我们配置了线程池大小为20的执行器来初始化调度器。

executors = {
    'default': ThreadPoolExecutor(20)
}
scheduler = BlockingScheduler(executors=executors)

配置中我们使用了default这个调度器名字,它意味着我们注册的任务如果没有特殊指定,默认就会使用这个执行器。如果我们配置了多个执行器,可以在add_job()方法中使用executor命名参数指定这个任务需要注册到的执行器名字。

scheduler.add_job(foo, 'interval', seconds=1, executor='demo_executor')

除了线程池执行器,对于计算密集型任务,我们还可以使用进程池执行器。由于Python语言的GIL限制,单进程无法充分利用多核处理器的计算性能,APScheduler则非常贴心的实现了进程池,使得我们通过简单的配置就可以调度多进程任务。假设我们的CPU核心是4,我们这里创建一个大小为4的进程池执行器。

executors = {
    'default': ProcessPoolExecutor(4)
}
scheduler = BlockingScheduler(executors=executors)

当然,进程池执行器也有一些限制:

  1. 任务函数必须在模块顶层定义,不可嵌套
  2. 函数参数必须可Pickle序列化
  3. 无法共享全局变量,因为进程间不共享内存

任务存储器配置

任务存储器用于持久化任务的调度信息,APScheduler支持以下几种任存储器组件:

  • 内存
  • MongoDB
  • Redis
  • ZooKeeper
  • 关系型数据库(基于SQLAlchemy)

默认情况下任务是存储在内存中的,我们的任务在每次程序启动时都需要通过add_job方法重新添加任务。简单的场景可能不需要持久化调度信息,此时我们使用内存作为任务存储器即可,但有时我们的需求不止于此,比如需要将任务保存在文件或数据库中,通过程序对其进行增删改查、配置、启动或停止,这种情况存储在内存中就不合适了。

下面例子配置使用Redis作为任务存储数据库,这需要额外安装redis-py这个库。

pip install redis

安装完成后,我们设置默认的任务存储器为RedisJobStore

jobstores = {
    'default': RedisJobStore(db=0, host='127.0.0.1', port=6379)
}
scheduler = BlockingScheduler(jobstores=jobstores)

此时我们的调度信息就会存储在Redis中,即使程序重启调度信息也不会直接丢失了。

任务管理

之前我们的大部分例子都是使用了内存的任务存储器并在每次启动程序时一次性的添加任务,这里我们基于持久化的任务存储器介绍任务的添加、删除、查询等操作。

添加任务

scheduler.add_job(foo, 'interval', seconds=1, id='testjob', replace_existing=True)

这里我们指定了两个新参数:

  • id:任务标识ID,需要保证唯一,后续删除等操作需要用到
  • replace_existing:覆盖同ID的已存在任务,这里建议指定上,ID的唯一性需要由我们程序逻辑保证

删除任务

scheduler.remove_job('testjob')

删除任务比较简单,直接使用任务ID即可删除。

列出任务

jobs = scheduler.get_jobs()

scheduler.get_jobs()返回当前存储器中已经部署的任务列表。

暂停任务

scheduler.pause_job('testjob')

恢复任务

scheduler.resume_job('testjob')

关于分布式调度

APScheduler是一个轻量级的单机任务调度框架,它并不直接原生支持分布式任务调度。在比较简单的场景下,我们可以通过分布式锁等逻辑使其具有一定的分布式逻辑,但实际开发中,对于复杂的分布式场景,我们可能还是需要考虑Celery等更健壮的分布式方案。

作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。