《Netty学习打卡--从小白到放弃》----- 18 - netty 之grpc Bidirectional Streaming RPC 例子

it2022-05-05  110

打卡日期(2019-07-18)

学习要点

- 利用grpc完成Bidirectional Streaming RPC例子

步骤

1.配置grpc依赖包2.编写proto文件3.利用gradle generateProto生成java类4.编写Server/Client服务 注:应为服务端只负责启动,所以Server类复用上一章的 《Netty学习打卡–从小白到放弃》----- 15 - netty 之grpc Simple RPC例子
2.编写proto文件
syntax = "proto3"; package study; option java_package = "com.dragon.study"; option java_outer_classname = "Student"; option optimize_for = SPEED; option java_multiple_files = true; message StreamRequest{ string message = 1; } message StreamResponse{ string username = 1; string address = 2; int32 phone = 3; } service StudentService{ rpc getAllUser(stream StreamRequest) returns (stream StreamResponse){} }
3.利用gradle generateProto生成java类
运行命令: gradle generateProto
4.编写Server/Client服务
Server

Server端跟上一章一样 《Netty学习打卡–从小白到放弃》----- 15 - netty 之grpc Simple RPC例子

package com.dragon.study.server; import com.dragon.study.*; import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.List; public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase { @Override public StreamObserver<StreamRequest> getAllUser(StreamObserver<StreamResponse> responseObserver) { StreamObserver<StreamRequest> streamObserver = new StreamObserver<StreamRequest>() { @Override public void onNext(StreamRequest value) { System.out.println("接收到客户端请求的参数 SreamRequest.message = " + value.getMessage()); responseObserver.onNext(StreamResponse.newBuilder().setAddress("北京").setPhone(123).setUsername("王八犊子《"+value.getMessage()+"》").build()); } @Override public void onError(Throwable t) { } @Override public void onCompleted() { responseObserver.onCompleted(); } }; return streamObserver; } }
Client
package com.dragon.study.client; import com.dragon.study.*; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; import java.util.Iterator; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; public class StudentClient { private static final Logger logger = Logger.getLogger(StudentClient.class.getName()); //channel相当于一个连接,客户端核心类 private final ManagedChannel channel; //当请求是不是stream流的时候,使用阻塞式stub就可以 private final StudentServiceGrpc.StudentServiceBlockingStub blockingStub; //需要指出一点,当请求是stream流的时候,就需要使用异步的请求方式去请求服务 private final StudentServiceGrpc.StudentServiceStub stub; public StudentClient(String host , int port){ //ManagedChannelBuilder 管理客户端的链接,用来创建链接 this(ManagedChannelBuilder.forAddress(host,port).usePlaintext().build()); } public StudentClient(ManagedChannel channel) { this.channel = channel; blockingStub = StudentServiceGrpc.newBlockingStub(channel); stub = StudentServiceGrpc.newStub(channel); } public void shutdown() throws InterruptedException { channel.shutdown().awaitTermination(100,TimeUnit.SECONDS); } public void getStreamUser(){ StreamObserver<StreamRequest> requestStreamObserver = stub.getAllUser(new StreamObserver<StreamResponse>() { //收到服务器端的返回的结果 @Override public void onNext(StreamResponse value) { System.out.println(value.getUsername() + "," + value.getAddress() + "," + value.getPhone()); } @Override public void onError(Throwable t) { } @Override public void onCompleted() { System.out.println("conCompleted"); } }); requestStreamObserver.onNext(StreamRequest.newBuilder().setMessage("美美哒").build()); requestStreamObserver.onCompleted(); } public static void main(String[] args) throws InterruptedException { StudentClient client = new StudentClient("localhost",8080); try{ client.getStreamUser(); }finally { client.shutdown(); } } } 王八犊子《美美哒》,北京,123 conCompleted

最新回复(0)