衷于栖
  • 衷于栖
  • 首页
  • 归档
  • 关于

Image
Profile Picture

衷于栖

自由开发者

分类目录

三维技术4 介绍2 应用1 异常1 技术笔记17 游戏2 源码解读3 管理5 读书笔记3 车联网3 转载11 随笔3

热门标签

  • GIT
  • 工作流指南
  • docker
  • SCRUM
  • JT808
  • 百度地图
  • 狼人杀
  • 模型数据结构
  • 敏捷
  • 扩展
  • 学习WEBGL系列
  • 可维护
  • GlTF
  • CentOS
  • 高德地图
  • 集中式
  • 郭麒麟
  • 郭德纲
  • 进阶
  • 路由节点编辑器

微信订阅

Image

友情链接

王海达博客 Steve Yegge Debug 客栈 Codelei's Blog 笛卡尔积 Java九点半课堂 薛定喵君

【JT808】Spring Boot Stater Jt808 简单源码解读

2020-05-30     车联网


新开源 Spring Boot Starter Jt808 已经有一段时间了,新版本已经支持了 2011、2013、2019 版本的协议解析,并完善了自定义消息的功能,还是值得期待的。

本文主要说明 Spring Boot Starter Jt808 的启动流程,配置项以及一些细节,读懂本文需要先了解字节码和 SpringBoot 等技术知识。

启动服务

项目依赖 SpringBoot,所以启动按照 Spring 的逻辑需要有一个启动注解。于是我们先从 EnableJt808Server 这个注解开始。

1
2
3
// ...
@Import({Jt808Starter.class, Jt808Config.class, SimpleBeanConfig.class})
public @interface EnableJt808Server {}

EnableJt808Server 通过 Import 注解引入了 Jt808Starter、Jt808Config、SimpleBeanConfig 三个 Bean,我们依次看一下这三个 Bean 都是什么作用。

Jt808Config

顾名思义,Jt808Config 是用来承载 jt808 服务的配置的,关于配置项的详细信息可以在下文中找到。这里只说明这个类的作用。

配置类使用直接赋值的方式为配置项添加默认值。

SimpleBeanConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ...
@Bean
@ConditionalOnMissingBean(DataService.class)
public DataService dataService() {
log.info("use default data service Bean.");
return new SimpleDataServiceAdapter(byteArrHelper);
}

@Bean
@ConditionalOnMissingBean(CacheService.class)
public CacheService sessionService(){
log.info("use default session service Bean.");
return new HashMapCacheService();
}

在 1.2.2 版本里为了简化用户入门难度增加了默认 Bean 配置,所以用户如果不定义缓存 Bean CacheService 的话应该会有默认的 Bean 顶替,但是持久化服务还是需要用户提供的,默认提供的Bean DataService 是不能接收消息的,只是能够保证启动成功。同时用户的主要业务逻辑也是在 DataService 实现类里。

  • 系统里默认提供了基于 ConcurrentHashMap 的缓存。
  • 数据层提供的类仅仅是添加了日志记录而已,并且不能通过任何客户端的鉴权。

Jt808Starter

这里定义了启动逻辑,首先定义了处理持久化需要的线程池,线程命名使用 jt808-mina-thread-pool-db 前缀,方便调试。

然后是加载 com.zhoyq.server.jt808.starter 包下的所有 Bean。

再次是程序启动事件 ApplicationStartedEvent 程序启动成功后会运行 onApplicationEvent 方法中的逻辑。

  • 检查是否启用 Jt808 服务。
  • 获取数据库中的历史数据,并存储到缓存中。
  • 获取用户配置使用的底层服务实现(Mina或者Netty),并启动。

这里需要注意的是,在启动完成后 PackHandlerManagement 也处理了一部分启动逻辑,这里把消息包处理器全部收集起来并存储到缓存中待用。

下面就是 Mina 或者 Netty 的启动了。

Mina 启动的时候,加载的关于编码解码的工厂类是 Jt808CodecFactory,这里定义了处理消息包边界的方法逻辑。Netty 启动的时候,加载的解码器和编码器是 Jt808NettyDecoder、Jt808NettyEncoder,同样是处理消息边界的方法和逻辑。

这里由于编码解码没有进一步封装,会有部分源码重复,会在后期优化的时候处理掉。

下面是核心逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// 有数据进入 开始读取数据
if (in.remaining() > 0) {
// 标记数据位置 待还原
in.mark();
// 读取第一个字节
byte byteBuf = in.get();
// 判断第一个字节是否等于 MSG_BROKER = 0x7E
if( byteBuf == MSG_BROKER ){
// 等于 则 还原读取情况
in.reset();
// 读取前五个字节
byte[] bodyProp = new byte[5];
// 标记 待还原读取情况
in.mark();
// 获取消息体属性数据
in.get(bodyProp, 0, 5);
byte[] body = new byte[]{bodyProp[3],bodyProp[4]};
// 获取消息体属性中的消息长度信息
int sizeBuf = jt808Helper.getMsgBodyLength(body);
// 获取消息体中的分包信息
boolean b = jt808Helper.hasPackage(body);
int size;
if(b){
size = sizeBuf + MEG_MIN_LEN_WITH_PKG;
}else{
size = sizeBuf + MSG_MIN_LEN;
}
// 还原读取情况
in.reset();
log.trace("the real pkg length is " + size);

// 通过 size 判断消息是否够一包 如果消息内容不够,则重置,相当于不读取
if (size > in.remaining() || size < MSG_MIN_LEN ) {
// 标记
in.mark();
// 读取以显示
byte[] bytes = new byte[in.remaining()];
in.get(bytes, 0, in.remaining());
// 还原
in.reset();
log.trace("short data length "+in.remaining()+" data "+ byteArrHelper.toHexString(bytes) +" go to reread " + session.getRemoteAddress());
// 数据不全 最后返回 false 继续接收数据
} else {
// 消息内容够一包 则 继续读取
byte[] bytes = new byte[size];
in.get(bytes, 0, size);
// 验证得到的数据是否正确
if( bytes[bytes.length-1] == MSG_BROKER ){
log.trace("origin data " + byteArrHelper.toHexString(bytes) + " " + session.getRemoteAddress());
// 这里转义还原
bytes = jt808Helper.retrans(bytes);
// 在这里验证校验码
if(jt808Helper.verify(bytes)){
// 把字节转换为Java对象的工具类
out.write(bytes);
}
}else{
log.trace("wrong data to drop " + byteArrHelper.toHexString(bytes) + " " + session.getRemoteAddress());
}
// 不管是否完成本包数据读取 这一包数据都放弃继续解析
// 如果还有剩余 则 继续解析数据 没有剩余 则 继续接收数据
return in.remaining() > 0;
}
} else {
// 不等于 则 删除前置错误数据 并继续接收数据
log.trace("wrong data structure " + session.getRemoteAddress());
for(int i = 0;i<in.remaining();){
// 读取到一包数据的 结束位置 返回
if( in.get() == MSG_BROKER ){
// 到此本包数据全部废弃 继续读取下一包
return true;
}
}
}
}
// 没有可读取的数据 返回继续接收数据
return false;

解码器处理的简单逻辑:

  • 当接收到的消息刚好时,截断,继续接收下一批内容。
  • 当接收到的消息不够时,等下一批数据拼接在当前数据之后继续读取。
  • 当接收到的消息太多时,读取一个消息包完成后,截断,进行下一次读取。

到此底层服务也配置启动了。

配置选项

下面是目前可用的配置注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private String use = "mina";             // 使用框架 默认 mina 可选 netty
private String protocol = "tcp"; // 使用协议 默认 tcp 可选 udp
private Integer port = 10001; // 服务监听端口

private Integer processCount = 2; // 一般是CPU数量 + 1
private Integer corePoolSize = 1; // 解析线程数量
private Integer maximumPoolSize = 10; // 解析线程最大数量
private Integer keepAliveTime = 1000; // 解析线程保持时间 单位 毫秒
private Integer idleTime = 10; // 解析线程空闲时间 单位 秒
private Integer idleCount = 6; // 解析线程空闲时间次数
private Integer readBufferSize = 2048; // mina 读取缓存
private Integer packageLength = 1024; // 处理包最大长度(超长会分包)

private Integer masterSize = 1; // netty master线程数量
private Integer slaveSize = 10; // netty slave线程数量

private Boolean tcpNoDelay = true; // 配置TCP延迟
private Boolean keepAlive = true; // 配置是否长连接
private Integer rsaHandleUnit = 117; // rsa 超长数据处理单元 默认最长117 暂时不可用 解析包里还没有处理 RSA相关功能
private Boolean enabled = true; // 是否启用服务器

private Integer threadCorePoolSize = 1; // 持久化线程数量
private Integer threadMaximumPoolSize = 10; // 持久化线程最大数量
private Integer threadKeepAliveTime = 1000; // 持久化线程持续时间

消息处理

下面是消息在下方到用户处理逻辑前的核心逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// 按照协议解析相应数据位的字节码
byte[] msgId, msgBodyProp, protocolVersion, phoneNum, streamNum, pkgCount = null, pkgNum = null, res = null;
// 偏移标识
int offset = 0;
msgId = new byte[]{originData[offset++],originData[offset++]};
msgBodyProp = new byte[]{originData[offset++],originData[offset++]};
// 通过消息体属性中的版本标识位 判断是否是 2019版本协议 并增加相关解析
boolean isVersion2019 = jt808Helper.isVersion2019(msgBodyProp);
if (isVersion2019) {
protocolVersion = new byte[]{originData[offset++]};
phoneNum = new byte[]{
originData[offset++],originData[offset++],originData[offset++],originData[offset++],originData[offset++],
originData[offset++],originData[offset++],originData[offset++],originData[offset++],originData[offset++]
};
} else {
phoneNum = new byte[]{
originData[offset++],originData[offset++],originData[offset++],
originData[offset++],originData[offset++],originData[offset++]
};
}
streamNum = new byte[]{originData[offset++],originData[offset++]};
boolean hasPackage = jt808Helper.hasPackage(msgBodyProp);
if( hasPackage ){
pkgCount = new byte[]{originData[offset++],originData[offset++]};
pkgNum = new byte[]{originData[offset++],originData[offset]};
}

// 将电话字节码转化成字符串
String phone = byteArrHelper.toHexString(phoneNum);

// 相同身份的终端建立链接 原链接需要断开 也就是加入之前需要判断是否存在终端 存在关闭后在加入
if(sessionManagement.contains(phone)){
IoSession preSession = (IoSession) sessionManagement.get(phone);
if (preSession.getId() != session.getId()) {
preSession.closeNow();
}
}

// session 加入会话缓存
sessionManagement.set(phone, session);

// 分包处理
if( hasPackage ){
// 获取 整型 参数
int totalPkgNum = byteArrHelper.twobyte2int(pkgCount);
int currentPkgNum = byteArrHelper.twobyte2int(pkgNum);
// 序号必须小于等于总包数 条件达成之后进行分包处理 否则不处理分包且不处理数据
if(totalPkgNum >= currentPkgNum){
if(!cacheService.containsPackages(phone)){
ConcurrentHashMap<Integer,byte[]> buf = new ConcurrentHashMap<>(totalPkgNum);
cacheService.setPackages(phone, buf);
}
Map<Integer,byte[]> pkgBuf = cacheService.getPackages(phone);
pkgBuf.put(currentPkgNum, originData);
}
// 分包结束时需要对分包数据进行解析处理并返回应答 通过总包数和序号对比 判断是不是最后一包
if( totalPkgNum == currentPkgNum ){
// 如果是最后一包
if(jt808Helper.pkgAllReceived(phone, totalPkgNum)){
// 合并所有包 并解析
res = data(jt808Helper.allPkg(phone, totalPkgNum));
}else{
// 没有全部收到 需要补传 最初一包的流水号
byte[] originStreamNum = null;
// 补传id列表
byte[] idList = new byte[]{};
// 补传数量
byte num = 0;
Map<Integer,byte[]> map = cacheService.getPackages(phone);
for(int i = 1;i<=totalPkgNum;i++){
// 这里需要判断版本 并获取 流水号
if(originStreamNum == null){
if (isVersion2019) {
originStreamNum = byteArrHelper.subByte(map.get(1), 15, 17);
} else {
originStreamNum = byteArrHelper.subByte(map.get(1), 10, 12);
}
}
if(!map.containsKey(i)){
num++;
if (isVersion2019) {
idList = byteArrHelper.union(idList, byteArrHelper.subByte(map.get(i), 19, 21));
} else {
idList = byteArrHelper.union(idList, byteArrHelper.subByte(map.get(i), 14, 16));
}
}
}
// 最后下发 消息包重传指令
if(originStreamNum != null) {
res = resHelper.getPkgReq(phoneNum, originStreamNum, num, idList);
}
}
}
} else {
// 未分包 直接解析
res = data(originData);
}
if( res == null ){
// 没有解析应答 则 直接返回平台通用应答 成功
res = resHelper.getPlatAnswer(phoneNum, streamNum, msgId, (byte) 0x00);
}
// 返回消息前判断 是否需要分包
int msgLen = jt808Config.getPackageLength();
if( res.length > msgLen ){
// 分包发送
jt808Helper.sentByPkg(res, session);
}else{
// 直接返回
session.write(res);
}
  • 消息先查看是否分包,分包则先不解析,待接收完整(接收到最后一包数据之后,会检查数据完整性,如果不完整会下发重传指令,如果完整)再解析;消息未分包则直接解析。
  • 消息解析时,交由对应消息ID的消息包处理器进行处理。
  • 消息解析时,会检查鉴权,如果没有鉴权仅能访问终端注册和终端鉴权两个消息包。
  • 消息包处理的逻辑都在 com.zhoyq.server.jt808.starter.pack 包下,希望了解更多的,可以直接查看源代码。
  • 默认处理方式中,所有查询应答类消息都会通过持久化接口的 terminalAnswer 方法存储,待进一步查询。

2019 兼容性处理逻辑

v1.2.2 版本增加了 2019 版本协议的解析内容,关于兼容性处理有以下几点:

  • 消息体属性中的版本标识位,为2019版本独有。
  • 2019 版本电话号码长度增加了,大部分兼容性可以通过电话号码位数判断。
  • 部分消息ID是2019独有的所以可以单独处理。
  • 终端注册和驾驶员身份信息上报可以根据消息长度可以判断是2013还是2011版本协议。

帮助类

帮助类处理了大部分底层逻辑,有兴趣可以看相关源码,帮助类可以使用 @Autowired 注解直接引入。

  • ByteArrHelper(二进制处理帮助类)
  • Jt808Helper(协议逻辑相关帮助类)
  • ResHeler(应答消息类)

关于开源

推荐邮件(feedback#zhoyq.com)或者使用 ISSUE(包括GITHUB和GITEE) 这两种方式报告 BUG,我会第一时间修复。

  • JT808二次开发包 - 采用MIT协议开源,开源地址,重构过程。视频里重构完成后的版本是 1.0.0,文章发布时二次开发包版本是 1.2.2。
  • JT808二次开发包文档 - 开发入门视频,开发入门文档,以及本系列文章。
  • JT808模拟测试终端 - 基于electron的版本正在制作中,计划以MIT协议开源。
  • 完整可用于生产的808服务 - 计划内。
  • 完整可用于生产的809服务 - 计划外。
  • 车联网平台 - 计划内(主要技术:SpringBoot、Vue)。
  • 相关文档会慢慢完善 - 暂时不约定期限。
#JT808

Copyright © 2021 zhoyq.com. All rights reserved.

京ICP备 17068495号-1