《Netty学习打卡--从小白到放弃》----- 17 - netty 之grpc Client Streaming Rpc 例子

it2022-05-05  203

打卡日期(2019-07-18)

学习要点

- 利用grpc完成Client Streaming Rpc例子

步骤

1.配置grpc依赖包2.编写proto文件3.利用gradle generateProto生成java类4.编写Server/Client服务 注:应为服务端只负责启动,所以Server类复用上一章的 《Netty学习打卡–从小白到放弃》----- 15 - netty 之grpc Simple RPC例子
2.编写proto文件
syntax = "proto3"; package study; option java_package = "com.dragon.study"; option java_outer_classname = "Student"; option optimize_for = SPEED; option java_multiple_files = true; message UserRequest{ int32 age = 1; } message UserResponse{ string username = 1; int32 age = 2; string city = 3; } message UserListResponse{ // repeated返回一个UserResponse List repeated UserResponse response = 1; } service StudentService{ rpc GetUserList(stream UserRequest) returns(UserListResponse){} }
3.利用gradle generateProto生成java类
运行命令: gradle generateProto
4.编写Server/Client服务
Server

Server端跟上一章一样 《Netty学习打卡–从小白到放弃》----- 15 - netty 之grpc Simple RPC例子

package com.dragon.study.server; import com.dragon.study.*; import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.List; public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase { @Override public StreamObserver<UserRequest> getUserList(StreamObserver<UserListResponse> responseObserver) { return new StreamObserver<UserRequest>() { @Override public void onNext(UserRequest value) { System.out.println("年龄"+value.getAge()); //根据请求参数处理对应的逻辑 UserResponse user1 = UserResponse.newBuilder().setAge(15).setUsername("张三").setCity("北京").build(); UserResponse user3 = UserResponse.newBuilder().setAge(20).setUsername("李四").setCity("上海").build(); UserListResponse list = UserListResponse.newBuilder().addResponse(user1).addResponse(user3).build(); responseObserver.onNext(list); } @Override public void onError(Throwable t) { } @Override public void onCompleted() { // 关闭连接 responseObserver.onCompleted(); } }; } }
Client
package com.dragon.study.client; import com.dragon.study.*; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; import java.util.Iterator; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; public class StudentClient { private static final Logger logger = Logger.getLogger(StudentClient.class.getName()); //channel相当于一个连接,客户端核心类 private final ManagedChannel channel; //当请求是不是stream流的时候,使用阻塞式stub就可以 private final StudentServiceGrpc.StudentServiceBlockingStub blockingStub; //需要指出一点,当请求是stream流的时候,就需要使用异步的请求方式去请求服务 private final StudentServiceGrpc.StudentServiceStub stub; public StudentClient(String host , int port){ //ManagedChannelBuilder 管理客户端的链接,用来创建链接 this(ManagedChannelBuilder.forAddress(host,port).usePlaintext().build()); } public StudentClient(ManagedChannel channel) { this.channel = channel; blockingStub = StudentServiceGrpc.newBlockingStub(channel); stub = StudentServiceGrpc.newStub(channel); } public void shutdown() throws InterruptedException { channel.shutdown().awaitTermination(100,TimeUnit.SECONDS); } public void getUserList(){ StreamObserver<UserListResponse> observer = new StreamObserver<UserListResponse>() { @Override public void onNext(UserListResponse value) { for(UserResponse response : value.getResponseList()){ System.out.println(response.getUsername() + "," + response.getAge() + "," + response.getCity()); } } @Override public void onError(Throwable t) { System.out.println(t.getMessage()); } @Override public void onCompleted() { System.out.println("ok"); } }; StreamObserver<UserRequest> requestStreamObserver = stub.getUserList(observer); requestStreamObserver.onNext(UserRequest.newBuilder().setAge(10).build()); requestStreamObserver.onNext(UserRequest.newBuilder().setAge(20).build()); requestStreamObserver.onNext(UserRequest.newBuilder().setAge(14).build()); requestStreamObserver.onCompleted(); } public static void main(String[] args) throws InterruptedException { StudentClient client = new StudentClient("localhost",8080); try{ client.getUserList(); }finally { client.shutdown(); } } } 分别启动Server 和 Client端 运行结果: 张三,15,北京 李四,20,上海 CANCELLED: HTTP/2 error code: CANCEL Received Rst Stream

最新回复(0)