前言
代码基于 kubeedge1.3
模块入口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| // Start controller
func (dc *DeviceController) Start() {
downstream, err := controller.NewDownstreamController()
if err != nil {
klog.Errorf("New downstream controller failed with error: %s", err)
os.Exit(1)
}
upstream, err := controller.NewUpstreamController(downstream)
if err != nil {
klog.Errorf("new upstream controller failed with error: %s", err)
os.Exit(1)
}
downstream.Start()
// wait for downstream controller to start and load deviceModels and devices
// TODO think about sync
time.Sleep(1 * time.Second)
upstream.Start()
}
|
函数使用NewDownstreamController
和NewUpstreamController
分别建立了downstream
和upstream
, 同时upstream
依赖于downstream
downstream
备注: downstream
一般描述云端向边缘端下发数据
初始化 downstream
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| // NewDownstreamController create a DownstreamController from config
func NewDownstreamController() (*DownstreamController, error) {
cli, err := utils.KubeClient()
if err != nil {
klog.Warningf("Create kube client failed with error: %s", err)
return nil, err
}
config, err := utils.KubeConfig()
if err != nil {
klog.Warningf("Get kubeConfig error: %v", err)
return nil, err
}
crdcli, err := utils.NewCRDClient(config)
if err != nil {
klog.Warningf("Failed to create crd client: %s", err)
return nil, err
}
deviceManager, err := manager.NewDeviceManager(crdcli, v1.NamespaceAll)
if err != nil {
klog.Warningf("Create device manager failed with error: %s", err)
return nil, err
}
deviceModelManager, err := manager.NewDeviceModelManager(crdcli, v1.NamespaceAll)
if err != nil {
klog.Warningf("Create device manager failed with error: %s", err)
return nil, err
}
dc := &DownstreamController{
kubeClient: cli,
deviceManager: deviceManager,
deviceModelManager: deviceModelManager,
messageLayer: messagelayer.NewContextMessageLayer(),
configMapManager: manager.NewConfigMapManager(),
}
return dc, nil
}
|
NewDownstreamController
创建了 kubeClient
, deviceManager
, deviceModelManager
, messageLayer
, configMapManager
赋值给了dc
并返回,
downstream.Start()
1
| downstream, err := controller.NewDownstreamController()
|
downstream
的Start()
方法为
1
2
3
4
5
6
7
8
9
10
11
12
13
| // Start DownstreamController
func (dc *DownstreamController) Start() error {
klog.Info("Start downstream devicecontroller")
go dc.syncDeviceModel()
// Wait for adding all device model
// TODO need to think about sync
time.Sleep(1 * time.Second)
go dc.syncDevice()
return nil
}
|
其中执行了dc.syncDeviceModel()
, dc.syncDevice()
两个函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| // syncDeviceModel is used to get events from informer
func (dc *DownstreamController) syncDeviceModel() {
for {
select {
case <-beehiveContext.Done():
klog.Info("stop syncDeviceModel")
return
case e := <-dc.deviceModelManager.Events():
deviceModel, ok := e.Object.(*v1alpha1.DeviceModel)
if !ok {
klog.Warningf("object type: %T unsupported", deviceModel)
continue
}
switch e.Type {
case watch.Added:
dc.deviceModelAdded(deviceModel)
case watch.Deleted:
dc.deviceModelDeleted(deviceModel)
case watch.Modified:
dc.deviceModelUpdated(deviceModel)
default:
klog.Warningf("deviceModel event type: %s unsupported", e.Type)
}
}
}
}
|
其中执行了 dc.deviceModelManager.Events()
即从 deviceModelManager
中获取Event
, 并对事件进行分析, 那么事件Event
是哪来的呢
来看deviceModelManager
的来源:
1
2
| // 执行 NewDownstreamController() 时创建了 deviceModelManager
deviceModelManager, err := manager.NewDeviceModelManager(crdcli, v1.NamespaceAll)
|
manager/devicemodel.go
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| // DeviceModelManager is a manager watch DeviceModel change event
type DeviceModelManager struct {
// events from watch kubernetes api server
events chan watch.Event
// DeviceModel, key is DeviceModel.Name, value is *v1alpha1.DeviceModel{}
DeviceModel sync.Map
}
// Events return a channel, can receive all DeviceModel event
func (dmm *DeviceModelManager) Events() chan watch.Event {
return dmm.events
}
// NewDeviceModelManager create DeviceModelManager from config
func NewDeviceModelManager(crdClient *rest.RESTClient, namespace string) (*DeviceModelManager, error) {
lw := cache.NewListWatchFromClient(crdClient, "devicemodels", namespace, fields.Everything())
events := make(chan watch.Event, config.Config.Buffer.DeviceModelEvent)
rh := NewCommonResourceEventHandler(events)
si := cache.NewSharedInformer(lw, &v1alpha1.DeviceModel{}, 0)
si.AddEventHandler(rh)
pm := &DeviceModelManager{events: events}
stopNever := make(chan struct{})
go si.Run(stopNever)
return pm, nil
}
|
此处可以看出dc.deviceModelManager.Events()
是 get deviceModelManager
的 events
, events
类型为chan watch.Event
, 可以理解为deviceModel
相关的事件到来后会传到通道events
中.
cache.NewListWatchFromClient
在vendor/k8s.io/client-go/tools/cache/listwatch.go
中, 是对 apiserver
的 listwatch
请求方法的包装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| // NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
optionsModifier := func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector.String()
}
return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}
// NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier.
// Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function
// to apply modification to ListOptions with a field selector, a label selector, or any other desired options.
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Do().
Get()
}
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Watch()
}
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}
|
回到dc.syncDeviceModel()
执行完e := <-dc.deviceModelManager.Events()
之后, syncDeviceModel
根据e.Type
的类型执行不同的操作,
1
2
3
4
5
6
7
8
9
10
| switch e.Type {
case watch.Added:
dc.deviceModelAdded(deviceModel)
case watch.Deleted:
dc.deviceModelDeleted(deviceModel)
case watch.Modified:
dc.deviceModelUpdated(deviceModel)
default:
klog.Warningf("deviceModel event type: %s unsupported", e.Type)
}
|
以dc.deviceModelAdded(deviceModel)
为例
1
2
3
4
5
| // deviceModelAdded is function to process addition of new deviceModel in apiserver
func (dc *DownstreamController) deviceModelAdded(deviceModel *v1alpha1.DeviceModel) {
// nothing to do when deviceModel added, only add in map
dc.deviceModelManager.DeviceModel.Store(deviceModel.Name, deviceModel)
}
|
dc.deviceModelAdded(deviceModel)
只是简单地将deviceModel
存到 map DeviceModel
中
dc.syncDeviceModel()
执行完了, 开始执行dc.syncDevice()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| // syncDevice is used to get device events from informer
func (dc *DownstreamController) syncDevice() {
for {
select {
case <-beehiveContext.Done():
klog.Info("Stop syncDevice")
return
case e := <-dc.deviceManager.Events():
device, ok := e.Object.(*v1alpha1.Device)
if !ok {
klog.Warningf("Object type: %T unsupported", device)
continue
}
switch e.Type {
case watch.Added:
dc.deviceAdded(device)
case watch.Deleted:
dc.deviceDeleted(device)
case watch.Modified:
dc.deviceUpdated(device)
default:
klog.Warningf("Device event type: %s unsupported", e.Type)
}
}
}
}
|
dc.syncDevice()
的结构与dc.syncDeviceModel()
几乎一样, 都是先通过Events()
获取events
,然后根据events
的类型执行相应的处理. deviceManager
与deviceModelManager
也几乎一样, 这里就不细看了.
以dc.deviceAdded(device)
为例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| // deviceAdded creates a device, adds in deviceManagers map, send a message to edge node if node selector is present.
func (dc *DownstreamController) deviceAdded(device *v1alpha1.Device) {
dc.deviceManager.Device.Store(device.Name, device)
if len(device.Spec.NodeSelector.NodeSelectorTerms) != 0 && len(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions) != 0 && len(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions[0].Values) != 0 {
dc.addToConfigMap(device)
edgeDevice := createDevice(device)
msg := model.NewMessage("")
resource, err := messagelayer.BuildResource(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions[0].Values[0], "membership", "")
if err != nil {
klog.Warningf("Built message resource failed with error: %s", err)
return
}
msg.BuildRouter(constants.DeviceControllerModuleName, constants.GroupTwin, resource, model.UpdateOperation)
content := types.MembershipUpdate{AddDevices: []types.Device{
edgeDevice,
}}
content.EventID = uuid.NewV4().String()
content.Timestamp = time.Now().UnixNano() / 1e6
msg.Content = content
err = dc.messageLayer.Send(*msg)
if err != nil {
klog.Errorf("Failed to send device addition message %v due to error %v", msg, err)
}
}
}
|
可以看到deviceAdded
首先把device
存到dc.deviceManager.Device
中, 然后执行dc.addToConfigMap(device)
和 createDevice(device)
, 接着执行messagelayer.BuildResource
, msg.BuildRouter
等函数来构建msg
, 最后通过dc.messageLayer.Send(*msg)
将 device 数据发了出去.
Send
函数为beehiveContext.Send()
的简单包装, 关于beehive
也有很多细节, 这里暂时先不研究.
1
2
3
4
5
| // Send message
func (cml *ContextMessageLayer) Send(message model.Message) error {
beehiveContext.Send(cml.SendModuleName, message)
return nil
}
|
upstream
备注: upstream
一般描述边缘端向云端上传数据
初始化 upstream
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| // NewUpstreamController create UpstreamController from config
func NewUpstreamController(dc *DownstreamController) (*UpstreamController, error) {
config, err := utils.KubeConfig()
if err != nil {
klog.Warningf("Failed to create kube client: %s", err)
return nil, err
}
crdcli, err := utils.NewCRDClient(config)
if err != nil {
klog.Warningf("Failed to create crd client: %s", err)
return nil, err
}
uc := &UpstreamController{
crdClient: crdcli,
messageLayer: messagelayer.NewContextMessageLayer(),
dc: dc,
}
return uc, nil
}
|
NewUpstreamController
通过utils.KubeConfig()
得到config
, 通过utils.NewCRDClient(config)
创建了crdcli
, 并将其赋值给UpstreamController
的成员, 除此之外, UpstreamController 中包含了一个downstream
dc.
和downstream
一样, devicecontroller
初始化时执行了upstream
的初始化
1
2
| upstream, err := controller.NewUpstreamController(downstream)
upstream.Start()
|
upstream.Start()
:
1
2
3
4
5
6
7
8
9
10
11
12
| // Start UpstreamController
func (uc *UpstreamController) Start() error {
klog.Info("Start upstream devicecontroller")
uc.deviceStatusChan = make(chan model.Message, config.Config.Buffer.UpdateDeviceStatus)
go uc.dispatchMessage()
for i := 0; i < int(config.Config.Buffer.UpdateDeviceStatus); i++ {
go uc.updateDeviceStatus()
}
return nil
}
|
UpstreamController
的定义:
1
2
3
4
5
6
7
8
9
10
| // UpstreamController subscribe messages from edge and sync to k8s api server
type UpstreamController struct {
crdClient *rest.RESTClient
messageLayer messagelayer.MessageLayer
// message channel
deviceStatusChan chan model.Message
// downstream controller to update device status in cache
dc *DownstreamController
}
|
Start()
函数主要执行了uc.dispatchMessage()
和 uc.updateDeviceStatus()
其中 dispatchMessage()
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| func (uc *UpstreamController) dispatchMessage() {
for {
select {
case <-beehiveContext.Done():
klog.Info("Stop dispatchMessage")
return
default:
}
msg, err := uc.messageLayer.Receive()
if err != nil {
klog.Warningf("Receive message failed, %s", err)
continue
}
klog.Infof("Dispatch message: %s", msg.GetID())
resourceType, err := messagelayer.GetResourceType(msg.GetResource())
if err != nil {
klog.Warningf("Parse message: %s resource type with error: %s", msg.GetID(), err)
continue
}
klog.Infof("Message: %s, resource type is: %s", msg.GetID(), resourceType)
switch resourceType {
case constants.ResourceTypeTwinEdgeUpdated:
uc.deviceStatusChan <- msg
default:
klog.Warningf("Message: %s, with resource type: %s not intended for device controller", msg.GetID(), resourceType)
}
}
}
|
核心代码为: uc.messageLayer.Receive()
收取数据, 之后将数据放入uc.deviceStatusChan
uc.updateDeviceStatus()
是循环执行的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
| func (uc *UpstreamController) updateDeviceStatus() {
for {
select {
case <-beehiveContext.Done():
klog.Info("Stop updateDeviceStatus")
return
case msg := <-uc.deviceStatusChan:
klog.Infof("Message: %s, operation is: %s, and resource is: %s", msg.GetID(), msg.GetOperation(), msg.GetResource())
msgTwin, err := uc.unmarshalDeviceStatusMessage(msg)
if err != nil {
klog.Warningf("Unmarshall failed due to error %v", err)
continue
}
deviceID, err := messagelayer.GetDeviceID(msg.GetResource())
if err != nil {
klog.Warning("Failed to get device id")
continue
}
device, ok := uc.dc.deviceManager.Device.Load(deviceID)
if !ok {
klog.Warningf("Device %s does not exist in downstream controller", deviceID)
continue
}
cacheDevice, ok := device.(*v1alpha1.Device)
if !ok {
klog.Warning("Failed to assert to CacheDevice type")
continue
}
deviceStatus := &DeviceStatus{Status: cacheDevice.Status}
for twinName, twin := range msgTwin.Twin {
for i, cacheTwin := range deviceStatus.Status.Twins {
if twinName == cacheTwin.PropertyName && twin.Actual != nil && twin.Actual.Value != nil {
reported := v1alpha1.TwinProperty{}
reported.Value = *twin.Actual.Value
reported.Metadata = make(map[string]string)
if twin.Actual.Metadata != nil {
reported.Metadata["timestamp"] = strconv.FormatInt(twin.Actual.Metadata.Timestamp, 10)
}
if twin.Metadata != nil {
reported.Metadata["type"] = twin.Metadata.Type
}
deviceStatus.Status.Twins[i].Reported = reported
break
}
}
}
// Store the status in cache so that when update is received by informer, it is not processed by downstream controller
cacheDevice.Status = deviceStatus.Status
uc.dc.deviceManager.Device.Store(deviceID, cacheDevice)
body, err := json.Marshal(deviceStatus)
if err != nil {
klog.Errorf("Failed to marshal device status %v", deviceStatus)
continue
}
result := uc.crdClient.Patch(MergePatchType).Namespace(cacheDevice.Namespace).Resource(ResourceTypeDevices).Name(deviceID).Body(body).Do()
if result.Error() != nil {
klog.Errorf("Failed to patch device status %v of device %v in namespace %v", deviceStatus, deviceID, cacheDevice.Namespace)
continue
}
klog.Infof("Message: %s process successfully", msg.GetID())
}
}
}
|
updateDeviceStatus
函数比较长, 下面是函数主要过程的分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
--> msg := <-uc.deviceStatusChan:
// 将chan deviceStatusChan 的数据传到 msg
--> msgTwin, err := uc.unmarshalDeviceStatusMessage(msg)
// 将msg反序列化得到msgTwin
--> deviceID, err := messagelayer.GetDeviceID(msg.GetResource())
device, ok := uc.dc.deviceManager.Device.Load(deviceID)
cacheDevice, ok := device.(*v1alpha1.Device)
deviceStatus := &DeviceStatus{Status: cacheDevice.Status}
// 获取 deviceID, device, cacheDevice, deviceStatus,
--> for twinName, twin := range msgTwin.Twin {
for i, cacheTwin := range deviceStatus.Status.Twins {
if twinName == cacheTwin.PropertyName && twin.Actual != nil && twin.Actual.Value != nil {
reported := v1alpha1.TwinProperty{}
reported.Value = *twin.Actual.Value
reported.Metadata = make(map[string]string)
if twin.Actual.Metadata != nil {
reported.Metadata["timestamp"] = strconv.FormatInt(twin.Actual.Metadata.Timestamp, 10)
}
if twin.Metadata != nil {
reported.Metadata["type"] = twin.Metadata.Type
}
deviceStatus.Status.Twins[i].Reported = reported
break
}
}
}
// 暂时没看懂
--> // Store the status in cache so that when update is received by informer, it is not processed by downstream controller
cacheDevice.Status = deviceStatus.Status
uc.dc.deviceManager.Device.Store(deviceID, cacheDevice)
--> body, err := json.Marshal(deviceStatus)
result := uc.crdClient.Patch(MergePatchType).Namespace(cacheDevice.Namespace).Resource(ResourceTypeDevices).Name(deviceID).Body(body).Do()
// 打包deviceStatus上传到apiserver
|
总结
至此 devicecontroller
模块的代码分析完毕, 代码结构如下
DeviceController.Start()
downstream.Start()
syncDeviceModel()
- deviceModelAdded(deviceModel)
- deviceModelDeleted(deviceModel)
- deviceModelUpdated(deviceModel)
syncDevice()
- deviceAdded(device)
- deviceDeleted(device)
- deviceUpdated(device)
upstream.Start()
- dispatchMessage()
- updateDeviceStatus()