在进行消费者正常消费过程中以及Rebalance操作之前,都会提交一次offset记录Consumer当前的消费位置。提交offset的功能是由ConsumerCoordinator完成的。 在SubscriptionState中使用TopicPartitionState记录每个TopicPartition的消费状况,TopicPartitionState.position字段则记录了消费者下次要从服务端获取消息的offset。当没有明确指定待提交的offset值时候,在把TopicPartitionState.position作为待提交的offset。组织成集合,形成ConsumerCoordinator.commitOffset*()方法的第一参数。
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) { //把needsFetchCommittedOffsets设置为true this.subscriptions.needRefreshCommits(); //创建并缓存OffsetCommitRequest请求,逻辑与之前发送JoinGroupRequest和SyncGroupRequest类似 //唯一的区别是使用OffsetCommitResponseHandler处理OffsetCommitResponse RequestFuture<Void> future = sendOffsetCommitRequest(offsets); //选择回调函数 final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback; future.addListener(new RequestFutureListener<Void>() { @Override public void onSuccess(Void value) { if (interceptors != null) interceptors.onCommit(offsets); //调用回调 cb.onComplete(offsets, null); } @Override public void onFailure(RuntimeException e) { //异常处理 if (e instanceof RetriableException) { cb.onComplete(offsets, new RetriableCommitFailedException("Commit offsets failed with retriable exception. You should retry committing offsets.", e)); } else { cb.onComplete(offsets, e); } } }); // ensure the commit has a chance to be transmitted (without blocking on its completion). // Note that commits are treated as heartbeats by the coordinator, so there is no need to // explicitly allow heartbeats through delayed task execution. //将OffsetCommitRequest发送出去 client.pollNoWakeup(); }同步commit,commitOffsetsSync()方法与commitOffsetsAsync()方法的实现类似,commitOffsetsSync发送时使用了ConsumerCoordinator.poll(future)阻塞等待OffsetCommitResponse处理完成。同时commitOffsetsSync方法在检测到RetriableException异常时会进行重试。 OffsetCommitResponseHandler.handle()方法是处理OffsetCommitResponse的入口。
private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> { private final Map<TopicPartition, OffsetAndMetadata> offsets; @Override public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) { sensors.commitLatency.record(response.requestLatencyMs()); Set<String> unauthorizedTopics = new HashSet<>(); //遍历待提交的所有offset的信息 for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) { TopicPartition tp = entry.getKey(); OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp); long offset = offsetAndMetadata.offset(); //获取错误码 Errors error = Errors.forCode(entry.getValue()); if (error == Errors.NONE) { log.debug("Group {} committed offset {} for partition {}", groupId, offset, tp); if (subscriptions.isAssigned(tp)) // update the local cache only if the partition is still assigned // 更新SubscriptionState中对应TopicPartitionState的committed字段 subscriptions.committed(tp, offsetAndMetadata); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { log.error("Not authorized to commit offsets for group {}", groupId); future.raise(new GroupAuthorizationException(groupId)); return; } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { unauthorizedTopics.add(tp.topic()); } else if (error == Errors.OFFSET_METADATA_TOO_LARGE || error == Errors.INVALID_COMMIT_OFFSET_SIZE) { // raise the error to the user log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message()); future.raise(error); return; } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) { // just retry log.debug("Offset commit for group {} failed: {}", groupId, error.message()); future.raise(error); return; } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP || error == Errors.REQUEST_TIMED_OUT) { log.debug("Offset commit for group {} failed: {}", groupId, error.message()); coordinatorDead(); future.raise(error); return; } else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION || error == Errors.REBALANCE_IN_PROGRESS) { // need to re-join group log.debug("Offset commit for group {} failed: {}", groupId, error.message()); subscriptions.needReassignment(); future.raise(new CommitFailedException("Commit cannot be completed since the group has already " + "rebalanced and assigned the partitions to another member. This means that the time " + "between subsequent calls to poll() was longer than the configured session.timeout.ms, " + "which typically implies that the poll loop is spending too much time message processing. " + "You can address this either by increasing the session timeout or by reducing the maximum " + "size of batches returned in poll() with max.poll.records.")); return; } else { log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message()); future.raise(new KafkaException("Unexpected error in commit: " + error.message())); return; } } if (!unauthorizedTopics.isEmpty()) { log.error("Not authorized to commit to topics {} for group {}", unauthorizedTopics, groupId); future.raise(new TopicAuthorizationException(unauthorizedTopics)); } else { // 传播成功提交offset事件,最终调用上面添加的Callback future.complete(null); } } }fetch offset 在rebalance操作结束之后,每个消费者都确定其需要消费的分区。在开始消费之前,消费者需要确定拉取消息的起始位置。假设之前已经将最后的消费者位置提交到了GroupCoordinator,GroupCoordinator将其保存到了kafka内部的__consumer_offsets中,此时消费者可以通过offsetFetchRequest请求上次提交offset并从此处继续消费。 refreshCommittedOffsetsIfNeeded主要是发送OffsetFetchRequest请求,从服务端拉取最近提交的offset集合,并更新到Subscriptions集合中。
/** * Refresh the committed offsets for provided partitions. */ public void refreshCommittedOffsetsIfNeeded() { //检测needsFetchCommittedOffsets,发送OffsetFetchRequest并处理OffsetFetchResponse响应,返回值是最近提交的offset集合。 if (subscriptions.refreshCommitsNeeded()) { Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions()); //处理offsets集合,更新对应的TopicPartitionState的commited字段中 for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) { TopicPartition tp = entry.getKey(); // verify assignment is still active if (subscriptions.isAssigned(tp)) this.subscriptions.committed(tp, entry.getValue()); } //needsFetchCommittedOffsets设置false,OffsetFetchRequest处理结束。 this.subscriptions.commitsRefreshed(); } } public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) { while (true) { //检测GroupCoordinator的状态 ensureCoordinatorReady(); // 创建并缓存OffsetFetchRequest请求 RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions); // 阻塞发送OffsetFetchRequest请求 client.poll(future); //返回从服务端得到的offset if (future.succeeded()) return future.value(); // 如果是RetriableException异常,则退避一段时间重试 if (!future.isRetriable()) throw future.exception(); time.sleep(retryBackoffMs); } }处理OffsetFetchResponse的入口是OffsetFetchResponseHandler.handle()方法,
public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) { // 记录从服务端获取的offset集合 Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size()); for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) { // 处理OffsetFetchResponse中的每个分区 TopicPartition tp = entry.getKey(); OffsetFetchResponse.PartitionData data = entry.getValue(); if (data.hasError()) { Errors error = Errors.forCode(data.errorCode); log.debug("Group {} failed to fetch offset for partition {}: {}", groupId, tp, error.message()); if (error == Errors.GROUP_LOAD_IN_PROGRESS) { // just retry future.raise(error); } else if (error == Errors.NOT_COORDINATOR_FOR_GROUP) { // re-discover the coordinator and retry coordinatorDead(); future.raise(error); } else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION) { // need to re-join group subscriptions.needReassignment(); future.raise(error); } else { future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message())); } return; } else if (data.offset >= 0) { // record the position with the offset (-1 indicates no committed offset to fetch) // 记录正常的offset offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata)); } else { log.debug("Group {} has no committed offset for partition {}", groupId, tp); } } // 传播offsets集合,最终通过fetchCommittedOffsets()方法返回 future.complete(offsets); }