1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
| func (uc *UpstreamController) updateNodeStatus() {
for {
select {
case <-beehiveContext.Done():
klog.Warning("stop updateNodeStatus")
return
case msg := <-uc.nodeStatusChan:
klog.Infof("message: %s, operation is: %s, and resource is %s", msg.GetID(), msg.GetOperation(), msg.GetResource())
var data []byte
switch msg.Content.(type) {
case []byte:
data = msg.GetContent().([]byte)
default:
var err error
data, err = json.Marshal(msg.GetContent())
if err != nil {
klog.Warningf("message: %s process failure, marshal message content with error: %s", msg.GetID(), err)
continue
}
}
namespace, err := messagelayer.GetNamespace(msg)
if err != nil {
klog.Warningf("message: %s process failure, get namespace failed with error: %s", msg.GetID(), err)
continue
}
name, err := messagelayer.GetResourceName(msg)
if err != nil {
klog.Warningf("message: %s process failure, get resource name failed with error: %s", msg.GetID(), err)
continue
}
switch msg.GetOperation() {
case model.InsertOperation:
_, err := uc.kubeClient.CoreV1().Nodes().Get(name, metaV1.GetOptions{})
if err == nil {
klog.Infof("node: %s already exists, do nothing", name)
uc.nodeMsgResponse(name, namespace, "OK", msg)
continue
}
if !errors.IsNotFound(err) {
klog.Errorf("get node %s info error: %v , register node failed", name, err)
uc.nodeMsgResponse(name, namespace, "", msg)
continue
}
node := &v1.Node{}
err = json.Unmarshal(data, node)
if err != nil {
klog.Errorf("message: %s process failure, unmarshal marshaled message content with error: %s", msg.GetID(), err)
uc.nodeMsgResponse(name, namespace, "", msg)
continue
}
if _, err = uc.createNode(name, node); err != nil {
klog.Errorf("create node %s error: %v , register node failed", name, err)
uc.nodeMsgResponse(name, namespace, "", msg)
continue
}
uc.nodeMsgResponse(name, namespace, "OK", msg)
case model.UpdateOperation:
nodeStatusRequest := &edgeapi.NodeStatusRequest{}
err := json.Unmarshal(data, nodeStatusRequest)
if err != nil {
klog.Warningf("message: %s process failure, unmarshal marshaled message content with error: %s", msg.GetID(), err)
continue
}
getNode, err := uc.kubeClient.CoreV1().Nodes().Get(name, metaV1.GetOptions{})
if errors.IsNotFound(err) {
klog.Warningf("message: %s process failure, node %s not found", msg.GetID(), name)
continue
}
if err != nil {
klog.Warningf("message: %s process failure with error: %s, namespaces: %s name: %s", msg.GetID(), err, namespace, name)
continue
}
// TODO: comment below for test failure. Needs to decide whether to keep post troubleshoot
// In case the status stored at metadata service is outdated, update the heartbeat automatically
if !config.Config.EdgeSiteEnable {
for i := range nodeStatusRequest.Status.Conditions {
if time.Now().Sub(nodeStatusRequest.Status.Conditions[i].LastHeartbeatTime.Time) > time.Duration(config.Config.NodeUpdateFrequency)*time.Second {
nodeStatusRequest.Status.Conditions[i].LastHeartbeatTime = metaV1.NewTime(time.Now())
}
if time.Now().Sub(nodeStatusRequest.Status.Conditions[i].LastTransitionTime.Time) > time.Duration(config.Config.NodeUpdateFrequency)*time.Second {
nodeStatusRequest.Status.Conditions[i].LastTransitionTime = metaV1.NewTime(time.Now())
}
}
}
if getNode.Annotations == nil {
klog.Warningf("node annotations is nil map, new a map for it. namespace: %s, name: %s", getNode.Namespace, getNode.Name)
getNode.Annotations = make(map[string]string)
}
for name, v := range nodeStatusRequest.ExtendResources {
if name == constants.NvidiaGPUScalarResourceName {
var gpuStatus []types.NvidiaGPUStatus
for _, er := range v {
gpuStatus = append(gpuStatus, types.NvidiaGPUStatus{ID: er.Name, Healthy: true})
}
if len(gpuStatus) > 0 {
data, _ := json.Marshal(gpuStatus)
getNode.Annotations[constants.NvidiaGPUStatusAnnotationKey] = string(data)
}
}
data, err := json.Marshal(v)
if err != nil {
klog.Warningf("message: %s process failure, extend resource list marshal with error: %s", msg.GetID(), err)
continue
}
getNode.Annotations[string(name)] = string(data)
}
// Keep the same "VolumesAttached" attribute with upstream,
// since this value is maintained by kube-controller-manager.
nodeStatusRequest.Status.VolumesAttached = getNode.Status.VolumesAttached
getNode.Status = nodeStatusRequest.Status
node, err := uc.kubeClient.CoreV1().Nodes().UpdateStatus(getNode)
if err != nil {
klog.Warningf("message: %s process failure, update node failed with error: %s, namespace: %s, name: %s", msg.GetID(), err, getNode.Namespace, getNode.Name)
continue
}
resMsg := model.NewMessage(msg.GetID())
resMsg.SetResourceVersion(node.ResourceVersion)
resMsg.Content = "OK"
nodeID, err := messagelayer.GetNodeID(msg)
if err != nil {
klog.Warningf("Message: %s process failure, get node id failed with error: %s", msg.GetID(), err)
continue
}
resource, err := messagelayer.BuildResource(nodeID, namespace, model.ResourceTypeNode, name)
if err != nil {
klog.Warningf("Message: %s process failure, build message resource failed with error: %s", msg.GetID(), err)
continue
}
resMsg.BuildRouter(constants.EdgeControllerModuleName, constants.GroupResource, resource, model.ResponseOperation)
if err = uc.messageLayer.Response(*resMsg); err != nil {
klog.Warningf("Message: %s process failure, response failed with error: %s", msg.GetID(), err)
continue
}
klog.V(4).Infof("message: %s, update node status successfully, namespace: %s, name: %s", msg.GetID(), getNode.Namespace, getNode.Name)
default:
klog.Warningf("message: %s process failure, node status operation: %s unsupported", msg.GetID(), msg.GetOperation())
}
klog.V(4).Infof("message: %s process successfully", msg.GetID())
}
}
}
|