grpc實戰——服務端流式調用

還記得很久之前Sunny有和大家聊過如何用grpc實現一個簡單的名稱解析服務,當時用的grpc簡單調用。這次我們本著從易到難的原則,對上次的更進一步,實現服務端流式調用。之後還會繼續出客戶端流式調用和雙向流式調用的文章,喜歡的朋友可以繼續關注。

這次我們的背景還是構建一個名稱解析服務,但是有所不同的是,我們這次一個名稱可能對應多個ip(這在實際生活中也有應用,比如DNS負載均衡)。

服務端

首先還是來看pom.xml文件,這次我們和上次有所不同,我直接使用grpc-all依賴一次性導入所需的依賴,具體依賴部分代碼如下:

 <dependency>
<groupid>io.grpc/<groupid>
<artifactid>grpc-all/<artifactid>
<version>${grpc.version}/<version>
/<dependency>

這裡我們版本還是和之前選擇的一樣,為1.12.0。其他部分和之前的一樣,想了解的童鞋可以看看源碼也可以看傳送門中的上一篇文章。

然後就是比較重要的proto文件了,定義了我們的服務,這裡和grpc簡單服務略有不同:

syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.nameservers";
option java_outer_classname = "NameProto";
option objc_class_prefix = "NSS";
package nameservers;
// 定義服務
service NameServers {
// 服務中的方法,用於根據Name類型的參數獲得一系列ip,以流的方式返回

rpc getIpsByName (Name) returns (stream Ip) {}
}
//定義Name消息類型,其中name為其序列為1的字段
message Name {
string name = 1;
}
//定義Ip消息類型,其中ip為其序列為1的字段
message Ip {
string ip = 1;
}

細心的童鞋可能發現了,這個基本上和上次一模一樣,除了服務的名稱還有配置項可能有所不同,其他就是在returns中,Ip前面多了一個stream。這個就是我們所說的服務端流式的服務定義方式了。接著還是利用maven插件來進行編譯得到相應的java代碼。

grpc實戰——服務端流式調用

這裡我們還是從服務端開始寫,也是分為兩個部分,一個用於開啟遠程調用服務,接收客戶端發來的調用請求,類名為NameServer;另一個則是實現真正的服務,類名為NameServersImplBaseImpl。這裡NameServer代碼變化不大,直接貼上代碼:

public class NameServer { 
private Logger logger = Logger.getLogger(NameServer.class.getName());
private static final int DEFAULT_PORT = 8088;
private int port;//服務端口號
private Server server;

public NameServer(int port) {
this(port,ServerBuilder.forPort(port));
}

public NameServer(int port, ServerBuilder> serverBuilder){
this.port = port;
//構造服務器,添加我們實際的服務
server = serverBuilder.addService(new NameServersImplBaseImpl()).build();
}

private void start() throws IOException {
server.start();
logger.info("Server has started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
NameServer.this.stop();
}
});
}

private void stop() {
if(server != null)
server.shutdown();
}

//阻塞到應用停止
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {

server.awaitTermination();
}
}

public static void main(String[] args) throws IOException, InterruptedException {
NameServer nameServer;
if(args.length > 0){
nameServer = new NameServer(Integer.parseInt(args[0]));
}else{
nameServer = new NameServer(DEFAULT_PORT);
}
nameServer.start();
nameServer.blockUntilShutdown();
}
}

實際提供服務的代碼有所改變,首先,我們之前用map來存儲Name和Ip的映射關係,現在因為存在同Name多Ip的情況,我們改用list來存儲,如果數據實際存儲於數據庫就看具體字段的約束。然後增加了一個類DataType用於表示Name和Ip數據對。在實際的服務中對每次請求都需要遍歷一遍list,將Name符合的Ip返回給客戶端,這裡的返回就是以流的方式返回的。

public class NameServersImplBaseImpl extends NameServersGrpc.NameServersImplBase {
//記錄名稱內容的list,實際項目中應該放置在數據庫
private List<datatype> list = new ArrayList<datatype>();
//構造方法中加入一些條目
public NameServersImplBaseImpl() {
list.add(new DataType(Name.newBuilder().setName("Sunny").build(),Ip.newBuilder().setIp("125.216.242.51").build()));
list.add(new DataType(Name.newBuilder().setName("Sunny").build(),Ip.newBuilder().setIp("126.216.242.51").build()));
list.add(new DataType(Name.newBuilder().setName("David").build(),Ip.newBuilder().setIp("117.226.178.139").build()));
list.add(new DataType(Name.newBuilder().setName("David").build(),Ip.newBuilder().setIp("117.227.178.139").build()));
list.add(new DataType(Name.newBuilder().setName("Tom").build(),Ip.newBuilder().setIp("111.222.336.11").build()));
list.add(new DataType(Name.newBuilder().setName("Tom").build(),Ip.newBuilder().setIp("111.333.336.11").build()));
list.add(new DataType(Name.newBuilder().setName("Tom").build(),Ip.newBuilder().setIp("111.222.335.11").build()));
}

@Override
public void getIpsByName(Name requestName, StreamObserver responseObserver) {
Iterator<datatype> iter = list.iterator();
while (iter.hasNext()){
DataType data = iter.next();
if(requestName.equals(data.getName())){
System.out.println("get " + data.getIp() + " from " + requestName);
responseObserver.onNext(data.getIp());
}
}
responseObserver.onCompleted();
}
}
/<datatype>
/<datatype>/<datatype>

DataType類如下:

class DataType{
private Name name;
private Ip ip;

public DataType(Name name, Ip ip) {
this.name = name;
this.ip = ip;
}
public Name getName() {
return name;
}
public void setName(Name name) {
this.name = name;
}
public Ip getIp() {
return ip;
}
public void setIp(Ip ip) {
this.ip = ip;
}
}

上面代碼中,list使用迭代器的方式遍歷,這個大家也可以使用下標的方式,大家需要關注的是,這裡使用了Name的equals方法,Sunny怎麼就敢用equals方法,確定Name類中有重寫equals方法嗎?確實重寫了,我們來看它的equals方法長什麼樣。

public boolean equals(final java.lang.Object obj) { 
if (obj == this) {
return true;
}
if (!(obj instanceof io.grpc.examples.nameservers.Name)) {
return super.equals(obj);
}
io.grpc.examples.nameservers.Name other = (io.grpc.examples.nameservers.Name) obj;
boolean result = true;
result = result && getName()
.equals(other.getName());
result = result && unknownFields.equals(other.unknownFields);
return result;
}

不出意料,首先比較是不是同一個對象,如果是那還說啥,直接true。如果傳入的這個對象不是Name類,那麼就調用父類的equals方法。緊接著進行了一次類型強轉。然後定義了一個boolean類型的變量result,實現起來也比較巧妙,利用&&的方法來確保符合所有兩個條件,第一個就是兩個消息中name字段是否equals——這裡是String類型的,實際調用了String的equals。然後還要比較unknownFields是否也equals,這裡unknownFields是Name父類中的一個UnknownFieldSet類型的成員變量。根據名字大概可以猜到這就是未知的字段,我們這裡的unknownFields中維護的fields對象應該就是null了。因此實際上判斷兩個Name類型是否equals,只需要判斷它們維護的name這個String類型的值是否equals。

到這裡,服務端基本完成了。


客戶端

首先是pom.xml,這裡我們和服務端一樣用grpc-all依賴。然後是proto生成java類,這裡我們採取和上一篇不同的方法,上一篇我們是用proto文件重新編譯,然後生成的。這裡,我們直接將服務端生成的代碼拷貝到客戶端的目錄中。

grpc實戰——服務端流式調用

客戶端NameClient中的代碼也有所不同,它需要接收一系列的服務端發過來的流消息。話不多說,我們直接上代碼:

public class NameClient { 
private static final String DEFAULT_HOST = "localhost";
private static final int DEFAULT_PORT = 8088;
private ManagedChannel managedChannel;
//服務存根,用於客戶端本地調用
private NameServersGrpc.NameServersBlockingStub nameServiceBlockingStub;

public NameClient(String host, int port) {
this(ManagedChannelBuilder.forAddress(host,port).usePlaintext(true).build());
}
public NameClient(ManagedChannel managedChannel) {
this.managedChannel = managedChannel;
this.nameServiceBlockingStub = NameServersGrpc.newBlockingStub(managedChannel);
}
public void shutdown() throws InterruptedException {
managedChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
public List getIpsByName(String n){
List result = new ArrayList();
Name name = Name.newBuilder().setName(n).build();
Iterator iterator = nameServiceBlockingStub.getIpsByName(name);
while (iterator.hasNext()){
result.add(iterator.next());
} return result;
}

public static void main(String[] args) {
NameClient nameClient = new NameClient(DEFAULT_HOST,DEFAULT_PORT);
for(String arg : args){
List result = nameClient.getIpsByName(arg);
for(int i=0;i<result.size> System.out.println("get result from server: " + result.get(i) + " as param is " + arg);
}
}
}
}

/<result.size>

還是一樣,上一篇說過的我們就不再老生常談了,想了解的童鞋請移步傳送門。我們來看不一樣的地方,getIpsByName方法有所變化:

public List getIpsByName(String n){ 
List result = new ArrayList();
Name name = Name.newBuilder().setName(n).build();
Iterator iterator = nameServiceBlockingStub.getIpsByName(name);
while (iterator.hasNext()){
result.add(iterator.next());
}
return result;
}

這裡我們的返回值不是簡單的Ip了,而是一個Ip類型的迭代器,這裡我們還是選擇再得到迭代器後把結果包裝進一個list中,然後返回這個list。相應地,這樣在main函數中調用getIpsByName的方法的時候因為返回值有所不同了,所以也需要有相應的改變。


運行驗證結果

我們首先運行服務端NameServer,得到結果:

Jun 10, 2018 6:24:19 PM com.sunny.NameServer start信息: Server has started, listening on 8088

服務在8088端口上啟動了,只要有客戶端連接這個端口併發送請求即可。

然後啟動客戶端,注意在啟動前加入程序參數

Sunny David Tom
grpc實戰——服務端流式調用

啟動客戶端NameClient,得到了我們請求的結果:

get result from server: ip: "125.216.242.51" 

as param is Sunny
get result from server: ip: "126.216.242.51"
as param is Sunny
get result from server: ip: "117.226.178.139"
as param is David
get result from server: ip: "117.227.178.139"
as param is David
get result from server: ip: "111.222.336.11"
as param is Tom
get result from server: ip: "111.333.336.11"
as param is Tom
get result from server: ip: "111.222.335.11"
as param is Tom

同樣在服務端的控制檯,我們也可以看到我們打印的服務調用信息:

get ip: "125.216.242.51"
from name: "Sunny"get ip: "126.216.242.51"
from name: "Sunny"get ip: "117.226.178.139"
from name: "David"get ip: "117.227.178.139"
from name: "David"get ip: "111.222.336.11"
from name: "Tom"get ip: "111.333.336.11"
from name: "Tom"get ip: "111.222.335.11"
from name: "Tom"

至此,一個grpc服務端流式調用就做完了。


分享到:


相關文章: