【数据结构篇】请求队列InFlightRequest

it2022-05-05  156

1.请求队列简单介绍:

InFlightRequest是client的请求队列。max.in.flight.requests.per.connection配置请求队列大小,默认5,请求队列中存放的是在发送途中的请求,包括:正在发送的请求和已经发送的但还没有接收到response的请求;请求队列满了,发送消息将会发生阻塞。也就是发往同一个node的最大未响应请求数。

具体实现是:

sender线程在发送消息之前会调用NetworkClient的ready检查连接是否准备好了,这个时候会检查请求队列是否可用,详细查看

【网络核心层篇】NetworkClient—检查连接 如果请求队列已经满了,那么会认为连接不可用,将不会发送消息。就会发生阻塞。

如果可用则调用NetworkClient的send方法,这个时候会将请求入队。

在NetworkClient的poll中handleCompletedSends(responses, updatedNow);处理上次完成的请求,这个时候会从请求队列中移除已经收到响应的请求。

图解:

2.主要的成员
/** * The set of requests which have been sent or are being sent but haven't yet received a responseq * 请求队列 * 1. 正在发送的请求 * 2. 已经发送的但还没有接收到response的请求; */ final class InFlightRequests { /** * 每个连接最大执行中请求数-->max.in.flight.requests.per.connection */ private final int maxInFlightRequestsPerConnection; /** * node->Deque<ClientRequest> * 双端队列Deque,其中的元素请求ClientRequest * 新请求从队列头部插入。发送从队列尾部取出 */ private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>(); /** * Thread safe total number of in flight requests. */ private final AtomicInteger inFlightRequestCount = new AtomicInteger(0);
3.核心API:
3.1请求入队
/** * 请求入队 */ public void add(NetworkClient.InFlightRequest request) { // 目的地 String destination = request.destination; // 获取目的地请求队列,没有则新建一个ArrayDeque Deque<NetworkClient.InFlightRequest> reqs = this.requests.get(destination); if (reqs == null) { reqs = new ArrayDeque<>(); this.requests.put(destination, reqs); } // 向头部添加请求 reqs.addFirst(request); // 请求消息个数+1, inFlightRequestCount.incrementAndGet(); }
3.2请求出队
/** * 请求出队 */ public NetworkClient.InFlightRequest completeNext(String node) { // 从指定的请求队列的尾部取出一个请求 NetworkClient.InFlightRequest inFlightRequest = requestQueue(node).pollLast(); // 请求数量-1 inFlightRequestCount.decrementAndGet(); return inFlightRequest; } /** * 获取指定noded的请求队列 */ private Deque<NetworkClient.InFlightRequest> requestQueue(String node) { Deque<NetworkClient.InFlightRequest> reqs = requests.get(node); if (reqs == null || reqs.isEmpty()) throw new IllegalStateException("There are no in-flight requests for node " + node); return reqs; }
3.3请求队列校验
/** * Can we send more requests to this node? * * 校验请求队列是否满了 * * @param node Node in question * @return true iff we have no requests still being sent to the given node * * 重点在于queue.peekFirst().request().completed, 即如果发给这个节点的最早的请求还没有发送完成,是不能再往这个节点发送请求的。 * * 从canSendMore方法中也可以看出, 只要没有超过maxInFlightRequestsPerConnection,一个node可以有多个in-flight request的 * * maxInFlightRequestsPerConnection max.in.flight.requests.per.connection */ public boolean canSendMore(String node) { Deque<NetworkClient.InFlightRequest> queue = requests.get(node); return queue == null || queue.isEmpty() || (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection); }

关于InFlightRequest的更详细源码细节,可点击下载源码注释版进行查看githup记得给star


最新回复(0)