Python源码学习Schedule

简介: Python源码学习Schedule上一篇《一个简单的Python调度器》介绍了一个简单的Python调度器的使用,后来我翻阅了一下它的源码,惊奇的发现核心库才一个文件,代码量短短700行不到。这是绝佳的学习材料。

Python源码学习Schedule
上一篇《一个简单的Python调度器》介绍了一个简单的Python调度器的使用,后来我翻阅了一下它的源码,惊奇的发现核心库才一个文件,代码量短短700行不到。这是绝佳的学习材料。
让我喜出望外的是这个库的作者竟然就是我最近阅读的一本书《Python Tricks》的作者!现在就让我们看看大神的实现思路。

0x00 准备
项目地址

https://github.com/dbader/schedule

将代码checkout到本地

环境

PyCharm+venv+Python3

0x01 用法
这个在上一篇也介绍过了,非常简单

import schedule

定义需要执行的方法

def job():

print("a simple scheduler in python.")
AI 代码解读

设置调度的参数,这里是每2秒执行一次

schedule.every(2).seconds.do(job)

if name == '__main__':

while True:
    schedule.run_pending()
AI 代码解读

执行结果

a simple scheduler in python.
a simple scheduler in python.
a simple scheduler in python.
...
这个库的文档也很详细,可以浏览 https://schedule.readthedocs.io/ 了解库的大概用法

0x02 项目结构
(venv) schedule git:(master) tree -L 2
.
...
├── requirements-dev.txt
├── schedule
│ └── __init__.py
├── setup.py
├── test_schedule.py
├── tox.ini
└── venv

├── bin
├── include
├── lib
├── pip-selfcheck.json
└── pyvenv.cfg
AI 代码解读

8 directories, 18 files
schedule目录下就一个__init__.py文件,这是我们需要重点学习的地方。
setup.py文件是发布项目的配置文件
test_schedule.py是单元测试文件,一开始除了看文档外,也可以从单元测试中入手,了解这个库的使用
requirements-dev.txt 开发环境的依赖库文件,如果核心的库是不需要第三方的依赖的,但是单元测试需要
venv是我checkout后创建的,原本的项目是没有的
0x03 schedule
我们知道__init__.py是定义Python包必需的文件。在这个文件中定义方法、类都可以在使用import命令时导入到工程项目中,然后使用。

schedule 源码
以下是schedule会用到的模块,都是Python内部的模块。

import collections
import datetime
import functools
import logging
import random
import re
import time

logger = logging.getLogger('schedule')
然后定义了一个日志打印工具实例

接着是定义了该模块的3个异常类的结构体系,是由Exception派生出来的,分别是ScheduleError、ScheduleValueError和IntervalError

class ScheduleError(Exception):

"""Base schedule exception"""
pass
AI 代码解读

class ScheduleValueError(ScheduleError):

"""Base schedule value error"""
pass
AI 代码解读

class IntervalError(ScheduleValueError):

"""An improper interval was used"""
pass
AI 代码解读

还定义了一个CancelJob的类,用于取消调度器的继续执行

class CancelJob(object):

"""
Can be returned from a job to unschedule itself.
"""
pass
AI 代码解读

例如在自定义的需要被调度方法中返回这个CancelJob类就可以实现一次性的任务

定义需要执行的方法

def job():

print("a simple scheduler in python.")
# 返回CancelJob可以停止调度器的后续执行
return schedule.CancelJob
AI 代码解读

接着就是这个库的两个核心类Scheduler和Job。

class Scheduler(object):

"""
Objects instantiated by the :class:`Scheduler <Scheduler>` are
factories to create jobs, keep record of scheduled jobs and
handle their execution.
"""
AI 代码解读

class Job(object):

"""
A periodic job as used by :class:`Scheduler`.

:param interval: A quantity of a certain time unit
:param scheduler: The :class:`Scheduler <Scheduler>` instance that
                  this job will register itself with once it has
                  been fully configured in :meth:`Job.do()`.

Every job runs at a given fixed time interval that is defined by:

* a :meth:`time unit <Job.second>`
* a quantity of `time units` defined by `interval`

A job is usually created and returned by :meth:`Scheduler.every`
method, which also defines its `interval`.
"""
AI 代码解读

Scheduler是调度器的实现类,它负责调度任务(job)的创建和执行。

Job则是对需要执行任务的抽象。

这两个类是这个库的核心,后面我们还会看到详细的分析。
接下来就是默认调度器default_scheduler和任务列表jobs的创建。

The following methods are shortcuts for not having to

create a Scheduler instance:

: Default :class:Scheduler <Scheduler> object

default_scheduler = Scheduler()

: Default :class:Jobs <Job> list

jobs = default_scheduler.jobs # todo: should this be a copy, e.g. jobs()?
在执行import schedule后,就默认创建了default_scheduler。而Scheduler的构造方法为

def __init__(self):

self.jobs = []
AI 代码解读

在执行初始化时,调度器就创建了一个空的任务列表。

在文件的最后定义了一些链式调用的方法,使用起来也是非常人性化的,值得学习。
这里的方法都定义在模块下,而且都是封装了default_scheduler实例的调用。

def every(interval=1):

"""Calls :meth:`every <Scheduler.every>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
return default_scheduler.every(interval)
AI 代码解读

def run_pending():

"""Calls :meth:`run_pending <Scheduler.run_pending>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
default_scheduler.run_pending()
AI 代码解读

def run_all(delay_seconds=0):

"""Calls :meth:`run_all <Scheduler.run_all>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
default_scheduler.run_all(delay_seconds=delay_seconds)
AI 代码解读

def clear(tag=None):

"""Calls :meth:`clear <Scheduler.clear>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
default_scheduler.clear(tag)
AI 代码解读

def cancel_job(job):

"""Calls :meth:`cancel_job <Scheduler.cancel_job>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
default_scheduler.cancel_job(job)
AI 代码解读

def next_run():

"""Calls :meth:`next_run <Scheduler.next_run>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
return default_scheduler.next_run
AI 代码解读

def idle_seconds():

"""Calls :meth:`idle_seconds <Scheduler.idle_seconds>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
return default_scheduler.idle_seconds
AI 代码解读

我们看下入口方法run_pending(),从本文一开头的Demo可以知道这个是启动调度器的方法。这里它执行了default_scheduler中的方法。

default_scheduler.run_pending()
所以我们就把目光定位到Scheduler类的相应方法

def run_pending(self):

"""
Run all jobs that are scheduled to run.

Please note that it is *intended behavior that run_pending()
does not run missed jobs*. For example, if you've registered a job
that should run every minute and you only call run_pending()
in one hour increments then your job won't be run 60 times in
between but only once.
"""
runnable_jobs = (job for job in self.jobs if job.should_run)
for job in sorted(runnable_jobs):
    self._run_job(job)
AI 代码解读

这个方法中首先从jobs列表将需要执行的任务过滤后放在runnable_jobs列表,然后将其排序后顺序执行内部的_run_job(job)方法

def _run_job(self, job):

ret = job.run()
if isinstance(ret, CancelJob) or ret is CancelJob:
    self.cancel_job(job)
AI 代码解读

在_run_job方法中就调用了job类中的run方法,并根据返回值判断是否需要取消任务。

这时候我们要看下Job类的实现逻辑。

首先我们要看下Job是什么时候创建的。还是从Demo中的代码入手

schedule.every(2).seconds.do(job)
这里先执行了schedule.every()方法

def every(interval=1):

"""Calls :meth:`every <Scheduler.every>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
return default_scheduler.every(interval)
AI 代码解读

这个方法就是scheduler类中的every方法

def every(self, interval=1):

"""
Schedule a new periodic job.

:param interval: A quantity of a certain time unit
:return: An unconfigured :class:`Job <Job>`
"""
job = Job(interval, self)
return job
AI 代码解读

在这里创建了一个任务job,并将参数interval和scheduler实例传入到构造方法中,最后返回job实例用于实现链式调用。

跳转到Job的构造方法

def __init__(self, interval, scheduler=None):

self.interval = interval  # pause interval * unit between runs
self.latest = None  # upper limit to the interval
self.job_func = None  # the job job_func to run
self.unit = None  # time units, e.g. 'minutes', 'hours', ...
self.at_time = None  # optional time at which this job runs
self.last_run = None  # datetime of the last run
self.next_run = None  # datetime of the next run
self.period = None  # timedelta between runs, only valid for
self.start_day = None  # Specific day of the week to start on
self.tags = set()  # unique set of tags for the job
self.scheduler = scheduler  # scheduler to register with
AI 代码解读

主要初始化了间隔时间配置、需要执行的方法、调度器各种时间单位等。

执行every方法之后又调用了seconds这个属性方法

@property
def seconds(self):

self.unit = 'seconds'
return self
AI 代码解读

设置了时间单位,这个设置秒,当然还有其它类似的属性方法minutes、hours、days等等。

最后就是执行了do方法

def do(self, job_func, args, *kwargs):

"""
Specifies the job_func that should be called every time the
job runs.

Any additional arguments are passed on to job_func when
the job runs.

:param job_func: The function to be scheduled
:return: The invoked job instance
"""
self.job_func = functools.partial(job_func, *args, **kwargs)
try:
    functools.update_wrapper(self.job_func, job_func)
except AttributeError:
    # job_funcs already wrapped by functools.partial won't have
    # __name__, __module__ or __doc__ and the update_wrapper()
    # call will fail.
    pass
self._schedule_next_run()
self.scheduler.jobs.append(self)
return self
AI 代码解读

在这里使用functools工具的中的偏函数partial将我们自定义的方法封装成可调用的对象

然后就调用_schedule_next_run方法,它主要是对时间的解析,按照时间对job排序,我觉得这个方法是本项目中的技术点,逻辑也是稍微复杂一丢丢,仔细阅读就可以看懂,主要是对时间datetime的使用。由于篇幅,这里就不再贴出代码。

这里就完成了任务job的添加。然后在调用run_pending方法中就可以让任务执行。

0x04 总结一下
schedule库定义两个核心类Scheduler和Job。在导入包时就默认创建一个Scheduler对象,并初始化任务列表。
schedule模块提供了链式调用的接口,在配置schedule参数时,就会创建任务对象job,并会将job添加到任务列表中,最后在执行run_pending方法时,就会调用我们自定义的方法。
这个库的核心思想是使用面向对象方法,对事物能够准确地抽象,它总体的逻辑并不复杂,是学习源码很不错的范例。

0x05 学习资料
https://github.com/dbader/schedule
https://schedule.readthedocs.io
关于我
一个有思想的程序猿,终身学习实践者,目前在一个创业团队任team lead,技术栈涉及Android、Python、Java和Go,这个也是我们团队的主要技术栈。
Github:https://github.com/hylinux1024
微信公众号:终身开发者(angrycode)
原文地址https://www.cnblogs.com/angrycode/p/11433283.html

相关文章
堆叠集成策略的原理、实现方法及Python应用。堆叠通过多层模型组合,先用不同基础模型生成预测,再用元学习器整合这些预测,提升模型性能
本文深入探讨了堆叠集成策略的原理、实现方法及Python应用。堆叠通过多层模型组合,先用不同基础模型生成预测,再用元学习器整合这些预测,提升模型性能。文章详细介绍了堆叠的实现步骤,包括数据准备、基础模型训练、新训练集构建及元学习器训练,并讨论了其优缺点。
89 3
学习Python Web开发的安全测试需要具备哪些知识?
学习Python Web开发的安全测试需要具备哪些知识?
40 4
pytorch学习一:Anaconda下载、安装、配置环境变量。anaconda创建多版本python环境。安装 pytorch。
这篇文章是关于如何使用Anaconda进行Python环境管理,包括下载、安装、配置环境变量、创建多版本Python环境、安装PyTorch以及使用Jupyter Notebook的详细指南。
411 1
pytorch学习一:Anaconda下载、安装、配置环境变量。anaconda创建多版本python环境。安装 pytorch。
Python学习的自我理解和想法(9)
这是我在B站跟随千锋教育学习Python的第9天,主要学习了赋值、浅拷贝和深拷贝的概念及其底层逻辑。由于开学时间紧张,内容较为简略,但希望能帮助理解这些重要概念。赋值是创建引用,浅拷贝创建新容器但元素仍引用原对象,深拷贝则创建完全独立的新对象。希望对大家有所帮助,欢迎讨论。
Python学习的自我理解和想法(10)
这是我在千锋教育B站课程学习Python的第10天笔记,主要学习了函数的相关知识。内容包括函数的定义、组成、命名、参数分类(必须参数、关键字参数、默认参数、不定长参数)及调用注意事项。由于开学时间有限,记录较为简略,望谅解。通过学习,我理解了函数可以封装常用功能,简化代码并便于维护。若有不当之处,欢迎指正。
1.1 学习Python操作Excel的必要性
学习Python操作Excel在当今数据驱动的商业环境中至关重要。Python能处理大规模数据集,突破Excel行数限制;提供丰富的库实现复杂数据分析和自动化任务,显著提高效率。掌握这项技能不仅能提升个人能力,还能为企业带来价值,减少人为错误,提高决策效率。推荐从基础语法、Excel操作库开始学习,逐步进阶到数据可视化和自动化报表系统。通过实际项目巩固知识,关注新技术,为职业发展奠定坚实基础。
Python学习的自我理解和想法(6)
这是我在B站千锋教育学习Python的第6天笔记,主要学习了字典的使用方法,包括字典的基本概念、访问、修改、添加、删除元素,以及获取字典信息、遍历字典和合并字典等内容。开学后时间有限,内容较为简略,敬请谅解。
Python学习的自我理解和想法(2)
今日学习Python第二天,重点掌握字符串操作。内容涵盖字符串介绍、切片、长度统计、子串计数、大小写转换及查找位置等。通过B站黑马程序员课程跟随老师实践,非原创代码,旨在巩固基础知识与技能。
Python学习的自我理解和想法(3)
这是学习Python第三天的内容总结,主要围绕字符串操作展开,包括字符串的提取、分割、合并、替换、判断、编码及格式化输出等,通过B站黑马程序员课程跟随老师实践,非原创代码。
Python学习的自我理解和想法(7)
学的是b站的课程(千锋教育),跟老师写程序,不是自创的代码! 今天是学Python的第七天,学的内容是集合。开学了,时间不多,写得不多,见谅。