springboot环境配置websocket

it2022-05-05  128

第一种方法:

后台只需两个文件即可:

package com.xy.admin.websocket; import com.google.gson.Gson; import com.xy.admin.entity.SateliteState; import com.xy.admin.entity.SateliteStateValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArraySet; /** * @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端, * 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端 */ @ServerEndpoint("/websocket") @Component public class WebSocketTest { private static final Logger logger = LoggerFactory.getLogger(WebSocketTest.class); // 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 private static int onlineCount = 0; // concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识 private static CopyOnWriteArraySet<WebSocketTest> webSocketSet = new CopyOnWriteArraySet<WebSocketTest>(); // 与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session; /** * 连接建立成功调用的方法 * * @param session * 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据 */ @OnOpen public void onOpen(Session session) { this.session = session; webSocketSet.add(this); // 加入set中 addOnlineCount(); // 在线数加1 System.out.println("有新连接加入!当前在线人数为" + getOnlineCount()); } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { webSocketSet.remove(this); // 从set中删除 subOnlineCount(); // 在线数减1 System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount()); } public static String getString(ByteBuffer buffer) { Charset charset = null; CharsetDecoder decoder = null; CharBuffer charBuffer = null; try { charset = Charset.forName("UTF-8"); decoder = charset.newDecoder(); // 用这个的话,只能输出来一次结果,第二次显示为空 // charBuffer = decoder.decode(buffer); charBuffer = decoder.decode(buffer.asReadOnlyBuffer()); return charBuffer.toString(); } catch (Exception ex) { ex.printStackTrace(); return "error"; } } /** * 接收文本消息 * @param message * @param session */ // @OnMessage // public void onMessage(String message, Session session) { // System.out.println("来自客户端的消息:" + message); // } /** * 接收二进制消息 * 收到客户端消息后调用的方法 * * @param byteBuffer * 客户端发送过来的消息 * @param session * 可选的参数 */ @OnMessage public void onMessage(Session session, ByteBuffer byteBuffer) { logger.info("接收到来自客户端的消息"); String satelistOrStationState = getString(byteBuffer); // System.out.println("来自客户端的消息:" + satelistOrStationState); sendMes(“123456”); } public void sendMes(String message) { // 群发消息 for (WebSocketTest item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); continue; } } } /** * 发生错误时调用 * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { System.out.println("发生错误"); error.printStackTrace(); } /** * 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。 * * @param message * @throws IOException */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); // this.session.getAsyncRemote().sendText(message); } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketTest.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketTest.onlineCount--; } // @Configuration // public class WebSocketConfig { // @Bean // public ServerEndpointExporter serverEndpointExporter(){ // return new ServerEndpointExporter(); // } // } }

还需要一个config

package com.xy.admin.websocket; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; import org.springframework.web.socket.handler.TextWebSocketHandler; import org.springframework.web.socket.server.standard.ServerEndpointExporter; @Configuration @EnableWebSocket public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter(){ return new ServerEndpointExporter(); } }

前台页面调用时:

var websocket = null; //判断当前浏览器是否支持WebSocket if ('WebSocket' in window) { websocket = new WebSocket("ws://172.16.100.14:8080/websocket"); } else { alert('当前浏览器 Not support websocket') } //连接发生错误的回调方法 websocket.onerror = function () { setMessageInnerHTML("WebSocket连接发生错误"); }; //连接成功建立的回调方法 websocket.onopen = function () { setMessageInnerHTML("WebSocket连接成功"); } //接收到消息的回调方法 websocket.onmessage = function (event) { setMessageInnerHTML(event.data); } //连接关闭的回调方法 websocket.onclose = function () { setMessageInnerHTML("WebSocket连接关闭"); } //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。 window.onbeforeunload = function () { closeWebSocket(); } //将消息显示在网页上 function setMessageInnerHTML(innerHTML) { document.getElementById('message').innerHTML += innerHTML + '<br/>'; } //关闭WebSocket连接 function closeWebSocket() { websocket.close(); } //发送消息 function send() { var message = document.getElementById('text').value; websocket.send(message); }

第二种方法

config:

package com.xy.admin.websocket; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; import org.springframework.web.socket.handler.TextWebSocketHandler; import org.springframework.web.socket.server.standard.ServerEndpointExporter; @Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Bean public ServerEndpointExporter serverEndpointExporter(){ return new ServerEndpointExporter(); } @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(chatMessageHandler(),"/websocket").addInterceptors(new ChatHandshakeInterceptor()).setAllowedOrigins("*"); registry.addHandler(chatMessageHandler(), "/sockjs/webSocketServer").addInterceptors(new ChatHandshakeInterceptor()).withSockJS(); } @Bean public TextWebSocketHandler chatMessageHandler(){ return new ChatMessageHandler(); } }

handler:

package com.xy.admin.websocket; import java.io.IOException; import java.util.ArrayList; import org.apache.log4j.Logger; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; import javax.annotation.Resource; public class ChatMessageHandler extends TextWebSocketHandler { private static final ArrayList<WebSocketSession> users;// 这个会出现性能问题,最好用Map来存储,key用userid private static Logger logger = Logger.getLogger(ChatMessageHandler.class); @Resource private SimpMessagingTemplate temPlate; static { users = new ArrayList<WebSocketSession>(); } /** * 连接成功时候,会触发UI上onopen方法 */ @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { System.out.println("connect to the websocket success......"); users.add(session); // 这块会实现自己业务,比如,当用户登录后,会把离线消息推送给用户 // TextMessage returnMessage = new TextMessage("你将收到的离线"); // session.sendMessage(returnMessage); } /** * 在UI在用js调用websocket.send()时候,会调用该方法 */ @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { System.out.println(message.getPayload()+"123456789-----------------------"); // sendMessageToUsers(message); this.temPlate.convertAndSend("/topic/satelliteStatus",message.getPayload()); //super.handleTextMessage(session, message); } /** * 给某个用户发送消息 * * @param userName * @param message */ public void sendMessageToUser(String userName, TextMessage message) { for (WebSocketSession user : users) { if (user.getAttributes().get("userName").equals(userName)) { try { if (user.isOpen()) { user.sendMessage(message); } } catch (IOException e) { e.printStackTrace(); } break; } } } /** * 给所有在线用户发送消息 * * @param message */ public void sendMessageToUsers(TextMessage message) { for (WebSocketSession user : users) { try { if (user.isOpen()) { user.sendMessage(message); } } catch (IOException e) { e.printStackTrace(); } } } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { if (session.isOpen()) { session.close(); } logger.debug("websocket connection closed......"); users.remove(session); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { logger.debug("websocket connection closed......"); users.remove(session); } @Override public boolean supportsPartialMessages() { return false; } }

ChatHandshakeInterceptor:

package com.xy.admin.websocket; import java.util.Map; import org.apache.shiro.SecurityUtils; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor; public class ChatHandshakeInterceptor extends HttpSessionHandshakeInterceptor { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { System.out.println("Before Handshake"); /* * if (request instanceof ServletServerHttpRequest) { * ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) * request; HttpSession session = * servletRequest.getServletRequest().getSession(false); if (session != * null) { //使用userName区分WebSocketHandler,以便定向发送消息 String userName = * (String) session.getAttribute(Constants.SESSION_USERNAME); if * (userName==null) { userName="default-system"; } * attributes.put(Constants.WEBSOCKET_USERNAME,userName); * * } } */ //使用userName区分WebSocketHandler,以便定向发送消息(使用shiro获取session,或是使用上面的方式) String userName = (String) SecurityUtils.getSubject().getSession().getAttribute("userName"); if (userName == null) { userName = "default-system"; } attributes.put("userName", userName); return super.beforeHandshake(request, response, wsHandler, attributes); } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) { System.out.println("After Handshake"); super.afterHandshake(request, response, wsHandler, ex); } }

前台页面调用时和前面一样;

第三种方法

config:

package com.xy.admin.websocket; import okhttp3.WebSocket; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.ChannelRegistration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import javax.websocket.OnMessage; import java.io.IOException; @Configuration @EnableWebSocketMessageBroker public class WebSocketMessageBrokerConfigurer extends AbstractWebSocketMessageBrokerConfigurer{ @Override public void registerStompEndpoints(StompEndpointRegistry registry){ /* * 注册stomp的端点 * addEndpoint:添加stomp协议的端点,这个http URL是供websocket或sockjs客户端访问的地址 * withsockjs:指定端点使用sockjs协议 * */ registry.addEndpoint("/websocket-simple").setAllowedOrigins("*").withSockJS();//添加允许跨域访问 // registry.addEndpoint("/websocket").setAllowedOrigins("*");//添加允许跨域访问 } @Override public void configureMessageBroker(MessageBrokerRegistry registry){ /* * 配置消息代理 * 启动简单Broker,消息的发送的地址符合配置的前缀来的消息才发送到这个broker *topic代表发布广播,queue代表点对点,发送指定用户 * */ registry.enableSimpleBroker("/topic","queue"); } @Override public void configureClientInboundChannel(ChannelRegistration registration){ super.configureClientInboundChannel(registration ); } }

controller:

package com.xy.admin.controller.xy_task; import com.alibaba.fastjson.JSONObject; import com.xy.admin.annotation.SysLog; import com.xy.admin.entity.TaskBusinessException; import com.xy.admin.entity.VO.TaskSchemeVO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import javax.annotation.PostConstruct; import javax.annotation.Resource; import javax.servlet.http.HttpServletRequest; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; @Controller public class WebSocketController { private static WebSocketController webSocketController; private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketController.class); @Resource private SimpMessagingTemplate temPlate; //收到消息计数 private AtomicInteger count = new AtomicInteger(0); @PostConstruct public void init(){ webSocketController = this; webSocketController.temPlate = this.temPlate; } @MessageMapping("/receiveTaskScheme")/*类似于@requestmapping*/ @SendTo("/topic/getTaskScheme")/*topic在WebSocketMessageBrokerConfigurer配置,getresponse类似于队列名*/ @ResponseBody // @SysLog("发送生成成功计划到页面") public TaskSchemeVO receiveTaskScheme(@RequestBody TaskSchemeVO taskSchemeVO){ return taskSchemeVO; } @MessageMapping("/changeToUploadQueue")/*类似于@requestmapping*/ @SendTo("/topic/getChangeToUploadQueue")/*topic在WebSocketMessageBrokerConfigurer配置,getresponse类似于队列名*/ @ResponseBody // @SysLog("发送生成成功计划到页面") public String changeToUploadQueue(@RequestBody String change){ return change; } @RequestMapping(value = "/broadcast/index") public String broadcastIndex(HttpServletRequest request){ LOGGER.info(request.getRemoteHost()); return "websocket/simple/ws-broadcast"; } // @MessageMapping("/createException")/*类似于@requestmapping*/ // @SendTo("/topic/getTaskBusinessException")/*topic在WebSocketMessageBrokerConfigurer配置,getresponse类似于队列名*/ // @ResponseBody @SysLog("发送生成成功计划到页面") // public TaskBusinessException createException(@RequestBody TaskBusinessException taskBusinessException){ // taskBusinessException.insert(); // return taskBusinessException; // } }

service及实现类:

package com.xy.admin.service; import com.xy.admin.entity.Log; import com.xy.admin.entity.TaskBusinessException; import com.xy.admin.entity.TaskBusinessLog; import com.xy.admin.entity.VO.TaskRequirementInjectVO; import com.xy.admin.util.RestResponse; import java.util.Map; public interface WebSocketService { void receiveToUploadQueue(RestResponse restResponse); void receiveErrorMessage(Log sysLog); void receiveModuleDetail(); void receiveWebLogMessage(Log sysLog); void receiveTaskRequirementInjectVO(String taskRequirementInjectVOName); void receiveTaskBusinessLog(TaskBusinessLog taskBusinessLog); void receiveChangeToUploadQueue(); void receiveReloadRequirement(); void receiveTaskBusinessException(TaskBusinessException taskBusinessException); } package com.xy.admin.service.impl; import com.google.common.collect.Maps; import com.xy.admin.annotation.SysLog; import com.xy.admin.entity.Log; import com.xy.admin.entity.TaskBusinessException; import com.xy.admin.entity.TaskBusinessLog; import com.xy.admin.entity.VO.ModuleDetailData; import com.xy.admin.entity.VO.TaskRequirementInjectVO; import com.xy.admin.entity.VO.TestObject; import com.xy.admin.service.ModuleService; import com.xy.admin.service.TaskRequirementInjectService; import com.xy.admin.service.WebSocketService; import com.xy.admin.util.RestResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.Date; import java.util.List; import java.util.Map; /** * 服务端发送websockt消息实现类 */ @Service public class WebSocketServiceImpl implements WebSocketService { private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServiceImpl.class); @Resource private SimpMessagingTemplate temPlate; @Autowired private TaskRequirementInjectService taskRequirementInjectService; @Autowired protected ModuleService moduleService; @Override public void receiveToUploadQueue(RestResponse restResponse) { this.temPlate.convertAndSend("/topic/getToUploadQueue",restResponse); } @Override // @SysLog("websocket发送logError日志到websocket队列") public void receiveErrorMessage(Log sysLog) { sysLog.setCreateDate(new Date()); this.temPlate.convertAndSend("/topic/getErrorMessage",sysLog); } @Override // @SysLog("模组有状态变更后发送模组详情到websocket队列") public void receiveModuleDetail() { List<ModuleDetailData> list = moduleService.selectModuleDetail(); Integer moduleAll = 0; for (ModuleDetailData moduleDetail:list) { moduleAll+=moduleDetail.getY(); } Map<String,Object> datamap = Maps.newHashMap(); datamap.put("moduleAll",moduleAll); datamap.put("colorByPoint",true); datamap.put("data",list); this.temPlate.convertAndSend("/topic/getModuleDetail",datamap); } @Override // @SysLog("发送log日志到websocket队列") public void receiveWebLogMessage(Log sysLog) { sysLog.setCreateDate(new Date()); this.temPlate.convertAndSend("/topic/getWebLogMessage",sysLog); } @Override // @SysLog("发送注入需求到websocket队列") public void receiveTaskRequirementInjectVO(String taskRequirementInjectVOName) { TaskRequirementInjectVO taskRequirementInjectVO = taskRequirementInjectService.selectInjectVOByName(taskRequirementInjectVOName); this.temPlate.convertAndSend("/topic/getZhddTRInject",taskRequirementInjectVO); } @Override // @SysLog("发送业务日志到websocket队列") public void receiveTaskBusinessLog(TaskBusinessLog taskBusinessLog) { this.temPlate.convertAndSend("/topic/getTaskBusinessLog",taskBusinessLog); } @Override // @SysLog("上注队列有更改时触发前台刷新") public void receiveChangeToUploadQueue() { this.temPlate.convertAndSend("/topic/getChangeToUploadQueue",""); } @Override // @SysLog("用户需求有变动时触发前台刷新") public void receiveReloadRequirement() { this.temPlate.convertAndSend("/topic/getReloadRequirement",""); } @Override //业务异常 public void receiveTaskBusinessException(TaskBusinessException taskBusinessException) { TestObject testObject = new TestObject(); testObject.setKind(1); testObject.setKindObject(taskBusinessException); this.temPlate.convertAndSend("/topic/getTaskBusinessException",testObject); } }

前台页面调用时:

var stompClient = null; var socket = new SockJS('/websocket-simple'); stompClient = Stomp.over(socket); stompClient.connect({}, function(frame) { //接收消息 stompClient.subscribe('/topic/satelliteStatus', function (data) { console.log(data+"-------------------------"); }); }); //发送消息 stompClient.send("/receiveTaskScheme", {}, JSON.stringify(item)); if(stompClient != null){ stompClient.disconnect(); }

最新回复(0)