kafka消费者(八):Fetcher

it2024-08-11  69

上次我们介绍了offset操作的原理,下面介绍消费者如何从服务端获取消息。Fetcher类的主要功能是发送Fetcher请求,获取指定消息集合,处理FetchResponse,更新消息位置。

public class Fetcher<K, V> { //client负责网络通信 private final ConsumerNetworkClient client; private final Time time; //在服务端收到FetchRequest之后并不是立即响应,而是当可返回的消息数据累计到至少minBytes个字节时才响应。这样每个FetchResponse中就包含了多个消息,提供效率。 private final int minBytes; //等待FetchResponse的最长时间 private final int maxWaitMs; //每次fetch操作的最大字节数 private final int fetchSize; //重试等待时间 private final long retryBackoffMs; //每次获取record的最大数量 private final int maxPollRecords; private final boolean checkCrcs; //记录kakfa集群的元数据 private final Metadata metadata; private final FetchManagerMetrics sensors; //记录每个TopicPartition的消费情况 private final SubscriptionState subscriptions; //每个FetchResponse首先会转换成completedFetches对象进入此缓存队列,此时没有解析消息 private final List<CompletedFetch> completedFetches; private final Deserializer<K> keyDeserializer; private final Deserializer<V> valueDeserializer; //PartitionRecrods类型,保存了CompleteFetch解析后的结果集合,它有三个主要字段 /* private long fetchOffset; //records中第一个消息offset private TopicPartition partition; //对应的TopicPartition private List<ConsumerRecord<K, V>> records; //消息集合 */ private PartitionRecords<K, V> nextInLineRecords = null; }

创建fetchRequest请求如下:

private Map<Node, FetchRequest> createFetchRequests() { // 获metadata元数据 Cluster cluster = metadata.fetch(); Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<>(); //fetchablePartitions就是按照以下条件过滤 //1. 分配给当前消费者的分区,即SubscriptionState.assign集合中有对应记录的分区 //2. 分区未被标记为暂停,且对应的TopicPartitionState.position不为空 //3. nextInLineRecords中没有来自此分区的消息 //4. completedFetches中没有来自此分区的消息 for (TopicPartition partition : fetchablePartitions()) { //查找分区leader所在副本的node Node node = cluster.leaderFor(partition); if (node == null) { //找不到leader就重新请求metadata metadata.requestUpdate(); } // 如果这个节点还有unsent集合或者InFlightRquest中的对应请求队列不为空,就不对此Node发送FetchRequest请求 else if (this.client.pendingRequestCount(node) == 0) { Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node); if (fetch == null) { fetch = new HashMap<>(); fetchable.put(node, fetch); } //通过SubcriptionState查找每个分区对应的position,并封装成PartitionData对象 long position = this.subscriptions.position(partition); fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize)); log.trace("Added fetch request for partition {} at offset {}", partition, position); } } // 对上面的fetchable集合进行转换,将发送同一个Node节点的所有TopicPartition的position信息封装成一个FetchRequest对象。 Map<Node, FetchRequest> requests = new HashMap<>(); for (Map.Entry<Node, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) { Node node = entry.getKey(); FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue()); requests.put(node, fetch); } return requests; }

封装完后就是发送。.

public void sendFetches() {     for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) {         final FetchRequest request = fetchEntry.getValue();         //把发往每个Node的FetchRequest都缓存到unsent队列上         client.send(fetchEntry.getKey(), ApiKeys.FETCH, request)                 //添加Listener,这也是处理FetchResponse的入口                 .addListener(new RequestFutureListener<ClientResponse>() {                     @Override                     public void onSuccess(ClientResponse resp) {                         FetchResponse response = new FetchResponse(resp.responseBody());                         Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());                         FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);

                        for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {                             //遍历响应中的数据                             TopicPartition partition = entry.getKey();                             long fetchOffset = request.fetchData().get(partition).offset;                             //注意,这里是FetchResponse.PartitionData类型                             FetchResponse.PartitionData fetchData = entry.getValue();                             //创建completedFetch,缓存到completedFetch队列中                             completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator));                         }

                        sensors.fetchLatency.record(resp.requestLatencyMs());                         sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());                     }

                    @Override                     public void onFailure(RuntimeException e) {                         log.debug("Fetch failed", e);                     }                 });     } }

放在completeFetches队列中的消息还是未解析的FetchResponse.PartitionData对象。把fetchedRecords()方法中会把这些消息进行解析,然后把Record集合返回,同时再修改对应TopicPartitionState的position,为下次fetch做好准备。  

/** * Return the fetched records, empty the record buffer and update the consumed position. * * NOTE: returning empty records guarantees the consumed position are NOT updated. */ public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() { if (this.subscriptions.partitionAssignmentNeeded()) { //需要进行rebalance则返回空集合 return Collections.emptyMap(); } else { //按照TopicPartition分类 Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>(); //一次最多取出maxPollRecords条消息 int recordsRemaining = maxPollRecords; //completedFetches集合的迭代器 Iterator<CompletedFetch> completedFetchesIterator = completedFetches.iterator(); while (recordsRemaining > 0) {//遍历completedFetches集合 if (nextInLineRecords == null || nextInLineRecords.isEmpty()) { if (!completedFetchesIterator.hasNext()) break; CompletedFetch completion = completedFetchesIterator.next(); completedFetchesIterator.remove(); //解析一个completedFetches得到一个PartitionRecords对象 nextInLineRecords = parseFetchedData(completion); } else { //nextInLineRecords中的消息添加到drained中 recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining); } } //将结果集合返回 return drained; } } /** * The callback for fetch completion 解析CompleteFetch */ private PartitionRecords<K, V> parseFetchedData(CompletedFetch completedFetch) { TopicPartition tp = completedFetch.partition; FetchResponse.PartitionData partition = completedFetch.partitionData; long fetchOffset = completedFetch.fetchedOffset; int bytes = 0; int recordsCount = 0; PartitionRecords<K, V> parsedRecords = null; try { if (!subscriptions.isFetchable(tp)) { // this can happen when a rebalance happened or a partition consumption paused // while fetch is still in-flight log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp); } else if (partition.errorCode == Errors.NONE.code()) { // we are interested in this fetch only if the beginning offset matches the // current consumed position Long position = subscriptions.position(tp); if (position == null || position != fetchOffset) { //拉取下来的消息要与目标拉取的offset相匹配。 log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " + "the expected offset {}", tp, fetchOffset, position); return null; } ByteBuffer buffer = partition.recordSet; //创建MemoryRecords,其中ByteBuffer来自FecthResponse MemoryRecords records = MemoryRecords.readableRecords(buffer); List<ConsumerRecord<K, V>> parsed = new ArrayList<>(); boolean skippedRecords = false; //遍历MemoryRecords集合获取Record集合 for (LogEntry logEntry : records) { // Skip the messages earlier than current position. // 跳过早于position的消息 if (logEntry.offset() >= position) { parsed.add(parseRecord(tp, logEntry)); bytes += logEntry.size(); } else { skippedRecords = true; } } recordsCount = parsed.size(); this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, recordsCount); if (!parsed.isEmpty()) { log.trace("Adding fetched record for partition {} with offset {} to buffered record list", tp, position); //把解析后端Record集合封装成PartitionRecords parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed); ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1); this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset()); } else if (buffer.limit() > 0 && !skippedRecords) { // we did not read a single message from a non-empty buffer // because that message's size is larger than fetch size, in this case // record this exception Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset); throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " + recordTooLargePartitions + " whose size is larger than the fetch size " + this.fetchSize + " and hence cannot be ever returned." + " Increase the fetch size on the client (using max.partition.fetch.bytes)," + " or decrease the maximum message size the broker will allow (using message.max.bytes).", recordTooLargePartitions); } } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { this.metadata.requestUpdate(); } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) { if (fetchOffset != subscriptions.position(tp)) { log.debug("Discarding stale fetch response for partition {} since the fetched offset {}" + "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp)); } else if (subscriptions.hasDefaultOffsetResetPolicy()) { log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp); subscriptions.needOffsetReset(tp); } else { throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset)); } } else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) { log.warn("Not authorized to read from topic {}.", tp.topic()); throw new TopicAuthorizationException(Collections.singleton(tp.topic())); } else if (partition.errorCode == Errors.UNKNOWN.code()) { log.warn("Unknown error fetching data for topic-partition {}", tp); } else { throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data"); } } finally { completedFetch.metricAggregator.record(tp, bytes, recordsCount); } return parsedRecords; }

在fetchedRecords()中,把消息添加到drained集合中时,还更新TopicPartitionState的position字段,下面是Fetcher.append()方法:

private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained, PartitionRecords<K, V> partitionRecords, int maxRecords) { if (partitionRecords.isEmpty()) return 0; if (!subscriptions.isAssigned(partitionRecords.partition)) { // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call // 可能有rebalance log.debug("Not returning fetched records for partition {} since it is no longer assigned", partitionRecords.partition); } else { // note that the consumed position should always be available as long as the partition is still assigned long position = subscriptions.position(partitionRecords.partition); if (!subscriptions.isFetchable(partitionRecords.partition)) { // this can happen when a partition is paused before fetched records are returned to the consumer's poll call log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition); } else if (partitionRecords.fetchOffset == position) { // we are ensured to have at least one record since we already checked for emptiness //获得消息集合,最多maxRecords个消息 List<ConsumerRecord<K, V>> partRecords = partitionRecords.take(maxRecords); //最后一个消息的offset long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1; log.trace("Returning fetched records at offset {} for assigned partition {} and update " + "position to {}", position, partitionRecords.partition, nextOffset); List<ConsumerRecord<K, V>> records = drained.get(partitionRecords.partition); if (records == null) { records = partRecords; drained.put(partitionRecords.partition, records); } else { records.addAll(partRecords); } //更新SubcriptionState中对应TopicPartitionState的position字段 subscriptions.position(partitionRecords.partition, nextOffset); return partRecords.size(); } else { // these records aren't next in line based on the last consumed position, ignore them // they must be from an obsolete request log.debug("Ignoring fetched records for {} at offset {} since the current position is {}", partitionRecords.partition, partitionRecords.fetchOffset, position); } } partitionRecords.discard(); return 0; }

更新position 在updateFetchPositions中实现了TopicPartitionState.position字段的功能

/** * Update the fetch positions for the provided partitions. * @param partitions the partitions to update positions for * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no reset policy is available */ public void updateFetchPositions(Set<TopicPartition> partitions) { // reset the fetch position to the committed position for (TopicPartition tp : partitions) { //检测position是否为空,如果非空则表示正常,不需要重置操作 if (!subscriptions.isAssigned(tp) || subscriptions.isFetchable(tp)) continue; // TODO: If there are several offsets to reset, we could submit offset requests in parallel if (subscriptions.isOffsetResetNeeded(tp)) { //按照指定策略更新position的值 resetOffset(tp); } else if (subscriptions.committed(tp) == null) { // there's no committed position, so we need to reset with the default strategy //commit字段为空,则使用默认策略更新 subscriptions.needOffsetReset(tp); resetOffset(tp); } else { //commited字段不是空的,就把position字段设置为commited的值 long committed = subscriptions.committed(tp).offset(); log.debug("Resetting offset for partition {} to the committed offset {}", tp, committed); subscriptions.seek(tp, committed); } } } private void resetOffset(TopicPartition partition) { //根据配置的重置策略选择timestamp OffsetResetStrategy strategy = subscriptions.resetStrategy(partition); final long timestamp; if (strategy == OffsetResetStrategy.EARLIEST) timestamp = ListOffsetRequest.EARLIEST_TIMESTAMP; //-2 else if (strategy == OffsetResetStrategy.LATEST) timestamp = ListOffsetRequest.LATEST_TIMESTAMP;//-1 else throw new NoOffsetForPartitionException(partition); log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase(Locale.ROOT)); //向分区的leader副本所在的Node发送一个OffsesRequest请求,阻塞等待响应 //listOffset方法实现了对offsetRequest的发送和OffsetResponse的处理。 long offset = listOffset(partition, timestamp); // we might lose the assignment while fetching the offset, so check it is still active //更新position if (subscriptions.isAssigned(partition)) this.subscriptions.seek(partition, offset); }

 

最新回复(0)