判断设备是否在线,一般情况是通过发送心跳包来实现,超过设定的时间段而设备未回复,判断为设备离线

Nginx Gatway

设备访问后台,默认url以device_connection_servic开头, 会转发后台device-connection-service,该服务使用GRPC协议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    server {
limit_req zone=reqlimit burst=100;
listen 8082 ssl http2 so_keepalive=5:3:1;
listen [::]:8082 ssl http2 so_keepalive=5:3:1;
server_name localhost;
ssl_certificate /etc/nginx/ssl/nginx.crt;
ssl_certificate_key /etc/nginx/ssl/nginx.key;

resolver 127.0.0.11 valid=60s;
resolver_timeout 3s;

location /device_connection_service {
set $forward_device_connection grpc://device-connection-service:50051;
grpc_pass $forward_device_connection;
proxy_connect_timeout 1d;
proxy_send_timeout 1d;
proxy_read_timeout 1d;
grpc_connect_timeout 1d;
grpc_send_timeout 1d;
grpc_read_timeout 1d;
grpc_socket_keepalive on;
}
}

GRPC协议定义

Proto

与设备端通信接口

1
rpc GetActionStream(stream ActionStreamRequest) returns (stream Action);

与后台调用通信接口

1
2
// For IAM3D: check device is online
rpc CheckStatus(CheckStatusRequest) returns (CheckStatusResponse);

Device 心跳服务

GetActionStream主要逻辑

检测逻辑 接收设备发送信息。如果stream方式,接收srv.Recv设备消息

通过定时器,判断是否超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
for {
go func() {
_, err := srv.Recv() // Recv() will block this goroutine
done <- err
}()

// Use timer to detect recv timeout
timer := time.NewTimer(GET_ACTION_STREAM_RECV_TIMEOUT_IN_SECS * time.Second)

select {

// 定时器
case <-timer.C:
timeoutCount += 1
// 断开后跳出循环
case <-isDisconnected:
return
case err := <-done:
if err == io.EOF {
eofCount += 1 // 计数,超出18次认为断线
} else {
eofCount = 0
timeoutCount = 0
}
}

由于设定的stream为双向的,后台也可以主动与设备通信srv.Send,来检测设备是否正常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
channel := make(chan *deviceConnectionPb.Action, 1)
actionChanMap.Store(deviceId, channel)

for {

select {
case action := <-channel:
log.Info(fmt.Sprintf("send to %s, action %s", deviceId, action.Type))
srv.Send(action)

case <-connectionFailed:
// server检测异常超时,主动断开
log.Info("Device " + deviceId + " max EOF or timeout count exceeded, force disconnect from server.")
actionChanMap.Delete(deviceId)
closeStreamChanMap.Delete(deviceId)
isDisconnected <- true
return nil

// 定时器 10s
case <-timer.C:
srv.Send(&deviceConnectionPb.Action{
Type: deviceConnectionPb.ActionType_PING,
})
}

Backend status获取

后台请求Device conntion,通过CheckStatus接口,获取在线状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
rpc CheckStatus(CheckStatusRequest) returns (CheckStatusResponse);
message CheckStatusRequest {
string device_id = 1;
string device_ip = 2;
}

message CheckStatusResponse {
ResponseStatus status = 1;
CheckStatusResponseData data = 2;
}

message CheckStatusResponseData {
bool is_online = 1;
}

后台判断是否在线

1
2
3
if _, exist := actionChanMap.Load(req.GetDeviceId()); exist {
isOnline = true
}