概述

源码版本信息

  • Project: kubernetes
  • Branch: master
  • Last commit id: d25d741c
  • Date: 2021-09-26

在 Operator 开发过程中难免会用到 Event 对象,所以很有必要了解 Event 相关细节,可以避免很多 bug 的产生。client-go 在处理 Event 的时候,有这样一些特性:

  1. 如果 apiserver 失联,会重试发送 12 次,第一次间隔是 [0,10),剩余每次间隔 10s,合计110-120 s 左右如果还连不上 apiserver 就会放弃本次事件的发送;
  2. client-go 在发送 event 之前会先进行一系列预处理流程,如果相似 event 的聚合,效果就是新发送一个关于相同资源对象的 Reason 和 Message 都相同的 event,这时候新 event 的 count 就是这类事件发生的次数,LastTimestamp 是事件产生时间,FirstTimestamp 是第一次观察到这类事件的时间;并且快速发送多个一样的 event 满足一定条件时会被聚合成一个;
  3. client-go 中发送 event 的 burst 是 25,qps 是 1/300,意味着令牌桶大小是 25,5分钟产生一块新令牌,换言之快速发送 25 个 event 之后,5分钟内发送的 event 会被丢弃;
  4. 消息广播器的缓冲区大小是 1000,如果产生事件的速度太快,当 EventWatcher 来不及处理时,新产生的 event 也会被直接丢弃;

event

以 job 控制器中 event 的用法为例,大致步骤如下

1
2
3
4
5
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"})
recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)

从这里我们可以得到几个关键信息,首先是涉及到的几个主要对象:

  • EventBroadcaster
  • EventSink
  • EventRecorder

从名字直接猜的话,也许是这样工作的:EventRecorder 产生 events,EventBroadcaster 广播 events,EventSink,不好猜,字面意思是个事件槽,大概就是事件的一个中转站,最后通过这个 Sink 事件会流转到其他地方,比如 logger 或者 apiserver?到这里为止纯属YY…… 下面具体来看。

EventBroadcaster

EventBroadcaster 用来接收 events,然后发送到一个 EventSink、watcher 或者 log 中;先看下接口定义:

  • client-go/tools/record/event.go:113
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
type EventBroadcaster interface {
   // 将从 EventBroadcaster 接收到的 events 丢给 eventHandler
   StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface 
   // 将从 EventBroadcaster 接收到的 events 丢给 EventSink
   StartRecordingToSink(sink EventSink) watch.Interface
   // 将从 EventBroadcaster 接收到的 events 丢给指定的日志函数
   StartLogging(logf func(format string, args ...interface{})) watch.Interface
   // 将从 EventBroadcaster 接收到的 events 丢给指定的结构化日志函数
   StartStructuredLogging(verbosity klog.Level) watch.Interface
   // 用于获取 EventRecorder,EventRecorder 可以发送 events 给 EventBroadcaster
   NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
   Shutdown()
}

这里注意不要找到 tools/events 包里去,两个包有很相似的代码,看起来是 tools/events 包已经过期了,目前引用的都是 tools/record 包。

EventBroadcaster 对应的实现是 eventBroadcasterImpl

  • client-go/tools/record/event.go:181
1
2
3
4
5
type eventBroadcasterImpl struct {
	 *watch.Broadcaster
	 sleepDuration time.Duration
	 options       CorrelatorOptions
}

前面提到第一步就是 eventBroadcaster := record.NewBroadcaster() 调用,我们下面看看这里的 New 过程

NewBroadcaster()

实例化一个 EventBroadcaster 的过程中会直接开启一个 goroutine 来从 Broadcaster.imcoming 接收 Event,然后分发给所有的 Broadcaster.watchers

  • client-go/tools/record/event.go:159
1
2
3
4
5
6
func NewBroadcaster() EventBroadcaster {
	return &eventBroadcasterImpl{
		Broadcaster:   watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), // maxQueuedEvents == 1000
		sleepDuration: defaultSleepDuration, // 10s
	}
}

这里通过调用 watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull) 来创建 Broadcaster,继续看下逻辑:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func NewLongQueueBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
   m := &Broadcaster{
      watchers:            map[int64]*broadcasterWatcher{},
      incoming:            make(chan Event, queueLength),
      stopped:             make(chan struct{}),
      watchQueueLength:    queueLength,
      fullChannelBehavior: fullChannelBehavior,
   }
   m.distributing.Add(1) // wg.Add(1)
   go m.loop()
   return m
}

这里实例化了一个 Broadcaster,先看下 Broadcaster 的结构:

  • apimachinery/pkg/watch/mux.go:42
1
2
3
4
5
6
7
8
9
type Broadcaster struct {
   watchers     map[int64]*broadcasterWatcher
   nextWatcher  int64
   distributing sync.WaitGroup
   incoming chan Event
   stopped  chan struct{}
   watchQueueLength int
   fullChannelBehavior FullChannelBehavior
}

这里的 Event 是这个结构:

1
2
3
4
5
6
7
type Event struct {
   Type EventType // "ADDED" / "MODIFIED" / "DELETED" / "BOOKMARK" / "ERROR"
   // 如果 EventType 是 "ADDED"/"MODIFIED",Object 是对象的最新状态;
   // 如果 EventType 是 "DELETED",Object 是对象删除前的状态;
   // 如果 EventType 是 "BOOKMARK",Object 里只有 ResourceVersion 字段被设置,客户端会保证不会收到重复的 event 或者丢失任何一个 event
   Object runtime.Object
}

回过来看调用的 m.loop(),loop() 方法的逻辑是从 m.incoming 接受消息,然后分发给所有的 watchers

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (m *Broadcaster) loop() {
   for event := range m.incoming { // chan Event 类型,这里的 Event apimachinery/pkg/watch.Event
      if event.Type == internalRunFunctionMarker { // 如果是 fake 类型的 Event,就直接调用其内部 Obj 的方法,下面有具体 internalRunFunctionMarker 逻辑的用途分析,在 blockQueue 方法里会讲到。
         event.Object.(functionFakeRuntimeObject)()
         continue
      }
      m.distribute(event) // 分发逻辑
   }
   m.closeAll()
   m.distributing.Done() // wg.Done()
}

继续看 distribute 逻辑,这里主要是将一个 Event 分发给所有的 watcher

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (m *Broadcaster) distribute(event Event) {
   if m.fullChannelBehavior == DropIfChannelFull { // 默认行为,如果 channel 满了不阻塞,直接丢弃消息
      for _, w := range m.watchers { // map[int64]*broadcasterWatcher
         select {
         case w.result <- event: // 将 event 丢给 broadcasterWatcher.result
         case <-w.stopped:
         default: // 当 w.result 满了写不进去时直接继续下一轮循环,区别于 else 里的阻塞行为
         }
      }
   } else {
      for _, w := range m.watchers {
         select {
         case w.result <- event: // result 满了会阻塞
         case <-w.stopped:
         }
      }
   }
}

StartEventWatcher

开头提到的 job 控制器里 events 相关代码里接着两步是这样的:

1
2
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

这两个函数调用代码都不长,主要逻辑在内部的 StartEventWatcher 中。简单看下两个方法的定义:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// client-go/tools/record/event.go:190
func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watch.Interface {
   return e.StartEventWatcher(
      func(e *v1.Event) { // 收到一个 Event,直接打印到日志里
         klog.V(verbosity).InfoS("Event occurred", "object", klog.KRef(e.InvolvedObject.Namespace, e.InvolvedObject.Name), "kind", e.InvolvedObject.Kind, "apiVersion", e.InvolvedObject.APIVersion, "type", e.Type, "reason", e.Reason, "message", e.Message)
      })
}
// ......
// client-go/tools/record/event.go:291
func (e *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
   eventCorrelator := NewEventCorrelatorWithOptions(e.options)
   return e.StartEventWatcher(
      func(event *v1.Event) {
         recordToSink(sink, event, eventCorrelator, e.sleepDuration)
      })
}

我们来看 StartEventWatcher 逻辑,然后回过来聊这两个方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
   watcher := e.Watch() // Watch 过程下面来看
   go func() {
      defer utilruntime.HandleCrash()
      for watchEvent := range watcher.ResultChan() { // 前面 Broadcaster 就是往这里丢的 Event
         event, ok := watchEvent.Object.(*v1.Event) // Event.Object 就是具体的 corev1.Event
         if !ok {
            // This is all local, so there's no reason this should
            // ever happen.
            continue
         }
         eventHandler(event) // 丢给 handler 函数处理
      }
   }()
   return watcher
}

StartEventWatcher() 方法的入参是一个能处理 Eventhandler 函数,这里的 Eventcorev1.Event,也就是我们通过 kubectl 命令具体看到的 Event 资源对象。上面有一个 Watch() 方法的调用,我们分析下具体内容。

Watch() 方法会 new 一个 watcher,然后加到 m.watchers map 里,返回这个 watcher,这个 watcher 不会接收到历史 events,而且会阻塞到成功加入 Broadcaster 为止。比如 Broadcaster 到 incoming 队列里已经有很多 Event 了,这时候新启动一个 watcher 直接开始工作会收到老消息,下面通过 blockQueue 逻辑实现了只接收新消息的逻辑,具体看下代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (m *Broadcaster) Watch() Interface {
   var w *broadcasterWatcher
   m.blockQueue(func() { // blockQueue() 下面有分析
      id := m.nextWatcher
      m.nextWatcher++
      w = &broadcasterWatcher{
         result:  make(chan Event, m.watchQueueLength),
         stopped: make(chan struct{}),
         id:      id,
         m:       m,
      }
      m.watchers[id] = w
   })
   if w == nil {
      panic("broadcaster already stopped")
   }
   return w
}

blockQueue 用来阻塞 incoming 队列用的,就是往 incoming 里加入一个 fake 的 Event,然后挂起当前 gorouting,直到这个 Event 被处理到为止

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (m *Broadcaster) blockQueue(f func()) {
   select {
   case <-m.stopped:
      return
   default:
   }
   var wg sync.WaitGroup
   wg.Add(1)
   m.incoming <- Event{
      Type: internalRunFunctionMarker, // "internal-do-function"
      Object: functionFakeRuntimeObject(func() {
         defer wg.Done() // 这个 Event 被消费后会调用到 Done()
         f() // 阻塞结束后调用
      }),
   }
   wg.Wait() // 阻塞直到上面加入到 Event 被处理完
}

StartRecordingToSink

讲完了 StartEventWatcher 的逻辑,回过头看一下 StartRecordingToSink 的具体逻辑。StartRecordingToSink 的作用是将从指定 eventBroadcaster 接收到的消息传送到给定的 sink 中去

  • client-go/tools/record/event.go:190
1
2
3
4
5
6
7
func (e *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
   eventCorrelator := NewEventCorrelatorWithOptions(e.options)
   return e.StartEventWatcher(
      func(event *v1.Event) {
         recordToSink(sink, event, eventCorrelator, e.sleepDuration)
      })
}

这里传给 StartEventWatcher() 方法的 handler 函数是 recordToSink(sink, event, eventCorrelator, e.sleepDuration),看下这个 handler 是怎么实现的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) {
   // 修改前复制一份,因为一个 event 有多个 listener
   eventCopy := *event
   event = &eventCopy
   result, err := eventCorrelator.EventCorrelate(event) // 聚合处理等,下面会提到
   if err != nil {
      utilruntime.HandleError(err)
   }
   if result.Skip {
      return
   }
   tries := 0
   for {
      // 具体执行将 event 写到 sink 的过程,这里是在 if 的条件里,所以直到成功了才会 break
      if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
         break
      }
      tries++
      if tries >= maxTriesPerEvent { // 最多重试 12 次
         klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
         break
      }
      // 第一次 sleep 随机一点,避免 apiserver 失联的时候所有 client 一起失败
      if tries == 1 {
        time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64())) // 10s * [0.0,1.0)
      } else {
         time.Sleep(sleepDuration) // 10s
      }
   }
}

recordEvent

  • client-go/tools/record/event.go:238
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
   var newEvent *v1.Event
   var err error
   if updateExistingEvent { // 如果是更新已有的 event,则调用 Patch 方法
      newEvent, err = sink.Patch(event, patch)
   }
   // 更新可能失败,因为这个 event 可能已经被删除了
   if !updateExistingEvent || (updateExistingEvent && util.IsKeyNotFoundError(err)) {
      // 如果是新建,则需要确保 ResourceVersion 为空
      event.ResourceVersion = ""
      newEvent, err = sink.Create(event)
   }
   if err == nil {
      // 更新 eventCorrelator 状态
      eventCorrelator.UpdateState(newEvent)
      return true
   }

   // 连不上 apiserver 等原因引起的失败
   switch err.(type) {
   case *restclient.RequestConstructionError:
      // 这种情况重试也会失败,所以直接返回 true
      klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
      return true
   case *errors.StatusError: // 服务器端拒绝更新,放弃
      if errors.IsAlreadyExists(err) {
         klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
      } else {
         klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
      }
      return true
   case *errors.UnexpectedObjectError:
   default: // http 传输问题,比如失联,需要重试
      
   }
   klog.Errorf("Unable to write event: '%#v': '%v'(may retry after sleeping)", event, err)
   return false
}

EventSink

接口定义如下

  • client-go/tools/record/event.go:47
1
2
3
4
5
type EventSink interface {
   Create(event *v1.Event) (*v1.Event, error)
   Update(event *v1.Event) (*v1.Event, error)
   Patch(oldEvent *v1.Event, data []byte) (*v1.Event, error)
}

实现是

1
2
3
type EventSinkImpl struct {
   Interface EventInterface
}

回到一开始的用法:eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

可以看到 Interface 引用的是 kubeClient.CoreV1().Events("") ,这里的逻辑就到了 client-go 的 clientset 中了,这里的类型也就是 corev1.EventInterface,所以上面接口的 Create、Update、Patch 也就都是通过 clientset 来实现的。

EventCorrelator

EventCorrelator 的作用是预处理所有 events,聚合频繁产生的相似的 events,将多次接受到的 events 聚合成一个等,从而降低系统压力。

上面提到一个 eventCorrelator.EventCorrelate() 调用,首先看下对象定义:

  • client-go/tools/record/events_cache.go:405
1
2
3
4
5
6
7
8
type EventCorrelator struct {
   // 过滤器
   filterFunc EventFilterFunc
   // 聚合器
   aggregator *EventAggregator
   // 观察器
   logger *eventLogger
}

EventFilterFunc

过滤器主要是限速用的,看一个具体的实现:

  • client-go/tools/record/events_cache.go:129
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
   var record spamRecord
  // eventKey 的结构大概这样:event.Source.(Component+Host)+event.InvolvedObject.(Kind+Namespace+event+Name)+...
   eventKey := f.spamKeyFunc(event)

   f.Lock()
   defer f.Unlock()
   value, found := f.cache.Get(eventKey) // cache 是一个 LRU 缓存
   if found {
      record = value.(spamRecord)
   }

   // 没有限速器就加一个
   if record.rateLimiter == nil {
      // 默认一个 source+object 的 burst 是 25 ,qps 是 1/300(5分钟一个),也就是令牌桶初始容量是 25,然后 5 分钟才会多一个令牌进来
      record.rateLimiter = flowcontrol.NewTokenBucketPassiveRateLimiterWithClock(f.qps, f.burst, f.clock)
   }

   // 看速率是否满足要求
   filter := !record.rateLimiter.TryAccept()
   // 更新 cache
   f.cache.Add(eventKey, record)

   return filter
}

EventAggregator

聚合器的作用是将相似的 events 聚合成一个 event

聚合器定义如下

  • client-go/tools/record/events_cache.go:191
1
2
3
4
5
6
7
8
9
type EventAggregator struct {
   sync.RWMutex
   cache *lru.Cache
   keyFunc EventAggregatorKeyFunc
   messageFunc EventAggregatorMessageFunc
   maxEvents uint            // 当相似 event 数量超过这个最大值时就触发聚合操作,默认是 10
   maxIntervalInSeconds uint // 过了这个间隔的两个相似 event 被认为是一个新的 event,默认 10min
   clock clock.PassiveClock
}

对应的 New 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc,
   maxEvents int, maxIntervalInSeconds int, clock clock.PassiveClock) *EventAggregator {
   return &EventAggregator{
      cache:                lru.New(lruCacheSize), // 默认 4096
      keyFunc:              keyFunc,
      messageFunc:          messageFunc,
      maxEvents:            uint(maxEvents), // 默认是 10
      maxIntervalInSeconds: uint(maxIntervalInSeconds), // 默认 600s
      clock:                clock,
   }
}

这里的 keyFunc 如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func EventAggregatorByReasonFunc(event *v1.Event) (string, string) {
   return strings.Join([]string{
      event.Source.Component,
      event.Source.Host,
      event.InvolvedObject.Kind,
      event.InvolvedObject.Namespace,
      event.InvolvedObject.Name,
      string(event.InvolvedObject.UID),
      event.InvolvedObject.APIVersion,
      event.Type,
      event.Reason,
      event.ReportingController,
      event.ReportingInstance,
   },
      ""), event.Message
}

messageFunc 如下

1
2
3
func EventAggregatorByReasonMessageFunc(event *v1.Event) string {
   return "(combined from similar events): " + event.Message
}

聚合过程在 EventAggregate() 方法中实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
   now := metav1.NewTime(e.clock.Now())
   var record aggregateRecord // 维护了所有接收到过的 event 的 key
   // 计算这个 event 的 key
   eventKey := getEventKey(newEvent)
   // 类似这样 "(combined from similar events): " + event.Message
   aggregateKey, localKey := e.keyFunc(newEvent)

   // 查询 caches 里是否有相似 events 记录
   e.Lock()
   defer e.Unlock()
   value, found := e.cache.Get(aggregateKey)
   if found {
      record = value.(aggregateRecord)
   }

   // 看向下之前的记录是否太旧了,这个事件是 10min,如果太旧了就更新。
   maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
   interval := now.Time.Sub(record.lastTimestamp.Time)
   if interval > maxInterval {
      record = aggregateRecord{localKeys: sets.NewString()}
   }

   // 新 event 写入聚合 record 里,并且放到 cache 中
   record.localKeys.Insert(localKey)
   record.lastTimestamp = now
   e.cache.Add(aggregateKey, record)

   // 如果不重复的 events 数量小于10
   if uint(record.localKeys.Len()) < e.maxEvents {
      return newEvent, eventKey
   }

   // 保证 localKeys 不增长,pop 出来一个
   record.localKeys.PopAny()

   // 返回一个聚合后的 event 和对应的“聚合 key”
   eventCopy := &v1.Event{
      ObjectMeta: metav1.ObjectMeta{
         Name:      fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
         Namespace: newEvent.Namespace,
      },
      Count:          1,
      FirstTimestamp: now,
      InvolvedObject: newEvent.InvolvedObject,
      LastTimestamp:  now,
      Message:        e.messageFunc(newEvent),
      Type:           newEvent.Type,
      Reason:         newEvent.Reason,
      Source:         newEvent.Source,
   }
   return eventCopy, aggregateKey
}

eventLogger

观察器做的事情是将一个新产生的 Event 和 LRU 缓存里的做对比,如果 key 一致,也就是两个 Event 表示的信息一样,则更新缓存;如果不一样,就加到缓存里。

  • client-go/tools/record/events_cache.go:315
1
2
3
4
5
type eventLogger struct {
   sync.RWMutex
   cache *lru.Cache
   clock clock.PassiveClock
}

观察器有一个 eventObserve() 方法,如果 key 是相同的,这个方法会直接更新已经存在的 event,反之增加一个新的 event

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
   var (
      patch []byte
      err   error
   )
   eventCopy := *newEvent // 复制一份
   event := &eventCopy

   e.Lock()
   defer e.Unlock()

   // 检查缓存里是否有需要更新的 event,这里的 key 就是前面提到的 EventAggregatorByReasonFunc() 计算出来的 key
   lastObservation := e.lastEventObservationFromCache(key)

   // 如果发现了需要更新的 event,也就是新的 event 已经存在已经老的和其各个属性都一样,Reason、Message 等都一样,而且属于同一个对象
   if lastObservation.count > 0 {
      // update the event based on the last observation so patch will work as desired
      event.Name = lastObservation.name
      event.ResourceVersion = lastObservation.resourceVersion
      event.FirstTimestamp = lastObservation.firstTimestamp // Event 构造的时候会设置 firstTimestamp 和 lastTimestamp,这里更新了 firstTimestamp
      event.Count = int32(lastObservation.count) + 1 // 计数器加1

      eventCopy2 := *event
      eventCopy2.Count = 0
      eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0))
      eventCopy2.Message = ""

      newData, _ := json.Marshal(event)
      oldData, _ := json.Marshal(eventCopy2)
      patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event)
   }

   // 记录新观察到的 Event
   e.cache.Add(
      key,
      eventLog{
         count:           uint(event.Count),
         firstTimestamp:  event.FirstTimestamp,
         name:            event.Name,
         resourceVersion: event.ResourceVersion,
      },
   )
   return event, patch, err
}

EventCorrelator.EventCorrelate()

回到 EventCorrelate() 方法的实现上

  • client-go/tools/record/events_cache.go:510
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
   if newEvent == nil {
      return nil, fmt.Errorf("event is nil")
   }
   aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent) // 聚合器处理
   observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey) // 观察器处理
   if c.filterFunc(observedEvent) { // 过滤器处理
      return &EventCorrelateResult{Skip: true}, nil
   }
   return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}

EventRecorder

最后两步是:

1
2
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"})
recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)

这里的 NewRecorder 定义如下

1
2
3
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
   return &recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}}
}

入参是 Scheme 和 EventSource,EventSource 结构很简单:

1
2
3
4
5
6
type EventSource struct {
   // Event 从哪个组件来的,比如:job-controller
   Component string `json:"component,omitempty" protobuf:"bytes,1,opt,name=component"`
   // Event 从哪个节点来的
   Host string `json:"host,omitempty" protobuf:"bytes,2,opt,name=host"`
}

看下 EventRecorder 的定义和实现,接口长这样:

1
2
3
4
5
6
7
8
type EventRecorder interface {
   // 这里的 object 是这个 event 相关的那个资源对象;eventtype 是 'Normal/Warning' 这类简单的字符串;reason 表示这个 event 产生的原因,message 是一个更详细的可读性好的描述信息
   Event(object runtime.Object, eventtype, reason, message string)
   // 和 Event() 类似,只是用来 Sprintf
   Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
   // Eventf() 基础上加了一个 annotations
   AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}

对应的实现是:

1
2
3
4
5
6
type recorderImpl struct {
   scheme *runtime.Scheme
   source v1.EventSource
   *watch.Broadcaster
   clock clock.PassiveClock
}

recorderImpl.Eventf()

我们写代码时使用最多的 Eventf() 对应的就是 recorderImpl 对象的 Eventf() 方法,接下来看下 Eventf() 的具体实现。

Eventf() 方法只是简单地通过 fmt.Sprintf() 格式化字符串后调用 Event()

  • client-go/tools/record/event.go:354
1
2
3
4
5
6
7
func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
	recorder.generateEvent(object, nil, eventtype, reason, message)
}

func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
   recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}

接着逻辑就到了generateEvent() 方法里

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, eventtype, reason, message string) {
   ref, err := ref.GetReference(recorder.scheme, object) // 获取 object 的 ObjectReference
   if err != nil {
      klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
      return
   }

   if !util.ValidateEventType(eventtype) { // 校验 eventtype 是 "Normal"/"Warning"
      klog.Errorf("Unsupported event type: '%v'", eventtype)
      return
   }

   event := recorder.makeEvent(ref, annotations, eventtype, reason, message) // 构建 event
   event.Source = recorder.source
   // events 操作不应该阻塞,如果 event 太多的时候直接丢弃,然后打印一个日志
   if sent := recorder.ActionOrDrop(watch.Added, event); !sent {
      klog.Errorf("unable to record event: too many queued events, dropped event %#v", event)
   }
}

这里有两个函数调用:

  • makeEvent()
  • ActionOrDrop()

makeEvent 从名字就能猜到这是构造一个 Event 对象的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
   t := metav1.Time{Time: recorder.clock.Now()}
   namespace := ref.Namespace
   if namespace == "" {
      namespace = metav1.NamespaceDefault
   }
   return &v1.Event{
      ObjectMeta: metav1.ObjectMeta{
         Name:        fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()), // 名字是相关 object 的 Name + 时间戳
         Namespace:   namespace, // 和 ref 在同一个 namespace 下,如果 ref 没有 namespace 就放到 default 下
         Annotations: annotations, // 支持添加 annotations
      },
      InvolvedObject: *ref,
      Reason:         reason,
      Message:        message,
      FirstTimestamp: t,
      LastTimestamp:  t,
      Count:          1,
      Type:           eventtype,
   }
}

ActionOrDrop 的逻辑是尝试往 Broadcaster.incoming channel 中写入 Event,如果失败了就直接 Drop 掉

1
2
3
4
5
6
7
8
func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) bool {
   select {
   case m.incoming <- Event{action, obj}:
      return true
   default:
      return false
   }
}

(转载请保留本文原始链接)