判断设备是否在线,一般情况是通过发送心跳包来实现,超过设定的时间段而设备未回复,判断为设备离线
Nginx Gatway
设备访问后台,默认url以device_connection_servic
开头, 会转发后台device-connection-service,该服务使用GRPC协议
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| server { limit_req zone=reqlimit burst=100; listen 8082 ssl http2 so_keepalive=5:3:1; listen [::]:8082 ssl http2 so_keepalive=5:3:1; server_name localhost; ssl_certificate /etc/nginx/ssl/nginx.crt; ssl_certificate_key /etc/nginx/ssl/nginx.key;
resolver 127.0.0.11 valid=60s; resolver_timeout 3s;
location /device_connection_service { set $forward_device_connection grpc://device-connection-service:50051; grpc_pass $forward_device_connection; proxy_connect_timeout 1d; proxy_send_timeout 1d; proxy_read_timeout 1d; grpc_connect_timeout 1d; grpc_send_timeout 1d; grpc_read_timeout 1d; grpc_socket_keepalive on; } }
|
GRPC协议定义
Proto
与设备端通信接口
1
| rpc GetActionStream(stream ActionStreamRequest) returns (stream Action);
|
与后台调用通信接口
1 2
| rpc CheckStatus(CheckStatusRequest) returns (CheckStatusResponse);
|
Device 心跳服务
GetActionStream
主要逻辑
检测逻辑 接收设备发送信息。如果stream方式,接收srv.Recv
设备消息
通过定时器,判断是否超时
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| for { go func() { _, err := srv.Recv() done <- err }()
timer := time.NewTimer(GET_ACTION_STREAM_RECV_TIMEOUT_IN_SECS * time.Second)
select {
case <-timer.C: timeoutCount += 1 case <-isDisconnected: return case err := <-done: if err == io.EOF { eofCount += 1 } else { eofCount = 0 timeoutCount = 0 } }
|
由于设定的stream为双向的,后台也可以主动与设备通信srv.Send
,来检测设备是否正常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| channel := make(chan *deviceConnectionPb.Action, 1) actionChanMap.Store(deviceId, channel)
for {
select { case action := <-channel: log.Info(fmt.Sprintf("send to %s, action %s", deviceId, action.Type)) srv.Send(action) case <-connectionFailed: log.Info("Device " + deviceId + " max EOF or timeout count exceeded, force disconnect from server.") actionChanMap.Delete(deviceId) closeStreamChanMap.Delete(deviceId) isDisconnected <- true return nil case <-timer.C: srv.Send(&deviceConnectionPb.Action{ Type: deviceConnectionPb.ActionType_PING, }) }
|
Backend status获取
后台请求Device conntion,通过CheckStatus
接口,获取在线状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| rpc CheckStatus(CheckStatusRequest) returns (CheckStatusResponse); message CheckStatusRequest { string device_id = 1; string device_ip = 2; }
message CheckStatusResponse { ResponseStatus status = 1; CheckStatusResponseData data = 2; }
message CheckStatusResponseData { bool is_online = 1; }
|
后台判断是否在线
1 2 3
| if _, exist := actionChanMap.Load(req.GetDeviceId()); exist { isOnline = true }
|