1.WebSocket配置类。开启WebSocket的支持
@Configuration public class WebSocketConfig { /** * bean注册:会自动扫描带有@ServerEndpoint注解声明的Websocket Endpoint(端点),注册成为Websocket bean。 * 要注意,如果项目使用外置的servlet容器,而不是直接使用springboot内置容器的话,就不要注入ServerEndpointExporter,因为它将由容器自己提供和管理。 */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
2.web套接字管理器
@Slf4j
public class WebSocketManager {
private final static CopyOnWriteArraySet<WebSocketServer> webSocketServerSet = new CopyOnWriteArraySet<>();
private final static ConcurrentHashMap<String, WebSocketServer> webSocketServerMap = new ConcurrentHashMap<>();
public static void addWebSocketServer(WebSocketServer webSocketServer) {
if (webSocketServer != null) {
webSocketServerSet.add(webSocketServer);
webSocketServerMap.put(webSocketServer.getSessionId(), webSocketServer);
}
}
public static void removeWebSocketServer(WebSocketServer webSocketServer) {
webSocketServerSet.remove(webSocketServer);
webSocketServerMap.remove(webSocketServer.getSessionId());
}
/**
* 通过SessionId发送消息给特定用户
*
* @param
* @param msg
*/
public static void sentToUser(String sessionId, String msg) {
Session session = webSocketServerMap.get(sessionId).getSession();
sentToUser(session, msg);
}
/**
* 通过Session发送消息给特定用户
*
* @param session
* @param msg
*/
public static void sentToUser(Session session, String msg) {
if (session == null) {
log.error("不存在该Session,无法发送消息");
return;
}
session.getAsyncRemote().sendText(msg);
}
/**
* 发送消息给所有用户
*
* @param msg
*/
public static void sentToAllUser(String msg) {
for (WebSocketServer webSocketServer : webSocketServerSet) {
sentToUser(webSocketServer.getSession(), msg);
}
log.info("向所有用户发送WebSocket消息完毕,消息:{}", msg);
}
}
3.web套接字Server
/** * web套接字服务器 * * @author yolo * @date 2024/03/15 15:03:17 */ @RestController @ServerEndpoint("/websocket") @Slf4j public class WebSocketServer { private Session session; @ApiOperation(value = "list ") @GetMapping(value = "list") public BaseResult<List<BaseCarPart>> listWithoutPage() { return BaseResult.ofSuccess( "aaaaaaa"); } @OnOpen public void onOpen(Session session) { this.session = session; log.info("WebSocket连接成功" + session.toString()); WebSocketManager.sentToUser(session, "WebSocket is connected!"); WebSocketManager.addWebSocketServer(this); log.info("与SessionId:{}建立连接", session.getId()); } @OnClose public void onClose(Session session, CloseReason closeReason) { log.info("与SessionId:{}断开连接{}", session.getId(),closeReason.getCloseCode().toString()); WebSocketManager.removeWebSocketServer(this); log.info("WebSocket连接关闭"); } @OnMessage public void onMessage(String message, Session session) { log.info("来自SessionId:{}的消息:{}", session.getId(), message); session.getAsyncRemote().sendText(message+"========"); } @OnError public void onError(Session session, Throwable error) { log.error("Session:{}的WebSocket发生错误", session.getId(), error); } public Session getSession() { log.info("获取Session:{}", session.toString()); return session; } public String getSessionId() { log.info("获取SessionId:{}", session.getId()); return session.getId(); } }