说明
仅是个人的不成熟想法, 未深入研究验证
1. 修改 NettyServerHandler类
package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.HashedWheelTimer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;/*** @description:* @author: 哼唧兽* @date: 9999/9/21**/
@Component
@ChannelHandler.Sharable
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {/** key: 用户code; value: channelId */public static Map<String, String> USER_CHANNEL = new ConcurrentHashMap<>(32);/** key: channelId; value: Channel */public static Map<String, Channel> CHANNEL = new ConcurrentHashMap<>(32);/** 最好是单独写个单例(注意: 最多只能new 64个此类对象) */public static HashedWheelTimer TIMER;@Overridepublic void channelActive(ChannelHandlerContext ctx) {Channel channel = ctx.channel();String channelId = channel.id().asLongText();log.info("有客户端连接, channelId : {}", channelId);CHANNEL.put(channelId, channel);Message message = new Message();message.setChannelId(channelId);channel.writeAndFlush(Message.transfer(message));}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {log.info("有客户端断开连接, channelId : {}", ctx.channel().id().asLongText());CHANNEL.remove(ctx.channel().id().asLongText());}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {if (msg != null) {Message message = JSON.parseObject(msg.toString(), Message.class);String userCode = message.getUserCode(),channelId = message.getChannelId();if (StringUtils.hasText(userCode) && StringUtils.hasText(channelId)) {connect(userCode, channelId);} else if (StringUtils.hasText(message.getText())) {if (StringUtils.hasText(message.getFriendUserCode())) {sendOtherClient(message);} else {sendAdmin(ctx.channel(), message);}}}}/*** 建立连接* @param userCode* @param channelId*/private void connect(String userCode, String channelId) {log.info("客户端 {} 连接", userCode);USER_CHANNEL.put(userCode, channelId);if (TIMER == null) {// 默认的时间轮是100毫秒的tick间隔, 0.1秒的误差TIMER = new HashedWheelTimer();}TIMER.newTimeout(new OfflineMessage(userCode), 1, TimeUnit.SECONDS);}/*** 发送给其他客户端* @param message*/private void sendOtherClient(Message message) {String friendUserCode = message.getFriendUserCode();String queryChannelId = USER_CHANNEL.get(friendUserCode);if (StringUtils.hasText(queryChannelId)) {Channel channel = CHANNEL.get(queryChannelId);if (channel == null) {offlineMessage(friendUserCode, message);return;}channel.writeAndFlush(Message.transfer(message));} else {offlineMessage(friendUserCode, message);}}/*** 离线消息存储* @param friendUserCode* @param message*/public void offlineMessage(String friendUserCode, Message message) {List<Message> messageList = OfflineMessage.USER_MESSAGE.get(friendUserCode);if (CollectionUtils.isEmpty(messageList)) {messageList = new ArrayList<>();}messageList.add(message);OfflineMessage.USER_MESSAGE.put(friendUserCode, messageList);}/*** 发送给服务端* @param channel* @param message*/private void sendAdmin(Channel channel, Message message) {message.setUserCode("ADMIN");message.setText(LocalDateTime.now().toString());channel.writeAndFlush(Message.transfer(message));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.info("有客户端发生异常, channelId : {}", ctx.channel().id().asLongText());}
}
2. config包下增加 OfflineMessage类
package com.hahashou.netty.server.config;import io.netty.channel.Channel;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @description: 离线消息* @author: 哼唧兽* @date: 9999/9/21**/
@Slf4j
public class OfflineMessage implements TimerTask {public static Map<String, List<Message>> USER_MESSAGE = new ConcurrentHashMap<>(32);private String userCode;public OfflineMessage(String userCode) {this.userCode = userCode;}@Overridepublic void run(Timeout timeout) {List<Message> messageList = USER_MESSAGE.get(userCode);if (CollectionUtils.isEmpty(messageList)) {return;}log.info("向 {} 推送离线消息", userCode);Channel channel = NettyServerHandler.CHANNEL.get(NettyServerHandler.USER_CHANNEL.get(userCode));for (Message offlineMessage : messageList) {channel.writeAndFlush(Message.transfer(offlineMessage));}}
}
3. 启动服务端以及客户端A, 发送几条离线消息
之后, 启动客户端B接收离线消息。启动/停止了4次, 得到如下4个结果
1
2
3
4
可以看出, 得到的离线消息并不可靠, 虽然有2次结果一致。而且在这之前, 有一次启动时, 根本就是1条消息都没有, 我都一度怀疑我写的有问题
4. 个人猜想
因为是异步的, netty发送消息时, 轮询策略应该有个时间轮管理着, 且时间轮是有tick间隔的。java中for循环的执行效率大概是10个循环耗时1毫秒, 0.001秒, 如果在for循环中增加线程sleep, 或许就都能执行到, 所以我在OfflineMessage类中for循环中增加50毫秒的slepp, 5次测试结果一致, 后将50改成10, 5次测试结果一致。虽然测试没有问题, 但毕竟测试量太少, 且我觉得离线消息应该是能通过接口一次性就获取到, 所以这种通过netty获取离线消息的方式, 我不赞同
for (Message offlineMessage : messageList) {// 异常向上抛出或捕获Thread.sleep(50);channel.writeAndFlush(Message.transfer(offlineMessage));
}