KafkaConsumer以来SubscriptionState管理订阅的Topic集合和Partition的消费状态,通过ConsumerCoordinator与服务端的GroupCoordinator交互,完成Rebalance操作并请求提交最近的offset。Fetcher负责从Kafka中拉取消息并进行解析,同时参加position的重置操作,提供获取指定Topic的集群元数据的操作。上述所有的请求都是通过ConsumerNetworkClient缓存并发送的,在ConsumerNetworkClient还维护了定时任务队列,用来完成HeartbeatTask任务和自动提交offset任务。 KafkaConsumer提供了多线程并发的检测机制,设计的方法是acquire()和release()
/** * Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking * when the lock is not available, however, we just throw an exception (since multi-threaded usage is not * supported). * @throws IllegalStateException if the consumer has been closed * @throws ConcurrentModificationException if another thread already has the lock */ private void acquire() { ensureNotClosed(); long threadId = Thread.currentThread().getId(); //记录当前进程ID if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId)) //检测到多线程并发操作,在额报错 throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access"); //记录冲入次数 refcount.incrementAndGet(); } /** * Release the light lock protecting the consumer from multi-threaded access. */ private void release() { if (refcount.decrementAndGet() == 0) //更新线程ID currentThread.set(NO_CURRENT_THREAD); }下面介绍poll消息的整个流程
public ConsumerRecords<K, V> poll(long timeout) { acquire(); try { if (timeout < 0) throw new IllegalArgumentException("Timeout must not be negative"); // poll for new data until the timeout expires long start = time.milliseconds(); long remaining = timeout; do { Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining); //检测是否有消息返回 if (!records.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches // and avoid block waiting for their responses to enable pipelining while the user // is handling the fetched records. // // NOTE: since the consumed position has already been updated, we must not allow // wakeups or any other errors to be triggered prior to returning the fetched records. // Additionally, pollNoWakeup does not allow automatic commits to get triggered. //为了提升效率,对records集合进行处理之前,先发送一次FetchRequest,这样线程处理完本次records集合的操作, //与FetchRequest及其响应在网络上传输以及在服务端的处理就变成并行的了 fetcher.sendFetches(); //把FetchRequest发送出去,非阻塞。 client.pollNoWakeup(); //ConsumerIterceptors拦截器 if (this.interceptors == null) return new ConsumerRecords<>(records); else return this.interceptors.onConsume(new ConsumerRecords<>(records)); } //计算超时时间 long elapsed = time.milliseconds() - start; remaining = timeout - elapsed; } while (remaining > 0); return ConsumerRecords.empty(); } finally { release(); } } /** * Do one round of polling. In addition to checking for new data, this does any needed * heart-beating, auto-commits, and offset updates. * @param timeout The maximum time to block in the underlying poll * @return The fetched records (may be empty) */ private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) { // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894) //通过GroupCoordinatorRequest查找GroupCoordinator。如果没有找到,就一直阻塞 coordinator.ensureCoordinatorReady(); // ensure we have partitions assigned if we expect to // 完成rebalance if (subscriptions.partitionsAutoAssigned()) coordinator.ensurePartitionAssignment(); // fetch positions if we have partitions we're subscribed to that we // don't know the offset for // 恢复SubscriptionState中对应TopicPartitionState装丰台,主要是commited状态和position字段 if (!subscriptions.hasAllFetchPositions()) updateFetchPositions(this.subscriptions.missingFetchPositions()); long now = time.milliseconds(); // execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records // 定时任务,心跳与自动提交offset client.executeDelayedTasks(now); // init any new fetches (won't resend pending fetches) //从completedFetches缓存中解析消息 Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); // if data is available already, e.g. from a previous network client poll() call to commit, // then just return it immediately // 判断缓存中是否有消息,有就马上返回 if (!records.isEmpty()) return records; //创建并缓存FetchRequest请求 fetcher.sendFetches(); //发送请求 client.poll(timeout, now); // 从completedFetches缓存中解析消息 return fetcher.fetchedRecords(); }