自己手動實現dubbo

用,現在我們就來動手自己編寫一個RPC框架,通過這篇文章的學習,你將學習到

  • 分佈式系統的概念
  • RPC遠程方法調用的應用
  • Dubbo的原理深入理解

當然,如果要完全自己編寫一個RPC框架,我們需要掌握以下知識點

  • 網絡編程(網絡通信) 本文將使用netty4網絡通信框架
  • 多線程相關知識
  • 反射相關知識
  • jdk的動態代理
  • Spring框架的相關知識

如果對於上述的知識點有一部分不是很理解,也不會影響你閱讀本文和對Dubbo的RPC調用原理的理解

好了,我們先來簡單的描述一下整個RPC調用的業務流程圖

自己手動實現dubbo

為了可以實現上面的RPC調用,我們創建的RPC框架的模塊之間的關係圖如下:

自己手動實現dubbo

對於上面的每個模塊的具體作用,使用一個表格簡單的進行描述

模塊名稱主要功能rpc-register主要完成可註冊中心Zookeeper的交互
RPC服務端使用該模塊往註冊中心註冊地址和端口
RPC客戶端通過該模塊獲取實時已近註冊的服務地址和端口rpc-common定義RPC通信的請求消息和響應消息的規則,以及消息的序列化和反序列化的幫助類rpc-serverRPC服務端,啟動RPC服務,掃描app-server中的所有可以提供的服務列表並保存
接受RPC客戶端的消息並且通過反射調用具體的方法
響應RPC客戶端,把方法執行結果返回到RPC客戶端rpc-clientRPC客戶端,通過網絡通信往RPC服務端發送請求調用消息
接受服務端的響應消息
配置動態代理類,所有的方法調用都通過網絡調用發送到RPC服務端app-common具體的應用中的接口和JavaBean對象,類似於service模塊和bean模塊app-server通過Spring的配置啟動SpringContext,並且配置RpcServer和RpcRegistry Bean對象的創建
實現app-common中的接口,並且在接口上添加註解@RpcService(IProductService.class)可以讓RPCServer識別到該服務
啟動服務app-client通過Spring的配置創建RpcDiscover對象和RpcProxy對象,其中RpcDiscover用於從註冊中心獲取到服務的地址信息,RpcProxy用於創建類的動態代理對象

接下來我們來看一下具體的實現代碼

  1. rpc-register
  2. 這個模塊用戶和註冊中心進行交互,主要包括三個類
  • Constant常量定義,設置連接ZKServer的相關參數
  • RpcRegistry:往註冊中心ZKServer設置地址信息,RPC-Server需要使用
  • RpcDiscover: 從註冊中心ZKServer獲取服務端的網絡地址信息 RPC-client需要使用
  1. 具體的實現代碼
package cn.wolfcode.rpc.register;
public interface Constant {
 //定義客戶端連接session會話超時時間,單位為毫秒,該值的設置和zkServer設置的心跳時間有關係
 int SESSION_TIMEOUT=4000;
 // 定義用於保存rpc通信服務端的地址信息的目錄
 String REGISTRY_PATH="/rpc";
 // 定義數據存放的具體目錄
 String DATA_PATH=REGISTRY_PATH+"/data";
}
package cn.wolfcode.rpc.register;
 
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
@Setter@Getter
@AllArgsConstructor()
@NoArgsConstructor
public class RpcRegistry {
 
 public static final Logger LOGGER=LoggerFactory.getLogger(RpcRegistry.class);
 //zkServer的地址信息
 private String registryAddress;
 //zk客戶端程序
 private ZooKeeper zooKeeper;
 
 public void createNode(String data) throws Exception{
 //創建一個客戶端程序, 對於註冊可以不用監聽事件
 zooKeeper= new ZooKeeper(registryAddress, Constant.SESSION_TIMEOUT, new Watcher() {
 @Override
 public void process(WatchedEvent event) {
 }
 });
 if(zooKeeper!=null){
 try{
 //判斷註冊的目錄是否存在
 Stat stat = zooKeeper.exists(Constant.REGISTRY_PATH, false);
 if(stat==null){
 //如果不存在, 創建一個持久的節點目錄
 zooKeeper.create(Constant.REGISTRY_PATH,null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
 }
 //創建一個臨時的序列節點,並且保存數據信息
 zooKeeper.create(Constant.DATA_PATH,data.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
 }catch (Exception e){
 LOGGER.error("",e);
 e.printStackTrace();
 }
 }else{
 LOGGER.debug("zooKeeper connect is null");
 }
 }
 //測試程序
 public static void main(String[] args) throws Exception {
 RpcRegistry rpcRegistry = new RpcRegistry();
 rpcRegistry.setRegistryAddress("192.168.158.151:2181");
 rpcRegistry.createNode("testdata");
 //讓程序等待輸入,程序一直處於運行狀態
 System.in.read();
 }
}
package cn.wolfcode.rpc.register;
import lombok.Getter;
import lombok.Setter;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@Setter
@Getter
//地址發現,用於實時的獲取最新的RPC服務信息
public class RpcDiscover {
 public static final Logger LOGGER=LoggerFactory.getLogger(RpcRegistry.class);
 //服務端地址 zkServer的地址
 private String registryAddress;
 //獲取到的所有提供服務的服務器列表
 private volatile List dataList=new ArrayList 
<>(); private ZooKeeper zooKeeper=null; //初始化zkClient客戶端 public RpcDiscover(String registryAddress) throws Exception { this.registryAddress = registryAddress; zooKeeper=new ZooKeeper(registryAddress, Constant.SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if(watchedEvent.getType()==Event.EventType.NodeChildrenChanged){ //監聽zkServer的服務器列表變化 watchNode(); } } }); //獲取節點相關數據 watchNode(); } // 從dataList列表隨機獲取一個可用的服務端的地址信息給rpc-client public String discover(){ int size=dataList.size(); if(size>0){ int index= new Random().nextInt(size); return dataList.get(index); } throw new RuntimeException("沒有找到對應的服務器"); } //監聽服務端的列表信息 private void watchNode(){ try{ //獲取子節點信息 List nodeList = zooKeeper.getChildren(Constant.REGISTRY_PATH, true); List dataList=new ArrayList<>(); for (String node : nodeList) { byte[] bytes = zooKeeper.getData(Constant.REGISTRY_PATH + "/" + node, false, null); dataList.add(new String(bytes)); } this.dataList=dataList; }catch (Exception e){ LOGGER.error("",e); e.printStackTrace(); } } //測試程序 public static void main(String[] args) throws Exception { //打印獲取到的連接地址信息 System.out.println(new RpcDiscover("192.168.158.151:2181").discover()); System.in.read(); } }
  1. rpc-common
  2. 定義RPC通信的請求消息和響應消息的規則,以及消息的序列化和反序列化的幫助類,主要包括
  • RpcRequest 請求消息封裝對象
  • RpcResponse 響應消息封裝對象
  • SerializationUtil 消息的序列化,煩序列化幫助類
  • RpcEncoder 把消息對象轉換為字節數組進行通信
  • RpcDecoder 把獲取到的字節數組轉換為對應的消息對象
  1. 具體代碼如下
package cn.wolfcode.rpc.common;
 
import lombok.*;
 
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
@ToString
//RPC通信的數據請求規則
public class RpcRequest {
 // 請求消息的消息Id
 private String requestId;
 // 請求的具體的類名(接口名稱)
 private String className;
 // 請求的具體的方法名稱
 private String methodName;
 // 請求的方法參數類型列表
 private Class>[] parameterTypes;
 // 請求的方法參數列表
 private Object[] parameters;
}
package cn.wolfcode.rpc.common;
 
import lombok.*;
 
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
@ToString
//RPC通信消息的響應數據規則
public class RpcResponse {
 //響應的消息id
 private String responseId;
 //請求的消息id
 private String requestId;
 // 響應的消息是否成功
 private boolean success;
 // 響應的數據結果
 private Object result;
 // 如果有異常信息,在該對象中記錄異常信息
 private Throwable throwable;
}
package cn.wolfcode.rpc.common;
 
import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;
import org.objenesis.Objenesis;
import org.objenesis.ObjenesisStd;
 
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
/**
 * 序列化工具類(基於 Protostuff 實現) 用於把對象序列化字節數組, 把字節數組反序列化對象
 */
public class SerializationUtil {
 
 private static Map, Schema>> cachedSchema = new ConcurrentHashMap, Schema>>();
 
 private static Objenesis objenesis = new ObjenesisStd(true);
 
 private SerializationUtil() {
 }
 /**
 * 獲取類的schema
 * @param cls
 * @return
 */
 @SuppressWarnings("unchecked")
 private static  Schema getSchema(Class cls) {
 Schema schema = (Schema) cachedSchema.get(cls);
 if (schema == null) {
 schema = RuntimeSchema.createFrom(cls);
 if (schema != null) {
 cachedSchema.put(cls, schema);
 }
 }
 return schema;
 }
 
 /**
 * 序列化(對象 -> 字節數組)
 */
 @SuppressWarnings("unchecked")
 public static  byte[] serialize(T obj) {
 Class cls = (Class) obj.getClass();
 LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
 try {
 Schema schema = getSchema(cls);
 return ProtostuffIOUtil.toByteArray(obj, schema, buffer);//序列化
 } catch (Exception e) {
 throw new IllegalStateException(e.getMessage(), e);
 } finally {
 buffer.clear();
 }
 }
 
 /**
 * 反序列化(字節數組 -> 對象)
 */
 public static  T deserialize(byte[] data, Class cls) {
 try {
 /*
 * 如果一個類沒有參數為空的構造方法時候,那麼你直接調用newInstance方法試圖得到一個實例對象的時候是會拋出異常的
 * 通過ObjenesisStd可以完美的避開這個問題
 * */
 T message = (T) objenesis.newInstance(cls);//實例化
 Schema schema = getSchema(cls);//獲取類的schema
 ProtostuffIOUtil.mergeFrom(data, message, schema);
 return message;
 } catch (Exception e) {
 throw new IllegalStateException(e.getMessage(), e);
 }
 }
}
package cn.wolfcode.rpc.common;
 
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
 
//對傳遞的消息進行編碼, 因為是請求/響應對象的傳遞,先編碼為字節數組在發送到服務器解碼
public class RpcEncoder extends MessageToByteEncoder {
 // 傳遞的數據的對象類型
 private Class genericClass;
 
 public RpcEncoder(Class genericClass) {
 this.genericClass = genericClass;
 }
 
 @Override
 protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
 if(genericClass.isInstance(msg)){
 //序列化請求消息為字節數組
 byte[] bytes = SerializationUtil.serialize(msg);
 // 把數據寫入到下一個通道(channel)或者是發往服務端
 out.writeBytes(bytes);
 }
 }
}
package cn.wolfcode.rpc.common;
 
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
 
import java.util.List;
//對傳遞的消息進行解碼, 接受到的數據是字節數組,需要把數組轉換為對應的請求/響應消息對象
public class RpcDecoder extends ByteToMessageDecoder {
 
 private Class> genericClass;
 
 public RpcDecoder(Class> genericClass) {
 this.genericClass = genericClass;
 }
 
 @Override
 //解碼方法,把字節數組轉換為消息對象
 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
 //消息的長度
 int size=in.readableBytes();
 if(size 
<4){//保證所有的消息都完全接受完成 return; } byte[] bytes =new byte[size]; //把傳遞的字節數組讀取到bytes中 in.readBytes(bytes); // 反序列化為對象(RPCRequest/RPCResponse對象) Object object = SerializationUtil.deserialize(bytes, genericClass); //輸出對象 out.add(object); //刷新緩存 ctx.flush(); } }
  1. rpc-server
  2. ​ RPC服務端,啟動RPC服務,掃描app-server中的所有可以提供的服務列表並保存,接受RPC客戶端的消息並且通過反射調用具體的方法,響應RPC客戶端,把方法執行結果返回到RPC客戶端
  3. 主要包括:
  • RpcService 定義一個註解,用於標記服務程序的提供者,通過Spring掃描出所有的服務並且保存
  • RpcServerHandler 處理RPC客戶端請求,調用服務提供者的具體方法,響應執行結果
  • RpcServer 掃描所有的服務(標記了@RPCService的類),啟動RPC服務
package cn.wolfcode.rpc.server;
 
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
 
/**
 * 這個註解用於貼在每個提供服務的實現類,
 * 在Spring容器啟動的時候,自動掃描到貼了該註解的所有的服務
 */
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface RpcService {
 public Class> value();
}
package cn.wolfcode.rpc.server;
 
import cn.wolfcode.rpc.common.RpcDecoder;
import cn.wolfcode.rpc.common.RpcEncoder;
import cn.wolfcode.rpc.common.RpcRequest;
import cn.wolfcode.rpc.common.RpcResponse;
import cn.wolfcode.rpc.register.RpcRegistry;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.commons.collections4.MapUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
 
import java.util.HashMap;
import java.util.Map;
 
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
//RPC服務端啟動,實現Spring的感知接口
public class RpcServer implements ApplicationContextAware,InitializingBean {
 //用於保存所有提供服務的方法, 其中key為類的全路徑名, value是所有的實現類
 private final Map serviceBeanMap=new HashMap<>();
 //rpcRegistry 用於註冊相關的地址信息
 private RpcRegistry rpcRegistry;
 //提供服務的地址信息 格式為 192.168.158.151:9000 類似
 private String serverAddress;
 //在Spring容器啟動完成後會執行該方法
 @Override
 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
 //獲取到所有貼了RpcService註解的Bean對象
 Map serviceBeanMap = applicationContext.getBeansWithAnnotation(RpcService.class);
 if(MapUtils.isNotEmpty(serviceBeanMap)){
 for (Object object : serviceBeanMap.values()) {
 //獲取到類的路徑名稱
 String serviceName = object.getClass().getAnnotation(RpcService.class).value().getName();
 //把獲取到的信息保存到serviceBeanMap中
 this.serviceBeanMap.put(serviceName,object);
 }
 }
 System.out.println("服務器: "+serverAddress +" 提供的服務列表: "+ serviceBeanMap );
 }
 // 初始化完成後執行
 @Override
 public void afterPropertiesSet() throws Exception {
 //創建服務端的通信對象
 ServerBootstrap server = new ServerBootstrap();
 // 創建異步通信的事件組 用於建立TCP連接的
 NioEventLoopGroup bossGroup = new NioEventLoopGroup();
 // 創建異步通信的事件組 用於處理Channel(通道)的I/O事件
 NioEventLoopGroup workerGroup = new NioEventLoopGroup();
 try{
 //開始設置server的相關參數
 server.group(bossGroup,workerGroup)
 //啟動異步ServerSocket
 .channel(NioServerSocketChannel.class)
 //初始化通道信息
 .childHandler(new ChannelInitializer() {
 @Override
 protected void initChannel(SocketChannel ch) throws Exception {
 ch.pipeline().addLast(new RpcDecoder(RpcRequest.class))//1 解碼請求參數
 .addLast(new RpcEncoder(RpcResponse.class))//2 編碼響應信息
 .addLast(new RpcServerHandler(serviceBeanMap));//3 請求處理
 }
 }).option(ChannelOption.SO_BACKLOG, 128)
 .childOption(ChannelOption.SO_KEEPALIVE, true);;
 String host=serverAddress.split(":")[0] ;//獲取到主機地址
 int port=Integer.valueOf(serverAddress.split(":")[1]);//端口
 ChannelFuture future = server.bind(host, port).sync();//開啟異步通信服務
 System.out.println("服務器啟動成功:"+future.channel().localAddress());
 rpcRegistry.createNode(serverAddress);
 System.out.println("向zkServer註冊服務地址信息");
 future.channel().closeFuture().sync();//等待通信完成
 }catch (Exception e){
 e.printStackTrace();
 }finally {
 //優雅的關閉socket
 bossGroup.shutdownGracefully();
 workerGroup.shutdownGracefully();
 }
 }
}
 package cn.wolfcode.rpc.server;
 
 
 import cn.wolfcode.rpc.common.RpcRequest;
 import cn.wolfcode.rpc.common.RpcResponse;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
 
 import java.lang.reflect.Method;
 import java.util.Map;
 import java.util.UUID;
 
 @Setter
 @Getter
 @NoArgsConstructor
 public class RpcServerHandler extends ChannelInboundHandlerAdapter{
 
 private Map serviceBeanMap;
 
 public RpcServerHandler(Map serviceBeanMap) {
 this.serviceBeanMap = serviceBeanMap;
 }
 
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 System.out.println("RpcServerHandler.channelRead");
 System.out.println(msg);
 RpcRequest rpcRequest= (RpcRequest) msg;
 RpcResponse rpcResponse=handler(rpcRequest);
 //告訴客戶端,關閉socket連接
 ctx.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE);
 }
 
 private RpcResponse handler(RpcRequest rpcRequest) {
 //創建一個響應消息對象
 RpcResponse rpcResponse =new RpcResponse();
 //設置響應消息ID
 rpcResponse.setResponseId(UUID.randomUUID().toString());
 //請求消息ID
 rpcResponse.setRequestId(rpcRequest.getRequestId());
 try{
 //獲取到類名(接口名稱)
 String className = rpcRequest.getClassName();
 //獲取到方法名
 String methodName = rpcRequest.getMethodName();
 //獲取到參數類型列表
 Class>[] parameterTypes = rpcRequest.getParameterTypes();
 //獲取到參數列表
 Object[] parameters = rpcRequest.getParameters();
 //獲取到具字節碼對象
 Class> clz = Class.forName(className);
 //獲取到實現類
 Object serviceBean = serviceBeanMap.get(className);
 if(serviceBean==null){
 throw new RuntimeException(className+"沒有找到對應的serviceBean:"+className+":beanMap:"+serviceBeanMap);
 }
 //反射調用方法
 Method method = clz.getMethod(methodName, parameterTypes);
 if(method==null)
 throw new RuntimeException("沒有找到對應的方法");
 Object result = method.invoke(serviceBean, parameters);
 rpcResponse.setSuccess(true);
 //設置方法調用的結果
 rpcResponse.setResult(result);
 }catch (Exception e){
 rpcResponse.setSuccess(false);
 rpcResponse.setThrowable(e);
 e.printStackTrace();
 }
 return rpcResponse;
 }
 }
 
  1. rpc-client
  2. ​ RPC客戶端,通過網絡通信往RPC服務端發送請求調用消息,接受服務端的響應消息,配置動態代理類,所有的方法調用都通過網絡調用發送到RPC服務端
  3. 其中包括的主要代碼:
  • RpcProxy 對於每一個類都創建一個動態代理對象,並且在invoke方法創建rpc客戶端並且發送網絡通信請求
  • RpcClient RPC通信客戶端,啟動RPC通信服務,創建TCP連接,發送請求,接受響應
  1. 具體實現代碼:
 package cn.wolfcode.rpc.client;
 
 
 import cn.wolfcode.rpc.common.RpcDecoder;
 import cn.wolfcode.rpc.common.RpcEncoder;
 import cn.wolfcode.rpc.common.RpcRequest;
 import cn.wolfcode.rpc.common.RpcResponse;
 import cn.wolfcode.rpc.register.RpcDiscover;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.*;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 //RPC通信客戶端,往服務端發送請求,並且接受服務端的響應
 public class RpcClient extends SimpleChannelInboundHandler {
 //消息響應對象
 private RpcResponse rpcResponse;
 //消息請求對象
 private RpcRequest rpcRequest;
 // 同步鎖 資源對象
 private Object object=new Object();
 // 用於獲取服務地址列表信息
 private RpcDiscover rpcDiscover;
 //構造函數
 public RpcClient(RpcRequest rpcRequest,RpcDiscover rpcDiscover) {
 this.rpcDiscover = rpcDiscover;
 this.rpcRequest=rpcRequest;
 }
 @Override
 protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
 this.rpcResponse=msg;//響應消息
 synchronized (object){
 ctx.flush();//刷新緩存
 object.notifyAll();//喚醒等待
 }
 }
 //發送消息
 public RpcResponse send() throws Exception {
 //創建一個socket通信對象
 Bootstrap client = new Bootstrap();
 //創建一個通信組,負責Channel(通道)的I/O事件的處理
 NioEventLoopGroup loopGroup = new NioEventLoopGroup();
 try{
 client.group(loopGroup)//設置參數
 .channel(NioSocketChannel.class)//使用異步socket通信
 .handler(new ChannelInitializer() {
 @Override
 protected void initChannel(SocketChannel ch) throws Exception {
 ch.pipeline().addLast(new RpcEncoder(RpcRequest.class))//編碼請求對象
 .addLast(new RpcDecoder(RpcResponse.class))//解碼響應對象
 .addLast(RpcClient.this);//發送請求對象
 }
 }).option(ChannelOption.SO_KEEPALIVE, true);;
 String serverAddress = rpcDiscover.discover();//獲取一個服務器地址
 String host=serverAddress.split(":")[0];
 int port=Integer.valueOf(serverAddress.split(":")[1]);
 ChannelFuture future = client.connect(host,port).sync();
 System.out.println("客戶端準備發送數據:"+rpcRequest);
 future.channel().writeAndFlush(rpcRequest).sync();
 synchronized (object){
 object.wait();//線程等待,等待客戶端響應
 }
 if (rpcResponse != null) {
 future.channel().closeFuture().sync();//等待服務端關閉socket
 }
 return rpcResponse;
 }finally {
 loopGroup.shutdownGracefully();//優雅關閉socket
 }
 }
 
 /**
 * 異常處理
 */
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
 throws Exception {
 ctx.close();
 }
 }
 package cn.wolfcode.rpc.client;
 
 
 import cn.wolfcode.rpc.common.RpcRequest;
 import cn.wolfcode.rpc.common.RpcResponse;
 import cn.wolfcode.rpc.register.RpcDiscover;
 import lombok.Getter;
 import lombok.Setter;
 
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.util.UUID;
 
 @Setter
 @Getter
 //動態代理類,用於獲取到每個類的代理對象
 //對於被代理對象的所有的方法調用都會執行invoke方法
 public class RpcProxy {
 //用於獲取到RPC-Server的地址信息
 private RpcDiscover rpcDiscover;
 
 @SuppressWarnings("all")
 public  T getInstance(Class interfaceClass){
 T instance = (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class>[]{interfaceClass}, new InvocationHandler() {
 @Override
 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
 //創建請求對象
 RpcRequest rpcRequest = new RpcRequest();
 //獲取到被調用的類名 和RPC-Server中的serviceMap中的key進行匹配
 String className=method.getDeclaringClass().getName();
 //獲取到方法的參數列表
 Class>[] parameterTypes = method.getParameterTypes();
 //生成一個請求的id
 rpcRequest.setRequestId(UUID.randomUUID().toString());
 rpcRequest.setClassName(className);//類名
 rpcRequest.setParameterTypes(parameterTypes);//參數類型列表
 rpcRequest.setParameters(args);//參數列表
 rpcRequest.setMethodName(method.getName());//調用的放方法名稱
 RpcResponse rpcResponse = new RpcClient(rpcRequest, rpcDiscover).send();//創建一個RPCclient對象,並且發送消息到服務端
 //返回調用結果
 return rpcResponse.getResult();
 }
 });
 //返回一個代理對象
 return instance;
 }
 }
 
  1. app-common
  2. 這是具體應用的通用模塊,和具體的項目結構有關係,這裡主要包括接口定義和JavaBean對象的定義
  3. 具體代碼為:
package cn.wolfcode.app.common;
 
public interface IProductService {
 /**
 * 保存產品
 * @param product
 */
 void save(Product product);
 
 /**
 * 根據產品id刪除產品
 * @param productId
 */
 void deleteById(Long productId);
 
 /**
 * 修改產品信息
 * @param product
 */
 void update(Product product);
 
 /**
 * 根據產品id獲取到產品信息
 * @param productId
 * @return
 */
 Product get(Long productId);
}
package cn.wolfcode.app.common;
 
import lombok.*;
 
import java.math.BigDecimal;
 
/**
 * 產品信息
 */
@Setter
@Getter
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class Product {
 private Long id;//id
 private String sn;//產品編號
 private String name;//產品名稱
 private BigDecimal price;//產品價格
}
  1. app-server
  2. 這個模塊主要是定義服務的具體實現和啟動Spring容器,在啟動Spring容器的時候需要創建RpcRegistry,RpcServer對象
  3. 具體代碼實現:
package cn.wolfcode.app.server;
 
import cn.wolfcode.app.common.IProductService;
import cn.wolfcode.app.common.Product;
import cn.wolfcode.rpc.server.RpcService;
import org.springframework.stereotype.Component;
 
import java.math.BigDecimal;
 
@Component
@RpcService(IProductService.class)
public class ProductServiceImpl implements IProductService {
 @Override
 public void save(Product product) {
 System.out.println("產品保存成功: "+product);
 }
 
 @Override
 public void deleteById(Long productId) {
 System.out.println("產品刪除成功: "+ productId);
 }
 
 @Override
 public void update(Product product) {
 System.out.println("產品修改成功: "+ product);
 }
 
 @Override
 public Product get(Long productId) {
 System.out.println("產品獲取成功");
 return new Product(1L,"001","筆記本電腦",BigDecimal.TEN);
 }
}
package cn.wolfcode.app.server;
 
import org.springframework.context.support.ClassPathXmlApplicationContext;
 
public class BootAppServer {
 public static void main(String[] args) {
 //啟動Spring容器
 new ClassPathXmlApplicationContext("classpath:application.xml");
 }
}
  1. 其中配置文件:
  • application.xml Spring的配置文件
  • log4j.properties 日誌配置文件
  • rpc.properties 服務提供者的地址和端口 以及zkServer的連接地址和端口

 
 
 
 
 
 
 
 
 
 
 
 

log4j.rootLogger=ERROR,console
 
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%m%n
 
log4j.logger.cn.wolfcode.rpc=DEBUG
# zookeeper server
registry.address=192.168.158.151:2181
 
# rpc server
server.address=192.168.158.1:9090
  1. app-client
  2. 通過Spring的配置創建RpcDiscover對象和RpcProxy對象,其中RpcDiscover用於從註冊中心獲取到服務的地址信息,RpcProxy用於創建類的動態代理對象
  3. 測試類:使用Spring的Junit進行測試
 package cn.wolfcode.app.client;
 
 import cn.wolfcode.app.common.IProductService;
 import cn.wolfcode.app.common.Product;
 import cn.wolfcode.rpc.client.RpcProxy;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
 import java.math.BigDecimal;
 
 //模擬客戶端啟動
 @RunWith(SpringJUnit4ClassRunner.class)
 @ContextConfiguration(locations="classpath:application.xml")
 public class APP {
 @Autowired
 private RpcProxy rpcProxy;
 
 private IProductService productService;
 
 @Before
 public void init() {
 productService = rpcProxy.getInstance(IProductService.class);
 }
 
 
 @Test
 public void testSave() throws Exception {
 productService.save(new Product(2L,"002","內衣",BigDecimal.TEN));
 }
 
 @Test
 public void testDelete() throws Exception {
 productService.deleteById(2L);
 }
 
 @Test
 public void testUpdate() throws Exception {
 productService.update(new Product(2L,"002","內衣",BigDecimal.ONE));
 }
 
 @Test
 public void testGet() throws Exception {
 Product product = productService.get(1L);
 System.out.println("獲取到的產品信息為:"+product);
 }
 }

配置文件信息

application.xml


 
 
 
 
 
 
 
 
 
 
 
 
 

log4j.properties

log4j.rootLogger=ERROR,console
 
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%m%n
 
log4j.logger.cn.wolfcode.rpc=DEBUG

rpc.properties

# zookeeper server
registry.address=192.168.158.151:2181

對於本文的完整代碼下載地址為 https://gitee.com/heshengjun/rpcdemo.git


分享到:


相關文章: