1.请求队列简单介绍:
InFlightRequest是client的请求队列。max.in.flight.requests.per.connection配置请求队列大小,默认5,请求队列中存放的是在发送途中的请求,包括:正在发送的请求和已经发送的但还没有接收到response的请求;请求队列满了,发送消息将会发生阻塞。也就是发往同一个node的最大未响应请求数。
具体实现是:
sender线程在发送消息之前会调用NetworkClient的ready检查连接是否准备好了,这个时候会检查请求队列是否可用,详细查看
【网络核心层篇】NetworkClient—检查连接 如果请求队列已经满了,那么会认为连接不可用,将不会发送消息。就会发生阻塞。
如果可用则调用NetworkClient的send方法,这个时候会将请求入队。
在NetworkClient的poll中handleCompletedSends(responses, updatedNow);处理上次完成的请求,这个时候会从请求队列中移除已经收到响应的请求。
图解:
2.主要的成员
final class InFlightRequests {
private final int maxInFlightRequestsPerConnection
;
private final Map
<String
, Deque
<NetworkClient.InFlightRequest>> requests
= new HashMap<>();
private final AtomicInteger inFlightRequestCount
= new AtomicInteger(0);
3.核心API:
3.1请求入队
public void add(NetworkClient
.InFlightRequest request
) {
String destination
= request
.destination
;
Deque
<NetworkClient.InFlightRequest> reqs
= this.requests
.get(destination
);
if (reqs
== null
) {
reqs
= new ArrayDeque<>();
this.requests
.put(destination
, reqs
);
}
reqs
.addFirst(request
);
inFlightRequestCount
.incrementAndGet();
}
3.2请求出队
public NetworkClient
.InFlightRequest
completeNext(String node
) {
NetworkClient
.InFlightRequest inFlightRequest
= requestQueue(node
).pollLast();
inFlightRequestCount
.decrementAndGet();
return inFlightRequest
;
}
private Deque
<NetworkClient.InFlightRequest> requestQueue(String node
) {
Deque
<NetworkClient.InFlightRequest> reqs
= requests
.get(node
);
if (reqs
== null
|| reqs
.isEmpty())
throw new IllegalStateException("There are no in-flight requests for node " + node
);
return reqs
;
}
3.3请求队列校验
public boolean canSendMore(String node
) {
Deque
<NetworkClient.InFlightRequest> queue
= requests
.get(node
);
return queue
== null
|| queue
.isEmpty() ||
(queue
.peekFirst().send
.completed() && queue
.size() < this.maxInFlightRequestsPerConnection
);
}
关于InFlightRequest的更详细源码细节,可点击下载源码注释版进行查看githup记得给star