Spring Springboot实现websocket通讯-1

特别说明:1. 本文基于Springboot spring-boot-starter-parent 1.5.1.RELEASE编码,在不同的版本中部分方法有区别。

第一篇地址:Spring Springboot实现websocket通讯-1

第二篇地址:Spring Springboot实现websocket通讯-2

在spring和spring boot中配置websocket的代码基本一样的,只是pom引入的包不一样,需要注意的是不同的tomcat版本对websocket的支持可能有区别,造成了代码的区别,这里本文没有特别深究,有兴趣的朋友可以去看一下。

spring 中需要引入的包

<code>         
        
            org.springframework
            spring-websocket
            ${spring.version}
        
        
            org.springframework
            spring-messaging
            ${spring.version}
           /<code>

spring boot中引入的包

<code>         
        
            org.springframework.boot
            spring-boot-starter-websocket
        /<code>


配置websocket的方式

  • 配置websocket首先是需要运行的容器支持,这个是前提,我们常用的容器,tomcat,jetty,undertow都支持websocket,spring boot 对内嵌的tomcat(7,8),jetty9,undertow提供了支持,源码在spring-websocket-4.3.6.RELEASE.jar包中。
  • websocket是通过一个socket来实现双向异步通讯的,websocket属于(sockJs:websocket协议的模拟,用作浏览器使用,增加了当浏览器不支持websocket的时候的兼容性支持,也属于底层协议)。
  • 底层协议配置有两种方式,使用javax.websocket包中的配置,属于JavaEE 7中出了JSR-356:Java API for WebSocket规范。还有一种是使用spring websocket api中提供的底层协议使用@EnableWebSocket注解,实现org.springframework.web.socket.config.annotation.WebSocketConfigurer;。
  • 使用底层协议比较繁琐,需要自己写大量的代码进行支持,不过更加灵活,当然我们也可以websocket的子协议STOMP来,它是一个更高级的协议,STOMP是基于帧(frame)格式来定义消息,与http的request和response类似(具有类似@RequestMapping的@MessageMapping),使用@EnableWebSocketMessageBroker 源码也在org.springframework.web.socket下。

  • 配置websocket

    <code>package com.wzh.demo.domain;
    
    import javax.websocket.Session;
    
    /**
     * 
     * 
     * @author wzh
     * @version 2018-07-08 18:49
     * @see [相关类/方法] (可选)
     **/
    public class WebSocketBean {
    
        /**
         * 连接session对象
         */
        private Session session;
    
        /**
         * 连接错误次数
         */
        private AtomicInteger erroerLinkCount = new AtomicInteger(0);
    
        public int getErroerLinkCount() {
            // 线程安全,以原子方式将当前值加1,注意:这里返回的是自增前的值
            return erroerLinkCount.getAndIncrement();
        }
    
        public void cleanErrorNum()
        {
            // 清空计数
            erroerLinkCount = new AtomicInteger(0);
        }
    
        //...... 省略get set toSting方法
    }
    /<code>

    1.javax.websocket 扩展协议配置

    基于Spring搭建,一个公用的websocket配置,使用@ServerEndpoint创立websocket endpoint

    <code>@Configuration
    public class WebSocketConfig {
        @Bean
        public ServerEndpointExporter serverEndpointExporter() {
            return new ServerEndpointExporter();
        }
     
    }/<code>

    因为websocket的session和我们常用的httpsession不一样,所有我们要转换一下,部分场景会用到httpsession

    <code>package com.wzh.config.utils;
    
    import javax.servlet.http.HttpSession;
    import javax.websocket.HandshakeResponse;
    import javax.websocket.server.HandshakeRequest;
    import javax.websocket.server.ServerEndpointConfig;
    import javax.websocket.server.ServerEndpointConfig.Configurator;
    
    /**
     * 
     * 
     * @author wzh
     * @version 2018-07-10 01:02
     * @see [相关类/方法] (可选)
     **/
    public class GetHttpSessionConfigurator extends Configurator{
    
        @Override
        public void modifyHandshake(ServerEndpointConfig sec,HandshakeRequest request, HandshakeResponse response) {
    
            HttpSession httpSession=(HttpSession) request.getHttpSession();
            sec.getUserProperties().put(HttpSession.class.getName(),httpSession);
        }
    }
    /<code>

    websocket业务接口,抽一些共用的方法出来

    <code>package com.wzh.demo.websocket.service;
    
    import javax.websocket.EndpointConfig;
    import javax.websocket.Session;
    
    /**
     * 
     * 
     * @author wzh
     * @version 2018-07-08 17:11
     * @see [相关类/方法] (可选)
     **/
    public interface WebSocketServer {
    
        /**
         * 连接建立成功调用的方法
         * @param session session 对象
         */
        public void onOpen(Session session,EndpointConfig config);
    
        /**
         * 断开连接方法
         */
        public void onClose(Session session);
    
        /**
         * 收到客户端消息后调用的方法
         * @param session session 对象
         * @param message 返回客户端的消息
         */
        public void onMessage(Session session, String message);
    
        /**
         * 发生异常时触发的方法
         * @param session session 对象
         * @param throwable 抛出的异常
         */
        public void onError(Session session,Throwable throwable);
    
        /**
         * 向单个客户端发送消息
         * @param session session 对象
         * @param message 发送给客户端的消息
         */
        public void sendMessage(Session session, String message);
    
        /**
         * 向所有在线用户群发消息
         * @param message 发送给客户端的消息
         */
        public void batchSendMessage(String message);
    }
    /<code>

    方法的实现类

    <code>package com.wzh.demo.websocket.service.impl;
    
    import com.wzh.config.utils.GetHttpSessionConfigurator;
    import com.wzh.demo.domain.WebSocketBean;
    import com.wzh.demo.websocket.service.WebSocketServer;
    import org.apache.log4j.Logger;
    import org.springframework.stereotype.Component;
    
    import javax.servlet.http.HttpSession;
    import javax.websocket.*;
    import javax.websocket.server.ServerEndpoint;
    import java.util.Map;
    import java.util.Set;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * 
     * 
     * @author wzh
     * @version 2018-07-08 17:11
     * @see [相关类/方法] (可选)
     **/
    @ServerEndpoint(value = "/javax/websocket",configurator=GetHttpSessionConfigurator.class)
    @Component("webSocketService")
    public class WebSocketServiceImpl implements WebSocketServer{
    
        private Logger log = Logger.getLogger(WebSocketServiceImpl.class);
    
        /**
         * 错误最大重试次数
         */
        private static final int MAX_ERROR_NUM = 10;
    
        /**
         * 用来存放每个客户端对应的webSocket对象。
         */
        private static Map webSocketInfo;
    
        static
        {
            // concurrent包的线程安全map
            webSocketInfo = new ConcurrentHashMap();
        }
    
        @OnOpen
        @Override
        public void onOpen(Session session,EndpointConfig config) {
    
            // 如果是session没有激活的情况,就是没有请求获取或session,这里可能会取出空,需要实际业务处理
            HttpSession httpSession= (HttpSession) config.getUserProperties().get(HttpSession.class.getName());
            if(httpSession != null)
            {
                log.info("获取到httpsession" + httpSession.getId());
            }else {
                log.error("未获取到httpsession");
            }
    
            // 连接成功当前对象放入websocket对象集合
            WebSocketBean bean = new WebSocketBean();
            bean.setSession(session);
            webSocketInfo.put(session.getId(),bean);
    
            log.info("客户端连接服务器session id :"+session.getId()+",当前连接数:" + webSocketInfo.size());
        }
    
        @OnClose
        @Override
        public void onClose(Session session) {
    
            // 客户端断开连接移除websocket对象
            webSocketInfo.remove(session.getId());
            log.info("客户端断开连接,当前连接数:" + webSocketInfo.size());
    
        }
    
        @OnMessage
        @Override
        public void onMessage(Session session, String message) {
    
            log.info("客户端 session id: "+session.getId()+",消息:" + message);
    
            // 此方法为客户端给服务器发送消息后进行的处理,可以根据业务自己处理,这里返回页面
            sendMessage(session, "服务端返回" + message);
    
        }
    
        @OnError
        @Override
        public void onError(Session session, Throwable throwable) {
    
            log.error("发生错误"+ throwable.getMessage(),throwable);
        }
    
        @Override
        public void sendMessage(Session session, String message) {
    
            try
            {
                // 发送消息
                session.getBasicRemote().sendText(message);
    
                // 清空错误计数
                webSocketInfo.get(session.getId()).cleanErrorNum();
            }
            catch (Exception e)
            {
                log.error("发送消息失败"+ e.getMessage(),e);
                int errorNum = webSocketInfo.get(session.getId()).getErroerLinkCount();
    
                // 小于最大重试次数重发
                if(errorNum <= MAX_ERROR_NUM)
                {
                    sendMessage(session, message);
                }
                else{
                    log.error("发送消息失败超过最大次数");
                    // 清空错误计数
                    webSocketInfo.get(session.getId()).cleanErrorNum();
                }
            }
        }
    
        @Override
        public void batchSendMessage(String message) {
            Set> set = webSocketInfo.entrySet();
            for (Map.Entry map : set)
            {
                sendMessage(map.getValue().getSession(),message);
            }
        }
    }
    /<code>

    触发websocket通讯推送的方式很多,这里做个最简单的按钮,写个简单的controller和一个html

    <code>package com.wzh.demo.controller;
    
    
    
    import org.springframework.boot.autoconfigure.web.ServerProperties;
    import org.springframework.stereotype.Controller;
    import org.springframework.ui.Model;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    
    import javax.servlet.http.HttpSession;
    
    
    /**
     * 
     * 
     * @author wzh
     * @version 2018-07-09 22:53
     * @see [相关类/方法] (可选)
     **/
    @Controller
    @RequestMapping("/websocket")
    public class WebSocketController {
    
        @RequestMapping(value = "socket.do",method = RequestMethod.GET)
        public String toWebSocket(HttpSession session, Model model)
        {
            model.addAttribute("address","/javax/websocket");
            return "/test/webSocket";
        }
    }
    /<code>

    html,主要是socketjs触发

    <code>
    
    
    
        Title
         
         
    
    
    
    
    
    /<code>

    测试一下,浏览器访问controller
    浏览器控制台输出

    Spring Springboot实现websocket通讯-1

    IDEA控制台

    Spring Springboot实现websocket通讯-1

    页面点击按钮

    Spring Springboot实现websocket通讯-1

    Spring Springboot实现websocket通讯-1

    通过测试可以看到我们使用底层协议创建的websocket通讯就完成了,当然这只是最简单的通讯,实际开发中还要保证心跳等其他因素。

    上面的例子是通过JAVA的扩展JAR实现的,既然是Spring项目,可定也能用框架提供的方法进行websocket通讯。

    Spring websocket api 地址:

    https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/web/socket/package-summary.html


    @EnableWebSocket配置

    在 spring 中 使用较低层级的 API 来处理消息,可以通过以下几个步骤

    • 一个HandshakeInterceptor拦截器,实现org.springframework.web.socket.server.HandshakeInterceptor,在次拦截器中可以做以下握手前后的处理,此步骤可以省略,此拦截器可以在springMVC中的websocket配置类中注册使用,做一下前置或者后置操作
    <code>@Configuration
    @EnableWebSocket
    public class WebSocketConfig implements WebSocketConfigurer {
        @Override
        public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
            registry.addHandler(new WebSocketHander(),"/xxxx").addInterceptors(new HandshakeInterceptor());
        }
    }/<code>
    • WebSocketHandler 一个消息处理中心,用于处理websocket的通讯具体服务,可以继承AbstractWebSocketHandler,也可以实现WebSocketHandler
    <code>public interface WebSocketHandler {
    void afterConnectionEstablished(WebSocketSession session) throws Exception;
    void handleMessage(WebSocketSession session, WebSocketMessage> message) throws Exception;
    void handleTransportError(WebSocketSession session,
     Throwable exception) throws Exception; 
    void afterConnectionClosed(WebSocketSession session,
     CloseStatus closeStatus) throws Exception; 
    boolean supportsPartialMessages();
    }/<code>
    <code>public class ChatTextHandler extends AbstractWebSocketHandler {
     
     @Override
     protected void handleTextMessage(WebSocketSession session,
       TextMessage message) throws Exception {
      session.sendMessage(new TextMessage("xxxx"));
     }
    }/<code>
    • 一个SpringMVC的配置,其中registerWebSocketHandlers注册消息处理器,此方法可以完成websocket路径的注册,消息处理器的注册,拦截器的注册
    <code>@Configuration
    @EnableWebSocket//开启websocket
    public class WebSocketConfig implements WebSocketConfigurer {
        @Override
        public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
            // WebSocketHander 为消息处理器,HandshakeInterceptor为拦截器
            registry.addHandler(new WebSocketHander(),"/xxx").addInterceptors(new HandshakeInterceptor());
        }
    }/<code>

    下面我们来开始正式配置基于Spring底层API的websocket通讯

    WebSocketHandler 拦截器

    在这一步可以做一些初始化操作,例如获取httpSession,此步骤不是开启websocket的必要步骤,根据自身的业务逻辑决定是否添加拦截器。拦截器我们可以直接使用HttpSessionHandshakeInterceptor这个Spring提供的拦截器,也可以实现HandshakeInterceptor 这个接口进行自定义。
    拦截器HttpSessionHandshakeInterceptor将HttpSession中的值保存到了一个Map里面,在后期的WebSocketHandler消息处理类中可以获取存入httpsession中的信息,通过WebSocketSession的getAttributes()下提供get方法获取。

    HttpSessionHandshakeInterceptor源码

    <code>//
    // Source code recreated from a .class file by IntelliJ IDEA
    // (powered by Fernflower decompiler)
    //
    
    package org.springframework.web.socket.server.support;
    
    import java.util.Collection;
    import java.util.Collections;
    import java.util.Enumeration;
    import java.util.Map;
    import javax.servlet.http.HttpSession;
    import org.springframework.http.server.ServerHttpRequest;
    import org.springframework.http.server.ServerHttpResponse;
    import org.springframework.http.server.ServletServerHttpRequest;
    import org.springframework.web.socket.WebSocketHandler;
    import org.springframework.web.socket.server.HandshakeInterceptor;
    
    public class HttpSessionHandshakeInterceptor implements HandshakeInterceptor {
        public static final String HTTP_SESSION_ID_ATTR_NAME = "HTTP.SESSION.ID";
        private final Collection attributeNames;
        private boolean copyAllAttributes;
        private boolean copyHttpSessionId = true;
        private boolean createSession;
    
        public HttpSessionHandshakeInterceptor() {
            this.attributeNames = Collections.emptyList();
            this.copyAllAttributes = true;
        }
    
        public HttpSessionHandshakeInterceptor(Collection attributeNames) {
            this.attributeNames = Collections.unmodifiableCollection(attributeNames);
            this.copyAllAttributes = false;
        }
    
        public Collection getAttributeNames() {
            return this.attributeNames;
        }
    
        public void setCopyAllAttributes(boolean copyAllAttributes) {
            this.copyAllAttributes = copyAllAttributes;
        }
    
        public boolean isCopyAllAttributes() {
            return this.copyAllAttributes;
        }
    
        public void setCopyHttpSessionId(boolean copyHttpSessionId) {
            this.copyHttpSessionId = copyHttpSessionId;
        }
    
        public boolean isCopyHttpSessionId() {
            return this.copyHttpSessionId;
        }
    
        public void setCreateSession(boolean createSession) {
            this.createSession = createSession;
        }
    
        public boolean isCreateSession() {
            return this.createSession;
        }
    
        // 在握手完成前(连接建立阶段)
        public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) throws Exception {
            HttpSession session = this.getSession(request);
            if (session != null) {
                if (this.isCopyHttpSessionId()) {
                    // 保存 sessionid
                    attributes.put("HTTP.SESSION.ID", session.getId());
                }
    
                Enumeration names = session.getAttributeNames();
    
                while(true) {
                    String name;
                    do {
                        if (!names.hasMoreElements()) {
                            return true;
                        }
    
                        name = (String)names.nextElement();
                    } while(!this.isCopyAllAttributes() && !this.getAttributeNames().contains(name));
                    // 保存HttpSession中的信息
                    attributes.put(name, session.getAttribute(name));
                }
            } else {
                return true;
            }
        }
    
        // 获取HttpSession
        private HttpSession getSession(ServerHttpRequest request) {
            if (request instanceof ServletServerHttpRequest) {
                ServletServerHttpRequest serverRequest = (ServletServerHttpRequest)request;
                return serverRequest.getServletRequest().getSession(this.isCreateSession());
            } else {
                return null;
            }
        }
    
        // 完成握手后业务
        public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {
        }
    }
    /<code>

    spring 框架提供的拦截器在org.springframework.web.socket.server.support下,如果不能满足业务需求,我们也可以直接去实现接口

    Spring Springboot实现websocket通讯-1

    实现HandshakeInterceptor接口,这里因为是操作httpsession,就演示继承 HttpSessionHandshakeInterceptor 并重写beforeHandshake 方法

    <code>package com.wzh.demo.websocket.interceptor;
    
    import org.apache.log4j.Logger;
    import org.springframework.http.server.ServerHttpRequest;
    import org.springframework.http.server.ServerHttpResponse;
    import org.springframework.http.server.ServletServerHttpRequest;
    import org.springframework.web.socket.WebSocketHandler;
    import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
    
    import javax.servlet.http.HttpSession;
    import java.util.Map;
    
    /**
     * 
     * 
     * @author wzh
     * @version 2018-07-21 20:05
     * @see [相关类/方法] (可选)
     */
    public class WebSocketHandshakeInterceptor extends HttpSessionHandshakeInterceptor
    {
        
        private Logger log = Logger.getLogger(WebSocketHandshakeInterceptor.class);
    
        @Override
        public boolean beforeHandshake(ServerHttpRequest request,
            ServerHttpResponse response,
            WebSocketHandler webSocketHandler, Map map)
            throws Exception
        {
            // websocket握手建立前调用,获取httpsession
            if(request instanceof ServletServerHttpRequest)
            {
                ServletServerHttpRequest servletRequset = (ServletServerHttpRequest) request;
    
                // 这里从request中获取session,获取不到不创建,可以根据业务处理此段
                HttpSession httpSession = servletRequset.getServletRequest().getSession(false);
                if (httpSession != null)
                {
                    // 这里打印一下session id 方便等下对比和springMVC获取到httpsession是不是同一个
                    log.info("httpSession key:" + httpSession.getId());
    
                    // 获取到httpsession后,可以根据自身业务,操作其中的信息,这里只是单纯的和websocket进行关联
                    map.put("HTTP_SESSION",httpSession);
    
                }
                else
                {
                    log.warn("httpSession is null");
                }
            }
    
            // 调用父类方法
            return super.beforeHandshake(request,response,webSocketHandler,map);
        }
        
        @Override
        public void afterHandshake(ServerHttpRequest serverHttpRequest,
            ServerHttpResponse serverHttpResponse,
            WebSocketHandler webSocketHandler, Exception e)
        {
            // websocket握手建立后调用
            log.info("websocket连接握手成功");
        }
    }
    /<code>

    WebSocketHandler 消息处理中心

    建立一个websocket消息处理中心,我们可以编写一个类实现WebSocketHandler接口,此接口提供5个方法,用于处理websocket的消息。

    Spring Springboot实现websocket通讯-1

    <code>//
    // Source code recreated from a .class file by IntelliJ IDEA
    // (powered by Fernflower decompiler)
    //
    
    package org.springframework.web.socket;
    
    public interface WebSocketHandler {
        // 在WebSocket协商成功并且WebSocket连接打开并准备好使用后调用。
        void afterConnectionEstablished(WebSocketSession var1) throws Exception;
        
        // 在新的WebSocket消息到达时调用,也就是接受客户端信息并发发送
        void handleMessage(WebSocketSession var1, WebSocketMessage> var2) throws Exception;
        
        // 处理底层WebSocket消息传输中的错误,连接出现异常时触发
        void handleTransportError(WebSocketSession var1, Throwable var2) throws Exception;
    
        // 在任何一方关闭WebSocket连接之后或在发生传输错误之后调用。
        void afterConnectionClosed(WebSocketSession var1, CloseStatus var2) throws Exception;
        
        // WebSocketHandler是否处理部分消息,API文档描述说是拆分消息,多次处理,没有实际使用过
        boolean supportsPartialMessages();
    }
    /<code>

    API路径:

    https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/web/socket/WebSocketHandler.html

    除了实现接口外,我们也可以继承Spring已经给我提供的实现类,简化操作,因为有的时候我们只需要此接口中的一个或几个方法,并不需要全部关注,spring提供的handler都在org.springframework.web.socket.handler这个路径下

    Spring Springboot实现websocket通讯-1

    这里我们继承一个抽象类AbstractWebSocketHandler,重写我们关注的方法,并扩展我们自己的业务方法

    一个公用的websocket类,存一些连接用到的基本信息,可以根据业务添加删除属性

    <code>package com.wzh.demo.domain;
    
    import org.springframework.web.socket.WebSocketSession;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * 
     * 
     * @author wzh
     * @version 2018-07-29 18:24
     * @see [相关类/方法] (可选)
     **/
    public class WebSocketBeanSpring
    {
        
        private WebSocketSession session;
        
        /**
         * 连接错误次数
         */
        private AtomicInteger erroerLinkCount = new AtomicInteger(0);
        
        public int getErroerLinkCount()
        {
            // 线程安全,以原子方式将当前值加1,注意:这里返回的是自增前的值
            return erroerLinkCount.getAndIncrement();
        }
        
        public void cleanErrorNum()
        {
            // 清空计数
            erroerLinkCount = new AtomicInteger(0);
        }
        
       // 省略get set 方法
    }
    /<code>

    一个简单的消息处理中心,继承AbstractWebSocketHandler

    <code>package com.wzh.demo.websocket.handler;
    
    import com.wzh.demo.domain.WebSocketBeanSpring;
    import org.apache.log4j.Logger;
    import org.springframework.stereotype.Component;
    import org.springframework.web.socket.*;
    import org.springframework.web.socket.handler.AbstractWebSocketHandler;
    
    import java.io.IOException;
    import java.util.Map;
    import java.util.Set;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * 
     * 
     * @author wzh
     * @version 2018-07-24 23:11
     * @see [相关类/方法] (可选)
     **/
    @Component("webSocketHander")
    public class WebSocketHander extends AbstractWebSocketHandler{
    
        private Logger log = Logger.getLogger(WebSocketHander.class);
    
        /**
         * 用来存放每个客户端对应的webSocket对象。
         */
        private static Map webSocketInfo;
    
        static
        {
            // concurrent包的线程安全map
            webSocketInfo = new ConcurrentHashMap();
        }
    
        // 服务器与客户端初次websocket连接成功执行
        @Override
        public void afterConnectionEstablished(WebSocketSession session) throws Exception {
    
            log.debug("websocket 连接成功......");
    
            // 连接成功当前对象放入websocket对象集合
            WebSocketBeanSpring bean = new WebSocketBeanSpring();
            bean.setSession(session);
    
            webSocketInfo.put(session.getId(),bean);
    
            log.info("客户端连接服务器session id :"+session.getId()+",当前连接数:" + webSocketInfo.size());
    
        }
    
        // 接受消息处理消息
        @Override
        public void handleMessage(WebSocketSession webSocketSession,
            WebSocketMessage> webSocketMessage)
            throws Exception
        {
            /*
            获取客户端发送的消息,这里使用文件消息,也就是字符串进行接收
            消息可以通过字符串,或者字节流进行接收
            TextMessage String/byte[]接收均可以
            BinaryMessage byte[]接收
            */
            log.info("客户端发送消息" + webSocketMessage.getPayload().toString());
            TextMessage message = new TextMessage(webSocketMessage.getPayload().toString());
            /*
            这里直接是字符串,做群发,如果要指定发送,可以在前台平均ID,后台拆分后获取需要发送的人。
            也可以做一个单独的controller,前台把ID传递过来,调用方法发送,在登录的时候把所有好友的标识符传递到前台,
            然后通过标识符发送私信消息
            */
            this.batchSendMessage(message);
    
        }
    
        // 连接错误时触发
        @Override
        public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) throws Exception {
            if(webSocketSession.isOpen()){
                webSocketSession.close();
            }
    
            log.debug("链接出错,关闭链接......");
            webSocketInfo.remove(webSocketSession.getId());
        }
    
        // 关闭websocket时触发
        @Override
        public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) throws Exception {
    
            log.debug("链接关闭......" + closeStatus.toString());
            webSocketInfo.remove(webSocketSession.getId());
        }
    
        /**
         * 给所有在线用户发送消息(这里用的文本消息)
         * @param message
         */
        public void batchSendMessage(TextMessage message)
        {
            
            Set> setInfo =
                webSocketInfo.entrySet();
            for (Map.Entry entry : setInfo)
            {
                WebSocketBeanSpring bean = entry.getValue();
                try
                {
                    bean.getSession().sendMessage(message);
                }
                catch (IOException e)
                {
                    log.error(e.getMessage(),e);
                }
            }
        }
    
        /**
         * 给指定用户发送消息
         * @param userId
         * @param message
         */
        public void sendMessage(String userId, TextMessage message)
        {
            WebSocketBeanSpring bean = webSocketInfo.get(userId);
            try
            {
                bean.getSession().sendMessage(message);
            }
            catch (IOException e)
            {
                log.error(e.getMessage(), e);
            }
        }
    
    }
    /<code> 

    WebSocketConfig 配置

    此步骤是在SpringMVC中注册消息处理中心,因为基于SpringBoot搭建,这里使用@Configuration注解配置,当然也可以xml配置,这个根据自身项目风格进行配置,这里我们实现WebSocketConfigurer接口

    Spring Springboot实现websocket通讯-1

    <code>package com.wzh.demo.websocket.config;
    
    import com.wzh.demo.websocket.handler.WebSocketHander;
    import com.wzh.demo.websocket.interceptor.WebSocketHandshakeInterceptor;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.socket.config.annotation.EnableWebSocket;
    import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
    import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
    
    /**
     * 
     * 
     * @author wzh
     * @version 2018-08-05 22:59
     * @see [相关类/方法] (可选)
     **/
    @Configuration //标记为spring 配置类
    @EnableWebSocket //开启websocket支持
    public class WebSocketConfig implements WebSocketConfigurer{
    
        // 注册消息处理器,并映射连接地址
        @Override
        public void registerWebSocketHandlers(WebSocketHandlerRegistry registry)
        {
            // 注册消息处理器,并添加自定义拦截器,支持websocket的连接访问
            registry.addHandler(new WebSocketHander(), "/spring/websocket")
                .addInterceptors(new WebSocketHandshakeInterceptor());
    
            /*
            注册消息处理器,并添加自定义拦截器,添加不支持websocket的连接访问
            SockJs是一个WebSocket的通信js库,Spring对这个js库进行了后台的自动支持,
            也就是说,如果使用SockJs,那么我们就不需要对后台进行更多的配置,只需要加上withSockJS()这一句就可以了
             */
            registry.addHandler(new WebSocketHander(), "/spring/sockjs/websocket")
                    .addInterceptors(new WebSocketHandshakeInterceptor()).withSockJS();
        }
    }
    /<code> 

    代码都准备好了,下面进行测试,这里测试就做两个简单的方法,一个通过消息处理中心公告,一个通过controller进行私信

    一个简单的controller 用于跳转到html页面

    <code>// 跳转websocket界面
        @RequestMapping(value = "/spring/socket.do",method = RequestMethod.GET)
        public String toSpringWebSocket(HttpSession session, Model model)
        {
            model.addAttribute("address","/spring/websocket");
            System.out.println("进入websocket");
            return "/test/springWebSocket";
        }/<code>

    html页面就用上面写的界面,按钮用于触发websocket的公告方法

    Spring Springboot实现websocket通讯-1

    登录可以看见后台控制台打印

    Spring Springboot实现websocket通讯-1

    点击页面按钮,浏览器控制台输出

    Spring Springboot实现websocket通讯-1

    在写个测试私信的controller,这里为了简单就不写页面了,直接get请求访问控制器,携带websession id

    <code>package com.wzh.demo.controller;
    
    import com.wzh.demo.websocket.handler.WebSocketHander;
    import org.springframework.boot.autoconfigure.web.ServerProperties;
    import org.springframework.stereotype.Controller;
    import org.springframework.ui.Model;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.socket.TextMessage;
    
    import javax.annotation.Resource;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpSession;
    
    
    /**
     * 
     * 
     * @author wzh
     * @version 2018-07-09 22:53
     * @see [相关类/方法] (可选)
     **/
    @Controller
    @RequestMapping("/websocket")
    public class WebSocketController {
    
        @Resource
        private WebSocketHander webSocketHander;
    
        // 测试私信发送
        @RequestMapping(value = "/spring/socketById.do",method = RequestMethod.GET)
        public void toSpringWebSocketByid(HttpSession session, HttpServletRequest request, Model model)
        {
            String id = request.getParameter("id");
            webSocketHander.sendMessage(id,new TextMessage("测试指定人员发送"));
    
        }
    }
    /<code>

    浏览器直接访问http://localhost:8080/SpringBootDemo/websocket/spring/socketById.do?id=1
    查看第一个session id 的界面成功收到消息,其他界面没有消息

    Spring Springboot实现websocket通讯-1


    分享到:


    相關文章: