03.03 配置中心是怎麼推送的?動手實現一個 Long Polling 長輪詢

介紹

眾所周知,數據交互有兩種模式:Push(推模式)、Pull(拉模式)。

推模式指的是客戶端與服務端建立好網絡長連接,服務方有相關數據,直接通過長連接通道推送到客戶端。其優點是及時,一旦有數據變更,客戶端立馬能感知到;另外對客戶端來說邏輯簡單,不需要關心有無數據這些邏輯處理。缺點是不知道客戶端的數據消費能力,可能導致數據積壓在客戶端,來不及處理。

拉模式指的是客戶端主動向服務端發出請求,拉取相關數據。其優點是此過程由客戶端發起請求,故不存在推模式中數據積壓的問題。缺點是可能不夠及時,對客戶端來說需要考慮數據拉取相關邏輯,何時去拉,拉的頻率怎麼控制等等。

詳解

說到Long Polling(長輪詢),必然少不了提起Polling(輪詢),這都是拉模式的兩種方式。

Polling是指不管服務端數據有無更新,客戶端每隔定長時間請求拉取一次數據,可能有更新數據返回,也可能什麼都沒有。

Long Polling原理也很簡單,相比Polling,客戶端發起Long Polling,此時如果服務端沒有相關數據,會hold住請求,直到服務端有相關數據,或者等待一定時間超時才會返回。返回後,客戶端又會立即再次發起下一次Long Polling。這種方式也是對拉模式的一個優化,解決了拉模式數據通知不及時,以及減少了大量的無效輪詢次數。

所謂的hold住請求指的服務端暫時不回覆結果,保存相關請求,不關閉請求鏈接,等相關數據準備好,寫會客戶端。

前面提到Long Polling如果當時服務端沒有需要的相關數據,此時請求會hold住,直到服務端把相關數據準備好,或者等待一定時間直到此次請求超時,這裡大家是否有疑問,為什麼不是一直等待到服務端數據準備好再返回,這樣也不需要再次發起下一次的Long Polling,節省資源?

主要原因是網絡傳輸層主要走的是tcp協議,tcp協議是可靠面向連接的協議,通過三次握手建立連接。但是所建立的連接是虛擬的,可能存在某段時間網絡不通,或者服務端程序非正常關閉,亦或服務端機器非正常關機,面對這些情況客戶端根本不知道服務端此時已經不能互通,還在傻傻的等服務端發數據過來,而這一等一般都是很長時間。

當然tcp協議棧在實現上有保活計時器來保證的,但是等到保活計時器發現連接已經斷開需要很長時間,如果沒有專門配置過相關的tcp參數,一般需要2個小時,而且這些參數是機器操作系統層面,所以,以此方式來保活不太靠譜,故Long Polling的實現上一般是需要設置超時時間的。

實現

Long Polling的實現很簡單,可分為四個過程:

  • 發起Polling,發起Polling很簡單,只需向服務器發起請求,此時服務端還未應答,所以客戶端與服務端之間一直處於連接狀態。
  • 數據推送,如果服務器端有相關數據,此時服務端會將數據通過此前建立的通道發回客戶端。
  • Polling終止,Polling終止情況有三種:若服務端返回相關數據,此時客戶端收到數據後,關閉請求連接,結束此次Polling過程。若客戶端等待設定的超時時間後,服務端依然沒有返回數據,此時客戶端需要主動終止此次Polling請求。若客戶端收到網絡故障或異常,此時客戶端自然也是需要主動終止此次Polling請求。
  • 重新Polling,終止上次Polling後,客戶端需要立即再次發起Polling請求。這樣才能保證拉取數據的及時性。

代碼實現起來也很簡單,Http Call按照上述過程就很方便實現LongPolling。下面Code只是簡單展示過程,在具體場景下,根據具體的業務邏輯進行調整。

客戶端Code

package com.andy.example.longpolling.client;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.net.HttpURLConnection;

import java.net.URL;

/**

* Created by andy on 17/7/6.

*/

public class ClientBootstrap {

public static final String URL = "http://localhost:8080/long-polling";

public static void main(String[] args) {

int i = 0;

while (true) {

System.out.println("第" + (++i) + "次 longpolling");

HttpURLConnection connection = null;

try {

URL getUrl = new URL(URL);

connection = (HttpURLConnection) getUrl.openConnection();

connection.setReadTimeout(50000);//這就是等待時間,設置為50s

connection.setConnectTimeout(3000);

connection.setRequestMethod("GET");

connection.setRequestProperty("Accept-Charset", "utf-8");

connection.setRequestProperty("Content-Type", "application/json");

connection.setRequestProperty("Charset", "UTF-8");

if (200 == connection.getResponseCode()) {

BufferedReader reader = null;

try {

reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));

StringBuilder result = new StringBuilder(256);

String line = null;

while ((line = reader.readLine()) != null) {

result.append(line);

}

System.out.println("結果 " + result);

} finally {

if (reader != null) {

reader.close();

}

}

}

} catch (IOException e) {

} finally {

if (connection != null) {

connection.disconnect();

}

}

}

}

}

服務端Code

package com.andy.example.longpolling.server;

import javax.servlet.ServletException;

import javax.servlet.http.HttpServlet;

import javax.servlet.http.HttpServletRequest;

import javax.servlet.http.HttpServletResponse;

import java.io.IOException;

import java.io.PrintWriter;

import java.util.Random;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicLong;

/**

* Created by andy on 17/7/6.

*/

public class LongPollingServlet extends HttpServlet {

private Random random = new Random();

private AtomicLong sequenceId = new AtomicLong();

private AtomicLong count = new AtomicLong();

@Override

protected void doGet(HttpServletRequest request, HttpServletResponse response)

throws ServletException, IOException {

System.out.println("第" + (count.incrementAndGet()) + "次 longpolling");

int sleepSecends = random.nextInt(100);

//隨機獲取等待時間,來通過sleep模擬服務端是否準備好數據

System.out.println("wait " + sleepSecends + " second");

try {

TimeUnit.SECONDS.sleep(sleepSecends);//sleep

} catch (InterruptedException e) {

}

PrintWriter out = response.getWriter();

long value = sequenceId.getAndIncrement();

out.write(Long.toString(value));

}

}

結果

配置中心是怎麼推送的?動手實現一個 Long Polling 長輪詢

配置中心是怎麼推送的?動手實現一個 Long Polling 長輪詢

應用

WebQQ、Comet都用到長輪詢技術,另外一些使用Pull模式消費的消息系統,都會使用Long Polling技術進行優化。


分享到:


相關文章: