目录

Kubeedge源码阅读系列--cloudcore.devicecontroller模块

前言

代码基于 kubeedge1.3


模块入口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// Start controller
func (dc *DeviceController) Start() {
  downstream, err := controller.NewDownstreamController()
  if err != nil {
    klog.Errorf("New downstream controller failed with error: %s", err)
    os.Exit(1)
  }
  upstream, err := controller.NewUpstreamController(downstream)
  if err != nil {
    klog.Errorf("new upstream controller failed with error: %s", err)
    os.Exit(1)
  }

  downstream.Start()
  // wait for downstream controller to start and load deviceModels and devices
  // TODO think about sync
  time.Sleep(1 * time.Second)
  upstream.Start()
}

函数使用NewDownstreamControllerNewUpstreamController分别建立了downstreamupstream, 同时upstream依赖于downstream

downstream

备注: downstream 一般描述云端向边缘端下发数据

初始化 downstream

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// NewDownstreamController create a DownstreamController from config
func NewDownstreamController() (*DownstreamController, error) {
  cli, err := utils.KubeClient()
  if err != nil {
    klog.Warningf("Create kube client failed with error: %s", err)
    return nil, err
  }

  config, err := utils.KubeConfig()
  if err != nil {
    klog.Warningf("Get kubeConfig error: %v", err)
    return nil, err
  }

  crdcli, err := utils.NewCRDClient(config)
  if err != nil {
    klog.Warningf("Failed to create crd client: %s", err)
    return nil, err
  }
  deviceManager, err := manager.NewDeviceManager(crdcli, v1.NamespaceAll)
  if err != nil {
    klog.Warningf("Create device manager failed with error: %s", err)
    return nil, err
  }

  deviceModelManager, err := manager.NewDeviceModelManager(crdcli, v1.NamespaceAll)
  if err != nil {
    klog.Warningf("Create device manager failed with error: %s", err)
    return nil, err
  }

  dc := &DownstreamController{
    kubeClient:         cli,
    deviceManager:      deviceManager,
    deviceModelManager: deviceModelManager,
    messageLayer:       messagelayer.NewContextMessageLayer(),
    configMapManager:   manager.NewConfigMapManager(),
  }
  return dc, nil
}

NewDownstreamController 创建了 kubeClient, deviceManager, deviceModelManager, messageLayer, configMapManager 赋值给了dc并返回,

downstream.Start()

1
downstream, err := controller.NewDownstreamController()

downstreamStart()方法为

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// Start DownstreamController
func (dc *DownstreamController) Start() error {
  klog.Info("Start downstream devicecontroller")

  go dc.syncDeviceModel()

  // Wait for adding all device model
  // TODO need to think about sync
  time.Sleep(1 * time.Second)
  go dc.syncDevice()

  return nil
}

其中执行了dc.syncDeviceModel(), dc.syncDevice()两个函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// syncDeviceModel is used to get events from informer
func (dc *DownstreamController) syncDeviceModel() {
  for {
    select {
    case <-beehiveContext.Done():
      klog.Info("stop syncDeviceModel")
      return
    case e := <-dc.deviceModelManager.Events():
      deviceModel, ok := e.Object.(*v1alpha1.DeviceModel)
      if !ok {
        klog.Warningf("object type: %T unsupported", deviceModel)
        continue
      }
      switch e.Type {
      case watch.Added:
        dc.deviceModelAdded(deviceModel)
      case watch.Deleted:
        dc.deviceModelDeleted(deviceModel)
      case watch.Modified:
        dc.deviceModelUpdated(deviceModel)
      default:
        klog.Warningf("deviceModel event type: %s unsupported", e.Type)
      }
    }
  }
}

其中执行了 dc.deviceModelManager.Events() 即从 deviceModelManager 中获取Event, 并对事件进行分析, 那么事件Event是哪来的呢

来看deviceModelManager的来源:

1
2
// 执行 NewDownstreamController() 时创建了 deviceModelManager
deviceModelManager, err := manager.NewDeviceModelManager(crdcli, v1.NamespaceAll)

manager/devicemodel.go:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// DeviceModelManager is a manager watch DeviceModel change event
type DeviceModelManager struct {
  // events from watch kubernetes api server
  events chan watch.Event

  // DeviceModel, key is DeviceModel.Name, value is *v1alpha1.DeviceModel{}
  DeviceModel sync.Map
}

// Events return a channel, can receive all DeviceModel event
func (dmm *DeviceModelManager) Events() chan watch.Event {
  return dmm.events
}

// NewDeviceModelManager create DeviceModelManager from config
func NewDeviceModelManager(crdClient *rest.RESTClient, namespace string) (*DeviceModelManager, error) {
  lw := cache.NewListWatchFromClient(crdClient, "devicemodels", namespace, fields.Everything())
  events := make(chan watch.Event, config.Config.Buffer.DeviceModelEvent)
  rh := NewCommonResourceEventHandler(events)
  si := cache.NewSharedInformer(lw, &v1alpha1.DeviceModel{}, 0)
  si.AddEventHandler(rh)

  pm := &DeviceModelManager{events: events}

  stopNever := make(chan struct{})
  go si.Run(stopNever)

  return pm, nil
}

此处可以看出dc.deviceModelManager.Events() 是 get deviceModelManagerevents, events类型为chan watch.Event, 可以理解为deviceModel相关的事件到来后会传到通道events中.

cache.NewListWatchFromClientvendor/k8s.io/client-go/tools/cache/listwatch.go中, 是对 apiserverlistwatch 请求方法的包装

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
  optionsModifier := func(options *metav1.ListOptions) {
    options.FieldSelector = fieldSelector.String()
  }
  return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}

// NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier.
// Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function
// to apply modification to ListOptions with a field selector, a label selector, or any other desired options.
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
  listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
    optionsModifier(&options)
    return c.Get().
      Namespace(namespace).
      Resource(resource).
      VersionedParams(&options, metav1.ParameterCodec).
      Do().
      Get()
  }
  watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
    options.Watch = true
    optionsModifier(&options)
    return c.Get().
      Namespace(namespace).
      Resource(resource).
      VersionedParams(&options, metav1.ParameterCodec).
      Watch()
  }
  return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}

回到dc.syncDeviceModel()

执行完e := <-dc.deviceModelManager.Events() 之后, syncDeviceModel根据e.Type的类型执行不同的操作,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
switch e.Type {
case watch.Added:
  dc.deviceModelAdded(deviceModel)
case watch.Deleted:
  dc.deviceModelDeleted(deviceModel)
case watch.Modified:
  dc.deviceModelUpdated(deviceModel)
default:
  klog.Warningf("deviceModel event type: %s unsupported", e.Type)
}

dc.deviceModelAdded(deviceModel)为例

1
2
3
4
5
// deviceModelAdded is function to process addition of new deviceModel in apiserver
func (dc *DownstreamController) deviceModelAdded(deviceModel *v1alpha1.DeviceModel) {
  // nothing to do when deviceModel added, only add in map
  dc.deviceModelManager.DeviceModel.Store(deviceModel.Name, deviceModel)
}

dc.deviceModelAdded(deviceModel)只是简单地将deviceModel存到 map DeviceModel

dc.syncDeviceModel()执行完了, 开始执行dc.syncDevice()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// syncDevice is used to get device events from informer
func (dc *DownstreamController) syncDevice() {
  for {
    select {
    case <-beehiveContext.Done():
      klog.Info("Stop syncDevice")
      return
    case e := <-dc.deviceManager.Events():
      device, ok := e.Object.(*v1alpha1.Device)
      if !ok {
        klog.Warningf("Object type: %T unsupported", device)
        continue
      }
      switch e.Type {
      case watch.Added:
        dc.deviceAdded(device)
      case watch.Deleted:
        dc.deviceDeleted(device)
      case watch.Modified:
        dc.deviceUpdated(device)
      default:
        klog.Warningf("Device event type: %s unsupported", e.Type)
      }
    }
  }
}

dc.syncDevice()的结构与dc.syncDeviceModel()几乎一样, 都是先通过Events()获取events,然后根据events的类型执行相应的处理. deviceManagerdeviceModelManager也几乎一样, 这里就不细看了.

dc.deviceAdded(device)为例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// deviceAdded creates a device, adds in deviceManagers map, send a message to edge node if node selector is present.
func (dc *DownstreamController) deviceAdded(device *v1alpha1.Device) {
  dc.deviceManager.Device.Store(device.Name, device)
  if len(device.Spec.NodeSelector.NodeSelectorTerms) != 0 && len(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions) != 0 && len(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions[0].Values) != 0 {
    dc.addToConfigMap(device)
    edgeDevice := createDevice(device)
    msg := model.NewMessage("")

    resource, err := messagelayer.BuildResource(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions[0].Values[0], "membership", "")
    if err != nil {
      klog.Warningf("Built message resource failed with error: %s", err)
      return
    }
    msg.BuildRouter(constants.DeviceControllerModuleName, constants.GroupTwin, resource, model.UpdateOperation)

    content := types.MembershipUpdate{AddDevices: []types.Device{
      edgeDevice,
    }}
    content.EventID = uuid.NewV4().String()
    content.Timestamp = time.Now().UnixNano() / 1e6
    msg.Content = content

    err = dc.messageLayer.Send(*msg)
    if err != nil {
      klog.Errorf("Failed to send device addition message %v due to error %v", msg, err)
    }
  }
}

可以看到deviceAdded首先把device存到dc.deviceManager.Device中, 然后执行dc.addToConfigMap(device)createDevice(device), 接着执行messagelayer.BuildResource, msg.BuildRouter等函数来构建msg, 最后通过dc.messageLayer.Send(*msg)将 device 数据发了出去.

Send函数为beehiveContext.Send()的简单包装, 关于beehive也有很多细节, 这里暂时先不研究.

1
2
3
4
5
// Send message
func (cml *ContextMessageLayer) Send(message model.Message) error {
  beehiveContext.Send(cml.SendModuleName, message)
  return nil
}

upstream

备注: upstream 一般描述边缘端向云端上传数据

初始化 upstream

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// NewUpstreamController create UpstreamController from config
func NewUpstreamController(dc *DownstreamController) (*UpstreamController, error) {
  config, err := utils.KubeConfig()
  if err != nil {
    klog.Warningf("Failed to create kube client: %s", err)
    return nil, err
  }

  crdcli, err := utils.NewCRDClient(config)
  if err != nil {
    klog.Warningf("Failed to create crd client: %s", err)
    return nil, err
  }

  uc := &UpstreamController{
    crdClient:    crdcli,
    messageLayer: messagelayer.NewContextMessageLayer(),
    dc:           dc,
  }
  return uc, nil
}

NewUpstreamController通过utils.KubeConfig()得到config, 通过utils.NewCRDClient(config)创建了crdcli, 并将其赋值给UpstreamController的成员, 除此之外, UpstreamController 中包含了一个downstream dc.

downstream一样, devicecontroller初始化时执行了upstream的初始化

1
2
upstream, err := controller.NewUpstreamController(downstream)
upstream.Start()

upstream.Start():

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// Start UpstreamController
func (uc *UpstreamController) Start() error {
  klog.Info("Start upstream devicecontroller")

  uc.deviceStatusChan = make(chan model.Message, config.Config.Buffer.UpdateDeviceStatus)
  go uc.dispatchMessage()

  for i := 0; i < int(config.Config.Buffer.UpdateDeviceStatus); i++ {
    go uc.updateDeviceStatus()
  }
  return nil
}

UpstreamController 的定义:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// UpstreamController subscribe messages from edge and sync to k8s api server
type UpstreamController struct {
  crdClient    *rest.RESTClient
  messageLayer messagelayer.MessageLayer
  // message channel
  deviceStatusChan chan model.Message

  // downstream controller to update device status in cache
  dc *DownstreamController
}

Start()函数主要执行了uc.dispatchMessage()uc.updateDeviceStatus()

其中 dispatchMessage():

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (uc *UpstreamController) dispatchMessage() {
  for {
    select {
    case <-beehiveContext.Done():
      klog.Info("Stop dispatchMessage")
      return
    default:
    }
    msg, err := uc.messageLayer.Receive()
    if err != nil {
      klog.Warningf("Receive message failed, %s", err)
      continue
    }

    klog.Infof("Dispatch message: %s", msg.GetID())

    resourceType, err := messagelayer.GetResourceType(msg.GetResource())
    if err != nil {
      klog.Warningf("Parse message: %s resource type with error: %s", msg.GetID(), err)
      continue
    }
    klog.Infof("Message: %s, resource type is: %s", msg.GetID(), resourceType)

    switch resourceType {
    case constants.ResourceTypeTwinEdgeUpdated:
      uc.deviceStatusChan <- msg
    default:
      klog.Warningf("Message: %s, with resource type: %s not intended for device controller", msg.GetID(), resourceType)
    }
  }
}

核心代码为: uc.messageLayer.Receive() 收取数据, 之后将数据放入uc.deviceStatusChan

uc.updateDeviceStatus()是循环执行的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
func (uc *UpstreamController) updateDeviceStatus() {
  for {
    select {
    case <-beehiveContext.Done():
      klog.Info("Stop updateDeviceStatus")
      return
    case msg := <-uc.deviceStatusChan:
      klog.Infof("Message: %s, operation is: %s, and resource is: %s", msg.GetID(), msg.GetOperation(), msg.GetResource())
      msgTwin, err := uc.unmarshalDeviceStatusMessage(msg)
      if err != nil {
        klog.Warningf("Unmarshall failed due to error %v", err)
        continue
      }
      deviceID, err := messagelayer.GetDeviceID(msg.GetResource())
      if err != nil {
        klog.Warning("Failed to get device id")
        continue
      }
      device, ok := uc.dc.deviceManager.Device.Load(deviceID)
      if !ok {
        klog.Warningf("Device %s does not exist in downstream controller", deviceID)
        continue
      }
      cacheDevice, ok := device.(*v1alpha1.Device)
      if !ok {
        klog.Warning("Failed to assert to CacheDevice type")
        continue
      }
      deviceStatus := &DeviceStatus{Status: cacheDevice.Status}
      for twinName, twin := range msgTwin.Twin {
        for i, cacheTwin := range deviceStatus.Status.Twins {
          if twinName == cacheTwin.PropertyName && twin.Actual != nil && twin.Actual.Value != nil {
            reported := v1alpha1.TwinProperty{}
            reported.Value = *twin.Actual.Value
            reported.Metadata = make(map[string]string)
            if twin.Actual.Metadata != nil {
              reported.Metadata["timestamp"] = strconv.FormatInt(twin.Actual.Metadata.Timestamp, 10)
            }
            if twin.Metadata != nil {
              reported.Metadata["type"] = twin.Metadata.Type
            }
            deviceStatus.Status.Twins[i].Reported = reported
            break
          }
        }
      }

      // Store the status in cache so that when update is received by informer, it is not processed by downstream controller
      cacheDevice.Status = deviceStatus.Status
      uc.dc.deviceManager.Device.Store(deviceID, cacheDevice)

      body, err := json.Marshal(deviceStatus)
      if err != nil {
        klog.Errorf("Failed to marshal device status %v", deviceStatus)
        continue
      }
      result := uc.crdClient.Patch(MergePatchType).Namespace(cacheDevice.Namespace).Resource(ResourceTypeDevices).Name(deviceID).Body(body).Do()
      if result.Error() != nil {
        klog.Errorf("Failed to patch device status %v of device %v in namespace %v", deviceStatus, deviceID, cacheDevice.Namespace)
        continue
      }
      klog.Infof("Message: %s process successfully", msg.GetID())
    }
  }
}

updateDeviceStatus 函数比较长, 下面是函数主要过程的分析

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

  --> msg := <-uc.deviceStatusChan:
  // 将chan deviceStatusChan 的数据传到 msg
  --> msgTwin, err := uc.unmarshalDeviceStatusMessage(msg)
  // 将msg反序列化得到msgTwin

  --> deviceID, err := messagelayer.GetDeviceID(msg.GetResource())
      device, ok := uc.dc.deviceManager.Device.Load(deviceID)
      cacheDevice, ok := device.(*v1alpha1.Device)
      deviceStatus := &DeviceStatus{Status: cacheDevice.Status}
  // 获取 deviceID, device, cacheDevice, deviceStatus,

  --> for twinName, twin := range msgTwin.Twin {
        for i, cacheTwin := range deviceStatus.Status.Twins {
          if twinName == cacheTwin.PropertyName && twin.Actual != nil && twin.Actual.Value != nil {
            reported := v1alpha1.TwinProperty{}
            reported.Value = *twin.Actual.Value
            reported.Metadata = make(map[string]string)
            if twin.Actual.Metadata != nil {
              reported.Metadata["timestamp"] = strconv.FormatInt(twin.Actual.Metadata.Timestamp, 10)
            }
            if twin.Metadata != nil {
              reported.Metadata["type"] = twin.Metadata.Type
            }
            deviceStatus.Status.Twins[i].Reported = reported
            break
          }
        }
      }
  // 暂时没看懂

  --> // Store the status in cache so that when update is received by informer, it is not processed by downstream controller
      cacheDevice.Status = deviceStatus.Status
      uc.dc.deviceManager.Device.Store(deviceID, cacheDevice)

  --> body, err := json.Marshal(deviceStatus)
      result := uc.crdClient.Patch(MergePatchType).Namespace(cacheDevice.Namespace).Resource(ResourceTypeDevices).Name(deviceID).Body(body).Do()
  // 打包deviceStatus上传到apiserver

总结

至此 devicecontroller 模块的代码分析完毕, 代码结构如下

  • DeviceController.Start()

    • downstream.Start()

      • syncDeviceModel()

        • deviceModelAdded(deviceModel)
        • deviceModelDeleted(deviceModel)
        • deviceModelUpdated(deviceModel)
      • syncDevice()

        • deviceAdded(device)
        • deviceDeleted(device)
        • deviceUpdated(device)
    • upstream.Start()

      • dispatchMessage()
      • updateDeviceStatus()