文章目录
- 一. VectorNettyApplication启动类配置
- 二.WebSocketServerBoot初始化服务端Netty
- 三. WebsocketServerChannelInitializer初始化服务端Netty读写处理器
- 四.initParamHandler处理器-去参websocket识别
- 五.MessageHandler核心业务处理类-采用工厂策略模式
- 5.1 策略上下文
- 六.统一响应
- 七.统一输出处理器
一. VectorNettyApplication启动类配置
初始化SpringBoot线程同时初始化Netty线程
/*** @description: 通知启动类* @Title: VectorNotification* @Package com.vector.notification* @Author YuanJie* @Date 2023/3/2 12:57*/
@EnableDiscoveryClient // 开启服务注册与发现
@SpringBootApplication(scanBasePackages = {"com.vector"},exclude = {DataSourceAutoConfiguration.class}) // 开启组件扫描和自动配置
public class VectorNettyApplication implements CommandLineRunner {@Value("${netty.host}")private String host;@Value("${netty.port}")private Integer port;@Resourceprivate WebSocketServerBoot webSocketServerBoot;public static void main(String[] args) {SpringApplication.run(VectorNettyApplication.class, args);}// springboot启动后执行netty服务端启动@Overridepublic void run(String... args) throws Exception {ChannelFuture channelFuture = webSocketServerBoot.bind(host, port);// 优雅关闭, jvm关闭时将netty服务端关闭Runtime.getRuntime().addShutdownHook(new Thread(() -> webSocketServerBoot.destroy()));// 阻塞 直到channel关闭channelFuture.channel().closeFuture().syncUninterruptibly();}
}
二.WebSocketServerBoot初始化服务端Netty
主要进行netty的基本配置
/*** @author YuanJie* @projectName vector-server* @package com.vector.netty.accept* @className com.vector.netty.accept.ServerBootstrap* @copyright Copyright 2020 vector, Inc All rights reserved.* @date 2023/6/9 18:34*/
@Component
@Slf4j
public class WebSocketServerBoot {private final EventLoopGroup parentGroup = new NioEventLoopGroup();private final EventLoopGroup childGroup = new NioEventLoopGroup(2);private Channel channel;@Resourceprivate WebsocketServerChannelInitializer websocketServerChannelInitializer;/*** 初始化服务端* sync():等待Future直到其完成,如果这个Future失败,则抛出失败原因;* syncUninterruptibly():不会被中断的sync();*/public ChannelFuture bind(String host, Integer port) {ChannelFuture channelFuture = null;try {channelFuture = new ServerBootstrap().group(parentGroup, childGroup) // 指定线程模型 一个用于接收客户端连接,一个用于处理客户端读写操作.channel(NioServerSocketChannel.class) // 指定服务端的IO模型.option(ChannelOption.SO_BACKLOG, 1024) // 设置TCP缓冲区.childOption(ChannelOption.SO_KEEPALIVE, true) // 保持连接 tcp底层心跳机制.childHandler(websocketServerChannelInitializer) // 指定处理新连接数据的读写处理逻辑.bind(host, port).addListener(new GenericFutureListener<Future<? super Void>>() {@Overridepublic void operationComplete(Future<? super Void> future) throws Exception {if (future.isSuccess()) {log.info("服务端启动成功,监听端口:{}", port);} else {log.error("服务端启动失败,监听端口:{}", port);bind(host, port + 1);}}}).syncUninterruptibly();// 绑定端口channel = channelFuture.channel(); // 获取channel} finally {if (null == channelFuture) {channel.close();parentGroup.shutdownGracefully();childGroup.shutdownGracefully();}}return channelFuture;}/*** 销毁*/public void destroy() {if (null == channel) return;channel.close();parentGroup.shutdownGracefully();childGroup.shutdownGracefully();}/*** 获取通道** @return*/public Channel getChannel() {return channel;}
}
三. WebsocketServerChannelInitializer初始化服务端Netty读写处理器
主要规划netty的读写处理器
/*** @author YuanJie* @projectName vector-server* @package com.vector.netty.config* @className com.vector.netty.server.ServerChannelInitializer* @copyright Copyright 2020 vector, Inc All rights reserved.* @date 2023/6/9 19:13*/
@Component
public class WebsocketServerChannelInitializer extends ChannelInitializer<SocketChannel> {// @Sharableprivate final LoggingHandler loggingHandler = new LoggingHandler(LogLevel.INFO);public final static String WEBSOCKET_PATH = "/ws";@Resourceprivate InitParamHandler initParamHandler;@Resourceprivate MessageHandler messageHandler;@Resourceprivate OutBoundHandler outBoundHandler;@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();// 日志打印pipeline.addLast(loggingHandler);// http报文解析器 线程不安全不能被共享pipeline.addLast(new HttpServerCodec());
// // 添加对大数据流的支持pipeline.addLast(new ChunkedWriteHandler());
// // 消息聚合器 8192 8Mpipeline.addLast(new HttpObjectAggregator(1 << 13));// 进行设置心跳检测pipeline.addLast(new IdleStateHandler(60, 30, 60 * 30, TimeUnit.SECONDS));// ================= 上述是用于支持http协议的 ==============//websocket 服务器处理的协议,用于给指定的客户端进行连接访问的路由地址// 处理uri参数 WebSocketServerProtocolHandler不允许带参数 顺序不可调换pipeline.addLast(initParamHandler);pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH,null, true, 1<<16,true,true,5000));pipeline.addLast(messageHandler);// 自定义出栈处理器pipeline.addLast(outBoundHandler);}
}
四.initParamHandler处理器-去参websocket识别
主要为了去参,WebSocketServerProtocolHandler不允许带参数,同时初始化一些信道用户数据
/*** URL参数处理程序,这时候连接还是个http请求,没有升级成webSocket协议,此处SimpleChannelInboundHandler泛型使用FullHttpRequest** @author YuanJie* @date 2023/5/7 15:07*/
@Slf4j
@ChannelHandler.Sharable
@Component
public class InitParamHandler extends SimpleChannelInboundHandler<FullHttpRequest> {/*** 存储已经登录用户的channel对象*/public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);/*** 存储用户id和用户的channelId绑定*/public static final Map<Long, ChannelId> userMap = new ConcurrentHashMap<>();/*** 用于存储群聊房间号和群聊成员的channel信息*/public static final Map<Long, ChannelGroup> groupMap = new ConcurrentHashMap<>();@DubboReferenceprivate MemberRemote memberRemote;/*** 此处进行url参数提取,重定向URL,访问webSocket的url不支持带参数的,带参数会抛异常,这里先提取参数,将参数放入通道中传递下去,重新设置一个不带参数的url** @param ctx the {@link ChannelHandlerContext} which this {@link SimpleChannelInboundHandler}* belongs to* @param request the message to handle* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {if (!this.acceptInboundMessage(request)) {ctx.fireChannelRead(request.retain());}String uri = request.uri();log.info("NettyWebSocketParamHandler.channelRead0 --> : 格式化URL... {}", uri);Map<CharSequence, CharSequence> queryMap = UrlBuilder.ofHttp(uri).getQuery().getQueryMap();//将参数放入通道中传递下去String senderId = "senderId";if (StringUtils.isBlank(queryMap.get(senderId))) {log.info("NettyWebSocketParamHandler.channelRead0 --> : 参数缺失 senderId");ctx.close();}// 验证token
// verifyToken(ctx,senderId);// 初始化数据
// initData(ctx, Long.valueOf(queryMap.get(senderId).toString()));// 获取?之前的路径request.setUri(WebsocketServerChannelInitializer.WEBSOCKET_PATH);ctx.fireChannelRead(request.retain());}@Overridepublic void channelActive(ChannelHandlerContext ctx) {//添加到channelGroup通道组channelGroup.add(ctx.channel());ctx.channel().id();}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {// 移除channelGroup 通道组channelGroup.remove(ctx.channel());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();log.error("NettyWebSocketParamHandler.exceptionCaught --> cause: ", cause);ctx.close();}private void verifyToken(ChannelHandlerContext ctx, Long senderId) {String userKey = CacheConstants.LOGIN_TOKEN_KEY + senderId;RedissonCache redissonCache = SpringContextUtil.getBean(RedissonCache.class);Boolean hasKey = redissonCache.hasKey(userKey);if (!hasKey) {log.info("NettyWebSocketParamHandler.channelRead0 --> : 用户未登录... {}", senderId);ctx.close();}// token续期redissonCache.expire(userKey, SystemConstants.TOKEN_EXPIRE_TIME, TimeUnit.MILLISECONDS);}/*** 加入聊天室** @param ctx* @param senderId* @throws ExecutionException* @throws InterruptedException*/private void joinGroup(ChannelHandlerContext ctx, Long senderId) {R r = null;try {CompletableFuture<R> result = memberRemote.getGroupListById(senderId);r = result.get(3, TimeUnit.SECONDS);} catch (Exception e) {log.error("messageHandler.joinGroup查询群聊列表失败 ===> {}", e.getMessage());ctx.channel().write(WSMessageDTO.error("查询群聊列表失败"));return;}if (r == null || r.getCode() != 200) {log.error("查询群聊列表失败 ====> {}", r.getMsg());ctx.channel().write(WSMessageDTO.error("查询群聊列表失败"));return;}//查询成功//获取群聊列表String json = JacksonInstance.toJson(r.getData());List<Long> groupIds = JacksonInstance.toObjectList(json, new TypeReference<List<Long>>() {});ChannelGroup group;for (Long groupId : groupIds) {group = groupMap.get(groupId);if (group == null) {//如果群聊信道不存在,则创建一个群聊信道group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);groupMap.put(groupId, group);}//将当前用户加入到群聊信道中group.add(ctx.channel());}}/*** 加入聊天信道*/private void joinChat(ChannelHandlerContext ctx, Long senderId) {//将当前用户的channelId放入map中userMap.put(senderId, ctx.channel().id());}private void initData(ChannelHandlerContext ctx, Long senderId) {joinChat(ctx, senderId);joinGroup(ctx, senderId);}
}
五.MessageHandler核心业务处理类-采用工厂策略模式
使得业务和通信协议无关,无感知。具体业务可以增加策略
/*** @author YuanJie* @projectName vector-server* @package com.vector.netty.handler* @className com.vector.netty.handler.MessageTypeHandler* @copyright Copyright 2020 vector, Inc All rights reserved.* @date 2023/6/15 16:23*/
@Slf4j
@Component
@ChannelHandler.Sharable
public class MessageHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {@Resourceprivate MessageStrategyContext messageStrategyContext;@Overridepublic void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {// 获取客户端发送的数据WSMessageDTO wsMessageDTO = JacksonInstance.toObject(msg.text(), new TypeReference<WSMessageDTO>() {});wsMessageDTO.setMessageId(SnowFlakeUtil.getNextId());log.info("客户端收到服务器数据:{}", wsMessageDTO.getMessage());verifyParams(ctx, wsMessageDTO);// 根据消息类型获取对应的处理器 核心处理方法messageStrategyContext.messageType(ctx, wsMessageDTO);}private void verifyParams(ChannelHandlerContext ctx, WSMessageDTO wsMessageDTO) {StringBuilder sb = new StringBuilder();if (wsMessageDTO.getSenderId() == null) {sb.append("senderId不能为空");}if (!EnumBusiness.containsBusiness(wsMessageDTO.getBusinessType())) {sb.append("businessType不能为空");}if (!EnumMessage.containsMessage(wsMessageDTO.getMessageType())) {sb.append("messageType不能为空");}if (wsMessageDTO.getMessage() == null) {sb.append("message不能为空");}if (sb.length() > 0) {log.error("参数校验失败:{}", sb.toString());ctx.channel().write(WSMessageDTO.error("参数校验失败:" + sb.toString()));ctx.close();}}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}
}
5.1 策略上下文
具体工厂策略可以详看我的策略模式文章
枚举维护前端参数和bean对象名
/*** @author YuanJie* @projectName vector-server* @package com.vector.netty.enums* @className com.vector.netty.enums.BusinessEnums* @copyright Copyright 2020 vector, Inc All rights reserved.* @date 2023/6/14 16:13*/
public enum EnumBusiness {/*** 单聊*/chatMessage("chat", ChatMessageStrategy.class.getSimpleName()),/*** 群聊*/groupMessage("group", GroupMessageStrategy.class.getSimpleName()),/*** 在线人数*/onlineCount("onlineCount", OnlineCountStrategy.class.getSimpleName()),TEST("test",TestStrategy.class.getSimpleName());private final String businessType;private final String beanName;EnumBusiness(String businessType, String beanName) {this.businessType = businessType;this.beanName = StringUtils.isNotEmpty(beanName)?beanName.toLowerCase():null;}/*** 根据code获取对应的枚举对象*/public static EnumBusiness getEnum(String businessType) {EnumBusiness[] values = EnumBusiness.values(); // 获取枚举列表if (null != businessType && values.length > 0) {for (EnumBusiness value : values) {if (value.businessType.equals(businessType)) {return value; // 返回枚举对象}}}return null;}/*** 该code在枚举列表code属性是否存在*/public static boolean containsBusiness(String businessType) {EnumBusiness anEnum = getEnum(businessType); // 获取枚举对象return anEnum != null;}/*** 判断code与枚举中的code是否相同*/public static boolean equals(String businessType, EnumBusiness calendarSourceEnum) {return calendarSourceEnum.businessType.equals(businessType);}public String getBusinessType() {return businessType;}public String getBeanName() {return beanName;}
}
策略根据bean名获取实例对象
/*** @author YuanJie* @projectName vector-server* @package com.vector.netty.service* @className com.vector.netty.service.MessageContext* @copyright Copyright 2020 vector, Inc All rights reserved.* @date 2023/6/14 17:02*/
@Component
@Slf4j
public class MessageStrategyContext {/** 策略实例集合 */private final ConcurrentHashMap<String, MessageStrategy> strategyConcurrentHashMap =new ConcurrentHashMap<>(20);/*** 注入策略实例* 如果使用的是构造器注入,可能会有多个参数注入进来。** 如果使用的是field反射注入** 如果使用的是setter方法注入,那么你将不能将属性设置为final。** @param strategyMap* 注意注入类型要是Map基础类型* 注入接口,spring会自动注入他的所有被spring托管的实现类*/@Autowiredpublic MessageStrategyContext(Map<String, MessageStrategy> strategyMap) {//清空集合数据this.strategyConcurrentHashMap.clear();if (!CollectionUtils.isEmpty(strategyMap)) {strategyMap.forEach((beanName, messageStrategy) -> {if (StringUtils.isEmpty(beanName) || messageStrategy == null) {return;}this.strategyConcurrentHashMap.put(beanName.toLowerCase(), messageStrategy);});}}/*** 选择业务方式* 单聊,群聊,统计在线人数...** @param msg 信息*/public void messageType(ChannelHandlerContext ctx, WSMessageDTO msg){EnumBusiness enumerateInstances = EnumBusiness.getEnum(msg.getBusinessType());if (CollectionUtils.isEmpty(strategyConcurrentHashMap)) {log.info("策略实例集合初始化失败,请检查是否正确注入!");}MessageStrategy messageStrategy = strategyConcurrentHashMap.get(enumerateInstances.getBeanName());messageStrategy.messageType(ctx, msg);}}
六.统一响应
注意使用该统一响应对象,所有入栈处理器必须使用即调用方必须是SimpleChannelInboundHandler,详细原因在下文 七.统一输出处理器中
/*** @author YuanJie* @projectName vector-server* @package com.vector.netty.entity* @className com.vector.netty.entity.SocketMessage* @copyright Copyright 2020 vector, Inc All rights reserved.* @date 2023/6/14 19:35*/
@Data
public class WSMessageDTO {/*** 消息发送者*/private Long senderId;/*** 消息接收者/群聊id*/private Long chatId;/*** 消息类型 0文本 1图片 2文件 3视频 4语音 5位置 6名片 7链接 8系统消息* @see com.vector.netty.enums.EnumMessage*/private byte messageType;/*** 业务类型 chat单聊 group群聊 onlineCount在线人数* @see com.vector.netty.enums.EnumBusiness*/private String businessType;/*** 记录每条消息的id*/private Long messageId;/*** 消息内容*/private String message;/*** 消息发送时间*/private LocalDateTime sendTime;/*** 消息接收时间*/private LocalDateTime receiveTime;/*** 最后一条消息内容*/private String lastMessage;/*** 消息状态 0失败 1成功*/private byte code;/*** 封装统一返回格式* @return*/public static TextWebSocketFrame ok(){WSMessageDTO data = new WSMessageDTO();data.setCode((byte) 1);return new TextWebSocketFrame(JacksonInstance.toJson(data)).retain();}public static TextWebSocketFrame ok(WSMessageDTO data){data.setCode((byte) 1);return new TextWebSocketFrame(JacksonInstance.toJson(data)).retain();}public static TextWebSocketFrame error(String message){WSMessageDTO data = new WSMessageDTO();data.setCode((byte) 0);data.setMessage(message);return new TextWebSocketFrame(JacksonInstance.toJson(data)).retain();}
}
七.统一输出处理器
- 若调用WSMessageDTO方法,必须注意内存泄露
- 即调用方必须是SimpleChannelInboundHandler<>
- 严禁使用ChannelInboundHandlerAdapter, 否则将造成严重内存泄露
- 相应地,必须使用此处的写出@param msg ,释放@param msg 引用
/*** @author YuanJie* @projectName vector-server* @package com.vector.netty.handler* @className com.vector.netty.handler.OutBoundHandler* @copyright Copyright 2020 vector, Inc All rights reserved.* @date 2023/7/24 22:38*/
@Slf4j
@Component
@ChannelHandler.Sharable
public class OutBoundHandler extends ChannelOutboundHandlerAdapter {/*** 若调用WSMessageDTO方法,必须注意内存泄露* 即调用方必须是SimpleChannelInboundHandler<>* 严禁使用ChannelInboundHandlerAdapter, 否则将造成严重内存泄露* 相应地,必须使用此处的写出@param msg ,释放@param msg 引用* @param ctx the {@link ChannelHandlerContext} for which the write operation is made* @param msg the message to write* @param promise the {@link ChannelPromise} to notify once the operation completes* @throws Exception*/@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {if (msg instanceof FullHttpMessage){log.info("webSocket协议升级成功");// 出栈必须得这样写,不能自定义通信消息,可能把websocket反馈的消息覆盖了。 也不能在最后处理器调ctx.fireChannelRead()ctx.writeAndFlush(msg,promise);return;} else if (msg instanceof TextWebSocketFrame) {log.info("我要给客户端发送消息了。。。。");ctx.writeAndFlush(msg, promise);return;}log.error("OutBoundHandler.write: 消息类型错误");ctx.writeAndFlush(WSMessageDTO.error("服务器内部错误: OutBoundHandler.write()"),promise);ctx.close();}
}