目录

Kubeedge源码阅读系列--cloudcore.edgecontroller模块

前言

代码基于 kubeedge1.3


模块入口

edgecontroller 与上次看的 devicecontroller 源码很像, 入口分别创建了upstreamdownstream, 不同的是这里的upstreamdownstream没有依赖关系

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// Start controller
func (ec *EdgeController) Start() {
  upstream, err := controller.NewUpstreamController()
  if err != nil {
    klog.Errorf("new upstream controller failed with error: %s", err)
    os.Exit(1)
  }
  upstream.Start()

  downstream, err := controller.NewDownstreamController()
  if err != nil {
    klog.Warningf("new downstream controller failed with error: %s", err)
    os.Exit(1)
  }
  downstream.Start()
}

upstream

devicecontroller 一样, edgecontroller 的 upstream 也有Start()函数

Start()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// Start UpstreamController
func (uc *UpstreamController) Start() error {
  klog.Info("start upstream controller")

  uc.nodeStatusChan = make(chan model.Message, config.Config.Buffer.UpdateNodeStatus)
  uc.podStatusChan = make(chan model.Message, config.Config.Buffer.UpdatePodStatus)
  uc.configMapChan = make(chan model.Message, config.Config.Buffer.QueryConfigMap)
  uc.secretChan = make(chan model.Message, config.Config.Buffer.QuerySecret)
  uc.serviceChan = make(chan model.Message, config.Config.Buffer.QueryService)
  uc.endpointsChan = make(chan model.Message, config.Config.Buffer.QueryEndpoints)
  uc.persistentVolumeChan = make(chan model.Message, config.Config.Buffer.QueryPersistentVolume)
  uc.persistentVolumeClaimChan = make(chan model.Message, config.Config.Buffer.QueryPersistentVolumeClaim)
  uc.volumeAttachmentChan = make(chan model.Message, config.Config.Buffer.QueryVolumeAttachment)
  uc.queryNodeChan = make(chan model.Message, config.Config.Buffer.QueryNode)
  uc.updateNodeChan = make(chan model.Message, config.Config.Buffer.UpdateNode)
  uc.podDeleteChan = make(chan model.Message, config.Config.Buffer.DeletePod)

  go uc.dispatchMessage()

  for i := 0; i < int(config.Config.Load.UpdateNodeStatusWorkers); i++ {
    go uc.updateNodeStatus()
  }
  for i := 0; i < int(config.Config.Load.UpdatePodStatusWorkers); i++ {
    go uc.updatePodStatus()
  }
  for i := 0; i < int(config.Config.Load.QueryConfigMapWorkers); i++ {
    go uc.queryConfigMap()
  }
  for i := 0; i < int(config.Config.Load.QuerySecretWorkers); i++ {
    go uc.querySecret()
  }
  for i := 0; i < int(config.Config.Load.QueryServiceWorkers); i++ {
    go uc.queryService()
  }
  for i := 0; i < int(config.Config.Load.QueryEndpointsWorkers); i++ {
    go uc.queryEndpoints()
  }
  for i := 0; i < int(config.Config.Load.QueryPersistentVolumeWorkers); i++ {
    go uc.queryPersistentVolume()
  }
  for i := 0; i < int(config.Config.Load.QueryPersistentVolumeClaimWorkers); i++ {
    go uc.queryPersistentVolumeClaim()
  }
  for i := 0; i < int(config.Config.Load.QueryVolumeAttachmentWorkers); i++ {
    go uc.queryVolumeAttachment()
  }
  for i := 0; i < int(config.Config.Load.QueryNodeWorkers); i++ {
    go uc.queryNode()
  }
  for i := 0; i < int(config.Config.Load.UpdateNodeWorkers); i++ {
    go uc.updateNode()
  }
  for i := 0; i < int(config.Config.Load.DeletePodWorkers); i++ {
    go uc.deletePod()
  }
  return nil
}

upstream.Start() 首先初始化了他的所有成员channel, 然后执行了dispatchMessage()用于分发收到的数据, 只后执行其他函数用于处理成员channel里面的数据, 可以以updateNodeStatus()为例

uc.dispatchMessage():

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
func (uc *UpstreamController) dispatchMessage() {
  for {
    select {
    case <-beehiveContext.Done():
      klog.Info("stop dispatchMessage")
      return
    default:
    }
    msg, err := uc.messageLayer.Receive()
    if err != nil {
      klog.Warningf("receive message failed, %s", err)
      continue
    }

    klog.Infof("dispatch message ID: %s", msg.GetID())
    klog.V(5).Infof("dispatch message content: %++v", msg)

    resourceType, err := messagelayer.GetResourceType(msg)
    if err != nil {
      klog.Warningf("parse message: %s resource type with error: %s", msg.GetID(), err)
      continue
    }
    klog.Infof("message: %s, resource type is: %s", msg.GetID(), resourceType)
    operationType := msg.GetOperation()
    klog.Infof("message: %s, operation type is: %s", msg.GetID(), operationType)

    switch resourceType {
    case model.ResourceTypeNodeStatus:
      uc.nodeStatusChan <- msg
    case model.ResourceTypePodStatus:
      uc.podStatusChan <- msg
    case model.ResourceTypeConfigmap:
      uc.configMapChan <- msg
    case model.ResourceTypeSecret:
      uc.secretChan <- msg
    case common.ResourceTypeService:
      uc.serviceChan <- msg
    case common.ResourceTypeEndpoints:
      uc.endpointsChan <- msg
    case common.ResourceTypePersistentVolume:
      uc.persistentVolumeChan <- msg
    case common.ResourceTypePersistentVolumeClaim:
      uc.persistentVolumeClaimChan <- msg
    case common.ResourceTypeVolumeAttachment:
      uc.volumeAttachmentChan <- msg
    case model.ResourceTypeNode:
      switch operationType {
      case model.QueryOperation:
        uc.queryNodeChan <- msg
      case model.UpdateOperation:
        uc.updateNodeChan <- msg
      default:
        klog.Errorf("message: %s, operation type: %s unsupported", msg.GetID(), operationType)
      }
    case model.ResourceTypePod:
      if msg.GetOperation() == model.DeleteOperation {
        uc.podDeleteChan <- msg
      }
    default:
      klog.Errorf("message: %s, resource type: %s unsupported", msg.GetID(), resourceType)
    }
  }
}

这里根据收到的数据的resourceType选择将数据送到对应的 channel 中.

updateNodeStatus():

函数很长, 结构为一个 for 循环嵌套 select, 接受 channel nodeStatusChan中数据到 msg 中, msg := <-uc.nodeStatusChan , 依次 GetContent, GetNamespace, GetResourceName, GetOperation, 根据数据的 Operation 来做出相应的操作, 一般是上传到 apiserver

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
func (uc *UpstreamController) updateNodeStatus() {
  for {
    select {
    case <-beehiveContext.Done():
      klog.Warning("stop updateNodeStatus")
      return
    case msg := <-uc.nodeStatusChan:
      klog.Infof("message: %s, operation is: %s, and resource is %s", msg.GetID(), msg.GetOperation(), msg.GetResource())

      var data []byte
      switch msg.Content.(type) {
      case []byte:
        data = msg.GetContent().([]byte)
      default:
        var err error
        data, err = json.Marshal(msg.GetContent())
        if err != nil {
          klog.Warningf("message: %s process failure, marshal message content with error: %s", msg.GetID(), err)
          continue
        }
      }

      namespace, err := messagelayer.GetNamespace(msg)
      if err != nil {
        klog.Warningf("message: %s process failure, get namespace failed with error: %s", msg.GetID(), err)
        continue
      }
      name, err := messagelayer.GetResourceName(msg)
      if err != nil {
        klog.Warningf("message: %s process failure, get resource name failed with error: %s", msg.GetID(), err)
        continue
      }

      switch msg.GetOperation() {
      case model.InsertOperation:
        _, err := uc.kubeClient.CoreV1().Nodes().Get(name, metaV1.GetOptions{})
        if err == nil {
          klog.Infof("node: %s already exists, do nothing", name)
          uc.nodeMsgResponse(name, namespace, "OK", msg)
          continue
        }

        if !errors.IsNotFound(err) {
          klog.Errorf("get node %s info error: %v , register node failed", name, err)
          uc.nodeMsgResponse(name, namespace, "", msg)
          continue
        }

        node := &v1.Node{}
        err = json.Unmarshal(data, node)
        if err != nil {
          klog.Errorf("message: %s process failure, unmarshal marshaled message content with error: %s", msg.GetID(), err)
          uc.nodeMsgResponse(name, namespace, "", msg)
          continue
        }

        if _, err = uc.createNode(name, node); err != nil {
          klog.Errorf("create node %s error: %v , register node failed", name, err)
          uc.nodeMsgResponse(name, namespace, "", msg)
          continue
        }

        uc.nodeMsgResponse(name, namespace, "OK", msg)

      case model.UpdateOperation:
        nodeStatusRequest := &edgeapi.NodeStatusRequest{}
        err := json.Unmarshal(data, nodeStatusRequest)
        if err != nil {
          klog.Warningf("message: %s process failure, unmarshal marshaled message content with error: %s", msg.GetID(), err)
          continue
        }

        getNode, err := uc.kubeClient.CoreV1().Nodes().Get(name, metaV1.GetOptions{})
        if errors.IsNotFound(err) {
          klog.Warningf("message: %s process failure, node %s not found", msg.GetID(), name)
          continue
        }

        if err != nil {
          klog.Warningf("message: %s process failure with error: %s, namespaces: %s name: %s", msg.GetID(), err, namespace, name)
          continue
        }

        // TODO: comment below for test failure. Needs to decide whether to keep post troubleshoot
        // In case the status stored at metadata service is outdated, update the heartbeat automatically
        if !config.Config.EdgeSiteEnable {
          for i := range nodeStatusRequest.Status.Conditions {
            if time.Now().Sub(nodeStatusRequest.Status.Conditions[i].LastHeartbeatTime.Time) > time.Duration(config.Config.NodeUpdateFrequency)*time.Second {
              nodeStatusRequest.Status.Conditions[i].LastHeartbeatTime = metaV1.NewTime(time.Now())
            }

            if time.Now().Sub(nodeStatusRequest.Status.Conditions[i].LastTransitionTime.Time) > time.Duration(config.Config.NodeUpdateFrequency)*time.Second {
              nodeStatusRequest.Status.Conditions[i].LastTransitionTime = metaV1.NewTime(time.Now())
            }
          }
        }

        if getNode.Annotations == nil {
          klog.Warningf("node annotations is nil map, new a map for it. namespace: %s, name: %s", getNode.Namespace, getNode.Name)
          getNode.Annotations = make(map[string]string)
        }
        for name, v := range nodeStatusRequest.ExtendResources {
          if name == constants.NvidiaGPUScalarResourceName {
            var gpuStatus []types.NvidiaGPUStatus
            for _, er := range v {
              gpuStatus = append(gpuStatus, types.NvidiaGPUStatus{ID: er.Name, Healthy: true})
            }
            if len(gpuStatus) > 0 {
              data, _ := json.Marshal(gpuStatus)
              getNode.Annotations[constants.NvidiaGPUStatusAnnotationKey] = string(data)
            }
          }
          data, err := json.Marshal(v)
          if err != nil {
            klog.Warningf("message: %s process failure, extend resource list marshal with error: %s", msg.GetID(), err)
            continue
          }
          getNode.Annotations[string(name)] = string(data)
        }

        // Keep the same "VolumesAttached" attribute with upstream,
        // since this value is maintained by kube-controller-manager.
        nodeStatusRequest.Status.VolumesAttached = getNode.Status.VolumesAttached

        getNode.Status = nodeStatusRequest.Status
        node, err := uc.kubeClient.CoreV1().Nodes().UpdateStatus(getNode)
        if err != nil {
          klog.Warningf("message: %s process failure, update node failed with error: %s, namespace: %s, name: %s", msg.GetID(), err, getNode.Namespace, getNode.Name)
          continue
        }

        resMsg := model.NewMessage(msg.GetID())
        resMsg.SetResourceVersion(node.ResourceVersion)
        resMsg.Content = "OK"
        nodeID, err := messagelayer.GetNodeID(msg)
        if err != nil {
          klog.Warningf("Message: %s process failure, get node id failed with error: %s", msg.GetID(), err)
          continue
        }
        resource, err := messagelayer.BuildResource(nodeID, namespace, model.ResourceTypeNode, name)
        if err != nil {
          klog.Warningf("Message: %s process failure, build message resource failed with error: %s", msg.GetID(), err)
          continue
        }
        resMsg.BuildRouter(constants.EdgeControllerModuleName, constants.GroupResource, resource, model.ResponseOperation)
        if err = uc.messageLayer.Response(*resMsg); err != nil {
          klog.Warningf("Message: %s process failure, response failed with error: %s", msg.GetID(), err)
          continue
        }

        klog.V(4).Infof("message: %s, update node status successfully, namespace: %s, name: %s", msg.GetID(), getNode.Namespace, getNode.Name)

      default:
        klog.Warningf("message: %s process failure, node status operation: %s unsupported", msg.GetID(), msg.GetOperation())
      }
      klog.V(4).Infof("message: %s process successfully", msg.GetID())
    }
  }
}

downstream

NewDownstreamController:

这是 downstream 的初始化函数,它创建了 podManager configMapManager secretManager nodesManager serviceManager endpointsManager 以及 lc := &manager.LocationCache{} cli, err := utils.KubeClient() 执行 dc.initLocating() // initLocating to know configmap and secret should send to which nodes

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// NewDownstreamController create a DownstreamController from config
func NewDownstreamController() (*DownstreamController, error) {
  lc := &manager.LocationCache{}

  cli, err := utils.KubeClient()
  if err != nil {
    klog.Warningf("create kube client failed with error: %s", err)
    return nil, err
  }

  var nodeName = ""
  if config.Config.EdgeSiteEnable {
    if config.Config.NodeName == "" {
      return nil, fmt.Errorf("kubeEdge node name is not provided in edgesite controller configuration")
    }
    nodeName = config.Config.NodeName
  }

  podManager, err := manager.NewPodManager(cli, v1.NamespaceAll, nodeName)
  if err != nil {
    klog.Warningf("create pod manager failed with error: %s", err)
    return nil, err
  }

  configMapManager, err := manager.NewConfigMapManager(cli, v1.NamespaceAll)
  if err != nil {
    klog.Warningf("create configmap manager failed with error: %s", err)
    return nil, err
  }

  secretManager, err := manager.NewSecretManager(cli, v1.NamespaceAll)
  if err != nil {
    klog.Warningf("create secret manager failed with error: %s", err)
    return nil, err
  }

  nodesManager, err := manager.NewNodesManager(cli, v1.NamespaceAll)
  if err != nil {
    klog.Warningf("Create nodes manager failed with error: %s", err)
    return nil, err
  }

  serviceManager, err := manager.NewServiceManager(cli, v1.NamespaceAll)
  if err != nil {
    klog.Warningf("Create service manager failed with error: %s", err)
    return nil, err
  }

  endpointsManager, err := manager.NewEndpointsManager(cli, v1.NamespaceAll)
  if err != nil {
    klog.Warningf("Create endpoints manager failed with error: %s", err)
    return nil, err
  }

  dc := &DownstreamController{
    kubeClient:       cli,
    podManager:       podManager,
    configmapManager: configMapManager,
    secretManager:    secretManager,
    nodeManager:      nodesManager,
    serviceManager:   serviceManager,
    endpointsManager: endpointsManager,
    messageLayer:     messagelayer.NewContextMessageLayer(),
    lc:               lc,
  }
  if err := dc.initLocating(); err != nil {
    return nil, err
  }

  return dc, nil
}

Start 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Start DownstreamController
func (dc *DownstreamController) Start() error {
  klog.Info("start downstream controller")
  // pod
  go dc.syncPod()

  // configmap
  go dc.syncConfigMap()

  // secret
  go dc.syncSecret()

  // nodes
  go dc.syncEdgeNodes()

  // service
  go dc.syncService()

  // endpoints
  go dc.syncEndpoints()

  return nil
}

dc.syncPod()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (dc *DownstreamController) syncPod() {
  for {
    select {
    case <-beehiveContext.Done():
      klog.Warning("Stop edgecontroller downstream syncPod loop")
      return
    case e := <-dc.podManager.Events():
      pod, ok := e.Object.(*v1.Pod)
      if !ok {
        klog.Warningf("object type: %T unsupported", pod)
        continue
      }
      if !dc.lc.IsEdgeNode(pod.Spec.NodeName) {
        continue
      }
      msg := model.NewMessage("")
      msg.SetResourceVersion(pod.ResourceVersion)
      resource, err := messagelayer.BuildResource(pod.Spec.NodeName, pod.Namespace, model.ResourceTypePod, pod.Name)
      if err != nil {
        klog.Warningf("built message resource failed with error: %s", err)
        continue
      }
      msg.Content = pod
      switch e.Type {
      case watch.Added:
        msg.BuildRouter(constants.EdgeControllerModuleName, constants.GroupResource, resource, model.InsertOperation)
        dc.lc.AddOrUpdatePod(*pod)
      case watch.Deleted:
        msg.BuildRouter(constants.EdgeControllerModuleName, constants.GroupResource, resource, model.DeleteOperation)
      case watch.Modified:
        msg.BuildRouter(constants.EdgeControllerModuleName, constants.GroupResource, resource, model.UpdateOperation)
        dc.lc.AddOrUpdatePod(*pod)
      default:
        klog.Warningf("pod event type: %s unsupported", e.Type)
      }
      if err := dc.messageLayer.Send(*msg); err != nil {
        klog.Warningf("send message failed with error: %s, operation: %s, resource: %s", err, msg.GetOperation(), msg.GetResource())
      } else {
        klog.V(4).Infof("send message successfully, operation: %s, resource: %s", msg.GetOperation(), msg.GetResource())
      }
    }
  }
}

获取 podManager 中收到的 events

podmanager:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Events return a channel, can receive all pod event
func (pm *PodManager) Events() chan watch.Event {
  return pm.mergedEvents
}

// NewPodManager create PodManager from config
func NewPodManager(kubeClient *kubernetes.Clientset, namespace, nodeName string) (*PodManager, error) {
  var lw *cache.ListWatch
  if "" == nodeName {
    lw = cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespace, fields.Everything())
  } else {
    selector := fields.OneTermEqualSelector("spec.nodeName", nodeName)
    lw = cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespace, selector)
  }
  realEvents := make(chan watch.Event, config.Config.Buffer.PodEvent)
  mergedEvents := make(chan watch.Event, config.Config.Buffer.PodEvent)
  rh := NewCommonResourceEventHandler(realEvents)
  si := cache.NewSharedInformer(lw, &v1.Pod{}, 0)
  si.AddEventHandler(rh)

  pm := &PodManager{realEvents: realEvents, mergedEvents: mergedEvents}

  stopNever := make(chan struct{})
  go si.Run(stopNever)
  go pm.merge()

  return pm, nil
}

上一篇分析说过 cache.NewListWatchFromClientvendor/k8s.io/client-go/tools/cache/listwatch.go中, 是对 apiserverlistwatch 请求方法的包装. 此处的 realEvents, mergedEvents 分别对应着从 apiserver 收到的真实时间和合并后的事件, 合并函数为:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (pm *PodManager) merge() {
  for re := range pm.realEvents {
    pod := re.Object.(*v1.Pod)
    switch re.Type {
    case watch.Added:
      pm.pods.Store(pod.UID, &CachePod{ObjectMeta: pod.ObjectMeta, Spec: pod.Spec})
      if pod.DeletionTimestamp == nil {
        pm.mergedEvents <- re
      } else {
        re.Type = watch.Modified
        pm.mergedEvents <- re
      }
    case watch.Deleted:
      pm.pods.Delete(pod.UID)
      pm.mergedEvents <- re
    case watch.Modified:
      value, ok := pm.pods.Load(pod.UID)
      pm.pods.Store(pod.UID, &CachePod{ObjectMeta: pod.ObjectMeta, Spec: pod.Spec})
      if ok {
        cachedPod := value.(*CachePod)
        if pm.isPodUpdated(cachedPod, pod) {
          pm.mergedEvents <- re
        }
      } else {
        pm.mergedEvents <- re
      }
    default:
      klog.Warningf("event type: %s unsupported", re.Type)
    }
  }
}

此处细节暂不深究, 大概能猜出是用来减少重复请求的. 回到 dc.syncPod() ,函数根据e.Type分发不同的路由, 最后执行dc.messageLayer.Send(*msg) 把数据发到边缘.