Netty的序列化之MessagePack

目录

引入MessagePack依赖

实体类

服务端代码

客户端代码

执行结果


引入MessagePack依赖

        <dependency><groupId>org.msgpack</groupId><artifactId>msgpack</artifactId><version>0.6.12</version></dependency>

实体类

@Message//MessagePack提供的注解,表明这是一个需要序列化的实体类
public class User {private String id;private String userName;private int age;private UserContact userContact;public User(String userName, int age, String id) {this.userName = userName;this.age = age;this.id = id;}public User() {}public String getUserName() {return userName;}public void setUserName(String userName) {this.userName = userName;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}public String getId() {return id;}public void setId(String id) {this.id = id;}public UserContact getUserContact() {return userContact;}public void setUserContact(UserContact userContact) {this.userContact = userContact;}@Overridepublic String toString() {return "User{" +"userName='" + userName + '\'' +", age=" + age +", id='" + id + '\'' +", userContact=" + userContact +'}';}
}
@Message//MessagePack提供的注解,表明这是一个需要序列化的实体类
public class UserContact {private String mail;private String phone;public UserContact() {}public UserContact(String mail, String phone) {this.mail = mail;this.phone = phone;}public String getMail() {return mail;}public void setMail(String mail) {this.mail = mail;}public String getPhone() {return phone;}public void setPhone(String phone) {this.phone = phone;}@Overridepublic String toString() {return "UserContact{" +"mail='" + mail + '\'' +", phone='" + phone + '\'' +'}';}
}

服务端代码

public class ServerMsgPackEcho {public static final int PORT = 9995;public static void main(String[] args) throws InterruptedException {ServerMsgPackEcho serverMsgPackEcho = new ServerMsgPackEcho();System.out.println("服务器即将启动");serverMsgPackEcho.start();}public void start() throws InterruptedException {final MsgPackServerHandler serverHandler = new MsgPackServerHandler();EventLoopGroup group = new NioEventLoopGroup();/*线程组*/try {ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/b.group(group)/*将线程组传入*/.channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/.localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*//*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,所以下面这段代码的作用就是为这个子channel增加handle*/.childHandler(new ChannelInitializerImp());ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/System.out.println("服务器启动完成,等待客户端的连接和数据.....");f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/} finally {group.shutdownGracefully().sync();/*优雅关闭线程组*/}}private static class ChannelInitializerImp extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,0,2,0,2));ch.pipeline().addLast(new MsgPackDecoder());ch.pipeline().addLast(new MsgPackServerHandler());}}
}/*基于MessagePack的解码器,反序列化*/
public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf> {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)throws Exception {final int length = msg.readableBytes();final byte[] array = new byte[length];msg.getBytes(msg.readerIndex(),array,0,length);MessagePack messagePack = new MessagePack();out.add(messagePack.read(array,User.class));}
}@ChannelHandler.Sharable
public class MsgPackServerHandler extends ChannelInboundHandlerAdapter {private AtomicInteger counter = new AtomicInteger(0);/*** 服务端读取到网络数据后的处理*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//将上一个handler生成的数据强制转型User user = (User)msg;System.out.println("Server Accept["+user+"] and the counter is:"+counter.incrementAndGet());//服务器的应答String resp = "I process user :"+user.getUserName()+ System.getProperty("line.separator");ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));ctx.fireChannelRead(user);}/*** 发生异常后的处理*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}

客户端代码

public class ClientMsgPackEcho {private final String host;public ClientMsgPackEcho(String host) {this.host = host;}public void start() throws InterruptedException {EventLoopGroup group = new NioEventLoopGroup();/*线程组*/try {final Bootstrap b = new Bootstrap();;/*客户端启动必须*/b.group(group)/*将线程组传入*/.channel(NioSocketChannel.class)/*指定使用NIO进行网络传输*//*配置要连接服务器的ip地址和端口*/.remoteAddress(new InetSocketAddress(host, ServerMsgPackEcho.PORT)).handler(new ChannelInitializerImp());ChannelFuture f = b.connect().sync();System.out.println("已连接到服务器.....");f.channel().closeFuture().sync();} finally {group.shutdownGracefully().sync();}}private static class ChannelInitializerImp extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel ch) throws Exception {/*告诉netty,计算一下报文的长度,然后作为报文头加在前面*/ch.pipeline().addLast(new LengthFieldPrepender(2));/*对服务器的应答也要解码,解决粘包半包*/ch.pipeline().addLast(new LineBasedFrameDecoder(1024));/*对我们要发送的数据做编码-序列化*/ch.pipeline().addLast(new MsgPackEncode());ch.pipeline().addLast(new MsgPackClientHandler(5));}}public static void main(String[] args) throws InterruptedException {new ClientMsgPackEcho("127.0.0.1").start();}
}/*基于MessagePack的编码器,序列化*/
public class MsgPackEncode extends MessageToByteEncoder<User> {@Overrideprotected void encode(ChannelHandlerContext ctx, User msg, ByteBuf out)throws Exception {MessagePack messagePack = new MessagePack();byte[] raw = messagePack.write(msg);out.writeBytes(raw);}
}public class MsgPackClientHandler extends SimpleChannelInboundHandler<ByteBuf> {private final int sendNumber;public MsgPackClientHandler(int sendNumber) {this.sendNumber = sendNumber;}private AtomicInteger counter = new AtomicInteger(0);/*** 客户端读取到网络数据后的处理*/protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)+"] and the counter is:"+counter.incrementAndGet());}/*** 客户端被通知channel活跃后,做事*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {User[] users = makeUsers();//发送数据for(User user:users){System.out.println("Send user:"+user);ctx.write(user);}ctx.flush();}/*** 发生异常后的处理*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}/*生成用户实体类的数组,以供发送*/private User[] makeUsers(){User[] users=new User[sendNumber];User user =null;for(int i=0;i<sendNumber;i++){user=new User();user.setAge(i);String userName = "ABC--->"+i;user.setUserName(userName);user.setId("No:"+(sendNumber-i));user.setUserContact(new UserContact(userName+"@9527.com","133333333"));users[i]=user;}return users;}
}

核心就是通过MessagePack提供的处理器来处理数据。

执行结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/460308.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【开源】JAVA+Vue.js实现高校实验室管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、研究内容2.1 实验室类型模块2.2 实验室模块2.3 实验管理模块2.4 实验设备模块2.5 实验订单模块 三、系统设计3.1 用例设计3.2 数据库设计 四、系统展示五、样例代码5.1 查询实验室设备5.2 实验放号5.3 实验预定 六、免责说明 一、摘…

notepad++成功安装后默认显示英文怎么设置中文界面?

前几天使用电脑华为管家清理电脑后&#xff0c;发现一直使用的notepad软件变回了英文界面&#xff0c;跟刚成功安装的时候一样&#xff0c;那么应该怎么设置为中文界面呢&#xff1f;具体操作如下&#xff1a; 1、打开notepad软件&#xff0c;点击菜单栏“Settings – Prefere…

如何开始深度学习,从实践开始

将“如何开始深度学习”这个问题喂给ChatGPT和文心一言&#xff0c;会给出很有专业水准的答案&#xff0c;比如&#xff1a; 要开始深度学习&#xff0c;你可以遵循以下步骤&#xff1a; 学习Python编程语言的基础知识&#xff0c;因为它在深度学习框架中经常被使用。 熟悉线性…

利用Intersection Observer实现图片懒加载性能优化

ntersection Observer是浏览器所提供的一个 Javascript API&#xff0c;用于异步的检测目标元素以及祖先或者是顶级的文档视窗的交叉状态 这句话的意思就是&#xff1a; 我们可以看的图片当中&#xff0c;绿色的 target element&#xff08;目标元素&#xff09;&#xff0c;…

百度员工:纠结!31岁,我年薪78w,老婆50w,两人在深圳同一家公司。武汉有房,现在有机会回去,但会降薪15%左右。...

上一篇&#xff1a;《繁花》里宝总成功的秘诀&#xff0c;句句真理&#xff01; 深圳的一名31岁的百度员工&#xff0c;拥有高薪和舒适的生活&#xff0c;与妻子过着看似完美的日子。当他们准备迎接家庭的新阶段时&#xff0c;面临一个艰难的抉择&#xff1a;是在深圳继续追求事…

性能篇:如何解决高并发下 I/O 瓶颈?

我们可以有效地解决高并发下I/O瓶颈的问题&#xff0c;提升系统的性能。当然&#xff0c;实际场景中的优化可能涉及到更多的细节和技术&#xff0c;但希望这篇文章能为大家提供一些思路和方法。​ 引言 大家好&#xff0c;我是小米&#xff01;今天我们来聊一个在高并发场景…

2021年通信工程师初级 实务 真题

文章目录 一、第1章 现代通信网概述&#xff0c;通信网的定义。第10章 通信业务&#xff0c;普遍服务原则10.2.4 通信行业的发展趋势&#xff08;六化&#xff09; 二、第2章 传输网SDH帧结构SDH线路保护倒换&#xff0c;“11 保护”和“1:1保护”波长值λc/f&#xff0c;中心频…

Docker-Learn(三)创建镜像Docker(换源)

根据之前的内容基础&#xff0c;本小点的内容主要涉及到的内容是比较重要的文本Dockerfile 1. 编辑Dockerfile 启动命令行终端&#xff08;在自己的工作空间当中&#xff09;,创建和编辑Dockerfile。 vim Dockerfile然后写入以下内容 # 使用一个基础镜像 FROM ubuntu:late…

Verilog刷题笔记24

题目&#xff1a; Verilog has a ternary conditional operator ( ? : ) much like C: (condition ? if_true : if_false) This can be used to choose one of two values based on condition (a mux!) on one line, without using an if-then inside a combinational alwa…

vulnhub中Beelzebub靶机

渗透思路 一.信息收集1.网段探测2.端口探测3.常见漏洞扫描4.目录扫描5.web页面分析 二.渗透继续目录扫描ssh连接提权提权&#xff0c;flag 一.信息收集 1.网段探测 ┌──(root㉿kali)-[~] └─# nmap -Pn 192.168.0.0/24 --min-rate 10000 Starting …

【MySQL】学习和总结DCL的权限控制

&#x1f308;个人主页: Aileen_0v0 &#x1f525;热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法 ​&#x1f4ab;个人格言:“没有罗马,那就自己创造罗马~” #mermaid-svg-Bl9kYeLf8GfpdQgL {font-family:"trebuchet ms",verdana,arial,sans-serif;font-siz…

『运维备忘录』之 Ansible 自动化运维工具

一、简介 Ansible是基于Python开发&#xff0c;集合了众多运维工具&#xff08;puppet、cfengine、chef、func、fabric&#xff09;的优点&#xff0c;实现了批量系统配置、批量程序部署、批量运行命令等功能的自动化运维工具&#xff0c;广泛用于配置管理、应用部署以及任务协…