源码阅读——robfig/cron

本文最后更新于:2024年4月20日 下午

cron一个用于管理定时任务的库,用 Go 实现 Linux 中crontab这个命令的效果。这个包提供的功能相对比较底层,许多分布式任务管理系统中在定时部分使用了这个cron。

我最近也用这个包比较多,今天看了一下整体的源码。

源码结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
cron                      
├─ chain.go
├─ chain_test.go
├─ constantdelay.go
├─ constantdelay_test.go
├─ cron.go
├─ cron_test.go
├─ doc.go
├─ go.mod
├─ LICENSE
├─ logger.go
├─ option.go
├─ option_test.go
├─ parser.go
├─ parser_test.go
├─ README.md
├─ spec.go
└─ spec_test.go

Entry(定时记录)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Entry consists of a schedule and the func to execute on that schedule.
// 记录:包含了一个定时计划,以及这个计划要执行的函数
type Entry struct {
// ID is the cron-assigned ID of this entry, which may be used to look up a
// snapshot or remove it.
// 记录id,方便后续用来对项目的定时、执行内容、状态控制
ID EntryID

// Schedule on which this job should be run.
// 计划接口
Schedule Schedule

// Next time the job will run, or the zero time if Cron has not been
// started or this entry's schedule is unsatisfiable
//下次运行时间
Next time.Time

// Prev is the last time this job was run, or the zero time if never.
//上一次运行时间
Prev time.Time

// WrappedJob is the thing to run when the Schedule is activated.
// 类似于装饰器一样的任务触发装置
WrappedJob Job

// Job is the thing that was submitted to cron.
// It is kept around so that user code that needs to get at the job later,
// e.g. via Entries() can do so.
// 真正要执行的任务
Job Job
}

Cron (调度器)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type Cron struct {
entries []*Entry // 保存了所有加入到 Cron 的作业
chain Chain
stop chan struct{} // 接收 Stop() 信号的 chan
add chan *Entry // Cron 运行过程中接收 AddJob() 信号的 chan
remove chan EntryID // 接收移除 Job 信号的 chan
snapshot chan chan []Entry // 快照信号
running bool // 标志 Cron 是否在运行中
logger Logger
runningMu sync.Mutex // Cron 运行前需要抢占该锁,保证并发安全
location *time.Location
parser ScheduleParser // cron 表达式的解析器
nextID EntryID // 即将加入的 Job 对应的 Entry 的 ID
jobWaiter sync.WaitGroup
}

核心流程

Entries() 和 Entry()

这两个方法被用来返回 Cron entries 的一组快照,Entries() 返回所有作业的快照,Entry(id EntryID) 根据 ID 返回特定作业的快照,其实就是遍历了一遍 Entries() 的返回值:

1
2
3
4
5
6
7
8
func (c *Cron) Entry(id EntryID) Entry {
for _, entry := range c.Entries() {
if id == entry.ID {
return entry
}
}
return Entry{}
}

关键在于 Entries() 的实现上:

1
2
3
4
5
6
7
8
9
10
func (c *Cron) Entries() []Entry {
c.runningMu.Lock()
defer c.runningMu.Unlock()
if c.running {
replyChan := make(chan []Entry, 1)
c.snapshot <- replyChan
return <-replyChan
}
return c.entrySnapshot()
}

获取快照时,根据 Cron 是否在运行有不同的处理逻辑,为了避免获取快照过程中 Cron 开始运行,需要竞争 runningMutex;

如果 Cron 没在运行,直接调用 entrySnapshot() 返回快照:

1
2
3
4
5
6
7
func (c *Cron) entrySnapshot() []Entry {
var entries = make([]Entry, len(c.entries))
for i, e := range c.entries {
entries[i] = *e
}
return entries
}

这种情况很简单,如果 Cron 已经在运行中了,会向 c.snapshot 发送一个信号,在 cron.run() 中会处理这个信号:

1
2
3
4
case replyChan := <-c.snapshot:
replyChan <- c.entrySnapshot()
continue

这有点向一个钩子,Entries() 中创建了一个新的 chan replyChan, 并将其发送给了 c.snapshot, run() 中通过多路复用监听到这个信号后,调用了 c.entrySnapshot() ,并将结果发送到了 replyChan 中,Entries() 阻塞等待结果并返回。

既然最终调用的都是 c.entrySnapshot() 为什么要分两种情况呢?后面再说。

Remove()

Remove() 用于删除一个作业,实现逻辑和 Entries() 类似:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (c *Cron) Remove(id EntryID) {
c.runningMu.Lock()
defer c.runningMu.Unlock()
if c.running {
c.remove <- id
} else {
c.removeEntry(id)
}
}

func (c *Cron) removeEntry(id EntryID) {
var entries []*Entry
for _, e := range c.entries {
if e.ID != id {
entries = append(entries, e)
}
}
c.entries = entries
}

run() 中处理 c.remove 信号:

1
2
3
4
5
case id := <-c.remove:
timer.Stop()
now = c.now()
c.removeEntry(id)
c.logger.Info("removed", "entry", id)

Stop()

Stop() 用来停止 Cron 的运行,但已经在执行中的作业是不会被打断的,也就是从执行 Stop() 之后,不会再有新的作业被调度:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (c *Cron) Stop() context.Context {
c.runningMu.Lock()
defer c.runningMu.Unlock()
if c.running {
c.stop <- struct{}{}
c.running = false
}
ctx, cancel := context.WithCancel(context.Background())
go func() {
// 等待所有已经在执行的作业执行完毕
c.jobWaiter.Wait()
// 会发出一个 cancelCtx.Done() 信号
cancel()
}()
return ctx
}

大体逻辑和上面的一样,比较巧妙地是 Stop() 返回了一个 Context, 具体来说是一个 cancelCtx, 用户可以监听 cancelCtx.Done() 得知什么时候 Cron 真的停止了.

Start()

Start() 用于开始执行 Cron

1
2
3
4
5
6
7
8
9
10
11


func (c *Cron) Start() {
c.runningMu.Lock()
defer c.runningMu.Unlock()
if c.running {
return
}
c.running = true
go c.run()
}

这个函数干了三件事:

  • 获取锁
  • 将 c.running 置为 true 表示 cron 已经在运行中了
  • 开启一个 goroutine 执行 c.run(), run 中会一直轮循 c.entries 中的 entry, 如果一个 entry 允许执行了,就会开启单独的 goroutine 去执行这个作业

run是整个 cron 的一个核心,它负责处理 cron 开始执行后的大部分事情,包括添加作业,删除作业,执行作业等,这是一个近一百行的大函数,其结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// Run the scheduler. this is private just due to the need to synchronize
// access to the 'running' state variable.
// 执行定时任务
// 执行调度器.
// 该任务的执行是否私有完全由'running'变量是否具有同步权限决定
func (c *Cron) run() {
// Figure out the next activation times for each entry.
// 获取所有任务的下一次执行时间
now := c.now()
for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now)
}

for {
// Determine the next entry to run.
// 根据时间顺序决定任务的执行顺序, 时间越大的越靠前
sort.Sort(byTime(c.entries))

var timer *time.Timer
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
// If there are no entries yet, just sleep - it still handles new entries
// and stop requests.
// 若任务入口slice为空, 则调取器休眠, 在休眠期间, 它仍然可以处理新的任务入口slice和任务停止请求
timer = time.NewTimer(100000 * time.Hour)
} else {
timer = time.NewTimer(c.entries[0].Next.Sub(now))
}

for {
select {
case now = <-timer.C: // 执行当前任务
now = now.In(c.location)
// Run every entry whose next time was less than now
for _, e := range c.entries {
if e.Next.After(now) || e.Next.IsZero() {
break
}
go c.runWithRecovery(e.Job)
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
}

case newEntry := <-c.add: // 指向新的任务
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
c.entries = append(c.entries, newEntry)

case <-c.snapshot: // 获取快照
c.snapshot <- c.entrySnapshot()
continue

case <-c.stop: // 停止任务
timer.Stop()
return
}

break
}
}
}

大概包含下面这几部分:

第一部分:遍历了 c.entries 列表,通过 schedule.Next() 计算出这个作业下一次执行的时间,并赋值给了 entry.Next 字段。

第二部分是一个死循环,这一部分又可以分为三个部分:

  • 调用了 sort 的快排,其实是对 entries 中的元素按 Next 字段的时间线后顺序排序。

  • 这一部分是对定时器的一个初始化操作:如果没有可以执行的作业,定时器被设置为十万小时后触发(其实就是休眠),否则定时器会在第一个作业允许被执行时触发,定时器触发后, 2.3 部分会去做剩下的事。

  • 这又是整个 run 的核心,其主体是一个死循环(其实它会退出,不算是死循环),这个循环里面的核心又是一个 select 多路复用,这个多路复用里监听了五种信号,这五种信号是怎样发出的我们在上面其实已经说过了,他们分别是定时器触发信号 timer.C, 运行过程中添加作业的信号 c.add, 快照信号 c.snapshot, cron 停止的信号 c.stop, 移除作业的信号 c.remove。


源码阅读——robfig/cron
https://yance.wiki/cron/
作者
Yance Huang
发布于
2021年9月17日
许可协议