這可能是講分佈式系統最到位的一篇文章

說到分佈式系統,不得不說集中式系統。傳統集中式系統中整個項目所有的東西都在一個應用裡面。



這可能是講分佈式系統最到位的一篇文章


一個網站就是一個應用,當系統壓力較大時,只能橫向擴展,增加多個服務器或者多個容器去做負載均衡,避免單點故障而影響到整個系統。

集中式最明顯的優點就是開發,測試,運維會比較方便,不用考慮複雜的分佈式環境。

弊端也很明顯,系統大而複雜、不易擴展、難於維護,每次更新都必須更新所有的應用。


這可能是講分佈式系統最到位的一篇文章


集中式系統拓撲圖


鑑於集中式系統的種種弊端,促成了分佈式系統的形成,分佈式系統背後是由一系列的計算機組成,但用戶感知不到背後的邏輯,就像訪問單個計算機一樣,天然的避免了單機故障的問題。

應用可以按業務類型拆分成多個應用或服務,再按結構分成接口層、服務層。

我們也可以按訪問入口分,如移動端、PC 端等定義不同的接口應用。數據庫可以按業務類型拆分成多個實例,還可以對單表進行分庫分表。同時增加分佈式緩存、消息隊列、非關係型數據庫、搜索等中間件。

分佈式系統雖好,但是增加了系統的複雜性,如分佈式事務、分佈式鎖、分佈式 Session、數據一致性等都是現在分佈式系統中需要解決的難題。

分佈式系統也增加了開發測試運維的成本,工作量增加,其管理不好反而會變成一種負擔。


這可能是講分佈式系統最到位的一篇文章


分佈式系統拓撲圖


分佈式系統最為核心的要屬分佈式服務框架,有了分佈式服務框架,我們只需關注各自的業務,而無需去關注那些複雜的服務之間調用的過程。

分佈式服務框架


目前業界比較流行的分佈式服務框架有:阿里的 Dubbo、Spring Cloud。

這裡不對這些分佈式服務框架做對比,簡單的說說他們都做了些什麼,能使我們用遠程服務就像調用本地服務那麼簡單高效。

服務


服務是對使用用戶有功能輸出的模塊,以技術框架作為基礎,能實現用戶的需求。

比如日誌記錄服務、權限管理服務、後臺服務、配置服務、緩存服務、存儲服務、消息服務等,這些服務可以靈活的組合在一起,也可以獨立運行。

服務需要有接口,與系統進行對接。面向服務的開發,應該是把服務拆分開發,把服務組合運行。

更加直接的例子如:歷史詳情、留言板、評論、評級服務等。他們之間能獨立運行,也要能組合在一起作為一個整體。

註冊中心


註冊中心對整個分佈式系統起著最為核心的整合作用,支持對等集群,需要提供 CRUD 接口,支持訂閱發佈機制且可靠性要求非常之高,一般拿 Zookeeper 集群來做為註冊中心。

分佈式環境中服務提供方的服務會在多臺服務器上部署,每臺服務器會向註冊中心提供服務方標識、服務列表、地址、對應端口、序列化協議等信息。

註冊中心記錄下服務和服務地址的映射關係,一般一個服務會對應多個地址,這個過程我們稱之為服務發佈或服務註冊。

服務調用方會根據服務方標識、服務列表從註冊中心獲取所需服務的信息(地址端口信息、序列化協議等),這些信息會緩存至本地。

當服務需要調用其他服務時,直接在這裡找到服務的地址,進行調用,這個過程我們稱之為服務發現。


這可能是講分佈式系統最到位的一篇文章


註冊中心


下面是以 Zookeeper 作為註冊中心的簡單實現:

/**
* 創建node節點
* @param node
* @param data
*/

public boolean createNode(String node, String data) {
try {
byte[] bytes = data.getBytes();
//同步創建臨時順序節點
String path = zk.create(ZkConstant.ZK_RPC_DATA_PATH+"/"+node+"-", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
log.info("create zookeeper node ({} => {})", path, data);
}catch (KeeperException e) {
log.error("", e);
return false;
}catch (InterruptedException ex){
log.error("", ex);
return false;
}
return true;
}



這可能是講分佈式系統最到位的一篇文章


子節點 1

這可能是講分佈式系統最到位的一篇文章


子節點 2


如下面 Zookeeper 中寫入的臨時順序節點信息:

  • com.black.blackrpc.test.HelloWord:發佈服務時對外的名稱。
  • 00000000010,00000000011:ZK 順序節點 ID。
  • 127.0.0.1:8888,127.0.0.1:8889:服務地址端口。
  • Protostuff:序列化方式。
  • 1.0:權值,負載均衡策略使用。


這裡使用的是 Zookeeper 的臨時順序節點,為什麼使用臨時順序節點,主要是考慮以下兩點:

  • 當服務提供者異常下線時,與 Zookeeper 的連接會中斷,Zookeeper 服務器會主動刪除臨時節點,同步給服務消費者。
  • 這樣就能避免服務消費者去請求異常的服務器。校稿注: 一般消費方也會在實際發起請求前,對當前獲取到的服務提供方節點進行心跳,避免請求連接有問題的節點。
  • Zookeeper 下面是不允許創建 2 個名稱相同的 ZK 子節點的,通過順序節點就能避免創建相同的名稱。
  • 當然也可以不用順序節點的方式,直接以 com.black.blackrpc.test.HelloWord 創建節點,在該節點下創建數據節點。


下面是 ZK 的數據同步過程:

/**
* 同步節點 (通知模式)
* syncNodes會通過級聯方式,在每次watcher被觸發後,就會再掛上新的watcher。完成了類似鏈式觸發的功能
*/
public boolean syncNodes() {
try {
List<string> nodeList = zk.getChildren(ZkConstant.ZK_RPC_DATA_PATH, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
syncNodes();
}
}
});
Map<string>> map =new HashMap<string>>();
for (String node : nodeList) {
byte[] bytes = zk.getData(ZkConstant.ZK_RPC_DATA_PATH + "/" + node, false, null);
String key =node.substring(0, node.lastIndexOf(ZkConstant.DELIMITED_MARKER));
String value=new String(bytes);
Object object =map.get(key);
if(object!=null){
((List<string>)object).add(value);
}else {
List<string> dataList = new ArrayList<string>();
dataList.add(value);
map.put(key,dataList);
}

log.info("node: [{}] data: [{}]",node,new String(bytes));
}
/**修改連接的地址緩存*/
if(MapUtil.isNotEmpty(map)){
log.debug("invoking service cache updateing....");
InvokingServiceCache.updataInvokingServiceMap(map);
}
return true;
} catch (KeeperException | InterruptedException e) {
log.error(e.toString());
return false;
}
}
/<string>/<string>/<string>/<string>/<string>/<string>


當數據同步到本地時,一般會寫入到本地文件中,防止因 Zookeeper 集群異常下線而無法獲取服務提供者信息。

通訊與協議


服務消費者無論是與註冊中心還是與服務提供者,都需要存在網絡連接傳輸數據,而這就涉及到通訊。

筆者之前也做過這方面的工作,當時使用的是 java BIO 簡單的寫了一個通訊包,使用場景沒有多大的併發,阻塞式的 BIO 也未暴露太多問題。

java BIO 因其建立連接之後會阻塞線程等待數據,這種方式必須以一連接一線程的方式,即客戶端有連接請求時服務器端就需要啟動一個線程進行處理。當連接數過大時,會建立相當多的線程,性能直線下降。

Java NIO:同步非阻塞,服務器實現模式為一個請求一個線程,即客戶端發送的連接請求都會註冊到多路複用器上,多路複用器輪詢到連接有 I/O 請求時才啟動一個線程進行處理。

Java AIO:異步非阻塞,服務器實現模式為一個有效請求一個線程,客戶端的 I/O 請求都是由 OS 先完成了再通知服務器應用去啟動線程進行處理。

BIO、NIO、AIO 適用場景分析:

  • BIO:用於連接數目比較小且固定的架構,這種方式對服務器資源要求比較高,併發侷限於應用中,但程序直觀簡單易理解。
  • NIO:適用於連接數目多且連接比較短(輕操作)的架構,比如聊天服務器,併發侷限於應用中,編程比較複雜。
  • 目前主流的通訊框架 Netty、Apache Mina、Grizzl、NIO Framework 都是基於其實現的。
  • AIO:
    用於連接數目多且連接比較長(重操作)的架構,比如圖片服務器,文件傳輸等,充分調用 OS 參與併發操作,編程比較複雜。


作為基石的通訊,其實要考慮很多東西。如:丟包粘包的情況,心跳機制,斷連重連,消息緩存重發,資源的優雅釋放,長連接還是短連接等。

下面是 Netty 建立服務端,客戶端的簡單實現:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/***
* netty tcp 服務端
* @author v_wangshiyu
*
*/
public class NettyTcpService {
private static final Logger log = LoggerFactory.getLogger(NettyTcpService.class);
private String host;
private int port;
public NettyTcpService(String address) throws Exception{
String str[] = address.split(":");
this.host=str[0];
this.port=Integer.valueOf(str[1]);
}

public NettyTcpService(String host,int port) throws Exception{
this.host=host;
this.port=port;
}
/**用於分配處理業務線程的線程組個數 */
private static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2; //默認
/** 業務出現線程大小*/
private static final int BIZTHREADSIZE = 4;
/*
* NioEventLoopGroup實際上就是個線程,
* NioEventLoopGroup在後臺啟動了n個NioEventLoop來處理Channel事件,
* 每一個NioEventLoop負責處理m個Channel,
* NioEventLoopGroup從NioEventLoop數組裡挨個取出NioEventLoop來處理Channel
*/
private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);
private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE);
public void start() throws Exception {
log.info("Netty Tcp Service Run...");
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer<socketchannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("decoder", new ByteArrayDecoder());
pipeline.addLast("encoder", new ByteArrayEncoder());
// pipeline.addLast(new Encoder());
// pipeline.addLast(new Decoder());
pipeline.addLast(new TcpServerHandler());
}
});
b.bind(host, port).sync();
log.info("Netty Tcp Service Success!");
}
/**
* 停止服務並釋放資源
*/
public void shutdown() {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();

}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* 服務端處理器
*/
public class TcpServerHandler extends SimpleChannelInboundHandler<object>{
private static final Logger log = LoggerFactory.getLogger(TcpServerHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
byte[] data=(byte[])msg;
}
}
/<object>/<socketchannel>


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.util.concurrent.Future;
/**
* netty tcp 客戶端
* @author v_wangshiyu
*
*/
public class NettyTcpClient {
private static final Logger log = LoggerFactory.getLogger(NettyTcpClient.class);
private String host;
private int port;
private Bootstrap bootstrap;
private Channel channel;
private EventLoopGroup group;
public NettyTcpClient(String host,int port){
bootstrap=getBootstrap();

channel= getChannel(host,port);
this.host=host;
this.port=port;
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
/**
* 初始化Bootstrap
* @return
*/
public final Bootstrap getBootstrap(){
group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class);
b.handler(new ChannelInitializer<channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// pipeline.addLast(new Encoder());
// pipeline.addLast(new Decoder());
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("decoder", new ByteArrayDecoder());
pipeline.addLast("encoder", new ByteArrayEncoder());
pipeline.addLast("handler", new TcpClientHandler());
}
});
b.option(ChannelOption.SO_KEEPALIVE, true);
return b;
}
/**
* 連接,獲取Channel
* @param host
* @param port
* @return
*/
public final Channel getChannel(String host,int port){
Channel channel = null;
try {
channel = bootstrap.connect(host, port).sync().channel();
return channel;
} catch (Exception e) {
log.info(String.format("connect Server(IP[%s],PORT[%s]) fail!", host,port));
return null;
}
}

/**
* 發送消息
* @param msg
* @throws Exception
*/
public boolean sendMsg(Object msg) throws Exception {
if(channel!=null){
channel.writeAndFlush(msg).sync();
log.debug("msg flush success");
return true;
}else{
log.debug("msg flush fail,connect is null");
return false;
}
}
/**
* 連接斷開
* 並且釋放資源
* @return
*/
public boolean disconnectConnect(){
//channel.close().awaitUninterruptibly();
Future> future =group.shutdownGracefully();//shutdownGracefully釋放所有資源,並且關閉所有當前正在使用的channel
future.syncUninterruptibly();
return true;
}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* 客戶端處理器
*/
public class TcpClientHandler extends SimpleChannelInboundHandler<object>{
private static final Logger log = LoggerFactory.getLogger(TcpClientHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
byte[] data=(byte[])msg;
}
}
/<object>/<channel>


說到通訊就不能不說協議,通信時所遵守的規則,訪問什麼,傳輸的格式等都屬於協議。

作為一個開發人員,應該都瞭解 TCP/IP 協議,它是一個網絡通信模型,以及一整套網絡傳輸協議家族,是互聯網的基礎通信架構。

也都應該用過 Http(超文本傳輸協議),Web 服務器傳輸超文本到本地瀏覽器的傳送協議,該協議建立在 TCP/IP 協議之上。分佈式服務框架服務間的調用也會規定協議。

為了支持不同場景,分佈式服務框架會存在多種協議,如 Dubbo 就支持 7 種協議:Dubbo 協議(默認),RMI 協議,Hessian協議,HTTP 協議,WebService 協議,Thrift 協議,Memcached 協議,Redis 協議每種協議應對的場景不盡相同,具體場景具體對待。

服務路由


分佈式服務上線時都是集群組網部署,集群中會存在某個服務的多實例,消費者如何從服務列表中選擇合適的服務提供者進行調用,這就涉及到服務路由。分佈式服務框架需要能夠滿足用戶靈活的路由需求。

透明化路由

很多開源的 RPC 框架調用者需要配置服務提供者的地址信息,儘管可以通過讀取數據庫的服務地址列表等方式避免硬編碼地址信息,但是消費者依然要感知服務提供者的地址信息,這違反了透明化路由原則。

基於服務註冊中心的服務訂閱發佈,消費者通過主動查詢和被動通知的方式獲取服務提供者的地址信息,而不再需要通過硬編碼方式得到提供者的地址信息。

只需要知道當前系統發佈了那些服務,而不需要知道服務具體存在於什麼位置,這就是透明化路由。

負載均衡

負載均衡策略是服務的重要屬性,分佈式服務框架通常會提供多種負載均衡策略,同時支持用戶擴展負載均衡策略。

隨機

通常在對等集群組網中,採用隨機算法進行負載均衡,隨機路由算法消息分發還是比較均勻的,採用 JDK 提供的 java.util.Random 或者 java.security.SecureRandom 在指定服務提供者列表中生成隨機地址。

消費者基於隨機生成的服務提供者地址進行遠程調用:

/**
* 隨機
*/
public class RandomStrategy implements ClusterStrategy {
@Override
public RemoteServiceBase select(List<remoteservicebase> list) {
int MAX_LEN = list.size();
int index = RandomUtil.nextInt(MAX_LEN);
return list.get(index);
}
}
/<remoteservicebase>


隨機還是存在缺點的,可能出現部分節點的碰撞的概率較高,另外硬件配置差異較大時,會導致各節點負載不均勻。

為避免這些問題,需要對服務列表加權,性能好的機器接收的請求的概率應該高於一般機器:

/**
* 加權隨機
*/
public class WeightingRandomStrategy implements ClusterStrategy {
@Override
public RemoteServiceBase select(List<remoteservicebase> list) {
//存放加權後的服務提供者列表
List<remoteservicebase> weightingList = new ArrayList<remoteservicebase>();
for (RemoteServiceBase remoteServiceBase : list) {
//擴大10倍
int weight = (int) (remoteServiceBase.getWeight()*10);
for (int i = 0; i < weight; i++) {
weightingList.add(remoteServiceBase);
}

}
int MAX_LEN = weightingList.size();
int index = RandomUtil.nextInt(MAX_LEN);
return weightingList.get(index);
}
}
/<remoteservicebase>/<remoteservicebase>/<remoteservicebase>


輪詢

逐個請求服務地址,到達邊界之後,繼續繞接。主要缺點:慢的提供者會累積請求。

例如第二臺機器很慢,但沒掛。當請求第二臺機器時被卡在那。久而久之,所有請求都卡在第二臺機器上。

輪詢策略實現非常簡單,順序循環遍歷服務提供者列表,達到邊界之後重新歸零開始,繼續順序循環:

/**
* 輪詢
*/
public class PollingStrategy implements ClusterStrategy {
//計數器
private int index = 0;
private Lock lock = new ReentrantLock();
@Override
public RemoteServiceBase select(List<remoteservicebase> list) {
RemoteServiceBase service = null;
try {
lock.tryLock(10, TimeUnit.MILLISECONDS);
//若計數大於服務提供者個數,將計數器歸0
if (index >= list.size()) {

index = 0;
}
service = list.get(index);
index++;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
//兜底,保證程序健壯性,若未取到服務,則直接取第一個
if (service == null) {
service = list.get(0);
}
return service;
}
}
/<remoteservicebase>


加權輪詢的話,需要給服務地址添加權重:

/**
* 加權輪詢
*/
public class WeightingPollingStrategy implements ClusterStrategy {
//計數器
private int index = 0;
//計數器鎖
private Lock lock = new ReentrantLock();
@Override
public RemoteServiceBase select(List<remoteservicebase> list) {
RemoteServiceBase service = null;
try {
lock.tryLock(10, TimeUnit.MILLISECONDS);
//存放加權後的服務提供者列表
List<remoteservicebase> weightingList = new ArrayList<remoteservicebase>();
for (RemoteServiceBase remoteServiceBase : list) {
//擴大10倍
int weight = (int) (remoteServiceBase.getWeight()*10);
for (int i = 0; i < weight; i++) {
weightingList.add(remoteServiceBase);
}

}
//若計數大於服務提供者個數,將計數器歸0
if (index >= weightingList.size()) {
index = 0;
}
service = weightingList.get(index);
index++;
return service;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
//兜底,保證程序健壯性,若未取到服務,則直接取第一個
return list.get(0);
}
}
/<remoteservicebase>/<remoteservicebase>/<remoteservicebase>


服務調用時延

消費者緩存所有服務提供者的調用時延,週期性的計算服務調用平均時延。

然後計算每個服務提供者服務調用時延與平均時延的差值,根據差值大小動態調整權重,保證服務時延大的服務提供者接收更少的消息,防止消息堆積。

該策略的特點:保證處理能力強的服務接受更多的消息,通過動態的權重分配消除服務調用時延的震盪範圍,使所有服務的調用時延接近平均值,實現負載均衡。

一致性哈希

相同參數的請求總是發送到統一服務提供者,當某一臺服務提供者宕機時,原本發往根提供者的請求,基於虛擬節點,平攤到其他提供者,不會引起劇烈變動,平臺提供默認的虛擬節點數,可以通過配置文件修改虛擬節點個數。

一致性 Hash 環工作原理如下圖所示:


這可能是講分佈式系統最到位的一篇文章


一致性哈希


路由規則

負載均衡只能保證服務提供者壓力的平衡,但是在一些業務場景中需要設置一些過濾規則,比較常用的是基本表達式的條件路由。

通過 IP 條件表達式配置黑白名單訪問控制:consumerIP != 192.168.1.1。

只暴露部分服務提供者,防止這個集群服務都被沖垮,導致其他服務也不可用。

例如providerIP = 192.168.3*。 
讀寫分離:method=find*,list*,get*,query*=>providerIP=192.168.1.。
前後臺分離:app=web=>providerIP=192.168.1.,app=java=>providerIP=192.168.2.。
灰度升級:將WEB前臺應用理由到新的服務版本上:app=web=>provicerIP=192.168.1.*。


序列化與反序列化


把對象轉換為字節序列的過程稱為序列化,把字節序列恢復為對象的過程稱為反序列化。

運程調用的時候,我們需要先將 Java 對象進行序列化,然後通過網絡,IO 進行傳輸,當到達目的地之後,再進行反序列化獲取到我們想要的結果對象。

分佈式系統中,傳輸的對象會很多,這就要求序列化速度快,產生字節序列小的序列化技術。

序列化技術:Serializable,XML,Jackson,MessagePack,FastJson,Protocol Buffer,Thrift,Gson,Avro,Hessian 等。

Serializable 是 Java 自帶的序列化技術,無法跨平臺,序列化和反序列化的速度相對較慢。

XML 技術多平臺支持好,常用於與銀行交互的報文,但是其字節序列產生較大,不太適合用作分佈式通訊框架。

FastJson 是 Java 語言編寫的高性能的 JSON 處理器,由阿里巴巴公司開發,字節序列為 json 串,可讀性好,序列化也速度非常的快。

Protocol Buffer 序列化速度非常快,字節序列較小,但是可讀性較差。

一般分佈式服務框架會內置多種序列化協議可供選擇,如 Dubbo 支持的 7 種協議用到的序列化技術就不完全相同。

服務調用


本地環境下,使用某個接口很簡單,直接調用就行。分佈式環境下就不是那麼簡單了,消費者方只會存在接口的定義,沒有具體的實現。

想要像本地環境下直接調用遠程接口那就得耗費一些功夫了,需要用到遠程代理。

下面是我盜的圖:


這可能是講分佈式系統最到位的一篇文章


遠程代理

通信時序如下:


這可能是講分佈式系統最到位的一篇文章


通信時序


消費者端沒有具體的實現,需要調用接口時動態的去創建一個代理類。與 Spirng 集成的情況,那直接在 Bean 構建的時候注入代理類。

下面是構建代理類:

import java.lang.reflect.Proxy;
public class JdkProxy {
public static Object getInstance(Class> cls){
JdkMethodProxy invocationHandler = new JdkMethodProxy();
Object newProxyInstance = Proxy.newProxyInstance(
cls.getClassLoader(),
new Class[] { cls },
invocationHandler);
return (Object)newProxyInstance;
}
}


import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
public class JdkMethodProxy implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] parameters) throws Throwable {
//如果傳進來是一個已實現的具體類
if (Object.class.equals(method.getDeclaringClass())) {
try {
return method.invoke(this, parameters);
} catch (Throwable t) {
t.printStackTrace();
}
//如果傳進來的是一個接口
} else {
//實現接口的核心方法
//return RemoteInvoking.invoking(serviceName, serializationType, //timeOut,loadBalanceStrategy,method, parameters);

}
return null;
}
}


代理會做很多事情,對請求服務的名稱及參數信息的序列化、通過路由選擇最為合適服務提供者、建立通訊連接發送請求信息(或者直接發起 Http 請求)、最後返回獲取到的結果。

當然這裡面需要考慮很多問題,如調用超時,請求異常,通訊連接的緩存,同步服務調用還是異步服務調用等等。

同步服務調用:客戶端發起遠程服務調用請求,用戶線程完成消息序列化之後,將消息投遞到通信框架,然後同步阻塞,等待通信線程發送請求並接收到應答之後,喚醒同步等待的用戶線程,用戶線程獲取到應答之後返回。

異步服務調用:基於 Java 的 Future 機制,客戶端發起遠程服務調用請求,該請求會被標上 RequestId,同時建立一個與 RequestId 對應的 Future,客戶端通過 Future 的 Get 方法獲取結果時會被阻塞。

服務端收到請求應達會回傳 RequestId,通過 RequestId 去解除對應 Future 的阻塞,同時 Set 對應結果,最後客戶端獲取到結果。

構建 Future,以 RequestId 為 Key,Put 到線程安全的 Map 中。Get 結果時需要寫入 Time Out 超時時間,防止由於結果的未返回而導致的長時間的阻塞。

SyncFuture<rpcresponse> syncFuture =new SyncFuture<rpcresponse>();
SyncFutureCatch.syncFutureMap.put(rpcRequest.getRequestId(), syncFuture);
try {
RpcResponse rpcResponse= syncFuture.get(timeOut,TimeUnit.MILLISECONDS); return rpcResponse.getResult();
}catch (Exception e){
throw e;
}finally {
SyncFutureCatch.syncFutureMap.remove(rpcRequest.getRequestId());
}
/<rpcresponse>/<rpcresponse>


結果返回時通過回傳的 RequestId 獲取對應 Future 寫入 Response,Future 線程解除阻塞:

log.debug("Tcp Client receive head:"+headAnalysis+"Tcp Client receive data:" +rpcResponse);
SyncFuture<rpcresponse> syncFuture= SyncFutureCatch.syncFutureMap.get(rpcResponse.getRequestId());
if(syncFuture!=null){
syncFuture.setResponse(rpcResponse);
}
/<rpcresponse>


import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class SyncFuture implements Future {
// 因為請求和響應是一一對應的,因此初始化CountDownLatch值為1。
private CountDownLatch latch = new CountDownLatch(1);
// 需要響應線程設置的響應結果
private T response;
// Futrue的請求時間,用於計算Future是否超時

private long beginTime = System.currentTimeMillis();
public SyncFuture() {
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
if (response != null) {
return true;
}
return false;
}
// 獲取響應結果,直到有結果才返回。
@Override
public T get() throws InterruptedException {
latch.await();
return this.response;
}
// 獲取響應結果,直到有結果或者超過指定時間就返回。
@Override
public T get(long timeOut, TimeUnit unit) throws InterruptedException {
if (latch.await(timeOut, unit)) {
return this.response;
}
return null;
}
// 用於設置響應結果,並且做countDown操作,通知請求線程
public void setResponse(T response) {
this.response = response;
latch.countDown();
}
public long getBeginTime() {
return beginTime;
}
}


SyncFuture<rpcresponse> syncFuture =new SyncFuture<rpcresponse>();
SyncFutureCatch.syncFutureMap.put(rpcRequest.getRequestId(), syncFuture);
RpcResponse rpcResponse= syncFuture.get(timeOut,TimeUnit.MILLISECONDS);
SyncFutureCatch.syncFutureMap.remove(rpcRequest.getRequestId());
/<rpcresponse>/<rpcresponse>


除了同步服務調用,異步服務調用,還有並行服務調用,泛化調用等調用形式。

高可用


簡單的介紹了下分佈式服務框架,下面來說下分佈式系統的高可用。一個系統設計開發出來,三天兩晚就出個大問題,導致無法使用,那這個系統也不是什麼好系統。

業界流傳一句話:"我們系統支持 X 個 9 的可靠性"。這個 X 是代表一個數字,X 個 9 表示在系統 1 年時間的使用過程中,系統可以正常使用時間與總時間(1 年)之比。

3 個 9:(1-99.9%)*365*24=8.76 小時,表示該系統在連續運行 1 年時間裡最多可能的業務中斷時間是 8.76 小時,4 個 9 即 52.6 分鐘,5 個 9 即 5.26 分鐘。要做到如此高的可靠性,是非常大的挑戰。

一個大型分佈式項目可能是由幾十上百個項目構成,涉及到的服務成千上萬,主鏈上的一個流程就需要流轉多個團隊維護的項目。

拿 4 個 9 的可靠性來說,平攤到每個團隊的時間可能不到 10 分鐘。這 10 分鐘內需要頂住壓力,以最快的時間找到並解決問題,恢復系統的可用。

下面說說為了提高系統的可靠性都有哪些方案:

服務檢測:某臺服務器與註冊中心的連接中斷,其提供的服務也無響應時,系統應該能主動去重啟該服務,使其能正常對外提供。

故障隔離:集群環境下,某臺服務器能對外提供服務,但是因為其他原因,請求結果始終異常。

這時就需要主動將該節點從集群環境中剔除,避免繼續對後面的請求造成影響,非高峰時期再嘗試修復該問題。至於機房故障的情況,只能去屏蔽整個機房了。

目前餓了麼做的是異地多活,即便單邊機房掛了,流量也可以全量切換至另外一邊機房,保證系統的可用。

監控:包含業務監控、服務異常監控、DB 中間件性能的監控等,系統出現異常的時候能及時的通知到開發人員。等到線下報上來的時候,可能影響已經很大了。

壓測:產線主鏈路的壓測是必不可少的,單靠集成測試,有些高併發的場景是無法覆蓋到的,壓測能暴露平常情況無法出現的問題,也能直觀的提現系統的吞吐能力。當業務激增時,可以考慮直接做系統擴容。

SOP 方案與演練:產線上隨時都可能會發生問題,抱著出現問題時再想辦法解決的態度是肯定不行的,時間根本來不及。

提前做好對應問題的 SOP 方案,能節省大量時間,儘快的恢復系統的正常。當然平常的演練也是不可少的,一旦產線故障可以做到從容不迫的去應對和處理。

除了上述方案外,還可以考慮服務策略的使用:

降級策略:業務高峰期,為了保證核心服務,需要停掉一些不太重要的業務。

如雙十一期間不允許發起退款、只允許查看 3 個月之內的歷史訂單等業務的降級,調用服務接口時,直接返回的空結果或異常等服務的降級,都屬於分佈式系統的降級策略。

服務降級是可逆操作,當系統壓力恢復到一定值不需要降級服務時,需要去除降級,將服務狀態恢復正常。

服務降級主要包括屏蔽降級和容錯降級:

  • 屏蔽降級:分佈式服務框架直接屏蔽對遠程接口的請求,不發起對遠程服務的調用,直接返回空結果、拋出指定異常、執行本地模擬接口實現等方式。
  • 容錯降級:非核心服務不可調用時,可以對故障服務做業務放通,保證主流程不受影響。如請求超時、消息解碼異常、系統擁塞保護異常, 服務提供方系統異常等情況。


筆者之前就碰到過因雙方沒有做容錯降級導致的系統故障的情況。午高峰時期,對方調用我們的一個非核心查詢接口,我們系統因為 Bug 問題一直異常,導致對方調用這個接口的頁面異常而無法跳轉到主流程頁面,影響了產線的生產。當時對方緊急發版才使系統恢復正常。

限流策略:說到限流,最先想到的就是秒殺活動了,一場秒殺活動的流量可能是正常流量的幾百至幾千倍,如此高的流量系統根本無法處理,只能通過限流來避免系統的崩潰。

服務的限流本質和秒殺活動的限流是一樣的,都是限制請求的流入,防止服務提供方因大量的請求而崩潰。

限流算法:令牌桶、漏桶、計數器算法。上述算法適合單機的限流,但涉及到整個集群的限流時,得考慮使用緩存中間件了。

例如:某個服務 1 分鐘內只允許請求 2 次,或者一天只允許使用 1000 次。

由於負載均衡存在,可能集群內每臺機器都會收到請求,這種時候就需要緩存來記錄調用方某段時間內的請求次數,再做限流處理。Redis 就很適合做此事。

熔斷策略:熔斷本質上是一種過載保護機制,這一概念來源於電子工程中的斷路器,當電流過大時,保險絲會熔斷,從而保護整個電路。

同樣在分佈式系統中,當被調用的遠程服務無法使用時,如果沒有過載保護,就會導致請求的資源阻塞在遠程服務器上耗盡資源。

很多時候,剛開始可能只是出現了局部小規模的故障,然而由於種種原因,故障影響範圍越來越大,最終導致全局性的後果。

當下遊服務因訪問壓力過大而響應變慢或失敗,上游服務為了保護自己以及系統整體的可用性,可以暫時切斷對下游服務的調用。

熔斷器的設計思路:

  • Closed:初始狀態,熔斷器關閉,正常提供服務。
  • Open: 失敗次數,失敗百分比達到一定的閾值之後,熔斷器打開,停止訪問服務。
  • Half-Open:熔斷一定時間之後,小流量嘗試調用服務,如果成功則恢復,熔斷器變為 Closed 狀態。


數據一致性


一個系統設計開發出來,必須保證其運行的數據準確和一致性。拿支付系統來說:用戶銀行卡已經扣款成功,系統裡卻顯示失敗,沒有給用戶的虛擬帳戶充值上,這會引起客訴。

說的再嚴重點,用戶發起提現,資金已經轉到其銀行賬戶,系統卻沒扣除對應虛擬帳號的餘額,直接導致資金損失了。如果這時候用戶一直髮起提現,那就酸爽了。

CAP 原則


說到數據一致性,就不得不說到 CAP 原則。CAP 原則中指出任何一個分佈式系統中,Consistency(一致性 C)、 Availability(可用性 A)、Partition tolerance(分區容錯性 P),三者不可兼得。

傳統單機數據庫基於 ACID 特性(原子性(Atomicity)、一致性(Consistency)、隔離性(Isolation)、持久性(Durability)) ,放棄了分區容錯性,能做到可用性和一致性。

對於一個分佈式系統而言,分區容錯性是一個最基本的要求。既然是一個分佈式系統,那麼分佈式系統中的組件必然需要被部署到不同的節點,會出現節點與節點之間的網絡通訊。

而網絡問題又是一定會出現的異常情況,分區容錯性也就成為了一個分佈式系統必然需要面對和解決的問題。

系統架構師往往需要把精力花在如何根據業務特點在一致性和可用性之間尋求平衡。

集中式系統,通過數據庫事務的控制,能做到數據的強一致性。但是分佈式系統中,涉及多服務間的調用,通過分佈式事務的方案:

  • 兩階段提交(2PC)
  • 三階段提交(3PC)
  • 補償事務(TCC)
  • ...


雖然能實現數據的強一致,但是都是通過犧牲可用性來實現。

BASE 理論


BASE 理論是對 CAP 原則中一致性和可用性權衡的結果:Basically Available(基本可用)、Soft state(軟狀態)和 Eventually consistent(最終一致性)。

BASE 理論,其來源於對大規模互聯網系統分佈式實踐的總結,是基於 CAP 原則逐步演化而來的。

其最核心思想是:即使無法做到強一致性,但每個應用都可以根據自身業務特點,採用適當的方式來使系統達到最終一致性。

基本可用:是指分佈式系統在出現不可預知故障的時候,允許損失部分可用性,這不等價於系統不可用。

軟狀態:指允許系統中的數據存在中間狀態,並認為該中間狀態的存在不會影響系統的整體可用性,即允許系統在不同節點的數據副本之間進行數據同步的過程存在延時。

最終一致性:強調的是所有的數據副本,在經過一段時間的同步之後,最終都能夠達到一致的狀態。

因此,最終一致性的本質是需要系統保證最終數據能夠達到一致,而不需要實時保證系統數據的強一致性。

總的來說,BASE 理論面向的是大型高可用可擴展的分佈式系統,和傳統的事物 ACID 特性是相反的。

它完全不同於 ACID 的強一致性模型,而是通過犧牲強一致性來獲得可用性,並允許數據在一段時間內是不一致的,但最終達到一致狀態。

同時,在實際的分佈式場景中,不同業務單元和組件對數據一致性的要求是不同的,因此在具體的分佈式系統架構設計過程中,ACID 特性和 BASE 理論往往又會結合在一起。

結語


分佈式系統涉及到的東西還有很多,如:分佈式鎖、定時調度、數據分片、性能問題、各種中間件的使用等,筆者分享只是瞭解到的那一小部分的知識而已。

之前本著學習的目的也寫過一個非常簡單的分佈式服務框架 blackRpc,通過它瞭解了分佈式服務框架內部的一些活動。

歡迎工作一到五年的Java工程師朋友們加入Java程序員開發: 721575865

群內提供免費的Java架構學習資料(裡面有高可用、高併發、高性能及分佈式、Jvm性能調優、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用自己每一分每一秒的時間來學習提升自己,不要再用"沒有時間“來掩飾自己思想上的懶惰!趁年輕,使勁拼,給未來的自己一個交代!



分享到:


相關文章: