SpringBoot(23) 集成socket.io服務端和客戶端實現通信

一、前言


websocket和socket.io區別?


websocket


  1. 一種讓客戶端和服務器之間能進行雙向實時通信的技術
  2. 使用時,雖然主流瀏覽器都已經支持,但仍然可能有不兼容的情況
  3. 適合用於client和基於node搭建的服務端使用

socket.io


  1. 將WebSocket、AJAX和其它的通信方式全部封裝成了統一的通信接口
  2. 使用時,不用擔心兼容問題,底層會自動選用最佳的通信方式
  3. 適合進行服務端和客戶端雙向數據通信

w3cschool上對socket.io的描述如下:

SpringBoot(23) 集成socket.io服務端和客戶端實現通信

本文將實現


  1. 基於springboot2.1.8.RELEASE集成netty-socketio : 仿node.js實現的socket.io服務端
  2. 集成socket.io-client: socket.io客戶端
  3. 實現服務端與客戶端之間的通信

二、Java集成socket.io服務端


1、pom.xml中引入所需依賴


溫馨小提示:這裡為了方便將後面需要的客戶端socket.io-client依賴一起直接引入了哦~

<code>
<dependency>
<groupid>com.corundumstudio.socketio/<groupid>
<artifactid>netty-socketio/<artifactid>
<version>1.7.7/<version>
/<dependency>

<dependency>
<groupid>io.socket/<groupid>
<artifactid>socket.io-client/<artifactid>
<version>1.0.0/<version>
/<dependency>/<code>


2、application.yml中配置socket.io服務端


<code># netty-socketio 配置
socketio:
host: 127.0.0.1
port: 8888
# 設置最大每幀處理數據的長度,防止他人利用大數據來攻擊服務器
maxFramePayloadLength: 1048576
# 設置http交互最大內容長度
maxHttpContentLength: 1048576
# socket連接數大小(如只監聽一個端口boss線程組為1即可)
bossCount: 1
workCount: 100
allowCustomRequests: true
# 協議升級超時時間(毫秒),默認10秒。HTTP握手升級為ws協議超時時間
upgradeTimeout: 1000000
# Ping消息超時時間(毫秒),默認60秒,這個時間間隔內沒有接收到心跳消息就會發送超時事件
pingTimeout: 6000000
# Ping消息間隔(毫秒),默認25秒。客戶端向服務器發送一條心跳消息間隔
pingInterval: 25000/<code>


3、socket.io服務端配置類


<code>@Configuration
public class SocketIOConfig {

@Value("${socketio.host}")
private String host;

@Value("${socketio.port}")
private Integer port;

@Value("${socketio.bossCount}")
private int bossCount;

@Value("${socketio.workCount}")
private int workCount;

@Value("${socketio.allowCustomRequests}")
private boolean allowCustomRequests;

@Value("${socketio.upgradeTimeout}")
private int upgradeTimeout;

@Value("${socketio.pingTimeout}")
private int pingTimeout;

@Value("${socketio.pingInterval}")
private int pingInterval;

@Bean
public SocketIOServer socketIOServer() {
SocketConfig socketConfig = new SocketConfig();
socketConfig.setTcpNoDelay(true);
socketConfig.setSoLinger(0);
com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
config.setSocketConfig(socketConfig);
config.setHostname(host);
config.setPort(port);
config.setBossThreads(bossCount);
config.setWorkerThreads(workCount);
config.setAllowCustomRequests(allowCustomRequests);
config.setUpgradeTimeout(upgradeTimeout);
config.setPingTimeout(pingTimeout);
config.setPingInterval(pingInterval);
return new SocketIOServer(config);
}

}/<code>


4、socket.io服務端服務層

服務類

<code>public interface ISocketIOService {
/**
* 啟動服務
*/
void start();

/**
* 停止服務
*/
void stop();

/**
* 推送信息給指定客戶端
*
* @param userId: 客戶端唯一標識
* @param msgContent: 消息內容
*/
void pushMessageToUser(String userId, String msgContent);
}/<code>

服務實現類:

<code>@Slf4j
@Service(value = "socketIOService")
public class SocketIOServiceImpl implements ISocketIOService {

/**
* 存放已連接的客戶端
*/
private static Map<string> clientMap = new ConcurrentHashMap<>();

/**
* 自定義事件`push_data_event`,用於服務端與客戶端通信
*/
private static final String PUSH_DATA_EVENT = "push_data_event";

@Autowired
private SocketIOServer socketIOServer;

/**
* Spring IoC容器創建之後,在加載SocketIOServiceImpl Bean之後啟動
*/
@PostConstruct
private void autoStartup() {
start();
}

/**
* Spring IoC容器在銷燬SocketIOServiceImpl Bean之前關閉,避免重啟項目服務端口占用問題
*/
@PreDestroy
private void autoStop() {
stop();
}

@Override
public void start() {
// 監聽客戶端連接
socketIOServer.addConnectListener(client -> {
log.debug("************ 客戶端: " + getIpByClient(client) + " 已連接 ************");
// 自定義事件`connected` -> 與客戶端通信 (也可以使用內置事件,如:Socket.EVENT_CONNECT)
client.sendEvent("connected", "你成功連接上了哦...");
String userId = getParamsByClient(client);
if (userId != null) {
clientMap.put(userId, client);
}
});

// 監聽客戶端斷開連接
socketIOServer.addDisconnectListener(client -> {
String clientIp = getIpByClient(client);
log.debug(clientIp + " *********************** " + "客戶端已斷開連接");
String userId = getParamsByClient(client);
if (userId != null) {
clientMap.remove(userId);
client.disconnect();

}
});

// 自定義事件`client_info_event` -> 監聽客戶端消息
socketIOServer.addEventListener(PUSH_DATA_EVENT, String.class, (client, data, ackSender) -> {
// 客戶端推送`client_info_event`事件時,onData接受數據,這裡是string類型的json數據,還可以為Byte[],object其他類型
String clientIp = getIpByClient(client);
log.debug(clientIp + " ************ 客戶端:" + data);
});

// 啟動服務
socketIOServer.start();

// broadcast: 默認是向所有的socket連接進行廣播,但是不包括髮送者自身,如果自己也打算接收消息的話,需要給自己單獨發送。
new Thread(() -> {
int i = 0;
while (true) {
try {
// 每3秒發送一次廣播消息
Thread.sleep(3000);
socketIOServer.getBroadcastOperations().sendEvent("myBroadcast", "廣播消息 " + DateUtil.now());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}

@Override
public void stop() {
if (socketIOServer != null) {
socketIOServer.stop();
socketIOServer = null;
}
}

@Override
public void pushMessageToUser(String userId, String msgContent) {
SocketIOClient client = clientMap.get(userId);

if (client != null) {
client.sendEvent(PUSH_DATA_EVENT, msgContent);
}
}

/**
* 獲取客戶端url中的userId參數(這裡根據個人需求和客戶端對應修改即可)
*
* @param client: 客戶端
* @return: java.lang.String
*/
private String getParamsByClient(SocketIOClient client) {
// 獲取客戶端url參數(這裡的userId是唯一標識)
Map<string>> params = client.getHandshakeData().getUrlParams();
List<string> userIdList = params.get("userId");
if (!CollectionUtils.isEmpty(userIdList)) {
return userIdList.get(0);
}
return null;
}

/**
* 獲取連接的客戶端ip地址
*
* @param client: 客戶端
* @return: java.lang.String
*/
private String getIpByClient(SocketIOClient client) {
String sa = client.getRemoteAddress().toString();
String clientIp = sa.substring(1, sa.indexOf(":"));
return clientIp;
}

}/<string>/<string>/<string>/<code>


三、Java開發socket.io客戶端


  1. socket.emit:發送數據到服務端事件
  2. socket.on: 監聽服務端事件
<code>@Slf4j
public class SocketIOClientLaunch {

public static void main(String[] args) {
// 服務端socket.io連接通信地址
String url = "http://127.0.0.1:8888";
try {
IO.Options options = new IO.Options();
options.transports = new String[]{"websocket"};
options.reconnectionAttempts = 2;
// 失敗重連的時間間隔
options.reconnectionDelay = 1000;
// 連接超時時間(ms)
options.timeout = 500;
// userId: 唯一標識 傳給服務端存儲
final Socket socket = IO.socket(url + "?userId=1", options);

socket.on(Socket.EVENT_CONNECT, args1 -> socket.send("hello..."));

// 自定義事件`connected` -> 接收服務端成功連接消息
socket.on("connected", objects -> log.debug("服務端:" + objects[0].toString()));

// 自定義事件`push_data_event` -> 接收服務端消息
socket.on("push_data_event", objects -> log.debug("服務端:" + objects[0].toString()));

// 自定義事件`myBroadcast` -> 接收服務端廣播消息
socket.on("myBroadcast", objects -> log.debug("服務端:" + objects[0].toString()));

socket.connect();

while (true) {
Thread.sleep(3000);
// 自定義事件`push_data_event` -> 向服務端發送消息
socket.emit("push_data_event", "發送數據 " + DateUtil.now());

}
} catch (Exception e) {
e.printStackTrace();
}
}

}/<code>


四、運行測試

當客戶端上線後,會通過自定義事件push_data_event每隔3秒向服務端發送消息,日誌如下

SpringBoot(23) 集成socket.io服務端和客戶端實現通信

而服務端中跑了一個廣播消息(自定義事件myBroadcast) 每隔3秒也會返回給客戶端

廣播事件: 向所有的socket連接進行廣播發送消息數據

<code>socketIOServer.getBroadcastOperations().sendEvent("myBroadcast", "廣播消息 " + DateUtil.now());/<code>

日誌如下:

SpringBoot(23) 集成socket.io服務端和客戶端實現通信

編寫服務端主動發送消息給客戶端接口

<code>@RestController 

@RequestMapping("/api/socket.io")
@Api(tags = "SocketIO測試-接口")
public class SocketIOController {

@Autowired
private ISocketIOService socketIOService;

@PostMapping(value = "/pushMessageToUser", produces = Constants.CONTENT_TYPE)
@ApiOperation(value = "推送信息給指定客戶端", httpMethod = "POST", response = ApiResult.class)
public ApiResult pushMessageToUser(@RequestParam String userId, @RequestParam String msgContent) {
socketIOService.pushMessageToUser(userId, msgContent);
return ApiResult.ok();
}

}/<code>

調用接口測試發送helloworld...

SpringBoot(23) 集成socket.io服務端和客戶端實現通信

五、總結


socket.io通信,服務端:

1.socketIOServer.addConnectListener:監聽客戶端連接

  1. socketIOServer.addDisconnectListener:監聽客戶端斷開連接
  2. socketIOServer.addEventListener:監聽客戶端傳輸的消息
  3. client.sendEvent("自定義事件名稱", "消息內容"):服務端向指定的clien客戶端發送消息
  4. socketIOServer.getBroadcastOperations().sendEvent("自定義事件名稱", "消息內容"):服務端發送廣播消息給所有客戶端

socket.io通信,客戶端:


  1. IO.socket(url):與指定的socket.io服務端建立連接
  2. socket.emit:發送數據到服務端事件
  3. socket.on: 監聽服務端事件

本文案例demo源碼

https://gitee.com/zhengqingya/java-workspace


分享到:


相關文章: