gRPC之Java实现
这篇笔记我们介绍如何在一个基础的Java程序中使用gRPC,关于如何在SpringBoot工程中集成gRPC将在下一章节进行介绍。
官方grpc-java工程Github:https://github.com/grpc/grpc-java
引入pom依赖
根据grpc-java工程的文档,在Java工程中,要使用gRPC首先我们需要在工程中引入grpc-java相关的依赖。
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.58.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.58.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.58.0</version>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
<version>6.0.53</version>
<scope>provided</scope>
</dependency>
此外为了调用protoc编译器对IDL文件进行编译,我们还需要引入对应的Maven插件。
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.24.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.58.0:exe:${os.detected.classifier}</pluginArtifact>
<outputDirectory>${basedir}/src/main/java</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
这里基于官方文档中的配置,我们新增了两个配置:<outputDirectory>中我们将输出目录指定到了src/main/java源代码目录中,<clearOutputDirectory>false</clearOutputDirectory>指定每次重新编译时不清空原来的代码,必要时我们也可以将其指定为true,这样每次protoc编译时都是全量编译的。
工程目录结构最佳实践
实际开发中,一个推荐的Java工程目录如下。
|_rpc-api
|_rpc-client
|_rpc-service
其中rpc-api、rpc-client、rpc-service是3个Maven模块,rpc-api专门放置.proto文件、gRPC接口和请求响应消息结构代码,该模块仅依赖必要的gRPC相关包和编译IDL的Maven插件;rpc-service是服务端实现,它通过Maven依赖rpc-api模块;rpc-client是客户端实现,它也通过Maven依赖rpc-api模块,如果有其它工程也需要调用gRPC接口,那么也对应的引入rpc-api模块即可。
我们将rpc-api和rpc-service解耦的目的是其它工程引入gRPC接口时,不会引入rpc-service模块中和接口无关的依赖项,这是Java开发中的一个最佳实践。
实现客户端和服务端通信
下面我们通过一些例子讲解如何用gRPC实现RPC调用。
Unary RPC(最常用)
Unary RPC是最简单也是最常用的RPC方式,我们实际开发中也是以实现此类RPC调用为主,这里我们通过一个完整的例子学习如何实现最基础的Unary RPC。
消息定义
我们的.proto文件如下。
src/main/proto/demo_service.proto
syntax = "proto3";
option java_outer_classname = "StudentProtocol";
option java_package = "com.gacfox.demo.api";
option java_multiple_files = false;
message StudentQueryReq {
string studentName = 1;
}
message StudentRsp {
int64 studentId = 1;
string studentCode = 2;
string studentName = 3;
int32 age = 4;
}
service StudentService {
rpc queryStudent(StudentQueryReq) returns (StudentRsp) {}
}
代码中包含了消息体StudentQueryReq和StudentRsp的定义,以及接口StudentService.queryStudent的定义。我们可以使用mvn compile对其进行编译,如果正确引入了gRPC插件会自动生成消息体代码类StudentProtocol和接口代码类StudentServiceGrpc。
服务端
服务端代码例子如下。
package com.gacfox.demo.service;
import com.gacfox.demo.api.StudentProtocol;
import com.gacfox.demo.api.StudentServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class StudentService extends StudentServiceGrpc.StudentServiceImplBase {
@Override
public void queryStudent(StudentProtocol.StudentQueryReq request, StreamObserver<StudentProtocol.StudentRsp> responseObserver) {
// 读取请求
String studentName = request.getStudentName();
log.info("收到请求 studentName [{}]", studentName);
// (假设这里查询了数据库或是某些业务处理)组装返回信息
StudentProtocol.StudentRsp.Builder builder = StudentProtocol.StudentRsp.newBuilder();
builder.setStudentId(1L);
builder.setStudentCode("STU03092301");
builder.setStudentName("汤姆");
builder.setAge(18);
StudentProtocol.StudentRsp studentRsp = builder.build();
// 返回数据
responseObserver.onNext(studentRsp);
responseObserver.onCompleted();
}
}
StudentService是我们响应RPC调用的具体处理类,它继承StudentServiceGrpc.StudentServiceImplBase类,gRPC中自动生成的处理类都是以ImplBase结尾的,我们需要继承并扩展它。我们这里重写了queryStudent()方法,其中request是gRPC生成的消息体类,我们直接从其中读取数据即可;responseObserver是一个实现了类似观察者模式的类,我们调用其中的onNext()方法传入响应数据,调用onCompleted()方法表示RPC调用响应结束。
注:一般RPC框架设计中,服务端方法可能设计为Response rpcMethod(Request)的形式,但gRPC比较特别,它的返回值没有设计到方法返回值中而是放在了参数的responseObserver中,这是因为gRPC底层使用Netty实现,除了Unary RPC外gRPC还支持异步流消息,因此采用了这种有点怪异的设计。
服务端中,我们还需要启动gRPC服务,下面是代码实现。
package com.gacfox.demo.service;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
public class Main {
public static void main(String[] args) throws IOException, InterruptedException {
Server server = ServerBuilder.forPort(9000)
.addService(new StudentService())
.build();
server.start();
server.awaitTermination();
}
}
代码中,我们指定了gRPC服务的主机名和端口,并注册了我们的StudentService最后启动了服务。
客户端
下面例子中我们使用BlockingStub实现了gRPC的客户端。
package com.gacfox.demo.client;
import com.gacfox.demo.api.StudentProtocol;
import com.gacfox.demo.api.StudentServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class Main {
public static void main(String[] args) {
// 创建消息通道
ManagedChannel managedChannel = ManagedChannelBuilder
.forAddress("localhost", 9000)
.usePlaintext()
.build();
try {
// 创建Stub
StudentServiceGrpc.StudentServiceBlockingStub blockingStub = StudentServiceGrpc.newBlockingStub(managedChannel);
// 创建请求消息体
StudentProtocol.StudentQueryReq.Builder builder = StudentProtocol.StudentQueryReq.newBuilder();
builder.setStudentName("汤姆");
StudentProtocol.StudentQueryReq studentQueryReq = builder.build();
// RPC调用并读取返回信息
StudentProtocol.StudentRsp studentRsp = blockingStub.queryStudent(studentQueryReq);
log.info("studentCode {}", studentRsp.getStudentCode());
log.info("studentName {}", studentRsp.getStudentName());
log.info("studentAge {}", studentRsp.getAge());
} finally {
// 关闭消息通道
managedChannel.shutdown();
}
}
}
客户端代码中我们首先创建了ManagedChannel类,它对应和gRPC服务端进行通信的信道。调用gRPC接口时,我们创建了Stub类,Stub是RPC框架中的通用概念,表示客户端调用接口时的“桩”,它可以看作一个代理类,我们调用“桩”,而“桩”的底层实现才会具体负责数据的序列化和网络通信。BlockingStub是gRPC中用于阻塞式RPC调用的客户端实现,Java开发中其实这种阻塞式调用还是最常用的。
得到返回值后,我们输出了返回的信息。
此外,我们也可以使用FutureStub实现非阻塞式RPC调用,下面是一个例子。
package com.gacfox.demo.client;
import com.gacfox.demo.api.StudentProtocol;
import com.gacfox.demo.api.StudentServiceGrpc;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class Main {
public static void main(String[] args) {
// 创建消息通道
ManagedChannel managedChannel = ManagedChannelBuilder
.forAddress("localhost", 9000)
.usePlaintext()
.build();
try {
// 创建FutureStub
StudentServiceGrpc.StudentServiceFutureStub futureStub = StudentServiceGrpc.newFutureStub(managedChannel);
// 创建请求消息体
StudentProtocol.StudentQueryReq.Builder builder = StudentProtocol.StudentQueryReq.newBuilder();
builder.setStudentName("汤姆");
StudentProtocol.StudentQueryReq studentQueryReq = builder.build();
// 获取异步listenableFuture
ListenableFuture<StudentProtocol.StudentRsp> listenableFuture = futureStub.queryStudent(studentQueryReq);
Futures.addCallback(listenableFuture, new FutureCallback<StudentProtocol.StudentRsp>() {
@Override
public void onSuccess(StudentProtocol.StudentRsp result) {
// 调用成功,读取消息
log.info("studentCode {}", result.getStudentCode());
log.info("studentName {}", result.getStudentName());
log.info("studentAge {}", result.getAge());
}
@Override
public void onFailure(Throwable t) {
// 调用出错
log.error("出错了: ", t);
}
}, Executors.newCachedThreadPool());
// 等到3秒防止客户端主线程结束
managedChannel.awaitTermination(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
managedChannel.shutdown();
}
}
}
FutureStub调用的返回值是ListenableFuture,我们需要使用Futures.addCallback()为其添加回调方法以及指定异步线程池。回调方法包裹在FutureCallback类中,其中包含了对RPC调用成功和异常两种情况的回调方法,线程池这里我们简单的指定了一个CachedThreadPool,实际开发中应该根据实际情况进行指定。
使用FutureStub进行RPC调用时,我们的主线程就不会阻塞等待了,因此我们这里在主线程的最后使用了managedChannel.awaitTermination(3, TimeUnit.SECONDS)让主线程等待3秒,防止程序结束而没有收到响应。
Server Stream RPC
Server Stream RPC表示服务端返回多个流消息,这是什么意思呢?举个例子,这就类似一个监控大屏,我们请求获取大屏的实时数据,但这个数据是不停更新的,服务端要不断的返回数据,直到达成指定的条件。
消息定义
IDL定义如下:
kanban_service.proto
syntax = "proto3";
import "google/protobuf/empty.proto";
option java_outer_classname = "KanbanProtocol";
option java_package = "com.gacfox.demo.api";
option java_multiple_files = false;
message KanbanRsp {
string cpuLoad = 1;
string memoryLoad = 2;
string diskLoad = 3;
}
service KanbanService {
rpc getCurrentKanbanData(google.protobuf.Empty) returns (stream KanbanRsp) {}
}
注意我们在返回值中使用了stream关键字,这表示服务端返回的是消息流。此外我们在传入参数中使用了Empty消息,表示没有传入参数。
服务端
服务端和之前没有太大区别,只不过是我们返回了多个消息,而且是每隔1秒返回1个消息。
KanbanService.java
package com.gacfox.demo.service;
import com.gacfox.demo.api.KanbanProtocol;
import com.gacfox.demo.api.KanbanServiceGrpc;
import com.google.protobuf.Empty;
import io.grpc.stub.StreamObserver;
import java.util.Random;
public class KanbanService extends KanbanServiceGrpc.KanbanServiceImplBase {
@Override
public void getCurrentKanbanData(Empty request, StreamObserver<KanbanProtocol.KanbanRsp> responseObserver) {
for (int i = 0; i < 10; i++) {
// 每隔1秒返回一些数据
KanbanProtocol.KanbanRsp.Builder builder = KanbanProtocol.KanbanRsp.newBuilder();
Random random = new Random();
builder.setCpuLoad(String.valueOf(random.nextInt(10000) / 100.0));
builder.setMemoryLoad(String.valueOf(random.nextInt(10000) / 100.0));
builder.setDiskLoad(String.valueOf(random.nextInt(10000) / 100.0));
KanbanProtocol.KanbanRsp kanbanRsp = builder.build();
responseObserver.onNext(kanbanRsp);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 返回完成
responseObserver.onCompleted();
}
}
客户端
客户端代码如下。
package com.gacfox.demo.client;
import com.gacfox.demo.api.KanbanProtocol;
import com.gacfox.demo.api.KanbanServiceGrpc;
import com.google.protobuf.Empty;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@Slf4j
public class Main {
public static void main(String[] args) {
// 创建消息通道
ManagedChannel managedChannel = ManagedChannelBuilder
.forAddress("localhost", 9000)
.usePlaintext()
.build();
try {
KanbanServiceGrpc.KanbanServiceStub kanbanServiceStub = KanbanServiceGrpc.newStub(managedChannel);
kanbanServiceStub.getCurrentKanbanData(Empty.newBuilder().build(), new StreamObserver<KanbanProtocol.KanbanRsp>() {
@Override
public void onNext(KanbanProtocol.KanbanRsp kanbanRsp) {
log.info("CPU Load {}%", kanbanRsp.getCpuLoad());
log.info("Memory Load {}%", kanbanRsp.getMemoryLoad());
log.info("Disk Load {}%", kanbanRsp.getDiskLoad());
}
@Override
public void onError(Throwable throwable) {
log.error("出错了: ", throwable);
}
@Override
public void onCompleted() {
log.info("响应结束");
}
});
managedChannel.awaitTermination(15, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
managedChannel.shutdown();
}
}
}
这里我们没有使用BlockingStub,因为处理流消息时一般都是收到一条处理一条,阻塞方式流消息就没有意义了。我们这里直接使用了Stub,它获取数据时需要RPC传入参数和StreamObserver对象,该对象中的onNext()方法、onError()方法和onCompleted()方法分别用于处理收到消息、出错和发送结束时的回调。
由于是异步处理的,和之前一样,我们这里在主线程的最后使用了managedChannel.awaitTermination(15, TimeUnit.SECONDS)让主线程等待15秒,防止程序结束而没有收到响应。
Client Stream RPC
Client Stream RPC表示客户端以流的形式将多个消息发送给服务端,服务端则只在最后返回一个响应消息。举个例子,一个传感器不停的给服务端发送采集到的数据,这种场景其实就可以使用Client Stream RPC。
消息定义
消息定义中,我们还是使用stream关键字表示流消息,不过这里我们将stream关键字指定给了传入参数。
collect_data_service.proto
syntax = "proto3";
option java_outer_classname = "CollectDataProtocol";
option java_package = "com.gacfox.demo.api";
option java_multiple_files = false;
message DataReq {
string cpuLoad = 1;
string memoryLoad = 2;
string diskLoad = 3;
}
message DataRsp {
string rspCode = 1;
string rspMsg = 2;
}
service CollectDataService {
rpc pushData(stream DataReq) returns (DataRsp) {}
}
服务端
服务端代码中,我们使用一个StreamObserver不停接收请求信息。
package com.gacfox.demo.service;
import com.gacfox.demo.api.CollectDataProtocol;
import com.gacfox.demo.api.CollectDataServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class CollectDataService extends CollectDataServiceGrpc.CollectDataServiceImplBase {
@Override
public StreamObserver<CollectDataProtocol.DataReq> pushData(StreamObserver<CollectDataProtocol.DataRsp> responseObserver) {
return new StreamObserver<CollectDataProtocol.DataReq>() {
@Override
public void onNext(CollectDataProtocol.DataReq dataReq) {
log.info("CPU Load {}%", dataReq.getCpuLoad());
log.info("Memory Load {}%", dataReq.getMemoryLoad());
log.info("Disk Load {}%", dataReq.getDiskLoad());
}
@Override
public void onError(Throwable throwable) {
log.error("出错了: ", throwable);
}
@Override
public void onCompleted() {
CollectDataProtocol.DataRsp.Builder builder = CollectDataProtocol.DataRsp.newBuilder();
builder.setRspCode("0");
builder.setRspMsg("success");
CollectDataProtocol.DataRsp dataRsp = builder.build();
responseObserver.onNext(dataRsp);
responseObserver.onCompleted();
log.info("结束");
}
};
}
}
代码中,对于请求的StreamObserver,每次收到请求时都会回调onNext()方法,请求完成时则会回调onCompleted()方法,此时我们调用responseObserver.onNext()返回消息并调用responseObserver.onCompleted()结束RPC调用。
客户端
客户端中我们依然使用Stub处理消息而不是BlockingStub,它的返回值是一个处理请求消息的StreamObserver,我们在客户端中不停调用它的onNext()方法发送消息即可,发送完后调用onCompleted()方法结束发送,服务端收到结束消息后会返回最终的DataRsp消息。
package com.gacfox.demo.client;
import com.gacfox.demo.api.CollectDataProtocol;
import com.gacfox.demo.api.CollectDataServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@Slf4j
public class Main {
public static void main(String[] args) {
// 创建消息通道
ManagedChannel managedChannel = ManagedChannelBuilder
.forAddress("localhost", 9000)
.usePlaintext()
.build();
try {
CollectDataServiceGrpc.CollectDataServiceStub collectDataServiceStub = CollectDataServiceGrpc.newStub(managedChannel);
StreamObserver<CollectDataProtocol.DataReq> streamObserver = collectDataServiceStub.pushData(new StreamObserver<CollectDataProtocol.DataRsp>() {
@Override
public void onNext(CollectDataProtocol.DataRsp dataRsp) {
log.info("RspCode {}", dataRsp.getRspCode());
log.info("RspMsg {}", dataRsp.getRspMsg());
}
@Override
public void onError(Throwable throwable) {
log.error("出错了: ", throwable);
}
@Override
public void onCompleted() {
log.info("响应结束");
}
});
for (int i = 0; i < 10; i++) {
CollectDataProtocol.DataReq.Builder builder = CollectDataProtocol.DataReq.newBuilder();
Random random = new Random();
builder.setCpuLoad(String.valueOf(random.nextInt(10000) / 100.0));
builder.setMemoryLoad(String.valueOf(random.nextInt(10000) / 100.0));
builder.setDiskLoad(String.valueOf(random.nextInt(10000) / 100.0));
CollectDataProtocol.DataReq dataReq = builder.build();
streamObserver.onNext(dataReq);
Thread.sleep(1000);
}
streamObserver.onCompleted();
managedChannel.awaitTermination(15, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
managedChannel.shutdown();
}
}
}
可以看到实际上这种代码可读性就很差了,整个调用流程都是“扭曲”的,当然异步代码本身就很难编写,不过也有Java语言表现力不足的原因。
Bidirectional Streaming RPC
我们学会Server Stream RPC和Client Stream RPC这两种单向的流消息后,Bidirectional Streaming RPC其实也就已经学会了,它是一种双向的流消息,这种RPC方式需要我们在protobuf的IDL中将入参和返回值都使用stream关键字标注,然后分别在客户端和服务端利用StreamObserver处理流消息即可。