Java实现JT/T808以及T/GDRTA002-2020车辆协议对接

简介

JT/T808,道路运输车辆卫星定位系统终端通信协议及数据格式,本标准规定了道路运输车辆卫星定位系统车载终端与监管/监控平台之间的通信协议与数据格式 , 包括协议基础、通信连接、消息处理、协议分类与要求及数据格式。
T/GDRTA002-2020,道路运输车辆智能视频监控报警系统通讯协议规范,广东省道路运输协会发布。
目前仅实现了JT/T808-2013、JT/T808-2019以及粤标T/GDRTA002-2020协议对接,通讯方为TCP,UDP未实现,不符合项目需求可参考已实现的代码更改。项目采用SpringBoot框架,整体结构如下图所示

在这里插入图片描述

主体实现

终端消息上报流程
Jtt808Decoder——》Jtt808ServerHandler——》AbstractJtt808Decoder——》IProcessor——》Jtt808Encoder

Jtt808Decoder

拿到任何一份协议时,首先看的是消息的组成结构,根据它的消息组成结构选择对应的解码器,例如固定长度、分隔符以及自定义长度LengthFieldBasedFrameDecoder解码器。从而解决粘包拆包问题,业务层只处理完整的一条消息,无需考虑其它。
JTT808协议是以0x7e开头0x7e结尾,可考虑分隔符解码器;
在这里插入图片描述
T/GDRTA002-2020协议文件数据上传是以0x30 0x31 0x63 0x64开头,可根据数据长度判断当前接收数据是否完整,可考虑使用自定义长度解码器。
在这里插入图片描述
由于需要同时支持以上两种消息,所以需要自己结合分隔符和自定义长度解码器实现对所有消息的区分。

package com.bho.jtt808.protocol;import com.bho.jtt808.constant.MsgConst;
import com.bho.jtt808.enums.MsgTypeEnum;
import com.bho.jtt808.util.Binary;
import com.bho.jtt808.util.Helper;
import com.bho.jtt808.util.MsgHelper;
import com.bho.jtt808.protocol.message.MsgBody;
import com.bho.jtt808.protocol.message.MsgHeader;
import com.bho.jtt808.protocol.message.MsgStructure;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.List;/*** 808消息解码器* @author wanzh* @date 2022/8/11*/
@Slf4j
public class Jtt808Decoder extends ByteToMessageDecoder {private final ByteBuf[] delimiters;private final ByteOrder byteOrder;private final int maxFrameLength;private final int lengthFieldOffset;private final int lengthFieldLength;private final int lengthFieldEndOffset;private final int lengthAdjustment;private final int initialBytesToStrip;private final boolean failFast;private final boolean stripDelimiter;private boolean discardingTooLongFrame;private long tooLongFrameLength;private long bytesToDiscard;private int frameLengthInt = -1;public Jtt808Decoder() {//针对文件数据上传消息this.byteOrder = ByteOrder.BIG_ENDIAN;this.maxFrameLength = 65598;this.lengthFieldOffset = 58;this.lengthFieldLength = 4;this.lengthAdjustment = 0;this.lengthFieldEndOffset = 62;this.initialBytesToStrip = 0;this.failFast = true;//针对0x7e开头0x7e结尾消息this.stripDelimiter = true;ByteBuf byteBuf = Unpooled.buffer();byteBuf.writeByte(MsgConst.MSG_SIGN);this.delimiters = new ByteBuf[]{byteBuf.slice(byteBuf.readerIndex(), byteBuf.readableBytes())};}@Overrideprotected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {Object decoded = null;int len = in.readableBytes();if (len < MsgConst.MSG_V2011_SHORTEST_LENGTH - 1) {return;}Channel channel = ctx.channel();int msgType = Integer.parseInt(String.valueOf(channel.attr(AttributeKey.valueOf(MsgConst.CHANNEL_MSG_TYPE)).get()));if (msgType == MsgConst.MSG_TYPE_INIT || msgType == MsgConst.MSG_TYPE_FILE) {in.markReaderIndex();byte firstByte = in.readByte();byte secondByte = in.readByte();byte thridByte = in.readByte();byte fourthByte = in.readByte();in.resetReaderIndex();if (firstByte == MsgConst.MSG_SIGN) {decoded = delimiterBaseFrameDecode(ctx, in);} else if (firstByte == MsgConst.ATTACHMENT_BIT_STREAM_FIRST_BYTE&& secondByte == MsgConst.ATTACHMENT_BIT_STREAM_SECOND_BYTE&& thridByte == MsgConst.ATTACHMENT_BIT_STREAM_THIRD_BYTE&& fourthByte == MsgConst.ATTACHMENT_BIT_STREAM_FOURTH_BYTE) {decoded = lengthFieldBaseFrameDecode(ctx, in);}} else {decoded = delimiterBaseFrameDecode(ctx, in);}if (decoded != null) {ByteBuf byteBuf = (ByteBuf) decoded;int readLen = byteBuf.readableBytes();if (readLen == 0) {channel.attr(AttributeKey.valueOf(MsgConst.CHANNEL_MSG_TYPE)).set(MsgConst.MSG_TYPE_NORAML);return;}byte[] bytes = new byte[readLen];byteBuf.readBytes(bytes);short msgId = Binary.beDBBytesToShort(Arrays.copyOfRange(bytes, 0, 2));if (bytes[0] == MsgConst.ATTACHMENT_BIT_STREAM_FIRST_BYTE&& bytes[1] == MsgConst.ATTACHMENT_BIT_STREAM_SECOND_BYTE&& bytes[2] == MsgConst.ATTACHMENT_BIT_STREAM_THIRD_BYTE&& bytes[3] == MsgConst.ATTACHMENT_BIT_STREAM_FOURTH_BYTE) {channel.attr(AttributeKey.valueOf(MsgConst.CHANNEL_MSG_TYPE)).set(MsgConst.MSG_TYPE_FILE);MsgStructure msgStructure = MsgStructure.builder().msgHeader(MsgHeader.builder().msgId(MsgTypeEnum.attachment_data_upload.getMsgId()).build()).msgBody(MsgBody.builder().bytes(bytes).build()).build();log.info("{}:{}", MsgTypeEnum.attachment_data_upload.getName(), Helper.bytesToHex(bytes));out.add(msgStructure);return;}MsgTypeEnum msgTypeEnum = MsgTypeEnum.getInstanceByMsgId(msgId);if (msgId == MsgTypeEnum.attachment_begin_upload.getMsgId()|| msgId == MsgTypeEnum.attachment_end_upload.getMsgId()|| msgId == MsgTypeEnum.alarm_attachment.getMsgId()) {channel.attr(AttributeKey.valueOf(MsgConst.CHANNEL_MSG_TYPE)).set(MsgConst.MSG_TYPE_FILE);} else {channel.attr(AttributeKey.valueOf(MsgConst.CHANNEL_MSG_TYPE)).set(MsgConst.MSG_TYPE_NORAML);}byte[] tempBytes = new byte[readLen + 2];tempBytes[0] = MsgConst.MSG_SIGN;tempBytes[readLen + 1] = MsgConst.MSG_SIGN;System.arraycopy(bytes, 0, tempBytes, 1, readLen);log.info("{}:{}", msgTypeEnum == null ? "未知的消息类型" : msgTypeEnum.getName(), Helper.bytesToHex(tempBytes));MsgStructure msgStructure = MsgHelper.bytesToMsgStructure(tempBytes);if (msgStructure == null) {return;}out.add(msgStructure);}}protected Object lengthFieldBaseFrameDecode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {long frameLength = 0;if (frameLengthInt == -1) { // new frameif (discardingTooLongFrame) {discardingTooLongFrame(in);}if (in.readableBytes() < lengthFieldEndOffset) {return null;}int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);if (frameLength < 0) {failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);}frameLength += lengthAdjustment + lengthFieldEndOffset;if (frameLength < lengthFieldEndOffset) {failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);}if (frameLength > maxFrameLength) {exceededFrameLength(in, frameLength);return null;}// never overflows because it's less than maxFrameLengthframeLengthInt = (int) frameLength;}if (in.readableBytes() < frameLengthInt) { // frameLengthInt exist , just check bufreturn null;}if (initialBytesToStrip > frameLengthInt) {failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);}in.skipBytes(initialBytesToStrip);// extract frameint readerIndex = in.readerIndex();int actualFrameLength = frameLengthInt - initialBytesToStrip;ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);in.readerIndex(readerIndex + actualFrameLength);frameLengthInt = -1; // start processing the next framereturn frame;}protected Object delimiterBaseFrameDecode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {// Try all delimiters and choose the delimiter which yields the shortest frame.int minFrameLength = Integer.MAX_VALUE;ByteBuf minDelim = null;for (ByteBuf delim: delimiters) {int frameLength = indexOf(buffer, delim);if (frameLength >= 0 && frameLength < minFrameLength) {minFrameLength = frameLength;minDelim = delim;}}if (minDelim != null) {int minDelimLength = minDelim.capacity();ByteBuf frame;if (discardingTooLongFrame) {// We've just finished discarding a very large frame.// Go back to the initial state.discardingTooLongFrame = false;buffer.skipBytes(minFrameLength + minDelimLength);long tooLongFrameLength = this.tooLongFrameLength;this.tooLongFrameLength = 0;if (!failFast) {fail(tooLongFrameLength);}return null;}if (minFrameLength > maxFrameLength) {// Discard read frame.buffer.skipBytes(minFrameLength + minDelimLength);fail(minFrameLength);return null;}if (stripDelimiter) {frame = buffer.readRetainedSlice(minFrameLength);buffer.skipBytes(minDelimLength);} else {frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);}return frame;} else {if (!discardingTooLongFrame) {if (buffer.readableBytes() > maxFrameLength) {// Discard the content of the buffer until a delimiter is found.tooLongFrameLength = buffer.readableBytes();buffer.skipBytes(buffer.readableBytes());discardingTooLongFrame = true;if (failFast) {fail(tooLongFrameLength);}}} else {// Still discarding the buffer since a delimiter is not found.tooLongFrameLength += buffer.readableBytes();buffer.skipBytes(buffer.readableBytes());}return null;}}private void fail(int frameLength) {if (frameLength > 0) {throw new TooLongFrameException("frame length exceeds " + maxFrameLength +": " + frameLength + " - discarded");} else {throw new TooLongFrameException("frame length exceeds " + maxFrameLength +" - discarding");}}private static int indexOf(ByteBuf haystack, ByteBuf needle) {for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i ++) {int haystackIndex = i;int needleIndex;for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex ++) {if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) {break;} else {haystackIndex ++;if (haystackIndex == haystack.writerIndex() &&needleIndex != needle.capacity() - 1) {return -1;}}}if (needleIndex == needle.capacity()) {// Found the needle from the haystack!return i - haystack.readerIndex();}}return -1;}private void discardingTooLongFrame(ByteBuf in) {long bytesToDiscard = this.bytesToDiscard;int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());in.skipBytes(localBytesToDiscard);bytesToDiscard -= localBytesToDiscard;this.bytesToDiscard = bytesToDiscard;failIfNecessary(false);}protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {buf = buf.order(order);long frameLength;switch (length) {case 1:frameLength = buf.getUnsignedByte(offset);break;case 2:frameLength = buf.getUnsignedShort(offset);break;case 3:frameLength = buf.getUnsignedMedium(offset);break;case 4:frameLength = buf.getUnsignedInt(offset);break;case 8:frameLength = buf.getLong(offset);break;default:throw new DecoderException("unsupported lengthFieldLength: " + lengthFieldLength + " (expected: 1, 2, 3, 4, or 8)");}return frameLength;}private static void failOnNegativeLengthField(ByteBuf in, long frameLength, int lengthFieldEndOffset) {in.skipBytes(lengthFieldEndOffset);throw new CorruptedFrameException("negative pre-adjustment length field: " + frameLength);}private static void failOnFrameLengthLessThanLengthFieldEndOffset(ByteBuf in,long frameLength,int lengthFieldEndOffset) {in.skipBytes(lengthFieldEndOffset);throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " +"than lengthFieldEndOffset: " + lengthFieldEndOffset);}private void exceededFrameLength(ByteBuf in, long frameLength) {long discard = frameLength - in.readableBytes();tooLongFrameLength = frameLength;if (discard < 0) {// buffer contains more bytes then the frameLength so we can discard all nowin.skipBytes((int) frameLength);} else {// Enter the discard mode and discard everything received so far.discardingTooLongFrame = true;bytesToDiscard = discard;in.skipBytes(in.readableBytes());}failIfNecessary(true);}private static void failOnFrameLengthLessThanInitialBytesToStrip(ByteBuf in, long frameLength, int initialBytesToStrip) {in.skipBytes((int) frameLength);throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " +"than initialBytesToStrip: " + initialBytesToStrip);}private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {if (bytesToDiscard == 0) {// Reset to the initial state and tell the handlers that// the frame was too large.long tooLongFrameLength = this.tooLongFrameLength;this.tooLongFrameLength = 0;discardingTooLongFrame = false;if (!failFast || firstDetectionOfTooLongFrame) {fail(tooLongFrameLength);}} else {// Keep discarding and notify handlers if necessary.if (failFast && firstDetectionOfTooLongFrame) {fail(tooLongFrameLength);}}}protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {return buffer.retainedSlice(index, length);}private void fail(long frameLength) {if (frameLength > 0) {throw new TooLongFrameException("Adjusted frame length exceeds " + maxFrameLength +": " + frameLength + " - discarded");} else {throw new TooLongFrameException("Adjusted frame length exceeds " + maxFrameLength +" - discarding");}}}

Jtt808Encoder

编码器是下发消息到终端,可不考虑粘包拆包问题,比较简单。

package com.bho.jtt808.protocol;import com.bho.jtt808.enums.MsgTypeEnum;
import com.bho.jtt808.util.Helper;
import com.bho.jtt808.util.MsgHelper;
import com.bho.jtt808.protocol.message.MsgStructure;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.extern.slf4j.Slf4j;/*** 808消息编码器* @author wanzh* @date 2022/8/13*/
@Slf4j
public class Jtt808Encoder extends MessageToByteEncoder<MsgStructure> {@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, MsgStructure msgStructure, ByteBuf byteBuf) throws Exception {short msgId = msgStructure.getMsgHeader().getMsgId();MsgTypeEnum msgTypeEnum = MsgTypeEnum.getInstanceByMsgId(msgId);byte[] bytes = MsgHelper.msgStructureToBytes(msgStructure);log.info("{}:{}", msgTypeEnum.getName(), Helper.bytesToHex(bytes));byteBuf.writeBytes(bytes);}}

AbstractJtt808Decoder

各类消息解码器抽象类,因为不需要控制终端下发指令,所以未实现AbstractJtt808Encoder。若需要控制终端,则需要将Channel信息缓存下来,具体可参考另外一篇文章。

package com.bho.jtt808.protocol.codec.decoder;import com.bho.jtt808.protocol.message.reply.BaseReplyMsg;
import com.bho.jtt808.protocol.message.uplink.BaseUplinkMsg;
import com.bho.jtt808.constant.MsgConst;
import com.bho.jtt808.protocol.message.MsgStructure;/*** 消息解码器* @author wanzh* @date 2022/8/13*/
public abstract class AbstractJtt808Decoder<U extends BaseUplinkMsg, R extends BaseReplyMsg> {/*** 消息解码** @param reportMsg 上报消息* @return*/public U decode(MsgStructure reportMsg) {U u = null;if (reportMsg.getMsgHeader().getProtocolVersion() == MsgConst.PROTOCOL_VERSION_2019) {u = decode2019(reportMsg);} else {u = decode2013(reportMsg);}if (u != null) {u.setMobile(reportMsg.getMsgHeader().getMobile());}return u;}/*** 消息解码** @param reportMsg 上报消息* @return*/public abstract U decode2019(MsgStructure reportMsg);/*** 消息解码** @param reportMsg 上报消息* @return*/public abstract U decode2013(MsgStructure reportMsg);/*** 消息回复 需要经过业务逻辑处理才知道消息回复内容* 例如文件上传完成消息 需要判断是否需要补传* @param msgStructure 上报消息* @param r 应答结果* @return*/public abstract MsgStructure reply(MsgStructure msgStructure, R r);/*** 消息回复 不需要经过业务逻辑处理就知道消息回复内容* @param msgStructure 上报消息* @param u 上行消息* @return*/public abstract MsgStructure reply(MsgStructure msgStructure, U u);}

Jtt808ServerHandler

package com.bho.jtt808.protocol;import com.alibaba.fastjson.JSONObject;
import com.bho.jtt808.constant.MsgConst;
import com.bho.jtt808.enums.GeneralReplyResultTypeEnum;
import com.bho.jtt808.enums.MsgTypeEnum;
import com.bho.jtt808.protocol.codec.decoder.AbstractJtt808Decoder;
import com.bho.jtt808.protocol.message.reply.BaseReplyMsg;
import com.bho.jtt808.protocol.message.uplink.BaseUplinkMsg;
import com.bho.jtt808.util.MsgHelper;
import com.bho.jtt808.util.SpringUtil;
import com.bho.jtt808.protocol.message.MsgStructure;
import com.bho.jtt808.protocol.processor.IProcessor;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;/*** @author wanzh* @date 2022/8/13*/
@Slf4j
public class Jtt808ServerHandler extends SimpleChannelInboundHandler<MsgStructure> {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("新的连接:{}", ctx.channel().remoteAddress());super.channelActive(ctx);ctx.channel().attr(AttributeKey.valueOf(MsgConst.CHANNEL_MSG_TYPE)).set(MsgConst.MSG_TYPE_INIT);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.error("异常连接:{},错误信息:{}", ctx.channel().remoteAddress(), cause);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info("断开的连接:{}", ctx.channel().remoteAddress());super.channelInactive(ctx);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {// 获取idle事件if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;// 读等待事件if (event.state() == IdleState.READER_IDLE) {if(ctx.channel().isActive() || ctx.channel().isOpen()) {ctx.channel().close();}}} else {super.userEventTriggered(ctx, evt);}}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MsgStructure curMsg) throws Exception {short msgId = curMsg.getMsgHeader().getMsgId();MsgTypeEnum msgTypeEnum = MsgTypeEnum.getInstanceByMsgId(msgId);if (msgTypeEnum == null) {//未知的消息类型 默认回复通用应答byte[] bodyBytes = MsgHelper.getGeneralReplyMsgBody(curMsg.getMsgHeader(), GeneralReplyResultTypeEnum.sucess.getResult());MsgStructure replyMsgStructure = MsgHelper.getReplyMsgStructure(curMsg.getMsgHeader(), bodyBytes, MsgTypeEnum.platform_reply);ctx.writeAndFlush(replyMsgStructure);return;}if (StringUtils.hasLength(msgTypeEnum.getDecoderBeanId())) {try {//1.获取对应的解码器解析成对应的消息类AbstractJtt808Decoder abstractJtt808Decoder = SpringUtil.getBean(msgTypeEnum.getDecoderBeanId(), AbstractJtt808Decoder.class);BaseUplinkMsg baseUplinkMsg = abstractJtt808Decoder.decode(curMsg);if (baseUplinkMsg != null) {log.info("上行消息: {}", JSONObject.toJSONString(baseUplinkMsg));}//2.不需要经过业务逻辑处理 消息应答MsgStructure replyMsgStructure = abstractJtt808Decoder.reply(curMsg, baseUplinkMsg);if (replyMsgStructure != null) {ctx.writeAndFlush(replyMsgStructure);}//3.业务逻辑处理BaseReplyMsg replyMsg = null;if (StringUtils.hasLength(msgTypeEnum.getProcessorBeanId())) {IProcessor iProcessor = SpringUtil.getBean(msgTypeEnum.getProcessorBeanId(), IProcessor.class);replyMsg = iProcessor.process(baseUplinkMsg);}//4.需要经过业务逻辑处理 消息应答replyMsgStructure = abstractJtt808Decoder.reply(curMsg, replyMsg);if (replyMsgStructure != null) {ctx.writeAndFlush(replyMsgStructure);}} catch (Exception e) {log.error("channelRead0 error {}", e);}}}
}

Jtt808Server

开启一个tcp服务监听终端上报消息

package com.bho.jtt808.protocol;import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;/*** @author wanzh* @date 2022/8/13*/
@Slf4j
@Component
public class Jtt808Server {@Value("${netty.server.port:22222}")private Integer port;//Netty服务端端口号@Value("${netty.server.readerIdleTime:60}")private Integer readerIdleTime;//读取超时时间(分钟)private ServerBootstrap serverBootstrap;private Channel serverChannel;private EventLoopGroup bossEventLoopGroup;private EventLoopGroup workerEventLoopGroup;public void startNettyServer() throws InterruptedException {serverBootstrap = new ServerBootstrap();bossEventLoopGroup = new NioEventLoopGroup();workerEventLoopGroup = new NioEventLoopGroup();serverBootstrap.group(bossEventLoopGroup,workerEventLoopGroup);serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true);serverBootstrap.childOption(ChannelOption.SO_LINGER, 0);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();if(readerIdleTime > 0){pipeline.addLast(new IdleStateHandler(readerIdleTime, 0, 0, TimeUnit.MINUTES));}pipeline.addLast(new Jtt808Decoder());pipeline.addLast(new Jtt808Encoder());pipeline.addLast(new Jtt808ServerHandler());}});ChannelFuture future = this.serverBootstrap.bind(this.port).sync();this.serverChannel = future.channel();}public void shutdownNettyServer() throws InterruptedException {serverChannel.close().sync();bossEventLoopGroup.shutdownGracefully().await();workerEventLoopGroup.shutdownGracefully().await();}@PostConstructpublic void init() {log.info("run netty server");try {startNettyServer();} catch (InterruptedException e) {log.error(e.getMessage());Thread.currentThread().interrupt();}}/*** .*/@PreDestroypublic void destory() {log.info("shutdown netty server");try {shutdownNettyServer();} catch (InterruptedException e) {log.error(e.getMessage());Thread.currentThread().interrupt();}}}

解码处理

AbstractJtt808Decoder

package com.bho.jtt808.protocol.codec.decoder;import com.bho.jtt808.protocol.message.reply.BaseReplyMsg;
import com.bho.jtt808.protocol.message.uplink.BaseUplinkMsg;
import com.bho.jtt808.constant.MsgConst;
import com.bho.jtt808.protocol.message.MsgStructure;/*** 消息解码器* @author wanzh* @date 2022/8/13*/
public abstract class AbstractJtt808Decoder<U extends BaseUplinkMsg, R extends BaseReplyMsg> {/*** 消息解码** @param reportMsg 上报消息* @return*/public U decode(MsgStructure reportMsg) {U u = null;if (reportMsg.getMsgHeader().getProtocolVersion() == MsgConst.PROTOCOL_VERSION_2019) {u = decode2019(reportMsg);} else {u = decode2013(reportMsg);}if (u != null) {u.setMobile(reportMsg.getMsgHeader().getMobile());}return u;}/*** 消息解码** @param reportMsg 上报消息* @return*/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/518634.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CCF-B推荐会议 Euro-Par‘24延期10天! 3月25日截稿!抓住机会!

会议之眼 快讯 第30届Euro-Par(International European Conference on Parallel and Distributed Computing)即国际欧洲并行和分布式计算会议将于 2024 年 8月26日-30日在西班牙马德里举行&#xff01;Euro-Par是欧洲最主要的会议之一&#xff0c;提供了一个广泛而综合的平台&a…

vue3中的生命周期有哪些和怎么使用?

目录 前言&#xff1a; 正文&#xff1a; 总结: 前言&#xff1a; Vue.js 3是Vue.js框架的最新主要版本&#xff0c;引入了一些重大的改变和增强。在Vue 3中&#xff0c;由于Composition API的引入&#xff0c;生命周期钩子被替换为生命周期函数。 正文&#xff1a; 以下是…

中大型工厂人员定位系统源码,实现人、车、物的实时位置监控

UWB高精度定位系统源码&#xff0c;中大型工厂人员定位系统&#xff0c;实现人、车、物的实时位置监控 UWB高精度定位系统源码&#xff0c;智慧工厂人员定位系统源码&#xff0c;基于VueSpring boot前后端分离架构开发的一套UWB高精度定位系统源码。有演示。 随着经济的高速发展…

三八妇女节智慧花店/自动售花机远程视频智能监控解决方案

一、项目背景 国家统计局发布的2023年中国经济年报显示&#xff0c;全年社会消费品零售总额471495亿元&#xff0c;比上年增长7.2%。我国无人零售整体发展迅速&#xff0c;2014年市场规模约为17亿元。无人零售自助终端设备市场规模超过500亿元&#xff0c;年均复合增长率超50%。…

2023年全国职业院校技能大赛软件测试赛题第8套

2023年全国职业院校技能大赛 软件测试赛题第8套 赛项名称: 软件测试 英文名称: Software Testing 赛项编号: GZ034 归属产业: 电子与信息大类 赛项组别: 高等职业教育 …

应用案例 | Softing echocollect e网关助力汽车零部件制造商构建企业数据库,提升生产效率和质量

为了提高生产质量和效率&#xff0c;某知名汽车零部件制造商采用了Softing echocollect e多协议数据采集网关——从机器和设备中获取相关数据&#xff0c;并直接将数据存储在中央SQL数据库系统中用于分析处理&#xff0c;从而实现了持续监控和生产过程的改进。 一 背景 该企业…

2024年5月软考中级《系统监理师》报名考试全攻略

​2024年软考信息系统监理师考试报名时间节点&#xff1a; 报名时间&#xff1a;上半年3月18日到4月15日&#xff0c;下半年8月19日到9月15日&#xff08;各地区报名时间不同&#xff0c;具体日期见官方通告&#xff09; 准考证打印时间&#xff1a;上半年5月20日起&#xff…

DxO ViewPoint:摄影师的最 佳拍档,记录世界的每一刻精彩 mac/win版

DxO ViewPoint是一款革命性的摄影软件&#xff0c;它以其独特的功能和卓越的性能&#xff0c;重新定义了摄影体验。这款软件不仅提供了丰富的摄影工具&#xff0c;还通过先进的算法和技术&#xff0c;让摄影师能够轻松捕捉、管理和展示他们的作品。 DxO ViewPoint 软件获取 Dx…

网络空间资产安全解决方案

长期以来&#xff0c;我们一直强调要做好网络安全建设&#xff0c;而其中的第一步就是要做好对自身资产的发现和清点&#xff0c;正如大家经常所说的那句话——“你无法保护你看不见的东西”。的确&#xff0c;如果不知道自己拥有什么资产&#xff0c;那么如何去了解与它们相关…

Spring面向切片编程AOP概念及相关术语(一)

个人名片&#xff1a; &#x1f43c;作者简介&#xff1a;一名大三在校生&#xff0c;喜欢AI编程&#x1f38b; &#x1f43b;‍❄️个人主页&#x1f947;&#xff1a;落798. &#x1f43c;个人WeChat&#xff1a;hmmwx53 &#x1f54a;️系列专栏&#xff1a;&#x1f5bc;️…

Linux下清除挖矿病毒

登录用户系统&#xff0c;显示系统被爆破了33万次了&#xff0c;结果用户的服务器密码改的很简单&#xff0c;极大可能是被爆破成功了。 执行top命令&#xff0c;显示 kswapd0 的CPU占用异常。基本100%占用。记下该进程ID 5081 执行查找命令 find / -name kswapd0 显示查找结…

qt练习案例

记录一下qt练习案例&#xff0c;方便学习qt知识点 基本部件 案例1 需求&#xff0c;做一个标签&#xff0c;显示"你好"知识点&#xff0c;QLabel画面 4. 参考&#xff0c;Qt 之 QLabel 案例2 需求&#xff0c;做一个标签&#xff0c;显示图片 知识点&#xff0c;…