# 二刷 K8s 源码 - workqueue 的所有细节


- [1. 概述 - 何来此文](#1-概述---何来此文)
- [2. Queue 的实现](#2-queue-的实现)
	- [2.1 Queue.Add(item interface{}) 方法](#21-queueadditem-interface-方法)
	- [2.2 Queue.Get() 方法](#22-queueget-方法)
	- [2.3 Queue.Done(item interface{}) 方法](#23-queuedoneitem-interface-方法)
- [3. DelayingQueue 的实现](#3-delayingqueue-的实现)
- [4. RateLimitingQueue 的实现](#4-ratelimitingqueue-的实现)
- [5. rateLimiter 限速器的实现](#5-ratelimiter-限速器的实现)
- [6. 控制器里用的默认限速器](#6-控制器里用的默认限速器)
- [7. 总结](#7-总结)


## 1. 概述 - 何来此文

有些天没有更新文章了，大约两三周。

为什么今天会再写一次源码分析相关的文章呢？

在几年前我写过一篇：[《Kubernetes client-go 源码分析 - workqueue》](https://www.danielhu.cn/k8s-client-go-workqueue/)，不过过去2年我没有搞 K8s，忘得差不多了……

![](1.png)

这几周我接了个活，写一个 vgpu 调度器，我以为是三天的活，结果搞了三周，笑死。

这个项目涉及到 K8s **自定义控制器**、**调度器拓展**、**webhook** 等，还是非常有趣的。虽然写起来有点费头发，但是成就感满满。可能你的想法和我一开始一样，感觉就是500行代码的事情，三天绰绰有余！哎，都是泪，5000行都写不完……

过去2年时间我在 DevOps 领域干活，K8s 相关的知识点很多已经记不清了。这会 vgpu 调度器也实现完了，忙绿告一段落，同时心里的疑惑也攒满了，各种我知其然（会用）但是不知其所以然（不记得细节，原理）的点，趁着过年期间，补一补。

今天从 workqueue 开始。

> 写完本文，我又回过头瞟了下几年前写的 workqueue 源码分析文章，略有感慨。当年更加关注的是如何快速看懂源码，更加关注“workqueue 的实现”本身；而今天再看 workqueue，我则更加关注“为什么要看 workqueue”，在自定义控制器里用到了 workqueue，这些用法的道理何在，原理何在。
> 
> 同时这次刷 workqueue 源码，我能够深切感受到这段源码的优雅，不止是看懂逻辑，更多是一种享受。就像看《Harry Potter》原著，除了沉浸于作者构造的真切魔法场景，体会看小说的乐趣外，还能被字里行间溢出的那种才华而感动，感慨“原来英文也可以写的文采斐然”，感慨“原来代码还能写的这么漂亮”。

## 2. Queue 的实现

Queue 对应的接口定义如下：

- `util/workqueue/queue.go:26`

```go
type Interface interface {
	Add(item interface{})
	Len() int
	Get() (item interface{}, shutdown bool)
	Done(item interface{})
	ShutDown()
	ShutDownWithDrain()
	ShuttingDown() bool
}
```

这里重点关注 `Add`、`Get` 和 `Done` 的实现。Queue 的实现类型如下：

- `util/workqueue/queue.go:115`

```go
type Type struct {
	// queue defines the order in which we will work on items. Every
	// element of queue should be in the dirty set and not in the
	// processing set.
	queue []t

	// dirty defines all of the items that need to be processed.
	dirty set

	// Things that are currently being processed are in the processing set.
	// These things may be simultaneously in the dirty set. When we finish
	// processing something and remove it from this set, we'll check if
	// it's in the dirty set, and if so, add it to the queue.
	processing set

	cond *sync.Cond
    // ......
}
```

来看这三个字段：queue、dirty 和 processing：

- `queue`：queue 定义了这个队列中 items 的顺序，这是一个 `[]interface{}` 类型的切片，可以保存任意类型的元素。
- `dirty`：dirty 的类型是 set 的底层类型是 `map[interface{}]struct{}`，也就是 `queue` 中的元素集合会存到这个 `dirty set` 中，也就是“待处理的 items”。
- `processing`：processing 的类型也是 set，保存的是正在被处理的 items。

也就是说 `Queue` 会将待处理的 items 全部放到 `queue` 中，这个 `queue` 是有序的；同时为了让 `queue` 中不存在重复的 items，所以加了一个 `dirty set`，毕竟判断 map 中是否存在某个 key 比判断 slice 中是否存在某个 item 要快得多。

另外当一个 item 从 `queue` 中被取出时，也就是出队后，这个 item 会被加到 `processing set` 中，同时被从 `dirty set` 中移除，也就是 `processing set` 保存了目前正在被处理，但是没有处理完的 items。换言之，会存在某个操作来表示“一个 item 已经被处理完成”，也就是 `Done(item interface{})` 方法。

下一步我们具体来看 `Add`、`Get` 和 `Done` 的实现。

### 2.1 Queue.Add(item interface{}) 方法

- `util/workqueue/queue.go:163`

```go
func (q *Type) Add(item interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
	if q.shuttingDown {
		return
	}
	if q.dirty.has(item) {
		return
	}

	q.metrics.add(item)

	q.dirty.insert(item)
	if q.processing.has(item) {
		return
	}

	q.queue = append(q.queue, item)
	q.cond.Signal()
}
```

这里用到了条件变量 `sync.Cond`（我在前面贴 Type 结构体代码时特地保留了 cond 字段），不难想到当队列为空的时候，和 Add 相对应的 `Get()` 方法会 Wait，等到 Add 完成 item 的添加，调用 `Signal()` 来唤醒。

`Add()` 方法的实现主要就两个逻辑：

1. 如果 item 在 `dirty set` 中不存在，就往 `dirty set` 中插入。这样也就解决了 `queue` 中重复放入某个 item 的问题。这个特性在 `workqueue` 包的 `doc.go` 中被描述为“Stingy”，也就是 Queue 的实现不允许同一个 item 被并发处理。再说的简单一点，就是加入 `queue` 中放进去了几个一模一样的 item，这时候如果多个 goroutine 去并发 `Get()` 后其实会拿到一样的 item（比如是一个 pod），那不就乱套了。
2. 如果 item 在 `processing set` 中不存在，那就加入到 `queue` 中。也就是说如果一个 item “正在被处理”，那么它不会被重复加到 queue 里去排队。显然这也是为了“Stingy”。但是这个 item 会被放到 `dirty set` 里，如果 `processing set` 中也存在这个 item，那么当这个 item “Done”之后，这次加进来的 item 会从 `dirty set` 中被提到 `queue` 里。这个逻辑后面可以看到。

总结一句话：**Add() 会以“吝啬”的方式将 item 入队，这个过程会保证一个 Queue 中同一时刻不存在重复的 item。**

### 2.2 Queue.Get() 方法

- `util/workqueue/queue.go:196`

```go
func (q *Type) Get() (item interface{}, shutdown bool) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
	for len(q.queue) == 0 && !q.shuttingDown {
		q.cond.Wait()
	}
	if len(q.queue) == 0 {
		// We must be shutting down.
		return nil, true
	}

	item = q.queue[0]
	// The underlying array still exists and reference this object, so the object will not be garbage collected.
	q.queue[0] = nil
	q.queue = q.queue[1:]

	q.metrics.get(item)

	q.processing.insert(item)
	q.dirty.delete(item)

	return item, false
}
```

`Get()` 方法尝试从 `queue` 中获取第一个 item，同时将其加入到 `processing set` 中，并且从 `dirty set` 中删除。

### 2.3 Queue.Done(item interface{}) 方法

- `util/workqueue/queue.go:223`

```go
func (q *Type) Done(item interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()

	q.metrics.done(item)

	q.processing.delete(item)
	if q.dirty.has(item) {
		q.queue = append(q.queue, item)
		q.cond.Signal()
	} else if q.processing.len() == 0 {
		q.cond.Signal()
	}
}
```

`Done()` 方法用来标记一个 item 被处理完成了。调用 `Done()` 方法的时候，这个 item 被从 `processing set` 中删除。另外前面提到 Add 的过程中如果发现 item 在 `processing set` 中存在，那么这个 item 会被暂存到 `dirty set` 中。这里在处理 item 的 Done 逻辑的时候也就顺带把暂存到 `dirty set` 中的 item 取出来，加入到 queue 里去了。这个行为同样是有意义的，在“Stingy”的同时允许一个 item 在处理过程中被重新入队，等待下一次重新处理。

## 3. DelayingQueue 的实现

继续来看 DelayingQueue 的实现。

- `util/workqueue/delaying_queue.go:30`

```go
type DelayingInterface interface {
	Interface
	// AddAfter adds an item to the workqueue after the indicated duration has passed
	AddAfter(item interface{}, duration time.Duration)
}
```

这个 `Interface` 也就是前文提到的 `Queue`，`DelayingQueue` 在 `Queue` 的基础上加了一个 `AddAfter()` 接口，实现了“过一会再加入一个 item”的功能。这样也就使得一个 item 处理失败之后，能够在指定延时之后再重新入队。

行，直接来看 `AddAfter()` 接口是怎么实现的：

- `util/workqueue/delaying_queue.go:205`

```go
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
	// don't add if we're already shutting down
	if q.ShuttingDown() {
		return
	}

	q.metrics.retry()

	// immediately add things with no delay
	if duration <= 0 {
		q.Add(item)
		return
	}

	select {
	case <-q.stopCh:
		// unblock if ShutDown() is called
	case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
	}
}
```

可以看到当 `duration > 0` 的时候，`AddAfter()` 方法只是简单地构造一个 waitFor 对象，然后将其加入 `chan *waitFor` 类型的 `waitingForAddCh` 中。所以后面应该关注的核心逻辑是如何消费这个 `waitingForAddCh。`

跟一下 `waitingForAddCh` 的实现，可以找到在 `newDelayingQueue()` 函数中初始化了 `waitingForAddCh`，然后调用了一个 delayingType 的 `waitingLoop()` 方法：

- `util/workqueue/delaying_queue.go:103`

```go
func newDelayingQueue(clock clock.WithTicker, q Interface, name string, provider MetricsProvider) *delayingType {
	ret := &delayingType{
		Interface:       q,
		clock:           clock,
		heartbeat:       clock.NewTicker(maxWait),
		stopCh:          make(chan struct{}),
		waitingForAddCh: make(chan *waitFor, 1000),
		metrics:         newRetryMetrics(name, provider),
	}

	go ret.waitingLoop()
	return ret
}
```

可以看到 `waitingForAddCh` 的容量是 1000，类型是 `chan *waitFor`，而 `waitFor` 的类型是：

```go
type waitFor struct {
	data    t
	readyAt time.Time
	// index in the priority queue (heap)
	index int
}
```

下一步继续看 `waitingLoop()` 方法的实现：

- `util/workqueue/delaying_queue.go:232`

```go
func (q *delayingType) waitingLoop() {
	defer utilruntime.HandleCrash()

	// Make a placeholder channel to use when there are no items in our list
	never := make(<-chan time.Time)

	// Make a timer that expires when the item at the head of the waiting queue is ready
	var nextReadyAtTimer clock.Timer

	// 这是一个优先级队列，用一个最小堆保存了所有 waitFor（waitForPriorityQueue 的类型是 []*waitFor）
	waitingForQueue := &waitForPriorityQueue{}
	heap.Init(waitingForQueue)

	waitingEntryByData := map[t]*waitFor{}

	// 后面的所有逻辑都在这个 for 循环里
	for {
		if q.Interface.ShuttingDown() {
			return
		}

		now := q.clock.Now()

		// Add ready entries
		// 一直判断 waitingForQueue 这个堆里有没有元素，如果有，就 Peek 出来第一个元素看是不是 ready，如果 ready
		// 那就通过 q.Add() 方法将其数据放入队列（也就是前文那个 Queue 实现的 Add 逻辑）
		for waitingForQueue.Len() > 0 {
			entry := waitingForQueue.Peek().(*waitFor)
			if entry.readyAt.After(now) {
				break
			}

			entry = heap.Pop(waitingForQueue).(*waitFor)
			q.Add(entry.data)
			delete(waitingEntryByData, entry.data)
		}

		// Set up a wait for the first item's readyAt (if one exists)
		// 代码到这里也就是说前面一个 for 里的 break 被执行了，换言之最小堆中的最小元素，也就是最快 ready 的一个都还没有 ready；
		// 这里将 nextReadyAtTimer 计时器的时间设置为（最快 ready 的第一个元素的 ready 时间 - 当前时间），
		// 这样当 nextReadyAt 这个 channel 有数据的时候，最小堆里的第一个元素也就 ready 了。
		nextReadyAt := never
		if waitingForQueue.Len() > 0 {
			if nextReadyAtTimer != nil {
				nextReadyAtTimer.Stop()
			}
			entry := waitingForQueue.Peek().(*waitFor)
			nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
			nextReadyAt = nextReadyAtTimer.C()
		}

		// 刚才设置好了一个合适的 nextReadyAt，现在开始 select 等待某个 channel 有反应
		select {
		case <-q.stopCh:
			return
		// 心跳时间是10s
		case <-q.heartbeat.C():
			// continue the loop, which will add ready items
		// 执行到这里也就是第一个 item ready 了
		case <-nextReadyAt:
			// continue the loop, which will add ready items

		// 当有元素被通过 AddAfter() 方法加进来时，waitingForAddCh 就会有内容，这时候会被取出来；
		// 如果没有 ready，那就调用 insert（大概率主要就是插入最小堆的过程）；如果 ready 了那就直接 Add 到 Queue 里。
		case waitEntry := <-q.waitingForAddCh:
			if waitEntry.readyAt.After(q.clock.Now()) {
				insert(waitingForQueue, waitingEntryByData, waitEntry)
			} else {
				q.Add(waitEntry.data)
			}
			// 通过一个循环将 waitingForAddCh 中的所有元素都消费掉，根据 ready 情况要么插入最小堆（优先级队列），要么直接入队。
			drained := false
			for !drained {
				select {
				case waitEntry := <-q.waitingForAddCh:
					if waitEntry.readyAt.After(q.clock.Now()) {
						insert(waitingForQueue, waitingEntryByData, waitEntry)
					} else {
						q.Add(waitEntry.data)
					}
				default:
					drained = true
				}
			}
		}
	}
}
```

我继续看了一眼 `insert()` 方法的实现，里面就2个逻辑：如果元素不存在，就插入 waitingForQueue 这个优先级队列；如果存在，就更新其 readAt 时间；代码就不具体贴了。

这个 `waitingLoop()` 写的还是很漂亮，滴水不漏，逻辑严谨，相当优雅。waitingLoop 通过优先级队列实现元素按照时间顺序从近到远排序，这样就能最高效地获取到最快 ready 的元素。然后又根据优先级队列中最快能 ready 的元素的 ready 剩余时间来构造等待计时器，等待的过程中又监测着 waitingForAddCh 这个 channel 中是否有新元素…… 这个过程很完美地体现了 Golang 特色的 Channel 机制的优雅与强大。

## 4. RateLimitingQueue 的实现

看完了延时队列，继续来看限速队列。

先贴接口：

- `util/workqueue/rate_limiting_queue.go:22`

```go
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
	DelayingInterface

	// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
	AddRateLimited(item interface{})

	// Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing
	// or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
	// still have to call `Done` on the queue.
	Forget(item interface{})

	// NumRequeues returns back how many times the item was requeued
	NumRequeues(item interface{}) int
}
```

层层递进，限速队列建立在延时队列之上，RateLimitingInterface 接口包含了 DelayingInterface 接口的所有方法。显然这里我们主要得关注 `AddRateLimited(item interface{})` 方法和 `Forget(item interface{})` 方法的意图与实现。`Forget` 方法在我们开发自定义控制器的时候其实会用到，故需知其然，能知其所以然则更佳。

接着找到 `AddRateLimited(item interface{})` 方法的实现：

```go
// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
func (q *rateLimitingType) AddRateLimited(item interface{}) {
	q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
```

是不是很酷，通过一个 rateLimiter 确定一个 item 的 ready 时间，然后通过延时队列完成延时入队的逻辑。这里就只剩下 rateLimiter 这个限速器的实现需要研究了。

如果看 `Forget(item interface{})` 方法：

```go
func (q *rateLimitingType) Forget(item interface{}) {
	q.rateLimiter.Forget(item)
}
```

同样，内容都在 rateLimiter 里。行，接着来看 rateLimiter 的实现。

## 5. rateLimiter 限速器的实现

rateLimiter 对应的接口定义如下：

- `util/workqueue/default_rate_limiters.go:27`

```go
type RateLimiter interface {
	// When gets an item and gets to decide how long that item should wait
	When(item interface{}) time.Duration
	// Forget indicates that an item is finished being retried.  Doesn't matter whether it's for failing
	// or for success, we'll stop tracking it
	Forget(item interface{})
	// NumRequeues returns back how many failures the item has had
	NumRequeues(item interface{}) int
}
```

实现可就有意思了，变着花样限速：

1. BucketRateLimiter
2. ItemExponentialFailureRateLimiter
3. ItemFastSlowRateLimiter
4. MaxOfRateLimiter
5. StepRateLimiter
6. WithMaxWaitRateLimiter

六种实现，对应不同的限速思路，体现算法之美的地方。

## 6. 控制器里用的默认限速器

在 sample-controller 里有这样一行代码：

`queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())`

我们先看下默认 workqueue 里用的限速器是哪一个。

- `util/workqueue/default_rate_limiters.go:39`

```go
func DefaultControllerRateLimiter() RateLimiter {
	return NewMaxOfRateLimiter(
		NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
		// 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
		&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
	)
}
```

这里调用了 `NewMaxOfRateLimiter()` 函数，这个函数会返回一个 `MaxOfRateLimiter`：

```go
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
	return &MaxOfRateLimiter{limiters: limiters}
}
```

`MaxOfRateLimiter` 内部保存多个其他限速器的实现，然后返回一个“限速最久”的结果：

```go
// MaxOfRateLimiter calls every RateLimiter and returns the worst case response
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
// were separately delayed a longer time.
type MaxOfRateLimiter struct {
	limiters []RateLimiter
}

func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
	ret := time.Duration(0)
	for _, limiter := range r.limiters {
		curr := limiter.When(item)
		// 拿到最久的限速时长
		if curr > ret {
			ret = curr
		}
	}

	return ret
}
```

你们前面贴的这个 `DefaultControllerRateLimiter()` 函数的实现就很容易理解了：

```go
func DefaultControllerRateLimiter() RateLimiter {
	return NewMaxOfRateLimiter(
		NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
		// 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
		&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
	)
}
```

这里接收两个限速器，然后看谁凶就用谁的限速结果。这两个限速器分别是：

- ItemExponentialFailureRateLimiter
- BucketRateLimiter

`ItemExponentialFailureRateLimiter` 限速器会指数级增加限速时长，也就是先延迟5毫秒，如果失败就变成10毫秒、20毫秒、40毫秒…… 最大不超过 1000秒。

`BucketRateLimiter` 是一个比较基础的令牌桶实现，在这里的2个参数10和100的含义是令牌桶里最多有100个令牌，每秒发放10个。

到这里控制器用的限速器逻辑也就完全能理解了。每个限速器的具体实现代码就不具体贴了，到这里我想要掌握的信息已经足够了。不过在早几年的我的一篇博客[《Kubernetes client-go 源码分析 - workqueue》](https://www.danielhu.cn/k8s-client-go-workqueue/)里有具体的每个限速器的实现逻辑分析，如果你感兴趣也可以跳转过去阅读。

## 7. 总结

在编写自定义控制器的时候我们会用到这样的代码：

`queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())`

这里实例化的队列是一个限速队列。

在 `ResourceEventHandlerFuncs` 里我们会写 `queue.Add(key)` 这样的代码，将一个 key（item/obj）加入到 workqueue 里。

控制器里还会写到这样的代码：

- `obj, shutdown := c.workqueue.Get()`
- `c.workqueue.Done(obj)`
- `c.workqueue.Forget(obj)`
- `c.workqueue.AddRateLimited(obj)`

这些方法也就对应着一个 obj 从 workqueue 中取出来（`Get()`），如果处理完成，就调用 `Done(obj)`，从而从队列中彻底移除，同时调用 `Forget(obj)` 告诉记速器可以忽略这个 obj 了（可别下次同名 obj 来的时候，误判了，让人家白等半天）。最后如果处理失败，遇到点啥临时异常情况，那就放回到 workqueue 里去，用 `AddRateLimited(obj)` 方法。

至此，workqueue 相关的用法我们就知其然，亦知其所以然了。

