打卡日期(2019-07-18)
学习要点
- 利用grpc完成Client Streaming Rpc例子
步骤
1.配置grpc依赖包2.编写proto文件3.利用gradle generateProto生成java类4.编写Server/Client服务
注:应为服务端只负责启动,所以Server类复用上一章的 《Netty学习打卡–从小白到放弃》----- 15 - netty 之grpc Simple RPC例子
2.编写proto文件
syntax = "proto3";
package study;
option java_package = "com.dragon.study";
option java_outer_classname = "Student";
option optimize_for = SPEED;
option java_multiple_files = true;
message UserRequest{
int32 age = 1;
}
message UserResponse{
string username = 1;
int32 age = 2;
string city = 3;
}
message UserListResponse{
// repeated返回一个UserResponse List
repeated UserResponse response = 1;
}
service StudentService{
rpc GetUserList(stream UserRequest) returns(UserListResponse){}
}
3.利用gradle generateProto生成java类
运行命令:
gradle generateProto
4.编写Server/Client服务
Server
Server端跟上一章一样 《Netty学习打卡–从小白到放弃》----- 15 - netty 之grpc Simple RPC例子
package com
.dragon
.study
.server
;
import com
.dragon
.study
.*
;
import io
.grpc
.stub
.StreamObserver
;
import java
.util
.ArrayList
;
import java
.util
.List
;
public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {
@Override
public StreamObserver
<UserRequest> getUserList(StreamObserver
<UserListResponse> responseObserver
) {
return new StreamObserver<UserRequest>() {
@Override
public void onNext(UserRequest value
) {
System
.out
.println("年龄"+value
.getAge());
UserResponse user1
= UserResponse
.newBuilder().setAge(15).setUsername("张三").setCity("北京").build();
UserResponse user3
= UserResponse
.newBuilder().setAge(20).setUsername("李四").setCity("上海").build();
UserListResponse list
= UserListResponse
.newBuilder().addResponse(user1
).addResponse(user3
).build();
responseObserver
.onNext(list
);
}
@Override
public void onError(Throwable t
) {
}
@Override
public void onCompleted() {
responseObserver
.onCompleted();
}
};
}
}
Client
package com
.dragon
.study
.client
;
import com
.dragon
.study
.*
;
import io
.grpc
.ManagedChannel
;
import io
.grpc
.ManagedChannelBuilder
;
import io
.grpc
.stub
.StreamObserver
;
import java
.util
.Iterator
;
import java
.util
.concurrent
.TimeUnit
;
import java
.util
.logging
.Logger
;
public class StudentClient {
private static final Logger logger
= Logger
.getLogger(StudentClient
.class.getName());
private final ManagedChannel channel
;
private final StudentServiceGrpc
.StudentServiceBlockingStub blockingStub
;
private final StudentServiceGrpc
.StudentServiceStub stub
;
public StudentClient(String host
, int port
){
this(ManagedChannelBuilder
.forAddress(host
,port
).usePlaintext().build());
}
public StudentClient(ManagedChannel channel
) {
this.channel
= channel
;
blockingStub
= StudentServiceGrpc
.newBlockingStub(channel
);
stub
= StudentServiceGrpc
.newStub(channel
);
}
public void shutdown() throws InterruptedException
{
channel
.shutdown().awaitTermination(100,TimeUnit
.SECONDS
);
}
public void getUserList(){
StreamObserver
<UserListResponse> observer
= new StreamObserver<UserListResponse>() {
@Override
public void onNext(UserListResponse value
) {
for(UserResponse response
: value
.getResponseList()){
System
.out
.println(response
.getUsername() + "," + response
.getAge() + "," + response
.getCity());
}
}
@Override
public void onError(Throwable t
) {
System
.out
.println(t
.getMessage());
}
@Override
public void onCompleted() {
System
.out
.println("ok");
}
};
StreamObserver
<UserRequest> requestStreamObserver
= stub
.getUserList(observer
);
requestStreamObserver
.onNext(UserRequest
.newBuilder().setAge(10).build());
requestStreamObserver
.onNext(UserRequest
.newBuilder().setAge(20).build());
requestStreamObserver
.onNext(UserRequest
.newBuilder().setAge(14).build());
requestStreamObserver
.onCompleted();
}
public static void main(String
[] args
) throws InterruptedException
{
StudentClient client
= new StudentClient("localhost",8080);
try{
client
.getUserList();
}finally {
client
.shutdown();
}
}
}
分别启动Server 和 Client端
运行结果:
张三,15,北京
李四,20,上海
CANCELLED: HTTP/2 error code: CANCEL
Received Rst Stream