基于Netty构建Websocket服务端

除了构建TCP和UDP服务器和客户端,Netty还可以用于构建WebSocket服务器。WebSocket是一种基于TCP协议的双向通信协议,可以在Web浏览器和Web服务器之间建立实时通信通道。下面是一个简单的示例,演示如何使用Netty构建一个WebSocket服务器。
项目目录:
在这里插入图片描述
引入pom依赖:

 <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.69.Final</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>

编写SocketServer:

package com.lzq.websocket.config;import com.lzq.websocket.handlers.WebSocketHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.TimeUnit;@Slf4j
@Configuration
public class WebSocketConfig implements CommandLineRunner {private static final Integer PORT = 8888;@Overridepublic void run(String... args) throws Exception {new WebSocketConfig().start();}public void start() {// 创建EventLoopGroupEventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new HttpServerCodec());// 最大数据长度pipeline.addLast(new HttpObjectAggregator(65536));// 添加接收websocket请求的url匹配路径pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));// 10秒内收不到消息强制断开连接// pipeline.addLast(new ReadTimeoutHandler(10, TimeUnit.SECONDS));pipeline.addLast(new WebSocketHandler());}});ChannelFuture future = serverBootstrap.bind(PORT).sync();log.info("websocket server started, port={}", PORT);// 处理 channel 的关闭,sync 方法作用是同步等待 channel 关闭// 阻塞future.channel().closeFuture().sync();} catch (Exception e) {log.error("websocket server exception", e);throw new RuntimeException(e);} finally {log.info("websocket server close");// 关闭EventLoopGroupbossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

编写WebSocketHandler:

package com.lzq.websocket.handlers;import com.lzq.websocket.config.NettyConfig;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import lombok.extern.slf4j.Slf4j;import java.nio.charset.StandardCharsets;@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {private WebSocketServerHandshaker webSocketServerHandshaker;private static final String WEB_SOCKET_URL = "ws://127.0.0.1:8888/websocket";@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 创建连接时执行NettyConfig.group.add(ctx.channel());log.info("client channel active, id={}", ctx.channel().id().toString());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {// 关闭连接时执行NettyConfig.group.remove(ctx.channel());log.info("client channel disconnected, id={}", ctx.channel().id().toString());}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {// 服务端接收客户端发送过来的数据结束之后调用ctx.flush();}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {WebSocketServerProtocolHandler.HandshakeComplete handshake = (WebSocketServerProtocolHandler.HandshakeComplete) evt;log.info("client channel connected, id={}, url={}", ctx.channel().id().toString(), handshake.requestUri());}}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FullHttpRequest) {// 处理客户端http握手请求handlerHttpRequest(ctx, (FullHttpRequest) msg);} else if (msg instanceof WebSocketFrame) {// 处理websocket连接业务handlerWebSocketFrame(ctx, (WebSocketFrame) msg);}}/*** 处理websocket连接业务** @param ctx* @param frame*/private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {log.info("handlerWebSocketFrame>>>>class={}", frame.getClass().getName());// 判断是否是关闭websocket的指令if (frame instanceof CloseWebSocketFrame) {webSocketServerHandshaker.close(ctx.channel(), ((CloseWebSocketFrame) frame).retain());return;}// 判断是否是ping消息if (frame instanceof PingWebSocketFrame) {ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));return;}if (!(frame instanceof TextWebSocketFrame)) {throw new RuntimeException("不支持消息类型:" + frame.getClass().getName());}String text = ((TextWebSocketFrame) frame).text();if ("ping".equals(text)) {ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));return;}log.info("WebSocket message received: {}", text);/*** 可通过客户传输的text,设计处理策略:* 如:text={"type": "messageHandler", "userId": "111"}* 服务端根据type,采用策略模式,自行派发处理** 注意:这里不需要使用线程池,因为Netty 采用 Reactor线程模型(目前使用的是主从Reactor模型),* Handler已经是线程处理,每个用户的请求是线程隔离的*/// 返回WebSocket响应ctx.writeAndFlush(new TextWebSocketFrame("server return:" + text));/*// 群发TextWebSocketFrame twsf = new TextWebSocketFrame(new Date().toString()+ ctx.channel().id()+ " : "+ text);NettyConfig.group.writeAndFlush(twsf);*/}/*** 处理客户端http握手请求** @param ctx* @param request*/private void handlerHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {log.info("handlerHttpRequest>>>>class={}", request.getClass().getName());// 判断是否采用WebSocket协议if (!request.getDecoderResult().isSuccess() || !("websocket".equals(request.headers().get("Upgrade")))) {sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));return;}WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(WEB_SOCKET_URL, null, false);webSocketServerHandshaker = wsFactory.newHandshaker(request);if (webSocketServerHandshaker == null) {WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());} else {webSocketServerHandshaker.handshake(ctx.channel(), request);}}private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, DefaultFullHttpResponse response) {if (response.getStatus().code() != 200) {ByteBuf buf = Unpooled.copiedBuffer(response.getStatus().toString(), StandardCharsets.UTF_8);response.content().writeBytes(buf);buf.release();}// 服务端向客户端发送数据ChannelFuture f = ctx.channel().writeAndFlush(response);if (response.getStatus().code() != 200) {f.addListener(ChannelFutureListener.CLOSE);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {// 非正常断开时调用log.error("client channel execute exception, id={}", ctx.channel().id().toString(), cause);ctx.close();}
}

NettyConfig:

package com.lzq.websocket.config;import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;public class NettyConfig {/*** 存储接入的客户端的channel对象*/public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}

使用Apifox测试:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/295204.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MT3608 高效率1.2MHz2A升压转换器和MT3608L 高效率1.2MHz 2.5A升压转换器 MT3608L和MT3608的区别

MT3608是一个恒定的频率&#xff0c;6引脚SOT23电流模式升压转换器的小&#xff0c;低功耗应用的目的。该MT3608开关在1.2MHz&#xff0c;并允许微小的&#xff0c;低成本的电容器和电感器使用2毫米或更小的高度内部软启动浪涌电流的结果&#xff0c;并延长电池寿命。 …

云渲染怎么批量效果图、影视动画?云渲染在效果图、影视的作用

在设计和建筑领域&#xff0c;设计师往往需要制作出精细逼真的效果图以向客户展示他们的设计思路。然而&#xff0c;在这些行业中&#xff0c;大量生成效果图需求非常费时费力。幸运的是&#xff0c;日期到了云渲染的这个时代&#xff0c;设计师们可以通过云渲染服务以一种更加…

CSM4054 500mA线性锂离子电池充电管理 适用消费类的电子产品

CSM4054 是一款完整的单节鲤离子电池采用恒定电流/恒定电压线性充电器。其 SOT23-5 封装与较少的外部元件数量使得 CSM4054 成为便携式应用的理想选择。 CSM4054 可以适合 USB 电源和适配器电源工作。由于采用了内部 PMOSFET 架构&#xff0c;加上防倒充电路&#xff0…

技术交底二维码的应用

二维码技术交底可以逐级落实、责任到人、有据可查、是目前最方便、实用的交底方式&#xff0c;下面我们讲解技术交底二维码的应用。 1、生成对应的技术交底二维码&#xff0c;将施工方案、技术资料、安全教育资料等内容上传到二维码里。打印出来现场粘贴&#xff0c;便于作业班…

计算机组成原理——指令系统41-60

41、下列哪种指令不属于程序控制指令&#xff08;C&#xff09;。 A、 无条件转移指令 B、 条件转移指令 C、 中断隐指令 D、 循环指令 42、下列关于一地址运算类指令的叙述中&#xff0c;正确的是&#xff08;B&#xff09;。 A、 仅有一个操作数&#xff0c;其地址由指令…

IP技术在网络安全防护中的重要意义

随着互联网的普及&#xff0c;网络安全问题日益凸显。作为网络通信中的重要标识&#xff0c;IP地址在网络安全防护中扮演着关键角色。近日&#xff0c;一则关于IP技术在网络安全防护措施的新闻引起了广泛关注。 据报道&#xff0c;IP技术已成为网络安全防护的重要手段之一。通过…

【星海出品】Keepalived 基础及概念 (一)

今天遇到一个用户&#xff0c;必须使用keepalived去实现高可用。为了让客户满意&#xff0c;所以就从各方面进行一个全方面的解析研究。 开源 keepalived 前世今生 VRRP,全称 Virtual Router Redundancy Protocol,中文名为虚拟路由冗余协议&#xff0c;VRRP的出现是为了解决…

HTML---浮动

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、pandas是什么&#xff1f;二、使用步骤 1.引入库2.读入数据总结 一.常见的网页布局 二.标准文档流 标准文档流常见标签 标准文档流的组成 块级元素<div…

C++ String的模拟实现

一.基本框架 1.成员变量 string类的成员变量分别是存储字符串的一段空间_str&#xff0c;表示字符串的有效字符个数_size和表示存储有效字符空间的_capacity。 private:char *_str;size_t _size;// 有效字符的个数size_t _capacity;// 存储有效字符的空间还有一个string类的特…

【数据结构】线段树算法总结(单点修改)

知识概览 用作单点修改的线段树有4个操作&#xff1a; pushup&#xff1a;由子节点的信息计算父节点的信息build&#xff1a;初始化一棵树modify&#xff1a;修改一个区间query&#xff1a;查询一个区间 线段树用一维数组来存储&#xff1a; 编号是x的节点&#xff0c;它的父节…

LM358 双运算放大器 用于误差放大器和电压跟随器等电路

LM358 内部包括有两个独立的、高增益、内部频率补偿的双运算放大器&#xff0c;适合于电源电压范围很宽的单电源审用&#xff0c;也适用于双电源工作模式&#xff0c;在推荐的工作条件下&#xff0c;由源电流与电源电压无关。它的使用范围句括传感放大器、直流增益模组&#xf…

D40|单词拆分+多重背包

139.单词拆分 给你一个字符串 s 和一个字符串列表 wordDict 作为字典。请你判断是否可以利用字典中出现的单词拼接出 s 。 注意&#xff1a;不要求字典中出现的单词全部都使用&#xff0c;并且字典中的单词可以重复使用 输入: s "leetcode", wordDict ["le…