前言
这篇随笔属实没想到一个好名字,起因是在项目中遇到了一个springboot服务会发出多个socket服务的场景,而且我们使用的是socketIO服务,为了减少调试工作和重复的开发工作,让开发在项目中专注于业务编写,因此封装了一个在启动springboot服务时,自动创建socketIONamespace的逻辑
依赖
在使用此依赖时,我的项目版本为:
因为是要跟公司其他团队的架构保持一致,所以我们的socketIo的版本偏低,使用的是
<dependency> <groupId>com.corundumstudio.socketio</groupId> <artifactId>netty-socketio</artifactId> <version>1.7.19</version>
</dependency>
<dependency> <groupId>io.socket</groupId> <artifactId>socket.io-client</artifactId> <version>1.0.0</version>
</dependency>
逻辑
首先需要添加一个socketIo的配置类
import com.corundumstudio.socketio.HandshakeData;
import com.corundumstudio.socketio.SocketConfig;
import com.corundumstudio.socketio.SocketIOServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class SocketIoConfig {@Value("${socketio.port}") private Integer port; @Value("${socketio.bossCount}") private int bossCount; @Value("${socketio.workCount}") private int workCount; @Value("${socketio.allowCustomRequests}") private boolean allowCustomRequests; @Value("${socketio.upgradeTimeout}") private int upgradeTimeout; @Value("${socketio.pingTimeout}") private int pingTimeout; @Value("${socketio.pingInterval}") private int pingInterval; @Bean public SocketIOServer socketIoServer() { SocketConfig socketConfig = new SocketConfig(); socketConfig.setTcpNoDelay(true); socketConfig.setSoLinger(0);// 因为使用了Spring的com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration(); config.setSocketConfig(socketConfig);
// 设置授权监听器
config.setAuthorizationListener(new AuthorizationListener() {
@Override
public boolean isAuthorized(HandshakeData data) {
// 在这里添加你的授权逻辑
// 例如,检查握手数据中的 token
String token = data.getSingleUrlParam("token");
return "valid_token".equals(token);
}
});config.setPort(port);config.setBossThreads(bossCount);config.setWorkerThreads(workCount);config.setAllowCustomRequests(allowCustomRequests);config.setUpgradeTimeout(upgradeTimeout);config.setPingTimeout(pingTimeout);config.setPingInterval(pingInterval);return new SocketIOServer(config);}
}
这些类的配置,我是使用的 application.yml
进行管理,在 application.yml
中添加了相应的配置.
对于授权监听器这块的逻辑,我一直没有真正的使用起来。我对于token的处理是放在了建立socket链接后进行处理
需要添加一个策略接口 SocketIoStrategy
和一个初始化加载的类 SocketInitHandle
public interface SocketIoStrategy { /** * 定义命名空间的Url * * @return 命名空间url */String defineNamespaceUrl(); /** * 链接后方法 * * @param client socket客户端信息 */ void connected(SocketIOClient client);/** * 自定义监听器 * * @param socketIoNamespace socketIo命名空间 */ void customListener(SocketIONamespace socketIoNamespace);/** * 链接后方法 * * @param client socket客户端信息 */ void disconnect(SocketIOClient client);}
import com.corundumstudio.socketio.SocketIONamespace;
import com.corundumstudio.socketio.SocketIOServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service; import javax.annotation.PreDestroy;
import javax.annotation.Resource;@Slf4j
@Service
public class SocketInitHandle implements ApplicationListener<ContextRefreshedEvent> { @Resourceprivate SocketIOServer socketIoServer;@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) { ApplicationContext applicationContext = event.getApplicationContext(); // 获取继承了ISocketIoService的beanapplicationContext.getBeansOfType(SocketIoStrategy.class).forEach((beanName, socketService) -> {log.info("{} socket io namespace:{}", beanName, socketService.defineNamespaceUrl());String namespaceUrl = socketService.defineNamespaceUrl();SocketIONamespace socketIoNamespace = socketIoServer.addNamespace(namespaceUrl);socketIoNamespace.addConnectListener(socketService::connected);socketIoNamespace.addDisconnectListener(socketService::disconnect);socketService.customListener(socketIoNamespace);});// 启动服务socketIoServer.start();}/** * Spring IoC容器在销毁SocketIOServiceImpl Bean之前关闭,避免重启项目服务端口占用问题 */ @PreDestroy private void autoStop() { stop();}public void stop() {if (socketIoServer != null) {socketIoServer.stop();socketIoServer = null;}}
}
这样,在项目启动时,会自动识别到实现了SocketIoStrategy这个接口的类,下面是一个这个接口实现的样例
@Slf4j
@Service
public class DemoSocketIoStrategyImpl implements SocketIoStrategy { /** * 存放已连接的客户端 */ private static final Map<String, SocketUtil.SocketClientInfo> CLIENT_MAP = new ConcurrentHashMap<>(); @Override public String defineNamespaceUrl() { return "/demo"; } @Override public void connected(SocketIOClient client) { SocketUtil.SocketClientInfo clientInfo = SocketUtil.formationSocketInfoWithClient(client); bizLog.info("************ 客户端:{} 已连接 ************", clientInfo.getClientId()); // 自定义事件`connected` -> 与客户端通信 (也可以使用内置事件,如:Socket.EVENT_CONNECT) client.sendEvent(Socket.EVENT_CONNECT, "成功连接"); CLIENT_MAP.put(clientInfo.getClientId(), clientInfo); } @Override public void customListener(SocketIONamespace socketIoNamespace) { socketIoNamespace.addEventListener("PUSH_DATA_EVENT", String.class, (client, data, ackSender) -> { // 客户端推送`client_info_event`事件时,onData接受数据,这里是string类型的json数据,还可以为Byte[],object其他类型 String clientId = SocketUtil.getClientIdByClient(client); bizLog.info("client{} push msg:{}", clientId, data); }); } @Override public void disconnect(SocketIOClient client) { String clientId = SocketUtil.getClientIdByClient(client); bizLog.info("{} *********************** 客户端已断开连接", clientId); if (clientId != null) { CLIENT_MAP.remove(clientId); client.disconnect(); } } public void pushBroadcastMessages(String eventType, String msgContent) { CLIENT_MAP.forEach((clientId, clientInfo) -> { bizLog.info("send fence msg to {}, content:{}", clientId, msgContent); clientInfo.getClient().sendEvent(eventType, msgContent); }); }
}
这个类实现了socketIO策略,还提供了一个广播的实现方法,在服务中需要广播消息时,执行消息的类型和内容即可发送
前端逻辑
下面是一个基于react的使用逻辑
const mySocket = useRef<any>(null);const startSocket = () => {if (!mySocket) {// socketIoUrl是对应后端服务的域名,/demo是对应链接的链接路径,用于区分一个服务中的多个socket逻辑mySocket = io(`${socketIoUrl}/demo`, {reconnectionDelayMax: 10000,query: {// 一些在链接的时候需要携带的参数},});mySocket.on('connect', (ev: any) => {console.log('socket 连接成功', ev);// 在链接成功后,发送一个emitEventType类型的事件消息mySocket.emit('emitEventType',"emit data")});// 此处监听后端服务的eventType类型的事件的数据mySocket.on('eventType', (data: any) => {// 接收到数据后的逻辑});}
};useEffect(()=>{// 在加载的时候建立链接startSocket();return () => {//断开socket连接if (mySocket.current) {mySocket.current.closeSocket();}};
},[])