前言
代码基于 kubeedge1.3
模块入口
先看EdgeHub
的原型
1
2
3
4
5
6
7
8
| //EdgeHub defines edgehub object structure
type EdgeHub struct {
chClient clients.Adapter
reconnectChan chan struct{}
syncKeeper map[string]chan model.Message
keeperLock sync.RWMutex
enable bool
}
|
从 Start()
函数开始
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
| //Start sets context and starts the controller
func (eh *EdgeHub) Start() {
// if there is no manual certificate setting or the setting has problems, then the edge applies for the certificate
if validation.FileIsExist(config.Config.TLSCAFile) && validation.FileIsExist(config.Config.TLSCertFile) && validation.FileIsExist(config.Config.TLSPrivateKeyFile) {
_, err := tls.LoadX509KeyPair(config.Config.TLSCertFile, config.Config.TLSPrivateKeyFile)
if err != nil {
if err := eh.applyCerts(); err != nil {
klog.Fatalf("Error: %v", err)
return
}
}
} else {
if err := eh.applyCerts(); err != nil {
klog.Fatalf("Error: %v", err)
return
}
}
HasTLSTunnelCerts <- true
close(HasTLSTunnelCerts)
for {
select {
case <-beehiveContext.Done():
klog.Warning("EdgeHub stop")
return
default:
}
err := eh.initial()
if err != nil {
klog.Fatalf("failed to init controller: %v", err)
return
}
err = eh.chClient.Init()
if err != nil {
klog.Errorf("connection error, try again after 60s: %v", err)
time.Sleep(waitConnectionPeriod)
continue
}
// execute hook func after connect
eh.pubConnectInfo(true)
go eh.routeToEdge()
go eh.routeToCloud()
go eh.keepalive()
// wait the stop singal
// stop authinfo manager/websocket connection
<-eh.reconnectChan
eh.chClient.Uninit()
// execute hook fun after disconnect
eh.pubConnectInfo(false)
// sleep one period of heartbeat, then try to connect cloud hub again
time.Sleep(time.Duration(config.Config.Heartbeat) * time.Second * 2)
// clean channel
clean:
for {
select {
case <-eh.reconnectChan:
default:
break clean
}
}
}
}
|
Start()
函数先是设置证书, 然后进入模块内部循环, 首先是eh.initial()
1
2
3
4
5
6
7
8
9
10
11
| func (eh *EdgeHub) initial() (err error) {
cloudHubClient, err := clients.GetClient()
if err != nil {
return err
}
eh.chClient = cloudHubClient
return nil
}
|
这个函数初始化了eh.chClient
eh.chClient
类型为clients.Adapter
, 即
1
2
3
4
5
6
7
8
9
10
11
| //Adapter is a web socket client interface
type Adapter interface {
Init() error
Uninit()
// async mode
Send(message model.Message) error
Receive() (model.Message, error)
// notify auth info
Notify(authInfo map[string]string)
}
|
此处的clients
为 wsclient
, Init()
函数的实现为
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
| // Init initializes websocket client
func (wsc *WebSocketClient) Init() error {
klog.Infof("Websocket start to connect Access")
cert, err := tls.LoadX509KeyPair(wsc.config.CertFilePath, wsc.config.KeyFilePath)
if err != nil {
klog.Errorf("Failed to load x509 key pair: %v", err)
return fmt.Errorf("failed to load x509 key pair, error: %v", err)
}
caCert, err := ioutil.ReadFile(config.Config.TLSCAFile)
if err != nil {
return err
}
pool := x509.NewCertPool()
if ok := pool.AppendCertsFromPEM(caCert); !ok {
return fmt.Errorf("cannot parse the certificates")
}
tlsConfig := &tls.Config{
RootCAs: pool,
Certificates: []tls.Certificate{cert},
InsecureSkipVerify: false,
}
option := wsclient.Options{
HandshakeTimeout: wsc.config.HandshakeTimeout,
TLSConfig: tlsConfig,
Type: api.ProtocolTypeWS,
Addr: wsc.config.URL,
AutoRoute: false,
ConnUse: api.UseTypeMessage,
}
exOpts := api.WSClientOption{Header: make(http.Header)}
exOpts.Header.Set("node_id", wsc.config.NodeID)
exOpts.Header.Set("project_id", wsc.config.ProjectID)
client := &wsclient.Client{Options: option, ExOpts: exOpts}
for i := 0; i < retryCount; i++ {
connection, err := client.Connect()
if err != nil {
klog.Errorf("Init websocket connection failed %s", err.Error())
} else {
wsc.connection = connection
klog.Infof("Websocket connect to cloud access successful")
return nil
}
time.Sleep(cloudAccessSleep)
}
return errors.New("max retry count reached when connecting to cloud")
}
|
Start()
函数接着执行了eh.chClient.Init()
, 也就是上面的 Init, 初始化过程执行了 websocket 的连接.
连接成功后启动了三个goroutine
: eh.routeToEdge()
, eh.routeToCloud()
, eh.keepalive()
1
2
3
4
5
| // execute hook func after connect
eh.pubConnectInfo(true)
go eh.routeToEdge()
go eh.routeToCloud()
go eh.keepalive()
|
routeToEdge()
: 接收信息chClient.Receive()
并分发eh.dispatch(message)
给对应的 edge
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| func (eh *EdgeHub) routeToEdge() {
for {
select {
case <-beehiveContext.Done():
klog.Warning("EdgeHub RouteToEdge stop")
return
default:
}
message, err := eh.chClient.Receive()
if err != nil {
klog.Errorf("websocket read error: %v", err)
eh.reconnectChan <- struct{}{}
return
}
klog.Infof("received msg from cloud-hub:%+v", message)
err = eh.dispatch(message)
if err != nil {
klog.Errorf("failed to dispatch message, discard: %v", err)
}
}
}
|
routeToCloud()
: 接收来自边缘的信息beehiveContext.Receive(ModuleNameEdgeHub)
, 并发到 cloudhub eh.sendToCloud(message)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| func (eh *EdgeHub) routeToCloud() {
for {
select {
case <-beehiveContext.Done():
klog.Warning("EdgeHub RouteToCloud stop")
return
default:
}
message, err := beehiveContext.Receive(ModuleNameEdgeHub)
if err != nil {
klog.Errorf("failed to receive message from edge: %v", err)
time.Sleep(time.Second)
continue
}
// post message to cloud hub
err = eh.sendToCloud(message)
if err != nil {
klog.Errorf("failed to send message to cloud: %v", err)
eh.reconnectChan <- struct{}{}
return
}
}
}
|
总结
总体来说 edgehub 就是与 edgeclient 建立连接用的.