gRPC之Java实现

这篇笔记我们介绍如何在一个基础的Java程序中使用gRPC,关于如何在SpringBoot工程中集成gRPC将在下一章节进行介绍。

官方grpc-java工程Github:https://github.com/grpc/grpc-java

引入pom依赖

根据grpc-java工程的文档,在Java工程中,要使用gRPC首先我们需要在工程中引入grpc-java相关的依赖。

<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-netty-shaded</artifactId>
    <version>1.58.0</version>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-protobuf</artifactId>
    <version>1.58.0</version>
</dependency>
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-stub</artifactId>
    <version>1.58.0</version>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
    <groupId>org.apache.tomcat</groupId>
    <artifactId>annotations-api</artifactId>
    <version>6.0.53</version>
    <scope>provided</scope>
</dependency>

此外为了调用protoc编译器对IDL文件进行编译,我们还需要引入对应的Maven插件。

<build>
    <extensions>
        <extension>
            <groupId>kr.motd.maven</groupId>
            <artifactId>os-maven-plugin</artifactId>
            <version>1.7.1</version>
        </extension>
    </extensions>
    <plugins>
        <plugin>
            <groupId>org.xolstice.maven.plugins</groupId>
            <artifactId>protobuf-maven-plugin</artifactId>
            <version>0.6.1</version>
            <configuration>
                <protocArtifact>com.google.protobuf:protoc:3.24.0:exe:${os.detected.classifier}</protocArtifact>
                <pluginId>grpc-java</pluginId>
                <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.58.0:exe:${os.detected.classifier}</pluginArtifact>
                <outputDirectory>${basedir}/src/main/java</outputDirectory>
                <clearOutputDirectory>false</clearOutputDirectory>
            </configuration>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>compile-custom</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

这里基于官方文档中的配置,我们新增了两个配置:<outputDirectory>中我们将输出目录指定到了src/main/java源代码目录中,<clearOutputDirectory>false</clearOutputDirectory>指定每次重新编译时不清空原来的代码,必要时我们也可以将其指定为true,这样每次protoc编译时都是全量编译的。

工程目录结构最佳实践

实际开发中,一个推荐的Java工程目录如下。

|_rpc-api
|_rpc-client
|_rpc-service

其中rpc-apirpc-clientrpc-service是3个Maven模块,rpc-api专门放置.proto文件、gRPC接口和请求响应消息结构代码,该模块仅依赖必要的gRPC相关包和编译IDL的Maven插件;rpc-service是服务端实现,它通过Maven依赖rpc-api模块;rpc-client是客户端实现,它也通过Maven依赖rpc-api模块,如果有其它工程也需要调用gRPC接口,那么也对应的引入rpc-api模块即可。

我们将rpc-apirpc-service解耦的目的是其它工程引入gRPC接口时,不会引入rpc-service模块中和接口无关的依赖项,这是Java开发中的一个最佳实践。

实现客户端和服务端通信

下面我们通过一些例子讲解如何用gRPC实现RPC调用。

Unary RPC(最常用)

Unary RPC是最简单也是最常用的RPC方式,我们实际开发中也是以实现此类RPC调用为主,这里我们通过一个完整的例子学习如何实现最基础的Unary RPC

消息定义

我们的.proto文件如下。

src/main/proto/demo_service.proto

syntax = "proto3";

option java_outer_classname = "StudentProtocol";
option java_package = "com.gacfox.demo.api";
option java_multiple_files = false;

message StudentQueryReq {
  string studentName = 1;
}

message StudentRsp {
  int64 studentId = 1;
  string studentCode = 2;
  string studentName = 3;
  int32 age = 4;
}

service StudentService {
  rpc queryStudent(StudentQueryReq) returns (StudentRsp) {}
}

代码中包含了消息体StudentQueryReqStudentRsp的定义,以及接口StudentService.queryStudent的定义。我们可以使用mvn compile对其进行编译,如果正确引入了gRPC插件会自动生成消息体代码类StudentProtocol和接口代码类StudentServiceGrpc

服务端

服务端代码例子如下。

package com.gacfox.demo.service;

import com.gacfox.demo.api.StudentProtocol;
import com.gacfox.demo.api.StudentServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class StudentService extends StudentServiceGrpc.StudentServiceImplBase {
    @Override
    public void queryStudent(StudentProtocol.StudentQueryReq request, StreamObserver<StudentProtocol.StudentRsp> responseObserver) {
        // 读取请求
        String studentName = request.getStudentName();
        log.info("收到请求 studentName [{}]", studentName);

        // (假设这里查询了数据库或是某些业务处理)组装返回信息
        StudentProtocol.StudentRsp.Builder builder = StudentProtocol.StudentRsp.newBuilder();
        builder.setStudentId(1L);
        builder.setStudentCode("STU03092301");
        builder.setStudentName("汤姆");
        builder.setAge(18);
        StudentProtocol.StudentRsp studentRsp = builder.build();

        // 返回数据
        responseObserver.onNext(studentRsp);
        responseObserver.onCompleted();
    }
}

StudentService是我们响应RPC调用的具体处理类,它继承StudentServiceGrpc.StudentServiceImplBase类,gRPC中自动生成的处理类都是以ImplBase结尾的,我们需要继承并扩展它。我们这里重写了queryStudent()方法,其中request是gRPC生成的消息体类,我们直接从其中读取数据即可;responseObserver是一个实现了类似观察者模式的类,我们调用其中的onNext()方法传入响应数据,调用onCompleted()方法表示RPC调用响应结束。

注:一般RPC框架设计中,服务端方法可能设计为Response rpcMethod(Request)的形式,但gRPC比较特别,它的返回值没有设计到方法返回值中而是放在了参数的responseObserver中,这是因为gRPC底层使用Netty实现,除了Unary RPC外gRPC还支持异步流消息,因此采用了这种有点怪异的设计。

服务端中,我们还需要启动gRPC服务,下面是代码实现。

package com.gacfox.demo.service;

import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

public class Main {
    public static void main(String[] args) throws IOException, InterruptedException {
        Server server = ServerBuilder.forPort(9000)
                .addService(new StudentService())
                .build();
        server.start();
        server.awaitTermination();
    }
}

代码中,我们指定了gRPC服务的主机名和端口,并注册了我们的StudentService最后启动了服务。

客户端

下面例子中我们使用BlockingStub实现了gRPC的客户端。

package com.gacfox.demo.client;

import com.gacfox.demo.api.StudentProtocol;
import com.gacfox.demo.api.StudentServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Main {
    public static void main(String[] args) {
        // 创建消息通道
        ManagedChannel managedChannel = ManagedChannelBuilder
                .forAddress("localhost", 9000)
                .usePlaintext()
                .build();
        try {
            // 创建Stub
            StudentServiceGrpc.StudentServiceBlockingStub blockingStub = StudentServiceGrpc.newBlockingStub(managedChannel);

            // 创建请求消息体
            StudentProtocol.StudentQueryReq.Builder builder = StudentProtocol.StudentQueryReq.newBuilder();
            builder.setStudentName("汤姆");
            StudentProtocol.StudentQueryReq studentQueryReq = builder.build();

            // RPC调用并读取返回信息
            StudentProtocol.StudentRsp studentRsp = blockingStub.queryStudent(studentQueryReq);
            log.info("studentCode {}", studentRsp.getStudentCode());
            log.info("studentName {}", studentRsp.getStudentName());
            log.info("studentAge {}", studentRsp.getAge());
        } finally {
            // 关闭消息通道
            managedChannel.shutdown();
        }
    }
}

客户端代码中我们首先创建了ManagedChannel类,它对应和gRPC服务端进行通信的信道。调用gRPC接口时,我们创建了Stub类,Stub是RPC框架中的通用概念,表示客户端调用接口时的“桩”,它可以看作一个代理类,我们调用“桩”,而“桩”的底层实现才会具体负责数据的序列化和网络通信。BlockingStub是gRPC中用于阻塞式RPC调用的客户端实现,Java开发中其实这种阻塞式调用还是最常用的。

得到返回值后,我们输出了返回的信息。

此外,我们也可以使用FutureStub实现非阻塞式RPC调用,下面是一个例子。

package com.gacfox.demo.client;

import com.gacfox.demo.api.StudentProtocol;
import com.gacfox.demo.api.StudentServiceGrpc;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Slf4j
public class Main {
    public static void main(String[] args) {
        // 创建消息通道
        ManagedChannel managedChannel = ManagedChannelBuilder
                .forAddress("localhost", 9000)
                .usePlaintext()
                .build();
        try {
            // 创建FutureStub
            StudentServiceGrpc.StudentServiceFutureStub futureStub = StudentServiceGrpc.newFutureStub(managedChannel);

            // 创建请求消息体
            StudentProtocol.StudentQueryReq.Builder builder = StudentProtocol.StudentQueryReq.newBuilder();
            builder.setStudentName("汤姆");
            StudentProtocol.StudentQueryReq studentQueryReq = builder.build();

            // 获取异步listenableFuture
            ListenableFuture<StudentProtocol.StudentRsp> listenableFuture = futureStub.queryStudent(studentQueryReq);
            Futures.addCallback(listenableFuture, new FutureCallback<StudentProtocol.StudentRsp>() {
                @Override
                public void onSuccess(StudentProtocol.StudentRsp result) {
                    // 调用成功,读取消息
                    log.info("studentCode {}", result.getStudentCode());
                    log.info("studentName {}", result.getStudentName());
                    log.info("studentAge {}", result.getAge());
                }

                @Override
                public void onFailure(Throwable t) {
                    // 调用出错
                    log.error("出错了: ", t);
                }
            }, Executors.newCachedThreadPool());

            // 等到3秒防止客户端主线程结束
            managedChannel.awaitTermination(3, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            managedChannel.shutdown();
        }
    }
}

FutureStub调用的返回值是ListenableFuture,我们需要使用Futures.addCallback()为其添加回调方法以及指定异步线程池。回调方法包裹在FutureCallback类中,其中包含了对RPC调用成功和异常两种情况的回调方法,线程池这里我们简单的指定了一个CachedThreadPool,实际开发中应该根据实际情况进行指定。

使用FutureStub进行RPC调用时,我们的主线程就不会阻塞等待了,因此我们这里在主线程的最后使用了managedChannel.awaitTermination(3, TimeUnit.SECONDS)让主线程等待3秒,防止程序结束而没有收到响应。

Server Stream RPC

Server Stream RPC表示服务端返回多个流消息,这是什么意思呢?举个例子,这就类似一个监控大屏,我们请求获取大屏的实时数据,但这个数据是不停更新的,服务端要不断的返回数据,直到达成指定的条件。

消息定义

IDL定义如下:

kanban_service.proto

syntax = "proto3";

import "google/protobuf/empty.proto";

option java_outer_classname = "KanbanProtocol";
option java_package = "com.gacfox.demo.api";
option java_multiple_files = false;

message KanbanRsp {
  string cpuLoad = 1;
  string memoryLoad = 2;
  string diskLoad = 3;
}

service KanbanService {
  rpc getCurrentKanbanData(google.protobuf.Empty) returns (stream KanbanRsp) {}
}

注意我们在返回值中使用了stream关键字,这表示服务端返回的是消息流。此外我们在传入参数中使用了Empty消息,表示没有传入参数。

服务端

服务端和之前没有太大区别,只不过是我们返回了多个消息,而且是每隔1秒返回1个消息。

KanbanService.java

package com.gacfox.demo.service;

import com.gacfox.demo.api.KanbanProtocol;
import com.gacfox.demo.api.KanbanServiceGrpc;
import com.google.protobuf.Empty;
import io.grpc.stub.StreamObserver;

import java.util.Random;

public class KanbanService extends KanbanServiceGrpc.KanbanServiceImplBase {
    @Override
    public void getCurrentKanbanData(Empty request, StreamObserver<KanbanProtocol.KanbanRsp> responseObserver) {
        for (int i = 0; i < 10; i++) {
            // 每隔1秒返回一些数据
            KanbanProtocol.KanbanRsp.Builder builder = KanbanProtocol.KanbanRsp.newBuilder();
            Random random = new Random();
            builder.setCpuLoad(String.valueOf(random.nextInt(10000) / 100.0));
            builder.setMemoryLoad(String.valueOf(random.nextInt(10000) / 100.0));
            builder.setDiskLoad(String.valueOf(random.nextInt(10000) / 100.0));
            KanbanProtocol.KanbanRsp kanbanRsp = builder.build();
            responseObserver.onNext(kanbanRsp);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        // 返回完成
        responseObserver.onCompleted();
    }
}

客户端

客户端代码如下。

package com.gacfox.demo.client;

import com.gacfox.demo.api.KanbanProtocol;
import com.gacfox.demo.api.KanbanServiceGrpc;
import com.google.protobuf.Empty;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

@Slf4j
public class Main {
    public static void main(String[] args) {
        // 创建消息通道
        ManagedChannel managedChannel = ManagedChannelBuilder
                .forAddress("localhost", 9000)
                .usePlaintext()
                .build();
        try {
            KanbanServiceGrpc.KanbanServiceStub kanbanServiceStub = KanbanServiceGrpc.newStub(managedChannel);
            kanbanServiceStub.getCurrentKanbanData(Empty.newBuilder().build(), new StreamObserver<KanbanProtocol.KanbanRsp>() {
                @Override
                public void onNext(KanbanProtocol.KanbanRsp kanbanRsp) {
                    log.info("CPU Load {}%", kanbanRsp.getCpuLoad());
                    log.info("Memory Load {}%", kanbanRsp.getMemoryLoad());
                    log.info("Disk Load {}%", kanbanRsp.getDiskLoad());
                }

                @Override
                public void onError(Throwable throwable) {
                    log.error("出错了: ", throwable);
                }

                @Override
                public void onCompleted() {
                    log.info("响应结束");
                }
            });
            managedChannel.awaitTermination(15, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            managedChannel.shutdown();
        }
    }
}

这里我们没有使用BlockingStub,因为处理流消息时一般都是收到一条处理一条,阻塞方式流消息就没有意义了。我们这里直接使用了Stub,它获取数据时需要RPC传入参数和StreamObserver对象,该对象中的onNext()方法、onError()方法和onCompleted()方法分别用于处理收到消息、出错和发送结束时的回调。

由于是异步处理的,和之前一样,我们这里在主线程的最后使用了managedChannel.awaitTermination(15, TimeUnit.SECONDS)让主线程等待15秒,防止程序结束而没有收到响应。

Client Stream RPC

Client Stream RPC表示客户端以流的形式将多个消息发送给服务端,服务端则只在最后返回一个响应消息。举个例子,一个传感器不停的给服务端发送采集到的数据,这种场景其实就可以使用Client Stream RPC

消息定义

消息定义中,我们还是使用stream关键字表示流消息,不过这里我们将stream关键字指定给了传入参数。

collect_data_service.proto

syntax = "proto3";

option java_outer_classname = "CollectDataProtocol";
option java_package = "com.gacfox.demo.api";
option java_multiple_files = false;

message DataReq {
  string cpuLoad = 1;
  string memoryLoad = 2;
  string diskLoad = 3;
}

message DataRsp {
  string rspCode = 1;
  string rspMsg = 2;
}

service CollectDataService {
  rpc pushData(stream DataReq) returns (DataRsp) {}
}

服务端

服务端代码中,我们使用一个StreamObserver不停接收请求信息。

package com.gacfox.demo.service;

import com.gacfox.demo.api.CollectDataProtocol;
import com.gacfox.demo.api.CollectDataServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CollectDataService extends CollectDataServiceGrpc.CollectDataServiceImplBase {
    @Override
    public StreamObserver<CollectDataProtocol.DataReq> pushData(StreamObserver<CollectDataProtocol.DataRsp> responseObserver) {
        return new StreamObserver<CollectDataProtocol.DataReq>() {
            @Override
            public void onNext(CollectDataProtocol.DataReq dataReq) {
                log.info("CPU Load {}%", dataReq.getCpuLoad());
                log.info("Memory Load {}%", dataReq.getMemoryLoad());
                log.info("Disk Load {}%", dataReq.getDiskLoad());
            }

            @Override
            public void onError(Throwable throwable) {
                log.error("出错了: ", throwable);
            }

            @Override
            public void onCompleted() {
                CollectDataProtocol.DataRsp.Builder builder = CollectDataProtocol.DataRsp.newBuilder();
                builder.setRspCode("0");
                builder.setRspMsg("success");
                CollectDataProtocol.DataRsp dataRsp = builder.build();
                responseObserver.onNext(dataRsp);

                responseObserver.onCompleted();
                log.info("结束");
            }
        };
    }
}

代码中,对于请求的StreamObserver,每次收到请求时都会回调onNext()方法,请求完成时则会回调onCompleted()方法,此时我们调用responseObserver.onNext()返回消息并调用responseObserver.onCompleted()结束RPC调用。

客户端

客户端中我们依然使用Stub处理消息而不是BlockingStub,它的返回值是一个处理请求消息的StreamObserver,我们在客户端中不停调用它的onNext()方法发送消息即可,发送完后调用onCompleted()方法结束发送,服务端收到结束消息后会返回最终的DataRsp消息。

package com.gacfox.demo.client;

import com.gacfox.demo.api.CollectDataProtocol;
import com.gacfox.demo.api.CollectDataServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;

import java.util.Random;
import java.util.concurrent.TimeUnit;

@Slf4j
public class Main {
    public static void main(String[] args) {
        // 创建消息通道
        ManagedChannel managedChannel = ManagedChannelBuilder
                .forAddress("localhost", 9000)
                .usePlaintext()
                .build();
        try {
            CollectDataServiceGrpc.CollectDataServiceStub collectDataServiceStub = CollectDataServiceGrpc.newStub(managedChannel);
            StreamObserver<CollectDataProtocol.DataReq> streamObserver = collectDataServiceStub.pushData(new StreamObserver<CollectDataProtocol.DataRsp>() {
                @Override
                public void onNext(CollectDataProtocol.DataRsp dataRsp) {
                    log.info("RspCode {}", dataRsp.getRspCode());
                    log.info("RspMsg {}", dataRsp.getRspMsg());
                }

                @Override
                public void onError(Throwable throwable) {
                    log.error("出错了: ", throwable);
                }

                @Override
                public void onCompleted() {
                    log.info("响应结束");
                }
            });
            for (int i = 0; i < 10; i++) {
                CollectDataProtocol.DataReq.Builder builder = CollectDataProtocol.DataReq.newBuilder();
                Random random = new Random();
                builder.setCpuLoad(String.valueOf(random.nextInt(10000) / 100.0));
                builder.setMemoryLoad(String.valueOf(random.nextInt(10000) / 100.0));
                builder.setDiskLoad(String.valueOf(random.nextInt(10000) / 100.0));
                CollectDataProtocol.DataReq dataReq = builder.build();
                streamObserver.onNext(dataReq);
                Thread.sleep(1000);
            }
            streamObserver.onCompleted();

            managedChannel.awaitTermination(15, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            managedChannel.shutdown();
        }
    }
}

可以看到实际上这种代码可读性就很差了,整个调用流程都是“扭曲”的,当然异步代码本身就很难编写,不过也有Java语言表现力不足的原因。

Bidirectional Streaming RPC

我们学会Server Stream RPCClient Stream RPC这两种单向的流消息后,Bidirectional Streaming RPC其实也就已经学会了,它是一种双向的流消息,这种RPC方式需要我们在protobuf的IDL中将入参和返回值都使用stream关键字标注,然后分别在客户端和服务端利用StreamObserver处理流消息即可。

作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。