目录 1.启用Websocket功能 2.封装操作websocket session的工具 3.保存websocket session的接口 4.保存websocket session的类 5.定义websocket 端点 6.创建定时任务 ping websocket 客户端
1.启用Websocket功能
package com. xxx. robot. config ; import org. springframework. context. annotation. Bean ;
import org. springframework. context. annotation. Configuration ;
import org. springframework. web. socket. config. annotation. EnableWebSocket ;
import org. springframework. web. socket. server. standard. ServerEndpointExporter ; @Configuration
@EnableWebSocket
public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpoint ( ) { return new ServerEndpointExporter ( ) ; } }
2.封装操作websocket session的工具
package com. xxx. robot. websocket. util ; import java. util. Map ; import javax. websocket. Session ; import org. apache. tomcat. websocket. Constants ;
import org. springframework. security. authentication. UsernamePasswordAuthenticationToken ; import com. xxx. framework. security. config. MyUserDetails ;
import com. xxx. framework. security. entity. LoginUser ;
import com. xxx. user. entity. User ; public final class WebSocketSessionUtils { private WebSocketSessionUtils ( ) { } public static final int WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE = 8 * 1024 * 1024 ; public static final int WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE = 8 * 1024 * 1024 ; public static final long WEBSOCKET_BLOCKING_SEND_TIMEOUT = 10 * 1000 ; public static User findUser ( Session session) { UsernamePasswordAuthenticationToken uToken = ( UsernamePasswordAuthenticationToken ) session. getUserPrincipal ( ) ; MyUserDetails userDetails = ( MyUserDetails ) uToken. getPrincipal ( ) ; LoginUser loginUser = ( LoginUser ) userDetails. getUserData ( ) ; return ( User ) loginUser. getAdditionalInfo ( ) ; } public static void setProperties ( Session session) { session. setMaxTextMessageBufferSize ( WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE) ; session. setMaxBinaryMessageBufferSize ( WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE) ; Map < String , Object > userProperties = session. getUserProperties ( ) ; userProperties. put ( Constants . BLOCKING_SEND_TIMEOUT_PROPERTY, WEBSOCKET_BLOCKING_SEND_TIMEOUT) ; }
}
3.保存websocket session的接口
package com. xxx. robot. websocket ; import java. io. IOException ;
import java. nio. ByteBuffer ;
import java. util. List ; import javax. websocket. Session ; import org. slf4j. Logger ;
import org. slf4j. LoggerFactory ; public interface WebSocketSessionManager { Logger log = LoggerFactory . getLogger ( WebSocketSessionManager . class ) ; String PING = "ping" ; String PONG = "pong" ; Session get ( String key) ; List < String > keys ( ) ; void add ( String key, Session session) ; Session remove ( String key) ; default void pingBatch ( ) { List < String > keyList = keys ( ) ; log. info ( "WebSocket: {} 数量为:{}" , this . getClass ( ) . getSimpleName ( ) , keyList. size ( ) ) ; for ( String key : keyList) { if ( key != null ) { Session session = get ( key) ; if ( session != null ) { try { session. getBasicRemote ( ) . sendPing ( ByteBuffer . wrap ( PING. getBytes ( ) ) ) ; try { Thread . sleep ( 10 ) ; } catch ( InterruptedException e1) { } } catch ( Exception e) { log. error ( "WebSocket-ping异常" , e) ; } } } } } default void clearAllSession ( ) { List < String > keyList = keys ( ) ; int i = 0 ; for ( String key : keyList) { if ( key != null ) { Session session = get ( key) ; if ( session != null ) { try { remove ( key) ; i++ ; session. close ( ) ; } catch ( IOException e1) { log. error ( "WebSocket-移除并关闭session异常" , e1) ; } if ( i % 10 == 0 ) { try { Thread . sleep ( 0 ) ; } catch ( InterruptedException e1) { } } } } } log. info ( "WebSocket-移除并关闭session数量为:{}" , i) ; }
}
4.保存websocket session的类
package com. xxx. robot. websocket. robot. manager ; import java. io. IOException ;
import java. util. ArrayList ;
import java. util. List ;
import java. util. NavigableSet ;
import java. util. concurrent. ConcurrentNavigableMap ;
import java. util. concurrent. ConcurrentSkipListMap ; import javax. websocket. Session ; import org. apache. commons. lang3. StringUtils ;
import org. springframework. stereotype. Component ; import com. xxx. robot. websocket. WebSocketSessionManager ;
@Component
public class RobotSessionManager implements WebSocketSessionManager { private static final ConcurrentSkipListMap < String , Session > SESSION_POOL = new ConcurrentSkipListMap < > ( ) ; public static final String joinKey ( String userId, String managerId) { return userId + '-' + managerId; } public static final String joinKey ( Long userId, String managerId) { return userId. toString ( ) + '-' + managerId; } public static final String [ ] splitKey ( String key) { return StringUtils . split ( key, '-' ) ; } @Override public Session get ( String key) { return SESSION_POOL. get ( key) ; } public List < String > keysByUserId ( String userId, String excludeManagerId) { ConcurrentNavigableMap < String , Session > subMap = SESSION_POOL. subMap ( userId + '-' , userId + '.' ) ; NavigableSet < String > keySet = subMap. navigableKeySet ( ) ; List < String > list = new ArrayList < > ( ) ; if ( StringUtils . isBlank ( excludeManagerId) ) { for ( String key : keySet) { if ( key != null ) { list. add ( key) ; } } } else { for ( String key : keySet) { if ( key != null && ! key. equals ( excludeManagerId) ) { list. add ( key) ; } } } return list; } @Override public List < String > keys ( ) { NavigableSet < String > keySet = SESSION_POOL. navigableKeySet ( ) ; List < String > list = new ArrayList < > ( ) ; for ( String key : keySet) { if ( key != null ) { list. add ( key) ; } } return list; } @Override public synchronized void add ( String key, Session session) { removeAndClose ( key) ; SESSION_POOL. put ( key, session) ; } @Override public synchronized Session remove ( String key) { return SESSION_POOL. remove ( key) ; } public synchronized void remove ( String key, Session session) { SESSION_POOL. remove ( key, session) ; } private void removeAndClose ( String key) { Session session = remove ( key) ; if ( session != null ) { try { session. close ( ) ; } catch ( IOException e) { } } } }
5.定义websocket 端点
package com. xxx. robot. websocket. robot. endpoint ; import java. util. Map ; import javax. websocket. OnClose ;
import javax. websocket. OnError ;
import javax. websocket. OnMessage ;
import javax. websocket. OnOpen ;
import javax. websocket. Session ;
import javax. websocket. server. PathParam ;
import javax. websocket. server. ServerEndpoint ; import org. springframework. stereotype. Component ; import com. fasterxml. jackson. databind. JsonNode ;
import com. xxx. framework. util. SpringBeanUtils ;
import com. xxx. user. entity. User ;
import com. xxx. robot. corefunc. service. RobotCoreService ;
import com. xxx. robot. util. serial. BaseJsonUtils ;
import com. xxx. robot. websocket. WebSocketSessionManager ;
import com. xxx. robot. websocket. robot. manager. RobotSessionManager ;
import com. xxx. robot. websocket. util. WebSocketSessionUtils ; import lombok. extern. slf4j. Slf4j ;
@Slf4j
@Component
@ServerEndpoint ( value = "/robot/{id}" )
public class RobotWebSocketServer { private volatile User user; private volatile String id; private volatile Session session; private volatile Map < String , RobotCoreService > robotCoreServiceMap; @OnOpen public void onOpen ( @PathParam ( "id" ) String id, Session session) { WebSocketSessionUtils . setProperties ( session) ; this . user = WebSocketSessionUtils . findUser ( session) ; this . id = id; this . session = session; log. info ( "连接成功:{}, {}" , id, this . user. getUserCode ( ) ) ; robotCoreServiceMap = SpringBeanUtils . getApplicationContext ( ) . getBeansOfType ( RobotCoreService . class ) ; RobotSessionManager robotSessionManager = SpringBeanUtils . getBean ( RobotSessionManager . class ) ; robotSessionManager. add ( RobotSessionManager . joinKey ( this . user. getId ( ) , id) , session) ; } @OnClose public void onClose ( ) { log. info ( "连接关闭:{}, {}" , this . id, this . user. getUserCode ( ) ) ; RobotSessionManager robotSessionManager = SpringBeanUtils . getBean ( RobotSessionManager . class ) ; robotSessionManager. remove ( RobotSessionManager . joinKey ( this . user. getId ( ) , this . id) , this . session) ; } @OnError public void onError ( Throwable error) { log. error ( "onError:id = {}, {}, {}" , this . id, this . session. getId ( ) , this . user. getUserCode ( ) , error) ; RobotSessionManager robotSessionManager = SpringBeanUtils . getBean ( RobotSessionManager . class ) ; robotSessionManager. remove ( RobotSessionManager . joinKey ( this . user. getId ( ) , this . id) , this . session) ; } @OnMessage public void onMessage ( String message) { log. info ( "onMessage:id = {}, {}, {}" , this . id, this . user. getUserCode ( ) , message) ; if ( WebSocketSessionManager . PING. equals ( message) ) { this . session. getAsyncRemote ( ) . sendText ( WebSocketSessionManager . PONG) ; return ; } try { JsonNode root = BaseJsonUtils . readTree ( message) ; String apiType = root. at ( "/apiType" ) . asText ( ) ; robotCoreServiceMap. get ( apiType + "Service" ) . receiveFrontMessage ( this . user, RobotSessionManager . joinKey ( this . user. getId ( ) , this . id) , root) ; } catch ( Exception e) { log. error ( "处理消息错误" , e) ; } } }
6.创建定时任务 ping websocket 客户端
package com. xxx. robot. config ; import org. springframework. context. annotation. Bean ;
import org. springframework. context. annotation. Configuration ;
import org. springframework. scheduling. annotation. EnableScheduling ;
import org. springframework. scheduling. concurrent. ThreadPoolTaskExecutor ;
@Configuration
@EnableScheduling
public class TaskExecutorConfig { @Bean public ThreadPoolTaskExecutor taskExecutor ( ) { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ( ) ; executor. setCorePoolSize ( 5 ) ; executor. setMaxPoolSize ( 5 ) ; executor. setQueueCapacity ( 10 ) ; executor. setKeepAliveSeconds ( 60 ) ; executor. setThreadNamePrefix ( "scheduler-executor-" ) ; return executor; } }
package com. xxx. robot. websocket ; import java. util. List ; import org. springframework. beans. factory. annotation. Autowired ;
import org. springframework. scheduling. annotation. Scheduled ;
import org. springframework. stereotype. Component ; import lombok. extern. slf4j. Slf4j ;
@Slf4j
@Component
public class WebSocketSchedulerTask { @Autowired private List < WebSocketSessionManager > webSocketSessionManagers; @Scheduled ( initialDelay = 60000 , fixedDelay = 30000 ) public void clearInvalidSession ( ) { try { log. info ( "pingBatch 开始。。。" ) ; for ( WebSocketSessionManager webSocketSessionManager : webSocketSessionManagers) { webSocketSessionManager. pingBatch ( ) ; } log. info ( "pingBatch 完成。。。" ) ; } catch ( Exception e) { log. error ( "pingBatch异常" , e) ; } }
}