源码版本信息
Project: kubernetes 
Branch: master 
Last commit id: d25d741c 
Date: 2021-09-26 
 
 
我们在《Kubernetes client-go 源码分析 - 概述》 里提到了自定义控制器涉及到的 client-go 组件整体工作流程,大致如下图:
         client-go 
    
今天我们来详细研究下 workqueue 相关代码。client-go 的 util/workqueue 包里主要有三个队列,分别是普通队列,延时队列,限速队列,后一个队列以前一个队列的实现为基础,层层添加新功能,我们按照 Queue、DelayingQueue、RateLimitingQueue 的顺序层层拨开来看限速队列是如何实现的。
先看接口定义:
k8s.io/client-go/util/workqueue/queue.go:26  
1
 2
 3
 4
 5
 6
 7
 8
  
type  Interface  interface  { 
   Add ( item  interface {})   // 添加一个元素
 Len ()  int               // 元素个数
 Get ()  ( item  interface {},  shutdown  bool )  // 获取一个元素,第二个返回值和 channel 类似,标记队列是否关闭了
 Done ( item  interface {})  // 标记一个元素已经处理完
 ShutDown ()              // 关闭队列
 ShuttingDown ()  bool     // 是否正在关闭
 } 
 
 
这个基础的队列接口定义很清晰,我们继续来看其实现的类型:
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
  
type  Type  struct  { 
   queue  [] t             // 定义元素的处理顺序,里面所有元素都应该在 dirty set 中有,而不能出现在 processing set 中
 dirty  set             // 标记所有需要被处理的元素
 processing  set        // 当前正在被处理的元素,当处理完后需要检查该元素是否在 dirty set 中,如果有则添加到 queue 里
 cond  * sync . Cond       // 条件锁
 shuttingDown  bool     // 是否正在关闭
 metrics  queueMetrics 
   unfinishedWorkUpdatePeriod  time . Duration 
   clock                       clock . Clock 
} 
 
 
这个 Queue 的工作逻辑大致是这样,里面的三个属性 queue、dirty、processing 都保存 items,但是含义有所不同:
queue :这是一个 []t 类型,也就是一个切片,因为其有序,所以这里当作一个列表来存储 item 的处理顺序。dirty :这是一个 set 类型,也就是一个集合,这个集合存储的是所有需要处理的 item,这些 item 也会保存在 queue 中,但是 set 里是无序的,set 的特性是里面元素具有唯一性。processing :这也是一个 set,存放的是当前正在处理的 item,也就是说这个 item 来自 queue 出队的元素,同时这个元素会被从 dirty 中删除。 
下面分别介绍 set 类型和 Queue 接口的集合核心方法的实现。
上面提到的 dirty 和 processing 字段都是 set 类型,set 相关定义如下:
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
  
type  empty  struct {} 
type  t  interface {} 
type  set  map [ t ] empty 
func  ( s  set )  has ( item  t )  bool  { 
   _ ,  exists  :=  s [ item ] 
   return  exists 
} 
func  ( s  set )  insert ( item  t )  { 
   s [ item ]  =  empty {} 
} 
func  ( s  set )  delete ( item  t )  { 
   delete ( s ,  item ) 
} 
 
 
	可以看到 set 是一个空接口到空结构体的 map,也就是实现了一个集合的功能,集合元素是 interface{} 类型,也就是可以存储任意类型。而 map 的 value 是 struct{} 类型,也就是空。这里利用 map 的 key 唯一的特性实现了一个集合类型,附带三个方法 has()、insert()、delete() 来实现集合相关操作。
Add() 方法用于标记一个 item 需要被处理,代码如下:
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
  
func  ( q  * Type )  Add ( item  interface {})  { 
   q . cond . L . Lock () 
   defer  q . cond . L . Unlock () 
   if  q . shuttingDown  {  // 如果 queue 正在被关闭,则返回
 return 
   } 
   if  q . dirty . has ( item )  {  // 如果 dirty set 中已经有了该 item,则返回
 return 
   } 
   q . metrics . add ( item ) 
   q . dirty . insert ( item )  // 添加到 dirty set 中
 if  q . processing . has ( item )  {  // 如果正在被处理,则返回
 return 
   } 
   q . queue  =  append ( q . queue ,  item )  // 如果没有正在处理,则加到 q.queue 中
 q . cond . Signal ()  // 通知 getter 有新 item 到来
 } 
 
 
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
  
func  ( q  * Type )  Get ()  ( item  interface {},  shutdown  bool )  { 
   q . cond . L . Lock () 
   defer  q . cond . L . Unlock () 
   for  len ( q . queue )  ==  0  &&  ! q . shuttingDown  {  // 如果 q.queue 为空,并且没有正在关闭,则等待下一个 item 的到来
 q . cond . Wait () 
   } 
   if  len ( q . queue )  ==  0  {  // 这时候如果 q.queue 长度还是 0,说明 q.shuttingDown 为 true,所以直接返回
 return  nil ,  true 
   } 
   item ,  q . queue  =  q . queue [ 0 ],  q . queue [ 1 :]  // 获取 q.queue 第一个元素,同时更新 q.queue
 q . metrics . get ( item ) 
   q . processing . insert ( item )  // 刚才获取到的 q.queue 第一个元素放到 processing set 中
 q . dirty . delete ( item )  // dirty set 中删除该元素
 return  item ,  false  // 返回 item
 } 
 
 
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
  
func  ( q  * Type )  Done ( item  interface {})  { 
   q . cond . L . Lock () 
   defer  q . cond . L . Unlock () 
   q . metrics . done ( item ) 
   q . processing . delete ( item )  // processing set 中删除该 item
 if  q . dirty . has ( item )  {  // 如果 dirty 中还有,说明还需要再次处理,放到 q.queue 中
 q . queue  =  append ( q . queue ,  item ) 
      q . cond . Signal ()  // 通知 getter 有新的 item
 } 
} 
 
 
还是先看接口定义:
k8s.io/client-go/util/workqueue/delaying_queue.go:30  
1
 2
 3
 4
 5
  
type  DelayingInterface  interface  { 
   Interface 
   // AddAfter adds an item to the workqueue after the indicated duration has passed
 AddAfter ( item  interface {},  duration  time . Duration ) 
} 
 
 
相比 Queue 这里只是多了一个 AddAfter(item interface{}, duration time.Duration) 方法,望文生义,也就是延时添加 item。
结构体定义:
1
 2
 3
 4
 5
 6
 7
 8
 9
  
type  delayingType  struct  { 
   Interface                // 用来嵌套普通 Queue
 clock  clock . Clock        // 计时器
 stopCh  chan  struct {} 
   stopOnce  sync . Once       // 用来确保 ShutDown() 方法只执行一次
 heartbeat  clock . Ticker   // 默认10s的心跳,后面用在一个大循环里,避免没有新元素时一直阻塞
 waitingForAddCh  chan  * waitFor   // 传递 waitFor 的 channel,默认大小 1000
 metrics  retryMetrics 
} 
 
 
对于延时队列,我们关注的入口方法肯定就是新增的 AddAfter() 了,看这个方法的具体的逻辑前我们先看下上面 delayingType 中涉及到的 waitFor 类型。
先看下 waitFor 结构定义,代码如下:
1
 2
 3
 4
 5
  
type  waitFor  struct  { 
   data     t           // 准备添加到队列中的数据
 readyAt  time . Time   // 应该被加入队列的时间
 index  int           // 在 heap 中的索引
 } 
 
 
然后可以注意到有这样一行代码:
1
  
type  waitForPriorityQueue  [] * waitFor 
 
 
这里定义了一个 waitFor 的优先级队列,用最小堆的方式来实现,这个类型实现了 heap.Interface 接口,我们具体看下源码:
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
  
// 添加一个 item 到队列中
 func  ( pq  * waitForPriorityQueue )  Push ( x  interface {})  { 
   n  :=  len ( * pq ) 
   item  :=  x .( * waitFor ) 
   item . index  =  n 
   * pq  =  append ( * pq ,  item )  // 添加到队列的尾巴
 } 
// 从队列尾巴移除一个 item
 func  ( pq  * waitForPriorityQueue )  Pop ()  interface {}  { 
   n  :=  len ( * pq ) 
   item  :=  ( * pq )[ n - 1 ] 
   item . index  =  - 1 
   * pq  =  ( * pq )[ 0 :( n  -  1 )] 
   return  item 
} 
// 获取队列第一个 item
 func  ( pq  waitForPriorityQueue )  Peek ()  interface {}  { 
   return  pq [ 0 ] 
} 
 
 
接着看一下 DelayingQueue 相关的几个 New 函数,理解了这里的逻辑,才能继续往后面分析 AddAfter() 方法。
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
  
// 这里可以传递一个名字
 func  NewNamedDelayingQueue ( name  string )  DelayingInterface  { 
   return  NewDelayingQueueWithCustomClock ( clock . RealClock {},  name ) 
} 
// 上面一个函数只是调用当前函数,附带一个名字,这里加了一个指定 clock 的能力
 func  NewDelayingQueueWithCustomClock ( clock  clock . Clock ,  name  string )  DelayingInterface  { 
  return  newDelayingQueue ( clock ,  NewNamed ( name ),  name )  // 注意这里的 NewNamed() 函数
 } 
func  newDelayingQueue ( clock  clock . Clock ,  q  Interface ,  name  string )  * delayingType  { 
   ret  :=  & delayingType { 
      Interface :        q , 
      clock :            clock , 
      heartbeat :        clock . NewTicker ( maxWait ),  // 10s 一次心跳
 stopCh :           make ( chan  struct {}), 
      waitingForAddCh :  make ( chan  * waitFor ,  1000 ), 
      metrics :          newRetryMetrics ( name ), 
   } 
   go  ret . waitingLoop ()  // 留意这里的函数调用
 return  ret 
} 
 
 
上面涉及到两个细节:
NewNamed(name)go ret.waitingLoop() 
NewNamed() 函数用于创建一个前面提到的 Queue 的对应类型 Type 对象,这个值被传递给了 newDelayingQueue() 函数,进而赋值给了 delayingType{} 对象的 Interface 字段,于是后面 delayingType 类型才能直接调用 Type 类型实现的方法。
1
 2
 3
 4
 5
 6
 7
 8
  
func  NewNamed ( name  string )  * Type  { 
   rc  :=  clock . RealClock {} 
   return  newQueue ( 
      rc , 
      globalMetricsFactory . newQueueMetrics ( name ,  rc ), 
      defaultUnfinishedWorkUpdatePeriod , 
   ) 
} 
 
 
waitingLoop() 方法逻辑不少,我们单独放到下面一个小节
这个方法是实现延时队列的核心逻辑所在,
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
  
func  ( q  * delayingType )  waitingLoop ()  { 
   defer  utilruntime . HandleCrash () 
   // 队列里没有 item 时实现等待用的
 never  :=  make ( <- chan  time . Time ) 
   var  nextReadyAtTimer  clock . Timer 
   // 构造一个优先级队列
 waitingForQueue  :=  & waitForPriorityQueue {} 
   heap . Init ( waitingForQueue )  // 这一行其实是多余的,功能上没有啥作用,不过在可读性上有点帮助。
 // 这个 map 用来处理重复添加逻辑的,下面会讲到
 waitingEntryByData  :=  map [ t ] * waitFor {} 
   // 无限循环
 for  { 
     // 这个地方 Interface 从语法上来看可有可无,不过放在这里能够强调是调用了内部 Queue 的 ShuttingDown() 方法
 if  q . Interface . ShuttingDown ()  { 
         return 
      } 
      now  :=  q . clock . Now () 
      // 队列里有 item 就开始循环
 for  waitingForQueue . Len ()  >  0  { 
         // 获取第一个 item
 entry  :=  waitingForQueue . Peek ().( * waitFor ) 
         // 时间还没到,先不处理
 if  entry . readyAt . After ( now )  { 
            break 
         } 
        // 时间到了,pop 出第一个元素;注意 waitingForQueue.Pop() 是最后一个 item,heap.Pop() 是第一个元素
 entry  =  heap . Pop ( waitingForQueue ).( * waitFor ) 
         // 将数据加到延时队列里
 q . Add ( entry . data ) 
         // map 里删除已经加到延时队列的 item
 delete ( waitingEntryByData ,  entry . data ) 
      } 
      // 如果队列中有 item,就用第一个 item 的等待时间初始化计时器,如果为空则一直等待
 nextReadyAt  :=  never 
      if  waitingForQueue . Len ()  >  0  { 
         if  nextReadyAtTimer  !=  nil  { 
            nextReadyAtTimer . Stop () 
         } 
         entry  :=  waitingForQueue . Peek ().( * waitFor ) 
         nextReadyAtTimer  =  q . clock . NewTimer ( entry . readyAt . Sub ( now )) 
         nextReadyAt  =  nextReadyAtTimer . C () 
      } 
      select  { 
      case  <- q . stopCh : 
         return 
      case  <- q . heartbeat . C ():  // 心跳时间是 10s,到了就继续下一轮循环
 case  <- nextReadyAt :  // 第一个 item 的等到时间到了,继续下一轮循环
 case  waitEntry  :=  <- q . waitingForAddCh :  // waitingForAddCh 收到新的 item
 // 如果时间没到,就加到优先级队列里,如果时间到了,就直接加到延时队列里
 if  waitEntry . readyAt . After ( q . clock . Now ())  { 
            insert ( waitingForQueue ,  waitingEntryByData ,  waitEntry ) 
         }  else  { 
            q . Add ( waitEntry . data ) 
         } 
         // 下面的逻辑就是将 waitingForAddCh 中的数据处理完
 drained  :=  false 
         for  ! drained  { 
            select  { 
            case  waitEntry  :=  <- q . waitingForAddCh : 
               if  waitEntry . readyAt . After ( q . clock . Now ())  { 
                  insert ( waitingForQueue ,  waitingEntryByData ,  waitEntry ) 
               }  else  { 
                  q . Add ( waitEntry . data ) 
               } 
            default : 
               drained  =  true 
            } 
         } 
      } 
   } 
} 
 
 
这个方法还有一个 insert() 调用,我们再来看一下这个插入逻辑:
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
  
func  insert ( q  * waitForPriorityQueue ,  knownEntries  map [ t ] * waitFor ,  entry  * waitFor )  { 
   // 这里的主要逻辑是看一个 entry 是否存在,如果已经存在,新的 entry 的 ready 时间更短,就更新时间
 existing ,  exists  :=  knownEntries [ entry . data ] 
   if  exists  { 
      if  existing . readyAt . After ( entry . readyAt )  { 
         existing . readyAt  =  entry . readyAt  // 如果存在就只更新时间
 heap . Fix ( q ,  existing . index ) 
      } 
      return 
   } 
   // 如果不存在就丢到 q 里,同时在 map 里记录一下,用于查重
 heap . Push ( q ,  entry ) 
   knownEntries [ entry . data ]  =  entry 
} 
 
 
这个方法的作用是在指定的延时到达之后,在 work queue 中添加一个元素,源码如下:
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
  
func  ( q  * delayingType )  AddAfter ( item  interface {},  duration  time . Duration )  { 
   if  q . ShuttingDown ()  {  // 已经在关闭中就直接返回
 return 
   } 
   q . metrics . retry () 
   if  duration  <=  0  {  // 如果时间到了,就直接添加
 q . Add ( item ) 
      return 
   } 
   select  { 
   case  <- q . stopCh : 
     // 构造 waitFor{},丢到 waitingForAddCh
 case  q . waitingForAddCh  <-  & waitFor { data :  item ,  readyAt :  q . clock . Now (). Add ( duration )}: 
   } 
} 
 
 
最后一个 workqueue 就是限速队列,我们继续来看。
先看接口定义
k8s.io/client-go/util/workqueue/rate_limiting_queue.go:20  
1
 2
 3
 4
 5
 6
  
type  RateLimitingInterface  interface  { 
   DelayingInterface                    // 延时队列里内嵌了普通队列,限速队列里内嵌了延时队列
 AddRateLimited ( item  interface {})     // 限速方式往队列里加入一个元素
 Forget ( item  interface {})             // 标识一个元素结束重试
 NumRequeues ( item  interface {})  int    // 标识这个元素被处理里多少次了
 } 
 
 
然后看下两个 New 函数
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
  
func  NewRateLimitingQueue ( rateLimiter  RateLimiter )  RateLimitingInterface  { 
   return  & rateLimitingType { 
      DelayingInterface :  NewDelayingQueue (), 
      rateLimiter :        rateLimiter , 
   } 
} 
func  NewNamedRateLimitingQueue ( rateLimiter  RateLimiter ,  name  string )  RateLimitingInterface  { 
   return  & rateLimitingType { 
      DelayingInterface :  NewNamedDelayingQueue ( name ), 
      rateLimiter :        rateLimiter , 
   } 
} 
 
 
这里的区别就是里面的延时队列有没有指定的名字。注意到这里有一个 RateLimiter 类型,后面要详细讲,另外 rateLimitingType 就是上面接口的具体实现类型了。
RateLimiter 表示一个限速器,我们看下限速器是什么意思。先看接口定义:
k8s.io/client-go/util/workqueue/default_rate_limiters.go:27 
1
 2
 3
 4
 5
  
type  RateLimiter  interface  { 
   When ( item  interface {})  time . Duration  // 返回一个 item 需要等待的时长
 Forget ( item  interface {})              // 标识一个元素结束重试
 NumRequeues ( item  interface {})  int     // 标识这个元素被处理里多少次了
 } 
 
 
这个接口有五个实现,分别叫做:
BucketRateLimiter ItemExponentialFailureRateLimiter ItemFastSlowRateLimiter MaxOfRateLimiter WithMaxWaitRateLimiter  
下面分别来看
这个限速器可说的不多,用了 golang 标准库的 golang.org/x/time/rate.Limiter 实现。BucketRateLimiter 实例化的时候比如传递一个 rate.NewLimiter(rate.Limit(10), 100) 进去,表示令牌桶里最多有 100 个令牌,每秒发放 10 个令牌。
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
  
type  BucketRateLimiter  struct  { 
   * rate . Limiter 
} 
var  _  RateLimiter  =  & BucketRateLimiter {} 
func  ( r  * BucketRateLimiter )  When ( item  interface {})  time . Duration  { 
   return  r . Limiter . Reserve (). Delay ()  // 过多久后给当前 item 发放一个令牌
 } 
func  ( r  * BucketRateLimiter )  NumRequeues ( item  interface {})  int  { 
   return  0 
} 
func  ( r  * BucketRateLimiter )  Forget ( item  interface {})  { 
} 
 
 
Exponential 是指数的意思,从这个限速器的名字大概能猜到是失败次数越多,限速越长而且是指数级增长的一种限速器。
结构体定义如下,属性含义基本可以望文生义
1
 2
 3
 4
 5
 6
 7
  
type  ItemExponentialFailureRateLimiter  struct  { 
   failuresLock  sync . Mutex 
   failures      map [ interface {}] int 
   baseDelay  time . Duration 
   maxDelay   time . Duration 
} 
 
 
主要逻辑是 When() 函数是如何实现的
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
  
func  ( r  * ItemExponentialFailureRateLimiter )  When ( item  interface {})  time . Duration  { 
   r . failuresLock . Lock () 
   defer  r . failuresLock . Unlock () 
   exp  :=  r . failures [ item ] 
   r . failures [ item ]  =  r . failures [ item ]  +  1  // 失败次数加一
 // 每调用一次,exp 也就加了1,对应到这里时 2^n 指数爆炸
 backoff  :=  float64 ( r . baseDelay . Nanoseconds ())  *  math . Pow ( 2 ,  float64 ( exp )) 
   if  backoff  >  math . MaxInt64  {  // 如果超过了最大整型,就返回最大延时,不然后面时间转换溢出了
 return  r . maxDelay 
   } 
   calculated  :=  time . Duration ( backoff ) 
   if  calculated  >  r . maxDelay  {  // 如果超过最大延时,则返回最大延时
 return  r . maxDelay 
   } 
   return  calculated 
} 
 
 
另外两个函数太简单了:
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
  
func  ( r  * ItemExponentialFailureRateLimiter )  NumRequeues ( item  interface {})  int  { 
   r . failuresLock . Lock () 
   defer  r . failuresLock . Unlock () 
   return  r . failures [ item ] 
} 
func  ( r  * ItemExponentialFailureRateLimiter )  Forget ( item  interface {})  { 
   r . failuresLock . Lock () 
   defer  r . failuresLock . Unlock () 
   delete ( r . failures ,  item ) 
} 
 
 
快慢限速器,也就是先快后慢,定义一个阈值,超过了就慢慢重试。先看类型定义:
1
 2
 3
 4
 5
 6
 7
 8
  
type  ItemFastSlowRateLimiter  struct  { 
   failuresLock  sync . Mutex 
   failures      map [ interface {}] int 
   maxFastAttempts  int             // 快速重试的次数
 fastDelay        time . Duration   // 快重试间隔
 slowDelay        time . Duration   // 慢重试间隔
 } 
 
 
同样继续来看具体的方法实现
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
  
func  ( r  * ItemFastSlowRateLimiter )  When ( item  interface {})  time . Duration  { 
   r . failuresLock . Lock () 
   defer  r . failuresLock . Unlock () 
   r . failures [ item ]  =  r . failures [ item ]  +  1  // 标识重试次数 + 1
 if  r . failures [ item ]  <=  r . maxFastAttempts  {  // 如果快重试次数没有用完,则返回 fastDelay
 return  r . fastDelay 
   } 
   return  r . slowDelay  // 反之返回 slowDelay
 } 
func  ( r  * ItemFastSlowRateLimiter )  NumRequeues ( item  interface {})  int  { 
   r . failuresLock . Lock () 
   defer  r . failuresLock . Unlock () 
   return  r . failures [ item ] 
} 
func  ( r  * ItemFastSlowRateLimiter )  Forget ( item  interface {})  { 
   r . failuresLock . Lock () 
   defer  r . failuresLock . Unlock () 
   delete ( r . failures ,  item ) 
} 
 
 
这个限速器看着有点乐呵人,内部放多个限速器,然后返回限速最狠的一个延时:
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
  
type  MaxOfRateLimiter  struct  { 
   limiters  [] RateLimiter 
} 
func  ( r  * MaxOfRateLimiter )  When ( item  interface {})  time . Duration  { 
   ret  :=  time . Duration ( 0 ) 
   for  _ ,  limiter  :=  range  r . limiters  { 
      curr  :=  limiter . When ( item ) 
      if  curr  >  ret  { 
         ret  =  curr 
      } 
   } 
   return  ret 
} 
 
 
这个限速器也很简单,就是在其他限速器上包装一个最大延迟的属性,如果到了最大延时,则直接返回:
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
  
type  WithMaxWaitRateLimiter  struct  { 
   limiter   RateLimiter    // 其他限速器
 maxDelay  time . Duration  // 最大延时
 } 
func  NewWithMaxWaitRateLimiter ( limiter  RateLimiter ,  maxDelay  time . Duration )  RateLimiter  { 
   return  & WithMaxWaitRateLimiter { limiter :  limiter ,  maxDelay :  maxDelay } 
} 
func  ( w  WithMaxWaitRateLimiter )  When ( item  interface {})  time . Duration  { 
   delay  :=  w . limiter . When ( item ) 
   if  delay  >  w . maxDelay  { 
      return  w . maxDelay  // 已经超过了最大延时,直接返回最大延时
 } 
   return  delay 
} 
 
 
看完了上面的限速器的概念,限速队列的实现就很简单了:
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
  
func  ( q  * rateLimitingType )  AddRateLimited ( item  interface {})  { 
   // 内部存了一个延时队列,通过限速器计算出一个等待时间,然后传给延时队列
 q . DelayingInterface . AddAfter ( item ,  q . rateLimiter . When ( item )) 
} 
func  ( q  * rateLimitingType )  NumRequeues ( item  interface {})  int  { 
   return  q . rateLimiter . NumRequeues ( item ) 
} 
func  ( q  * rateLimitingType )  Forget ( item  interface {})  { 
   q . rateLimiter . Forget ( item ) 
} 
 
 
在自定义控制器开发场景下,我们用到的 workqueue 其实是用的这里的延时队列实现,一个延时队列也就是实现了 item 延时入队效果,内部是一个“优先级队列”,用了“最小堆”(有序完全二叉树),从而我们在 requeueAfter 中指定一个调谐过程 1 分钟后重试,实现原理也就明白了。
转载请注明本文来自胡涛的个人网站  - https://www.danielhu.cn