第 3 篇 : Netty离线消息处理(可跳过)

说明

仅是个人的不成熟想法, 未深入研究验证

1. 修改 NettyServerHandler类

package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.HashedWheelTimer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;/*** @description:* @author: 哼唧兽* @date: 9999/9/21**/
@Component
@ChannelHandler.Sharable
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {/** key: 用户code; value: channelId */public static Map<String, String> USER_CHANNEL = new ConcurrentHashMap<>(32);/** key: channelId; value: Channel */public static Map<String, Channel> CHANNEL = new ConcurrentHashMap<>(32);/** 最好是单独写个单例(注意: 最多只能new 64个此类对象) */public static HashedWheelTimer TIMER;@Overridepublic void channelActive(ChannelHandlerContext ctx) {Channel channel = ctx.channel();String channelId = channel.id().asLongText();log.info("有客户端连接, channelId : {}", channelId);CHANNEL.put(channelId, channel);Message message = new Message();message.setChannelId(channelId);channel.writeAndFlush(Message.transfer(message));}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {log.info("有客户端断开连接, channelId : {}", ctx.channel().id().asLongText());CHANNEL.remove(ctx.channel().id().asLongText());}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {if (msg != null) {Message message = JSON.parseObject(msg.toString(), Message.class);String userCode = message.getUserCode(),channelId = message.getChannelId();if (StringUtils.hasText(userCode) && StringUtils.hasText(channelId)) {connect(userCode, channelId);} else if (StringUtils.hasText(message.getText())) {if (StringUtils.hasText(message.getFriendUserCode())) {sendOtherClient(message);} else {sendAdmin(ctx.channel(), message);}}}}/*** 建立连接* @param userCode* @param channelId*/private void connect(String userCode, String channelId) {log.info("客户端 {} 连接", userCode);USER_CHANNEL.put(userCode, channelId);if (TIMER == null) {// 默认的时间轮是100毫秒的tick间隔, 0.1秒的误差TIMER = new HashedWheelTimer();}TIMER.newTimeout(new OfflineMessage(userCode), 1, TimeUnit.SECONDS);}/*** 发送给其他客户端* @param message*/private void sendOtherClient(Message message) {String friendUserCode = message.getFriendUserCode();String queryChannelId = USER_CHANNEL.get(friendUserCode);if (StringUtils.hasText(queryChannelId)) {Channel channel = CHANNEL.get(queryChannelId);if (channel == null) {offlineMessage(friendUserCode, message);return;}channel.writeAndFlush(Message.transfer(message));} else {offlineMessage(friendUserCode, message);}}/*** 离线消息存储* @param friendUserCode* @param message*/public void offlineMessage(String friendUserCode, Message message) {List<Message> messageList = OfflineMessage.USER_MESSAGE.get(friendUserCode);if (CollectionUtils.isEmpty(messageList)) {messageList = new ArrayList<>();}messageList.add(message);OfflineMessage.USER_MESSAGE.put(friendUserCode, messageList);}/*** 发送给服务端* @param channel* @param message*/private void sendAdmin(Channel channel, Message message) {message.setUserCode("ADMIN");message.setText(LocalDateTime.now().toString());channel.writeAndFlush(Message.transfer(message));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.info("有客户端发生异常, channelId : {}", ctx.channel().id().asLongText());}
}

2. config包下增加 OfflineMessage类

package com.hahashou.netty.server.config;import io.netty.channel.Channel;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @description: 离线消息* @author: 哼唧兽* @date: 9999/9/21**/
@Slf4j
public class OfflineMessage implements TimerTask {public static Map<String, List<Message>> USER_MESSAGE = new ConcurrentHashMap<>(32);private String userCode;public OfflineMessage(String userCode) {this.userCode = userCode;}@Overridepublic void run(Timeout timeout) {List<Message> messageList = USER_MESSAGE.get(userCode);if (CollectionUtils.isEmpty(messageList)) {return;}log.info("向 {} 推送离线消息", userCode);Channel channel = NettyServerHandler.CHANNEL.get(NettyServerHandler.USER_CHANNEL.get(userCode));for (Message offlineMessage : messageList) {channel.writeAndFlush(Message.transfer(offlineMessage));}}
}

3. 启动服务端以及客户端A, 发送几条离线消息

A客户端发送离线消息
之后, 启动客户端B接收离线消息。启动/停止了4次, 得到如下4个结果
1
结果1
2
结果2
3
结果3
4
结果4
可以看出, 得到的离线消息并不可靠, 虽然有2次结果一致。而且在这之前, 有一次启动时, 根本就是1条消息都没有, 我都一度怀疑我写的有问题

4. 个人猜想

因为是异步的, netty发送消息时, 轮询策略应该有个时间轮管理着, 且时间轮是有tick间隔的。java中for循环的执行效率大概是10个循环耗时1毫秒, 0.001秒, 如果在for循环中增加线程sleep, 或许就都能执行到, 所以我在OfflineMessage类中for循环中增加50毫秒的slepp, 5次测试结果一致, 后将50改成10, 5次测试结果一致。虽然测试没有问题, 但毕竟测试量太少, 且我觉得离线消息应该是能通过接口一次性就获取到, 所以这种通过netty获取离线消息的方式, 我不赞同

for (Message offlineMessage : messageList) {// 异常向上抛出或捕获Thread.sleep(50);channel.writeAndFlush(Message.transfer(offlineMessage));
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/651501.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CRM客户管理系统盘点2024:16款顶级系统PK赛,寻找最佳利器

客户关系管理系统&#xff08;CRM&#xff09;在企业数字化转型的过程中扮演着至关重要的角色。选择一个高效、功能丰富的CRM客户管理系统&#xff0c;对于确保企业未来健康、稳定的发展至关重要。当前市场上存在着众多的CRM客户管理系统件&#xff0c;每个软件都有其独特的功能…

C#开发的全套成熟的LIS系统源码JavaScript+SQLserver 2012区域云LIS系统源码

C#开发的全套成熟的LIS系统源码JavaScriptSQLserver 2012区域云LIS系统源码 医院云LIS系统是一套成熟的实验室信息管理系统&#xff0c;目前已在多家三级级医院应用&#xff0c;并不断更新。云LIS系统是为病人为中心、以业务处理为基础、以提高检验科室管理水平和工作效率为目标…

攻防世界 easyphp

本题主要利用的知识点是php绕过 一、PHP代码分析 首先先看一下代码 我们需要利用get方式上传3个参数a,b,c&#xff0c;这3个分别需要满足不同的条件: a&#xff1a;设置a值&#xff1b;值大于6000000&#xff1b;长度不超过3&#xff1b; b&#xff1a;设置b值&#xff1b;MD…

MT8788智能模块简介_MTK联发科安卓核心板方案厂商

MT8788安卓核心板是一款具备超高性能和低功耗的4G全网通安卓智能模块。该模块采用联发科AIOT芯片平台&#xff0c;供货周期长。 MT8788核心板搭载了12nm制程的四个Cortex-A73处理器核心和四个Cortex-A53处理器核心&#xff0c;最高主频可达2.0GHz。板载内存容量可选为4GB64GB(也…

Linux2.6内核进程调度队列

目录 运行队列runqueue 活跃队列&过期队列 queue[140]&优先级&队列数组下标 bitmap[5]&O(1)调度算法 nr_active active指针和expired指针 O(1)调度算法之调度过程 本篇是Linux进程概念篇的最后一篇&#xff0c;Linux2.6内核是一个具体的/可行的/实际的存…

【计算机系统基础读书笔记】1.1.2 冯诺依曼机基本结构

1.1.2 冯诺依曼机基本结构 冯诺依曼机基本结构如图所示&#xff1a; 模型机中主要包括&#xff1a; 主存储器&#xff1a;用来存放指令和数据&#xff0c;简称主存或内存&#xff1b; 算数逻辑部件&#xff08;Arithmetic Logic Unit&#xff0c;简称ALU&#xff09;&#x…

SignalR中的重连机制和心跳监测机制详解

一. 重连机制 声明&#xff1a;   本节仅介绍重连机制和心跳监测机制&#xff0c;基于Core 3.1框架&#xff0c;至于SignalR其它的一些基本使用&#xff0c;包括引入、Hub、配置等常规操作&#xff0c;在本节中不介绍&#xff0c;后续写Core下的SignalR 说明   默认是没有重…

Log4j日志框架多种日志级别

Log4j日志框架定义了多种日志级别&#xff0c;这些级别按照优先级从高到低排列如下&#xff1a; OFF&#xff1a;这是最高等级的日志级别&#xff0c;用于关闭所有日志记录。FATAL&#xff1a;指出每个严重的错误事件将会导致应用程序的退出。ERROR&#xff1a;表明发生错误事…

C++之STL-list+模拟实现

目录 一、list的介绍和基本使用的方法 1.1 list的介绍 1.2 list的基本使用方法 1.2.1 构造方法 1.2.2 迭代器 1.2.3 容量相关的接口 1.2.4 增删查改的相关接口 1.3 关于list迭代器失效的问题 二、模拟实现list 2.1 节点类 2.2 迭代器类 2.3 主类list类 2.3.1 成员变…

JAVA12

JAVA12 1 概述2 语法层次的变化1_swich表达式(预览) 3 API层次的变化1_支持数字压缩格式化2_String新方法3_Files新增mismatch方法 4 关于GC方面的新特性1_Shenandoah GC&#xff1a;低停顿时间的GC&#xff08;预览&#xff09;2_可中断的 G1 Mixed GC3_ 增强G1 5 其他新特性简…

Git如何配合Github使用

1.安装Git https://git-scm.com/ ##2.配置 Git 安装完成后&#xff0c;你需要设置 Git 的用户名和邮箱地址&#xff0c;这样在提交代码时就能知道是谁提交的。你可以在命令行中输入以下命令来配置&#xff1a; git config --global user.name "Your Name" git con…

Orange3数据可视化(箱线图-离散属性分布)

箱线图(Box Plot) 又称为盒须图、盒式图、盒状图或箱线图&#xff0c;是一种用作显示一组数据分散情况资料的统计图。因图形如箱子&#xff0c;且在上下四分位数之外常有线条像胡须延伸出去而得名 箱线图可以显示属性值的分布,快速发现异常,例如重复的值,离群值等,挖掘数据的…