第 5 篇 : 多节点Netty服务端(可扩展)

说明

前面消息互发以及广播都是单机就可以完成测试, 但实际场景中客户端的连接数量很大, 那就需要有一定数量的服务端去支撑, 所以准备虚拟机测试。

1. 虚拟机准备

1.1 准备1个1核1G的虚拟机(160), 配置java环境, 安装redis和minio

1.2 准备6个1核1G的空虚拟机(161到166), 只需要java环境即可

2. 服务端改造

2.1 修改 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.4.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.hahashou.netty</groupId><artifactId>server</artifactId><version>1.0-SNAPSHOT</version><name>server</name><description>Netty Server Project For Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.100.Final</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.58</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.security</groupId><artifactId>spring-security-crypto</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

2.2 修改 application.yml (每个服务端的id是不一样的)

server:port: 32000spring:redis:host: 192.168.109.160port: 6379password: rootlogging:level:com.hahashou.netty: infonetty:server:# 唯一标识(与hosts文件里对应)id : netty-server-1# 客户端需要连接的端口port: 35000

2.3 config包下增加 NettyStatic类

package com.hahashou.netty.server.config;import io.netty.channel.Channel;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @description: 静态常量* @author: 哼唧兽* @date: 9999/9/21**/
public class NettyStatic {/** key: 用户code; value: channelId */public static Map<String, String> USER_CHANNEL = new ConcurrentHashMap<>(32);/** key: channelId; value: Channel */public static Map<String, Channel> CHANNEL = new ConcurrentHashMap<>(32);public static Map<String, NettyClientHandler> NETTY_CLIENT_HANDLER = new ConcurrentHashMap<>(32);public static Map<NettyClientHandler, NettyClient> NETTY_CLIENT = new ConcurrentHashMap<>(32);
}

2.4 config包下增加 RedisConfig类

package com.hahashou.netty.server.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** @description: Redis配置* @author: 哼唧兽* @date: 9999/9/21**/
@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();// 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());// 使用StringRedisSerializer来序列化和反序列化redis的keyredisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setHashKeySerializer(new StringRedisSerializer());// 开启事务:redisTemplate.setEnableTransactionSupport(true); 我觉得一般用不到(该操作是为了执行一组命令而设置的)redisTemplate.setConnectionFactory(redisConnectionFactory);return redisTemplate;}@Beanpublic ValueOperations<String, Object> redisOperation(RedisTemplate<String, Object> redisTemplate) {return redisTemplate.opsForValue();}public static String NETTY_SERVER_LOCK = "NETTY_SERVER_LOCK";public static String NETTY_SERVER_LIST = "NETTY_SERVER_LIST";public static String OFFLINE_MESSAGE = "OFFLINE_MESSAGE_";
}

2.5 修改 EventLoopGroupConfig类

package com.hahashou.netty.server.config;import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.RejectedExecutionHandlers;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;/*** @description: Netty线程组* @author: 哼唧兽* @date: 9999/9/21**/
@Configuration
public class EventLoopGroupConfig {private int bossNum = 1;private int workerNum = 4;private int businessNum = 1;private int maxPending = 100000;/** ------------------------------ 服务端 ------------------------------ */@Bean("bossGroup")public NioEventLoopGroup bossGroup() {return new NioEventLoopGroup(bossNum);}@Bean("workerGroup")public NioEventLoopGroup workerGroup() {return new NioEventLoopGroup(workerNum);}@Bean("businessGroup")public EventExecutorGroup businessGroup() {return new DefaultEventExecutorGroup(businessNum, new BusinessThreadFactory(),maxPending, RejectedExecutionHandlers.reject());}/** ------------------------------ 客户端 ------------------------------ */@Bean("clientWorkerGroup")public NioEventLoopGroup clientWorkerGroup() {return new NioEventLoopGroup(workerNum);}@Bean("clientBusinessGroup")public EventExecutorGroup clientBusinessGroup() {return new DefaultEventExecutorGroup(businessNum, new BusinessThreadFactory(), maxPending, RejectedExecutionHandlers.reject());}static class BusinessThreadFactory implements ThreadFactory {private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;BusinessThreadFactory() {SecurityManager securityManager = System.getSecurityManager();group = (securityManager != null) ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();namePrefix = "netty-server-";}@Overridepublic Thread newThread(Runnable runnable) {Thread thread = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0);if (thread.isDaemon()) {thread.setDaemon(false);}if (thread.getPriority() != Thread.NORM_PRIORITY) {thread.setPriority(Thread.NORM_PRIORITY);}return thread;}}
}

2.6 config包下增加 SpringBean类

package com.hahashou.netty.server.config;import io.netty.util.HashedWheelTimer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;/*** @description: Spring Bean管理* @author: 哼唧兽* @date: 9999/9/21**/
@Configuration
public class SpringBean {@Beanpublic PasswordEncoder passwordEncoder() {return new BCryptPasswordEncoder();}/*** 最多能new64个, private static final int INSTANCE_COUNT_LIMIT = 64;* @return*/@Beanpublic HashedWheelTimer hashedWheelTimer() {// 默认tick间隔100毫秒, 轮子大小为512return new HashedWheelTimer();}
}

2.7 server包下增加 ApplicationInitial类

package com.hahashou.netty.server;import com.hahashou.netty.server.config.NettyServer;
import io.netty.util.HashedWheelTimer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;/*** @description: 应用初始化* @author: 哼唧兽* @date: 9999/9/21**/
@Component
@Slf4j
public class ApplicationInitial implements ApplicationRunner {@Resourceprivate HashedWheelTimer hashedWheelTimer;@Resourceprivate NettyServer nettyServer;@Overridepublic void run(ApplicationArguments args) {hashedWheelTimer.newTimeout(nettyServer, 1 , TimeUnit.SECONDS);}
}

2.8 修改 Message类

package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
import lombok.Data;
import lombok.Getter;import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;/*** @description:* @author: 哼唧兽* @date: 9999/9/21**/
@Data
public class Message {/** 广播秘钥 */private String secretKey;/** 发送者用户code */private String userCode;/** 中转的服务端Id */private String serverId;/** 接收者用户code */private String friendUserCode;/** 连接时专用 */private String channelId;/** 消息类型 */private Integer type;public enum TypeEnum {TEXT(0, "文字", "", new ArrayList<>()),IMAGE(1, "图片", "image", Arrays.asList("bmp", "gif", "jpeg", "jpg", "png")),VOICE(2, "语音", "voice", Arrays.asList("mp3", "amr", "flac", "wma", "aac")),VIDEO(3, "视频", "video", Arrays.asList("mp4", "avi", "rmvb", "flv", "3gp", "ts", "mkv")),;@Getterprivate Integer key;@Getterprivate String describe;@Getterprivate String bucketName;@Getterprivate List<String> formatList;TypeEnum(int key, String describe, String bucketName, List<String> formatList) {this.key = key;this.describe = describe;this.bucketName = bucketName;this.formatList = formatList;}public static TypeEnum select(String format) {TypeEnum result = null;for (TypeEnum typeEnum : TypeEnum.values()) {if (typeEnum.getFormatList().contains(format)) {result = typeEnum;break;}}return result;}}/** 文字或文件的全路径名称 */private String text;public static ByteBuf transfer(Message message) {return Unpooled.copiedBuffer(JSON.toJSONString(message), CharsetUtil.UTF_8);}/*** 生成指定长度的随机字符串* @param length* @return*/public static String randomString (int length) {if (length > 64) {length = 64;}List<String> list = new ArrayList<>();for (int i = 0; i < 10; i++) {list.add(i + "");}for (char i = 'A'; i <= 'Z'; i++) {list.add(String.valueOf(i));}for (char i = 'a'; i <= 'z'; i++) {list.add(String.valueOf(i));}list.add("α");list.add("ω");Collections.shuffle(list);String string = list.toString();return string.replace("[", "").replace("]", "").replace(", ", "").substring(0, length);}
}

2.9 config包下增加 NettyClientHandler类

package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.util.StringUtils;import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;/*** @description:* @author: 哼唧兽* @date: 9999/9/21**/
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Getter@Setterprivate String userCode;@Getter@Setterprivate String hostName;@Getter@Setterprivate int port;@Resourceprivate ValueOperations<String, Object> redisOperation;@Overridepublic void channelActive(ChannelHandlerContext ctx) {log.info("{}, 作为客户端, 与其他服务端连接", LocalDateTime.now());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {NettyStatic.CHANNEL.remove(ctx.channel().id().asLongText());NettyClientHandler nettyClientHandler = NettyStatic.NETTY_CLIENT_HANDLER.remove(hostName + "@" + port);NettyClient nettyClient = NettyStatic.NETTY_CLIENT.remove(nettyClientHandler);nettyClient = null;nettyClientHandler = null;System.gc();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {if (msg != null) {Message message = JSON.parseObject(msg.toString(), Message.class);String channelId = message.getChannelId(),text = message.getText();if (StringUtils.hasText(channelId)) {Channel channel = ctx.channel();message.setUserCode(userCode);NettyStatic.USER_CHANNEL.put(hostName, channelId);NettyStatic.CHANNEL.put(channelId, channel);channel.writeAndFlush(Message.transfer(message));} else if (StringUtils.hasText(text)) {String friendUserCode = message.getFriendUserCode();if (StringUtils.hasText(message.getServerId())) {String queryChannelId = NettyStatic.USER_CHANNEL.get(friendUserCode);if (StringUtils.hasText(queryChannelId)) {Channel channel = NettyStatic.CHANNEL.get(queryChannelId);if (channel == null) {offlineMessage(friendUserCode, message);return;}// 此时, 已不需要serverIdmessage.setServerId(null);channel.writeAndFlush(Message.transfer(message));} else {offlineMessage(friendUserCode, message);}}}}}/*** 离线消息存储Redis* @param friendUserCode* @param message*/public void offlineMessage(String friendUserCode, Message message) {List<Message> messageList = new ArrayList<>();Object offlineMessage = redisOperation.get(RedisConfig.OFFLINE_MESSAGE + friendUserCode);if (offlineMessage != null) {messageList = JSON.parseArray(offlineMessage.toString(), Message.class);}messageList.add(message);redisOperation.set(RedisConfig.OFFLINE_MESSAGE + friendUserCode, JSON.toJSONString(messageList));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {}
}

2.10 config包下增加 NettyClient类

package com.hahashou.netty.server.config;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;import javax.annotation.PreDestroy;
import java.net.*;
import java.nio.charset.Charset;/*** @description: Netty-客户端TCP服务* @author: 哼唧兽* @date: 9999/9/21**/
@Slf4j
public class NettyClient {@Getter@Setterprivate NioEventLoopGroup clientWorkerGroup;@Getter@Setterprivate EventExecutorGroup clientBusinessGroup;public void createClient(NettyClientHandler nettyClientHandler) {Bootstrap bootstrap = new Bootstrap();bootstrap.group(clientWorkerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));pipeline.addLast(clientBusinessGroup, nettyClientHandler);}});try {InetAddress inetAddress = InetAddress.getByName(nettyClientHandler.getHostName());SocketAddress socketAddress = new InetSocketAddress(inetAddress, nettyClientHandler.getPort());bootstrap.connect(socketAddress).sync().channel();} catch (UnknownHostException exception) {log.error("请检查hosts文件是否配置正确 : {}", exception.getMessage());} catch (InterruptedException exception) {log.error("客户端中断异常 : {}", exception.getMessage());}}@PreDestroypublic void destroy() {clientWorkerGroup.shutdownGracefully().syncUninterruptibly();log.info("客户端关闭成功");}
}

2.11 修改 NettyServer类

package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;/*** @description: Netty-服务端TCP服务* @author: 哼唧兽* @date: 9999/9/21**/
@Component
@Slf4j
public class NettyServer implements TimerTask {@Value("${netty.server.id}")private String serverId;@Value("${netty.server.port}")private int port;@Resourceprivate NioEventLoopGroup bossGroup;@Resourceprivate NioEventLoopGroup workerGroup;@Resourceprivate EventExecutorGroup businessGroup;@Resourceprivate NettyServerHandler nettyServerHandler;@Resourceprivate NioEventLoopGroup clientWorkerGroup;@Resourceprivate EventExecutorGroup clientBusinessGroup;@Resourceprivate RedisTemplate<String, Object> redisTemplate;@Resourceprivate ValueOperations<String, Object> redisOperation;@Resourceprivate HashedWheelTimer hashedWheelTimer;@Overridepublic void run(Timeout timeout) {Object nettyServerLock = redisOperation.get(RedisConfig.NETTY_SERVER_LOCK);if (nettyServerLock != null) {hashedWheelTimer.newTimeout(this, 10, TimeUnit.SECONDS);return;}try {redisOperation.set(RedisConfig.NETTY_SERVER_LOCK, true);//String hostAddress = InetAddress.getLocalHost().getHostAddress();ServerBootstrap serverBootstrap = new ServerBootstrap();ChannelFuture channelFuture = serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));pipeline.addLast(businessGroup, nettyServerHandler);}})// 服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数.option(ChannelOption.SO_BACKLOG, 1024).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true)// 此处有个大坑, 详见文章脱坑指南.bind(port).sync();if (channelFuture.isSuccess()) {log.info("{} 启动成功", serverId);redisTemplate.delete(RedisConfig.NETTY_SERVER_LOCK);}thisNodeHandle(port);channelFuture.channel().closeFuture().sync();} catch (InterruptedException exception) {log.error("{} 启动失败: {}", serverId, exception.getMessage());} finally {redisTemplate.delete(RedisConfig.NETTY_SERVER_LOCK);}}private void thisNodeHandle(int port) {Set<String> nodeList = new HashSet<>();Object nettyServerList = redisOperation.get(RedisConfig.NETTY_SERVER_LIST);if (nettyServerList != null) {nodeList = new HashSet<>(JSON.parseArray(nettyServerList.toString(), String.class));for (String hostAndPort : nodeList) {String[] split = hostAndPort.split("@");String connectHost = split[0];int connectPort = Integer.parseInt(split[1]);NettyClient nettyClient = new NettyClient();nettyClient.setClientWorkerGroup(clientWorkerGroup);nettyClient.setClientBusinessGroup(clientBusinessGroup);NettyClientHandler nettyClientHandler = new NettyClientHandler();nettyClientHandler.setUserCode(serverId);nettyClientHandler.setHostName(connectHost);nettyClientHandler.setPort(connectPort);nettyClient.createClient(nettyClientHandler);NettyStatic.NETTY_CLIENT_HANDLER.put(connectHost + "@" + connectPort, nettyClientHandler);NettyStatic.NETTY_CLIENT.put(nettyClientHandler, nettyClient);}}nodeList.add(serverId + "@" + port);redisOperation.set(RedisConfig.NETTY_SERVER_LIST, JSON.toJSONString(nodeList));}public void stop() {bossGroup.shutdownGracefully().syncUninterruptibly();workerGroup.shutdownGracefully().syncUninterruptibly();log.info("TCP服务关闭成功");Object nettyServerList = redisOperation.get(RedisConfig.NETTY_SERVER_LIST);List<String> hostList = JSON.parseArray(nettyServerList.toString(), String.class);hostList.remove(serverId + "@" + port);if (CollectionUtils.isEmpty(hostList)) {redisTemplate.delete(RedisConfig.NETTY_SERVER_LIST);} else {redisOperation.set(RedisConfig.NETTY_SERVER_LIST, JSON.toJSONString(hostList));}}@PreDestroypublic void destroy() {stop();}
}

2.12 修改 NettyServerHandler类

package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;/*** @description:* @author: 哼唧兽* @date: 9999/9/21**/
@Component
@ChannelHandler.Sharable
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {@Value("${netty.server.id}")private String serverId;public static String SERVER_PREFIX = "netty-server-";@Resourceprivate RedisTemplate<String, Object> redisTemplate;@Resourceprivate ValueOperations<String, Object> redisOperation;@Overridepublic void channelActive(ChannelHandlerContext ctx) {Channel channel = ctx.channel();String channelId = channel.id().asLongText();log.info("有客户端连接, channelId : {}", channelId);NettyStatic.CHANNEL.put(channelId, channel);Message message = new Message();message.setChannelId(channelId);channel.writeAndFlush(Message.transfer(message));}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {String channelId = ctx.channel().id().asLongText();log.info("有客户端断开连接, channelId : {}", channelId);NettyStatic.CHANNEL.remove(channelId);for (Map.Entry<String, String> entry : NettyStatic.USER_CHANNEL.entrySet()) {if (entry.getValue().equals(channelId)) {redisTemplate.delete(entry.getKey());break;}}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {if (msg != null) {Message message = JSON.parseObject(msg.toString(), Message.class);String userCode = message.getUserCode(),channelId = message.getChannelId(),friendUserCode = message.getFriendUserCode();if (StringUtils.hasText(userCode) && StringUtils.hasText(channelId)) {connect(userCode, channelId);} else if (StringUtils.hasText(message.getText())) {Object code = redisOperation.get(friendUserCode);if (code != null) {String queryServerId = code.toString();message.setServerId(serverId.equals(queryServerId) ? null : queryServerId);if (StringUtils.hasText(friendUserCode)) {sendOtherClient(message);} else {sendAdmin(ctx.channel(), message);}} else {offlineMessage(friendUserCode, message);}}}}/*** 建立连接* @param userCode* @param channelId*/private void connect(String userCode, String channelId) {log.info("{} 连接", userCode);NettyStatic.USER_CHANNEL.put(userCode, channelId);if (!userCode.startsWith(SERVER_PREFIX)) {redisOperation.set(userCode, serverId);}}/*** 发送给其他客户端* @param message*/private void sendOtherClient(Message message) {String friendUserCode = message.getFriendUserCode(),serverId = message.getServerId();String queryChannelId;if (StringUtils.hasText(serverId)) {log.info("向" + serverId + " 进行转发");queryChannelId = NettyStatic.USER_CHANNEL.get(serverId);} else {queryChannelId = NettyStatic.USER_CHANNEL.get(friendUserCode);}if (StringUtils.hasText(queryChannelId)) {Channel channel = NettyStatic.CHANNEL.get(queryChannelId);if (channel == null) {offlineMessage(friendUserCode, message);return;}channel.writeAndFlush(Message.transfer(message));} else {offlineMessage(friendUserCode, message);}}/*** 离线消息存储Redis* @param friendUserCode* @param message*/public void offlineMessage(String friendUserCode, Message message) {// 1条message在redis中大概是100B, 1万条算1M, redis.conf的maxmemory设置的是256MList<Message> messageList = new ArrayList<>();Object offlineMessage = redisOperation.get(RedisConfig.OFFLINE_MESSAGE + friendUserCode);if (offlineMessage != null) {messageList = JSON.parseArray(offlineMessage.toString(), Message.class);}messageList.add(message);redisOperation.set(RedisConfig.OFFLINE_MESSAGE + friendUserCode, JSON.toJSONString(messageList));}/*** 发送给服务端* @param channel* @param message*/private void sendAdmin(Channel channel, Message message) {message.setUserCode("ADMIN");message.setText(LocalDateTime.now().toString());channel.writeAndFlush(Message.transfer(message));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.info("有客户端发生异常, channelId : {}", ctx.channel().id().asLongText());}
}

2.13 新建service包, 并新增 ServerService接口

package com.hahashou.netty.server.service;import com.hahashou.netty.server.config.Message;/*** @description:* @author: 哼唧兽* @date: 9999/9/21**/
public interface ServerService {/*** 发送消息* @param dto*/void send(Message dto);/*** 停止服务(为后续断线重连做准备)*/void stop();
}

2.14 service包下新建impl包, 并新增 ServerServiceImpl类

package com.hahashou.netty.server.service.impl;import com.alibaba.fastjson.JSON;
import com.hahashou.netty.server.config.*;
import com.hahashou.netty.server.service.ServerService;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;/*** @description:* @author: 哼唧兽* @date: 9999/9/21**/
@Service
@Slf4j
public class ServerServiceImpl implements ServerService {@Value("${netty.server.id}")private String serverId;@Resourceprivate PasswordEncoder passwordEncoder;@Resourceprivate ValueOperations<String, Object> redisOperation;@Resourceprivate NettyServer nettyServer;@Overridepublic void send(Message dto) {String friendUserCode = dto.getFriendUserCode();if (StringUtils.hasText(friendUserCode)) {Object code = redisOperation.get(friendUserCode);if (code != null) {String queryServerId = code.toString();dto.setServerId(serverId.equals(queryServerId) ? null : queryServerId);if (StringUtils.hasText(friendUserCode)) {sendOtherClient(dto);}} else {offlineMessage(friendUserCode, dto);}} else {// 全体广播, 需要校验秘钥(inputSecretKey应该是一个动态值, 通过手机+验证码每次广播时获取, 自行实现)String inputSecretKey = dto.getSecretKey();// encodedPassword生成见main方法String encodedPassword = "$2a$10$J/UEqtme/w2D0TWB4gJKFeSsyc3s8pepr6ahzOsORkC9zpaLSvZbG";if (StringUtils.hasText(inputSecretKey) && passwordEncoder.matches(inputSecretKey, encodedPassword)) {dto.setSecretKey(null);for (Map.Entry<String, String> entry : NettyStatic.USER_CHANNEL.entrySet()) {String key = entry.getKey();if (key.startsWith(NettyServerHandler.SERVER_PREFIX)) {// 这里可以用http调用其他服务端, 自行补充(信息redis都有)continue;}// 只处理连接本端的客户端String value = entry.getValue();Channel channel = NettyStatic.CHANNEL.get(value);if (channel == null) {offlineMessage(friendUserCode, dto);return;}channel.writeAndFlush(Message.transfer(dto));}}}}public static void main(String[] args) {String text = "uTωAoJIGBcy7piYCFgQntVvEh8RH6WMU";PasswordEncoder passwordEncoder = new BCryptPasswordEncoder();String encode = passwordEncoder.encode(text);log.info(encode);if (passwordEncoder.matches(text, encode)) {log.info("秘钥正确");}}/*** 发送给其他客户端* @param message*/private void sendOtherClient(Message message) {String friendUserCode = message.getFriendUserCode(),serverId = message.getServerId();String queryChannelId;if (StringUtils.hasText(serverId)) {log.info("向" + serverId + " 进行转发");queryChannelId = NettyStatic.USER_CHANNEL.get(serverId);} else {queryChannelId = NettyStatic.USER_CHANNEL.get(friendUserCode);}if (StringUtils.hasText(queryChannelId)) {Channel channel = NettyStatic.CHANNEL.get(queryChannelId);if (channel == null) {offlineMessage(friendUserCode, message);return;}channel.writeAndFlush(Message.transfer(message));} else {offlineMessage(friendUserCode, message);}}/*** 离线消息存储Redis* @param friendUserCode* @param message*/public void offlineMessage(String friendUserCode, Message message) {List<Message> messageList = new ArrayList<>();Object offlineMessage = redisOperation.get(RedisConfig.OFFLINE_MESSAGE + friendUserCode);if (offlineMessage != null) {messageList = JSON.parseArray(offlineMessage.toString(), Message.class);}messageList.add(message);redisOperation.set(RedisConfig.OFFLINE_MESSAGE + friendUserCode, JSON.toJSONString(messageList));}@Overridepublic void stop() {nettyServer.stop();}
}

2.15 修改 ServerController类

package com.hahashou.netty.server.controller;import com.hahashou.netty.server.config.Message;
import com.hahashou.netty.server.service.ServerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;import javax.annotation.Resource;/*** @description:* @author: 哼唧兽* @date: 9999/9/21**/
@RestController
@RequestMapping("/server")
@Slf4j
public class ServerController {@Resourceprivate ServerService serverService;/*** 秘钥记录: uTωAoJIGBcy7piYCFgQntVvEh8RH6WMU* @param dto* @return*/@PostMapping("/send")public String send(@RequestBody Message dto) {serverService.send(dto);return "success";}@GetMapping("/stop")public String stop() {serverService.stop();return "stop netty success";}
}

3. 脱坑指南, 针对 NettyServer类

工具

yum -y install net-tools
netstat -tunlp

防火墙打开时, 当使用 bind(String inetHost, int inetPort) 方法时, 因为inetHost是127.0.0.1, 所以只有本机可以访问35000, 要想让其他机器可以连接到, 需使用 bind(int inetPort) 方法, 下图是前后两次端口占用情况
端口占用情况
结论
当使用bind(String inetHost, int inetPort)方法时, 无论防火墙关闭以及启动, 虚拟机均有问题; 但当机器有公网IP, 且防火墙关闭或端口开放时, 通过DNS解析映射是没有问题的, 建议还是用bind(int inetPort)方法

4. 服务端准备

4.1 打包3个服务端的jar包, id分别为netty-server-1、netty-server-2、netty-server-3, 分别放在161到163上

4.2 161、162、163端口开放

firewall-cmd --zone=public --add-port=35000/tcp --permanent
firewall-cmd --zone=public --add-port=32000/tcp --permanent
firewall-cmd --reload

4.3 161、162、163修改hosts

vi /etc/hosts

追加内容

192.168.109.161 netty-server-1
192.168.109.162 netty-server-2
192.168.109.163 netty-server-3

4.4 依次启动161、162、163

java -Dfile.encoding=UTF-8 -jar server-1.0-SNAPSHOT.jar

161
服务端1启动
162
服务端2启动
163
服务端3启动
redis中记录的服务列表
redis中记录的服务列表

5. 客户端改造

5.1 修改 application.yml

server:port: 32001logging:level:com.hahashou.netty: infospring:servlet:multipart:max-file-size: 128MBmax-request-size: 256MBuserCode: Aa
host: 192.168.109.161minio:endpoint: http://192.168.109.160:9000accessKey: rootsecretKey: root123456

5.2 修改 NettyClient类

package com.hahashou.netty.client.config;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.Charset;/*** @description: Netty-TCP服务* @author: 哼唧兽* @date: 9999/9/21**/
@Component
@Slf4j
public class NettyClient implements ApplicationListener<ApplicationStartedEvent> {@Value("${host}")private String host;public static int PORT = 35000;@Resourceprivate NioEventLoopGroup workerGroup;@Resourceprivate EventExecutorGroup businessGroup;@Resourceprivate NettyClientHandler nettyClientHandler;public static Channel CHANNEL;@SneakyThrows@Overridepublic void onApplicationEvent(ApplicationStartedEvent event) {createClient(workerGroup, businessGroup, nettyClientHandler, host, PORT);}public void createClient(NioEventLoopGroup workerGroup, EventExecutorGroup businessGroup,NettyClientHandler nettyClientHandler, String host, int port) {Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));pipeline.addLast(businessGroup, nettyClientHandler);}});try {CHANNEL = bootstrap.connect(host, port).sync().channel();} catch (InterruptedException exception) {log.error("客户端中断异常 : {}", exception.getMessage());}}@PreDestroypublic void destroy() {workerGroup.shutdownGracefully().syncUninterruptibly();log.info("客户端关闭成功");}
}

6. 客户端准备

6.1 准备6个jar包, 修改application.yml, 并根据下述规则放到对应机器上

Aa放在163上, Bb放在164上, Cc放在165上, Dd放在166上, Ee放在161上, Ff放在162上

userCode: Aa
host: 192.168.109.161
userCode: Bb
host: 192.168.109.161
userCode: Cc
host: 192.168.109.162
userCode: Dd
host: 192.168.109.162
userCode: Ee
host: 192.168.109.163
userCode: Ff
host: 192.168.109.163

6.2 161到166端口开放

firewall-cmd --zone=public --add-port=32001/tcp --permanent
firewall-cmd --reload

6.3 启动所有客户端

AB连接
CD连接
EF连接

7. 测试

请求参数

7.1 两个客户端连同一服务端, 不会出现转发

Aa向Bb发送消息, 且Bb收到后回复Aa
Aa向Bb
Bb向Aa

7.2 两个客户端连不同服务端

Aa向Cc发送消息(通过服务端1转发到服务端2), 且Cc收到后回复Aa(通过服务端2转发到服务端1)
A到C的转发
Aa向CcC到A的转发
Cc向Aa
Aa向Ee发送消息, 且Ee收到后回复Aa
Aa向Ee
Ee向Aa

7.3 广播

广播请求参数
收到广播

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/700318.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

现在闪侠惠递寄快递有福利了,千万不要因没把握住而后悔呀!

闪侠惠递平台寄快递现在真的是太便宜了&#xff0c;优惠价格把握不住&#xff0c;后悔都来不及&#xff01;大家可以在闪侠惠递上面寄快递&#xff0c;价格真的非常优惠呢&#xff0c;比咱们平常寄快递的价格都优惠呢&#xff0c;真的&#xff0c;小编都亲自替大家尝试过了呢。…

【Java基础】我不允许还有人搞不懂lambda表达式!!!

λ希腊字母表中排序第十一位的字母避免匿名内部类定义过多&#xff0c;使得代码更加简洁其实质属于函数式编程的概念 (params)->expression[表达式] (params)->statement[语句] (params)->{statements}lambda表达式推导过程&#xff1a; 创建一个类&#xff0c;重写接…

【大数据·hadoop】在hdfs上运行shell基本常用命令

一、准备工作 1.1格式化并启动Hadoop服务 参见Hadoop在ubuntu虚拟机上的伪分布式部署|保姆级教程的4.7节 二、HDFS常用命令 接着&#xff0c;就愉快地在刚刚的命令行里敲命令啦 1.显示hdfs目录结构 hadoop fs -ls -R /hadoop fs: 这是Hadoop文件系统命令行的一部分&#x…

36. 有效的数独 - 力扣(LeetCode)

基础知识要求&#xff1a; Java&#xff1a;方法、for循环、if判断、数组 Python&#xff1a; 方法、for循环、if判断、列表、集合 题目&#xff1a; 请你判断一个 9 x 9 的数独是否有效。只需要 根据以下规则 &#xff0c;验证已经填入的数字是否有效即可。 数字 1-9 在每一…

SystemC学习使用记录

一、概述 对于复杂的片上系统&#xff0c;在进行RTL编码前&#xff0c;需进行深入的系统级仿真&#xff0c;以确认设计的体系结构是否恰当、总线是否能满足吞吐量和实现性要求以及存储器是否浪费&#xff0c;所进行的这些仿真要求在芯片的仿真模型上运行大量的软件&#xff0c…

LVS负载均衡超详细入门介绍

LVS 一、LVS入门介绍 1.1.LVS负载均衡简介 1.2.负载均衡的工作模式 1.2.1.地址转换NAT&#xff08;Network Address Translation&#xff09; 1.2.2.IP隧道TUN&#xff08;IP Tunneling&#xff09; 1.2.3.直接路由DR&#xff08;Direct Routing&#xff09; 1.3.…

版本控制:软件开发的基石(一文读懂版本控制)

未经允许&#xff0c;禁止转载&#xff01; 在现代软件开发中&#xff0c;版本控制是不可或缺的工具。它帮助开发者跟踪和管理代码的变化&#xff0c;协作完成项目&#xff0c;并确保代码的完整性和安全性。本文将基于Git官网的视频“什么是版本控制”来深入探讨版本控制的基本…

macOS Sonoma 14.5(23F79)发布

系统介绍 黑果魏叔5 月 14 日快报&#xff0c;苹果今日向 Mac 电脑用户推送了 macOS 14.5 正式版更新&#xff08;内部版本号&#xff1a;23F79 同 RC&#xff09;。这是去年 9 月发布的 macOS Sonoma 操作系统的第五次更新&#xff0c;距离上一次的 macOS Sonoma 14.4 更新已…

在React中利用Postman测试代码获取数据

文章目录 概要名词解释1、Postman2、axios 使用Postman测试API在React中获取并展示数据小结 概要 在Web开发中&#xff0c;通过API获取数据是一项常见任务。Postman是一个功能强大的工具&#xff0c;可以帮助开发者测试API&#xff0c;并查看API的响应数据。在本篇博客中&…

Windows snmp++获取本地主机信息

编译snmp的包 调用snmp.lib实现信息获取_哔哩哔哩_bilibili 代码&#xff1a; #include <iostream> #include <libsnmp.h> #include <vector> #include <fstream> #include <string> #include "snmp_pp/snmp_pp.h" //#define _NO_L…

C语言错题本之<结构体>

以下叙述中正确的是________. A)预处理命令行必须位于源文件的开头 B)在源文件的一行上可以有多条预处理命令 C)宏名必须用大写字母表示 D)宏替换不占用程序的运行时间 答案&#xff1a;D 解析&#xff1a; A&#xff1a;在C、C等编程语言中&#xff0c;预处理指令&#xff08;…

两小时看完花书(深度学习入门篇)

1.深度学习花书前言 机器学习早期的时候十分依赖于已有的知识库和人为的逻辑规则&#xff0c;需要人们花大量的时间去制定合理的逻辑判定&#xff0c;可以说是有多少人工&#xff0c;就有多少智能。后来逐渐发展出一些简单的机器学习方法例如logistic regression、naive bayes等…