目录

Kubeedge源码阅读系列--edgecore.edgehub模块

前言

代码基于 kubeedge1.3


模块入口

先看EdgeHub的原型

1
2
3
4
5
6
7
8
//EdgeHub defines edgehub object structure
type EdgeHub struct {
  chClient      clients.Adapter
  reconnectChan chan struct{}
  syncKeeper    map[string]chan model.Message
  keeperLock    sync.RWMutex
  enable        bool
}

Start()函数开始

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
//Start sets context and starts the controller
func (eh *EdgeHub) Start() {
  // if there is no manual certificate setting or the setting has problems, then the edge applies for the certificate
  if validation.FileIsExist(config.Config.TLSCAFile) && validation.FileIsExist(config.Config.TLSCertFile) && validation.FileIsExist(config.Config.TLSPrivateKeyFile) {
    _, err := tls.LoadX509KeyPair(config.Config.TLSCertFile, config.Config.TLSPrivateKeyFile)
    if err != nil {
      if err := eh.applyCerts(); err != nil {
        klog.Fatalf("Error: %v", err)
        return
      }
    }
  } else {
    if err := eh.applyCerts(); err != nil {
      klog.Fatalf("Error: %v", err)
      return
    }
  }
  HasTLSTunnelCerts <- true
  close(HasTLSTunnelCerts)

  for {
    select {
    case <-beehiveContext.Done():
      klog.Warning("EdgeHub stop")
      return
    default:

    }
    err := eh.initial()
    if err != nil {
      klog.Fatalf("failed to init controller: %v", err)
      return
    }
    err = eh.chClient.Init()
    if err != nil {
      klog.Errorf("connection error, try again after 60s: %v", err)
      time.Sleep(waitConnectionPeriod)
      continue
    }
    // execute hook func after connect
    eh.pubConnectInfo(true)
    go eh.routeToEdge()
    go eh.routeToCloud()
    go eh.keepalive()

    // wait the stop singal
    // stop authinfo manager/websocket connection
    <-eh.reconnectChan
    eh.chClient.Uninit()

    // execute hook fun after disconnect
    eh.pubConnectInfo(false)

    // sleep one period of heartbeat, then try to connect cloud hub again
    time.Sleep(time.Duration(config.Config.Heartbeat) * time.Second * 2)

    // clean channel
  clean:
    for {
      select {
      case <-eh.reconnectChan:
      default:
        break clean
      }
    }
  }
}

Start()函数先是设置证书, 然后进入模块内部循环, 首先是eh.initial()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (eh *EdgeHub) initial() (err error) {

  cloudHubClient, err := clients.GetClient()
  if err != nil {
    return err
  }

  eh.chClient = cloudHubClient

  return nil
}

这个函数初始化了eh.chClient

eh.chClient类型为clients.Adapter, 即

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
//Adapter is a web socket client interface
type Adapter interface {
  Init() error
  Uninit()
  // async mode
  Send(message model.Message) error
  Receive() (model.Message, error)

  // notify auth info
  Notify(authInfo map[string]string)
}

此处的clientswsclient, Init() 函数的实现为

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// Init initializes websocket client
func (wsc *WebSocketClient) Init() error {
  klog.Infof("Websocket start to connect Access")
  cert, err := tls.LoadX509KeyPair(wsc.config.CertFilePath, wsc.config.KeyFilePath)
  if err != nil {
    klog.Errorf("Failed to load x509 key pair: %v", err)
    return fmt.Errorf("failed to load x509 key pair, error: %v", err)
  }

  caCert, err := ioutil.ReadFile(config.Config.TLSCAFile)
  if err != nil {
    return err
  }

  pool := x509.NewCertPool()
  if ok := pool.AppendCertsFromPEM(caCert); !ok {
    return fmt.Errorf("cannot parse the certificates")
  }

  tlsConfig := &tls.Config{
    RootCAs:            pool,
    Certificates:       []tls.Certificate{cert},
    InsecureSkipVerify: false,
  }

  option := wsclient.Options{
    HandshakeTimeout: wsc.config.HandshakeTimeout,
    TLSConfig:        tlsConfig,
    Type:             api.ProtocolTypeWS,
    Addr:             wsc.config.URL,
    AutoRoute:        false,
    ConnUse:          api.UseTypeMessage,
  }
  exOpts := api.WSClientOption{Header: make(http.Header)}
  exOpts.Header.Set("node_id", wsc.config.NodeID)
  exOpts.Header.Set("project_id", wsc.config.ProjectID)
  client := &wsclient.Client{Options: option, ExOpts: exOpts}

  for i := 0; i < retryCount; i++ {
    connection, err := client.Connect()
    if err != nil {
      klog.Errorf("Init websocket connection failed %s", err.Error())
    } else {
      wsc.connection = connection
      klog.Infof("Websocket connect to cloud access successful")
      return nil
    }
    time.Sleep(cloudAccessSleep)
  }
  return errors.New("max retry count reached when connecting to cloud")
}

Start()函数接着执行了eh.chClient.Init(), 也就是上面的 Init, 初始化过程执行了 websocket 的连接.

连接成功后启动了三个goroutine: eh.routeToEdge(), eh.routeToCloud(), eh.keepalive()

1
2
3
4
5
// execute hook func after connect
eh.pubConnectInfo(true)
go eh.routeToEdge()
go eh.routeToCloud()
go eh.keepalive()

routeToEdge(): 接收信息chClient.Receive()并分发eh.dispatch(message)给对应的 edge

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (eh *EdgeHub) routeToEdge() {
  for {
    select {
    case <-beehiveContext.Done():
      klog.Warning("EdgeHub RouteToEdge stop")
      return
    default:

    }
    message, err := eh.chClient.Receive()
    if err != nil {
      klog.Errorf("websocket read error: %v", err)
      eh.reconnectChan <- struct{}{}
      return
    }

    klog.Infof("received msg from cloud-hub:%+v", message)
    err = eh.dispatch(message)
    if err != nil {
      klog.Errorf("failed to dispatch message, discard: %v", err)
    }
  }
}

routeToCloud(): 接收来自边缘的信息beehiveContext.Receive(ModuleNameEdgeHub), 并发到 cloudhub eh.sendToCloud(message)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (eh *EdgeHub) routeToCloud() {
  for {
    select {
    case <-beehiveContext.Done():
      klog.Warning("EdgeHub RouteToCloud stop")
      return
    default:
    }
    message, err := beehiveContext.Receive(ModuleNameEdgeHub)
    if err != nil {
      klog.Errorf("failed to receive message from edge: %v", err)
      time.Sleep(time.Second)
      continue
    }

    // post message to cloud hub
    err = eh.sendToCloud(message)
    if err != nil {
      klog.Errorf("failed to send message to cloud: %v", err)
      eh.reconnectChan <- struct{}{}
      return
    }
  }
}

总结

总体来说 edgehub 就是与 edgeclient 建立连接用的.