springboot集成websocket點對點推送、廣播推送

一、什麼都不用說,導入個依賴先


<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-websocket/<artifactid>
/<dependency>

二、推送到前端的消息實體類


import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;


@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class NotifyBean implements Serializable {

private static final long serialVersionUID = 1L;

private int type;
private String message;
private T data;

}

三、因為要實現點對點的推送,所以需要創建一個監聽器來獲取到websocket的session,如下:

如果想學習Java工程化、高性能及分佈式、深入淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群裡有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給大家。


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.web.socket.messaging.SessionConnectEvent;



public class STOMPConnectEventListener implements ApplicationListener<sessionconnectevent> {

@Autowired
private RedisHelper redisHelper;

@Override
public void onApplicationEvent(SessionConnectEvent event) {
StompHeaderAccessor sha = StompHeaderAccessor.wrap(event.getMessage());
//login get from browser
if(sha.getNativeHeader("userid")==null){
return;
}
String userid = sha.getNativeHeader("userid").get(0);
String sessionId = sha.getSessionId();
redisHelper.redisTemplate.opsForValue().set("websocket:"+userid,sessionId);
}
}
/<sessionconnectevent>

四、最重要的配置類


import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptorAdapter;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.web.socket.config.annotation.*;


@Configuration
@EnableWebSocketMessageBroker
@Slf4j
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

//STOMP監聽類
@Bean
public STOMPConnectEventListener applicationStartListener(){
return new STOMPConnectEventListener();
}


@Override
public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
//建立連接端點,註冊一個STOMP的協議節點,並指定使用SockJS協議
stompEndpointRegistry.addEndpoint("/nmpSocketWeb")
.setAllowedOrigins("*")
.withSockJS();
}


@Override
public void configureMessageBroker(MessageBrokerRegistry messageBrokerRegistry) {
//配置消息代理(MessageBroker)。
messageBrokerRegistry.enableSimpleBroker("/topic");// 推送消息前綴
messageBrokerRegistry.setApplicationDestinationPrefixes("/app");// 應用請求前綴,前端發過來的消息將會帶有“/app”前綴。
}

@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
//token認證
registration.setInterceptors(new ChannelInterceptorAdapter() {
@Override

public Message> preSend(Message> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand()) || StompCommand.SEND.equals(accessor.getCommand())) {
String token = accessor.getFirstNativeHeader("token");
try {
tokenValidate(token);
} catch (Exception e) {
log.error(e.toString());
return null;
}
}
return message;
}
});
}


public boolean tokenValidate(String token) throws Exception {
if (token == null || token.isEmpty()) {
throw new Exception("webSocket:token為空!");
}
if (JwtUtil.validateToken(token)==null) {
throw new Exception("webSoc:token無效!");
}
return true;
}

}

代碼中有詳細的解釋,認真看可以看明白的。

五、controller


import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.SimpMessagingTemplate;

import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;



@Api(tags="WebSocket控制器",description="WebSocket控制器")
@Controller
@RequestMapping(value = "/webSocket")
public class WebSocketController extends BaseController {

@Autowired
private SimpMessagingTemplate simpMessagingTemplate;

@Autowired
private RedisHelper redisHelper;


@ApiOperation(value = "測試主動發送消息", notes = "測試主動發送消息", httpMethod = "GET")
@RequestMapping(value = "/sendMsg")
@ResponseBody
public void sendMsg(){
System.out.println("測試主動發送消息");
NotifyBean notifyBean = NotifyBean.builder().message("服務器給你發消息啦!").build();
simpMessagingTemplate.convertAndSend(WebConstant.WEB_SC_TOPIC_NOTIFY,notifyBean);
}


@MessageMapping("/test") //當瀏覽器向服務端發送請求時,通過@MessageMapping映射/welcome這個地址,類似於@ResponseMapping
@SendTo(WebConstant.WEB_SC_TOPIC_NOTIFY)//當服務器有消息時,會對訂閱了@SendTo中的路徑的瀏覽器發送消息
public NotifyBean test(UserVo userVo) {
try {
//睡眠1秒
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
NotifyBean notifyBean = NotifyBean.builder().message("welcome!"+ userVo.getName()).build();
return notifyBean;
}

/**
* 點對點發送消息demo
* 根據用戶key發送消息
* @param userVo
* @return
* @throws Exception
*/
@MessageMapping("/test/toOne")
public void toOne(UserVo userVo) throws Exception {
String sessionId=(String)redisHelper.redisTemplate.opsForValue().get("websocket:"+userVo.getId());
NotifyBean notifyBean = NotifyBean.builder().message("welcome!"+ userVo.getName()).build();
//convertAndSendToUser該方法會在訂閱路徑前拼接"/user",所以前端訂閱的路徑全路徑是"/user/topic/notify"
simpMessagingTemplate.convertAndSendToUser(sessionId, WebConstant.WEB_SC_TOPIC_NOTIFY,notifyBean,createHeaders(sessionId));
}

private MessageHeaders createHeaders(String sessionId) {
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
headerAccessor.setSessionId(sessionId);
headerAccessor.setLeaveMutable(true);
return headerAccessor.getMessageHeaders();
}

}

六、前端頁面

如果想學習Java工程化、高性能及分佈式、深入淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群裡有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給大家。



<link>
<link>






/user/topic-message












<table>
<thead>

Greetings

/<thead>
<tbody>
/<tbody>
/<table>








最好,來試試點對點推送。

第一個頁面:

springboot集成websocket點對點推送、廣播推送

第二個頁面:

springboot集成websocket點對點推送、廣播推送

可以看到,後臺推送的消息只有一個頁面接收到,完事!

如果想學習Java工程化、高性能及分佈式、深入淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群裡有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給大家。


分享到:


相關文章: