程序員:RPC遠程調用原理淺析

RPC

基本概念

RPC(Remote Procedure Call)遠程過程調用,簡單的理解是一個節點請求另一個節點提供的服務

本地過程調用:如果需要將本地student對象的age+1,可以實現一個addAge()方法,將student對象傳入,對年齡進行更新之後返回即可,本地方法調用的函數體通過函數指針來指定。

遠程過程調用:上述操作的過程中,如果addAge()這個方法在服務端,執行函數的函數體在遠程機器上,如何告訴機器需要調用這個方法呢?

  1. 首先客戶端需要告訴服務器,需要調用的函數,這裡函數和進程ID存在一個映射,客戶端遠程調用時,需要查一下函數,找到對應的ID,然後執行函數的代碼。
  2. 客戶端需要把本地參數傳給遠程函數,本地調用的過程中,直接壓棧即可,但是在遠程調用過程中不再同一個內存裡,無法直接傳遞函數的參數,因此需要客戶端把參數轉換成字節流,傳給服務端,然後服務端將字節流轉換成自身能讀取的格式,是一個序列化和反序列化的過程。
  3. 備好了之後,如何進行傳輸?網絡傳輸層需要把調用的ID和序列化後的參數傳給服務端,然後把計算好的結果序列化傳給客戶端,因此TCP層即可完成上述過程,gRPC中採用的是HTTP2協議。

總結一下上述過程:

// Client端

// Student student = Call(ServerAddr, addAge, student)

1. 將這個調用映射為Call ID。

2. 將Call ID,student(params)序列化,以二進制形式打包

3. 把2中得到的數據包發送給ServerAddr,這需要使用網絡傳輸層

4. 等待服務器返回結果

5. 如果服務器調用成功,那麼就將結果反序列化,並賦給student,年齡更新

// Server端

1. 在本地維護一個Call ID到函數指針的映射call_id_map,可以用Map<string> callIdMap/<string>

2. 等待服務端請求

3. 得到一個請求後,將其數據包反序列化,得到Call ID

4. 通過在callIdMap中查找,得到相應的函數指針

5. 將student(params)反序列化後,在本地調用addAge()函數,得到結果

6. 將student結果序列化後通過網絡返回給Client

程序員:RPC遠程調用原理淺析
  • 在微服務的設計中,一個服務A如果訪問另一個Module下的服務B,可以採用HTTP REST傳輸數據,並在兩個服務之間進行序列化和反序列化操作,服務B把執行結果返回過來。
  • 由於HTTP在應用層中完成,整個通信的代價較高,遠程過程調用中直接基於TCP進行遠程調用,數據傳輸在傳輸層TCP層完成,更適合對效率要求比較高的場景,RPC主要依賴於客戶端和服務端之間建立Socket鏈接進行,底層實現比REST更復雜。

創建三個maven項目

服務者

消費者

API

讓服務者和消費者都依賴API

程序員:RPC遠程調用原理淺析


程序員:RPC遠程調用原理淺析

在消費者創建ConsumerApp類

使用代理對象

具體代碼在ProxyUtils中

public class ConsumerApp {

public static void main(String[] args) {

//while死循環是為了測試調用提供者是否為隨機

while (true) {

try {

Thread.sleep(2000);

// 獲得代理對象

AddService addService = ProxyUtils.getProxy(AddService.class);

// 只要調用方法就會進入代理對象invoke方法

int result = addService.add(15, 684);

System.out.println(result);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

API中創建Request,AddService,ProxyUtils,ZkUtils

創建Request(該類為傳輸對象,必須實現序列化)

public class Request implements Serializable{

/**

*

*/

private static final long serialVersionUID = 1L;

private String interfaceName;

private String methodName;

private Object[] args;

public String getInterfaceName() {

return interfaceName;

}

public void setInterfaceName(String interfaceName) {

this.interfaceName = interfaceName;

}

public String getMethodName() {

return methodName;

}

public void setMethodName(String methodName) {

this.methodName = methodName;

}

public Object[] getArgs() {

return args;

}

public void setArgs(Object[] args) {

this.args = args;

}

@Override

public String toString() {

return "Request [interfaceName=" + interfaceName + ", methodName=" + methodName + ", args="

+ Arrays.toString(args) + "]";

}

}

創建AddService

package com.chenlei.service;

public interface AddService {

public int add(Integer a, Integer b);

}

創建ProxyUtils(重點)

public class ProxyUtils {

private static Random RDM = new Random();

@SuppressWarnings("unchecked")

public static T getProxy(Class interfaces) {

T proxy = (T) Proxy.newProxyInstance(ProxyUtils.class.getClassLoader(), new Class>[] { interfaces },

new InvocationHandler() {

@Override

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

String methodName = method.getName();

if ("toString".equals(methodName)) {

return interfaces.getClass().getName() + "$Proxy";

}

if ("hashCode".equals(methodName)) {

return Object.class.hashCode();

}

if ("equals".equals(methodName)) {

return Object.class.equals(this);

}

// 消費者發送過去

Request request = new Request();

request.setInterfaceName(interfaces.getName());

request.setMethodName(methodName);

request.setArgs(args);

// 找到interfaces下的所有節點

List<string> serverList = ZkUtils.discover(interfaces.getName());/<string>

String one = randomOne(serverList);// 拿到的結果為ip:port 如127.0.0.1:8888

String[] split = one.split(":");

String address = split[0];

Integer port = Integer.valueOf(split[1]);

Socket socket = null;

// 打開書出管道,發送請求

Object result = null;

OutputStream outputStream = null;

ObjectOutputStream objectOutputStream = null;

InputStream inputStream = null;

ObjectInputStream objectInputStream = null;

try {

socket = new Socket(address, port);

outputStream = socket.getOutputStream();

objectOutputStream = new ObjectOutputStream(outputStream);

objectOutputStream.writeObject(request);

inputStream = socket.getInputStream();

objectInputStream = new ObjectInputStream(inputStream);

result = objectInputStream.readObject();

System.out.println("本次調用的是======" + port);

} catch (Exception e) {

e.printStackTrace();

} finally {

closeResources(objectInputStream, inputStream, objectOutputStream, outputStream, socket);

}

return result;

}

});

return proxy;

}

/**

* 從節點中隨機找出一個

*

* @param serverList

* @return

*/

private static String randomOne(List<string> serverList) {/<string>

if (null == serverList || 0 == serverList.size()) {

return null;

}

int index = RDM.nextInt(serverList.size());

return serverList.get(index);

}

/**

* 關閉資源的方法

*/

public static void closeResources(Closeable... resources) {

for (Closeable resource : resources) {

if (null != resource) {

try {

resource.close();

} catch (IOException e) {

e.printStackTrace();

} finally {

resource = null;

}

}

}

}

}

創建ZkUtils(zookeeper註冊和發現,另加緩存解決髒讀)

在API項目中導入zkclient的依賴

public class ZkUtils {

private static final String ZK_URL = "自己的域名:2181";

private static ZkClient zkClient = null;

//創建zookeeper緩存

private static Map<string>> cache = new HashMap<string>>();/<string>/<string>

static {

zkClient = new ZkClient(ZK_URL, 10000, 10000);

}

/**

* 服務節點向zookeeper的註冊

*

* @param serverName

* @param serverPort

*/

public static void register(String serverName, String serverPort) {

if (null == serverName || "".equals(serverName)) {

throw new RuntimeException("服務名不能為空");

}

if (null == serverPort || "".equals(serverPort)) {

throw new RuntimeException("服務ip和端口不能為空");

}

if (!zkClient.exists("/" + serverName)) {

zkClient.createPersistent("/" + serverName);

}

if (!zkClient.exists("/" + serverName + "/" + serverPort)) {

zkClient.createEphemeral("/" + serverName + "/" + serverPort);

}

System.out.println("註冊一個服務節點為" + "/" + serverName + "/" + serverPort);

}

/**

* 向zookeeper發現服務節點

*

* @param serverName

* @return

*/

public static List<string> discover(String serverName) {/<string>

if (null == serverName || "".equals(serverName)) {

throw new RuntimeException("服務名不能為空");

}

// 先從緩存裡找

if (cache.containsKey(serverName)) {

System.out.println("在緩存中找到" + serverName + "節點");

}

// 如果該節點在zookeeper中不存在,直接返回空

if (!zkClient.exists("/" + serverName)) {

return null;

}

zkClient.subscribeChildChanges("/" + serverName, new IZkChildListener() {

@Override

public void handleChildChange(String parentPath, List<string> currentChilds) throws Exception {/<string>

// 一旦進入此方法,證明有節點改變

cache.put(serverName, currentChilds);

System.out.println(serverName + "節點有變化-----" + "緩存完成更新");

}

});

return zkClient.getChildren("/" + serverName);

}

}

寫提供者代碼

創建AddServiceImpl

注意類名最好是AddService+Impl,並且類全路徑也要對應com.chenlei.service.impl.AddServiceImpl,否則代碼需要調整

package com.chenlei.service.impl;

import com.chenlei.service.AddService;

public class AddServiceImpl implements AddService {

@Override

public int add(Integer a, Integer b) {

return a + b;

}

}

創建ProviderApp(重點)

public class ProviderApp {

public static void main(String[] args) {

Integer port = 7777;

ServerSocket serverSocket = bind(port);

// 向zookeeper註冊

ZkUtils.register(AddService.class.getName(), "127.0.0.1" + ":" + port);

// 監聽+處理請求

listener(serverSocket);

}

/**

* 監聽和處理請求

*

* @param serverSocket

*/

private static void listener(ServerSocket serverSocket) {

//此處死循環是為了讓次提供者一直處於工作狀態

while (true) {

Socket socket = null;

InputStream inputStream = null;

ObjectInputStream objectInputStream = null;

OutputStream outputStream = null;

ObjectOutputStream objectOutputStream = null;

try {

socket = serverSocket.accept();

inputStream = socket.getInputStream();

objectInputStream = new ObjectInputStream(inputStream);

Request request = (Request) objectInputStream.readObject();

Object answer = invoker(request);

outputStream = socket.getOutputStream();

objectOutputStream = new ObjectOutputStream(outputStream);

objectOutputStream.writeObject(answer);

} catch (Exception e) {

e.printStackTrace();

} finally {

ProxyUtils.closeResources(objectOutputStream, outputStream, objectInputStream, inputStream, socket);

}

}

}

/**

* 處理請求返回結果

*

* @param request

* @return

*/

private static Object invoker(Request request) {

// 獲得從消費者傳過來的信息

String interfaceName = request.getInterfaceName();

String methodName = request.getMethodName();

Object[] args = request.getArgs();

// 獲得對應實現類全名

String className = getClassNameByInterfaceName(interfaceName);

Object answer = null;

try {

// 找到該類

Class> clazz = Class.forName(className);

// 創建一個對象

Object object = clazz.newInstance();

Class>[] argsType = new Class>[args.length];

if (null != args || 0 != args.length) {

for (int i = 0; i < args.length; i++) {

argsType[i] = args[i].getClass();

}

}

Method method = clazz.getMethod(methodName, argsType);

answer = method.invoke(object, args);

} catch (Exception e) {

e.printStackTrace();

}

return answer;

}

/**

* 通過請求者傳來的類信息,獲得對應實現類的所有信息,並返回實現類的全名

*

* @param interfaceName

* @return

*/

private static String getClassNameByInterfaceName(String interfaceName) {

// 傳過來的接口名為com.chenlei.service.AddService

int index = interfaceName.lastIndexOf(".");

StringBuilder sb = new StringBuilder();

// com.chenlei.service

sb.append(interfaceName.subSequence(0, index));

// com.chenlei.service.impl.

sb.append(".impl.");

// com.chenlei.service.impl.AddService

sb.append(interfaceName.substring(index + 1)).append("Impl");

return sb.toString();

}

/**

* 綁定一個端口

*

* @param port

* @return

*/

private static ServerSocket bind(Integer port) {

ServerSocket serverSocket = null;

try {

serverSocket = new ServerSocket(port);

} catch (IOException e) {

e.printStackTrace();

}

return serverSocket;

}

/**

* 測試代碼

*/

//public static void main(String[] args) {

// String interfaceName = "com.chenlei.service.AddService";

// int index = interfaceName.lastIndexOf(".");

// StringBuilder sb = new StringBuilder();

// // com.chenlei.service

// sb.append(interfaceName.subSequence(0, index));

// // com.chenlei.service.impl.

// sb.append(".impl.");

// // com.chenlei.service.impl.AddService

// sb.append(interfaceName.substring(index + 1)).append("Impl");

// System.out.println(sb.toString());

//}

}

更改提供者端口,分別啟動三個提供者


程序員:RPC遠程調用原理淺析

再啟動消費者,查看結果


程序員:RPC遠程調用原理淺析

其他錯誤和注意事項


程序員:RPC遠程調用原理淺析

寫代碼思路

程序員:RPC遠程調用原理淺析


程序員:RPC遠程調用原理淺析



分享到:


相關文章: