kafka消费者(六):Rebalance

it2024-08-02  118

触发rebalance的条件: 1. 有新的消费者加入消费者组 2. 有消费者宕机下线。例如长时间的GC。网络延迟导致消费者长时间没有向GroupCoordinator发送心跳请求。 3. 消费者退出消费者组 4. 消费者组订阅的任意一个topic发生了分区数量的变化 5. 消费者调用unsubscribe()取消订阅。 rebalance的操作分为三个阶段,下面分别介绍: 第一阶段 1. 查找GroupCoordinator,这个阶段消费者会向Kafka集群中的任意一个Broker发送GroupCoordinatorRequest请求,并且返回GroupCoordinatorResponse响应。 首先检测是否需要重新查找GroupCoordinator,主要检查coordinator字段是否为空以及GroupCoordinator之间的连接是否正常。

public abstract class AbstractCoordinator implements Closeable { public boolean coordinatorUnknown() { //检测coordinator字段是否为null if (coordinator == null) return true; //检测与GroupCoordinator之间的网络是否正常 if (client.connectionFailed(coordinator)) { //把unsent集合中对应的请求清空并把coordinator字段设置为null coordinatorDead(); return true; } return false; } protected void coordinatorDead() { if (this.coordinator != null) { log.info("Marking the coordinator {} dead for group {}", this.coordinator, groupId); client.failUnsentRequests(this.coordinator, GroupCoordinatorNotAvailableException.INSTANCE); this.coordinator = null; } } }

2. 查找集群中负载最低node节点,创建GroupCoordinatorRequest请求,调用client.send()方法把请求放入unsent中等待发送,并返回RequestFuture<Void>对象。返回的RequestFuture<Void>对象经过了compose()方法适配,原理同HeartbeatCompletionHandler。 3. 调用ConsumerNetworkClient.poll(future)方法,把请求发送出去。此处调用则色的方式发送,直到收到GroupCoordinatorResponse响应或异常完成才会返回。 4. 检测ResquestFuture<Void>对象的状态。如果出现RetriableException异常,则调用ConsumerNetWorkClient.awaitMetadataUpdate()方法阻塞更新Metadata中记录的集群元数据,并且返回步骤1继续执行。 5. 如果成功刚找到GroupCoordinator节点,但是网络连接失败,就把unsent字段清空,把coordinator置为null,重新查找GroupCoordinator,跳到步骤1继续执行。

/** * Block until the coordinator for this group is known and is ready to receive requests. */ public abstract class AbstractCoordinator implements Closeable { public void ensureCoordinatorReady() { //1. 检测GroupCoordinator状态 while (coordinatorUnknown()) { //2. 创建并缓存请求 RequestFuture<Void> future = sendGroupCoordinatorRequest(); //3. 阻塞发送请求,并处理响应 client.poll(future); if (future.failed()) { //4. 异常处理,阻塞更新Metadata中记录的集群元数据 if (future.isRetriable()) client.awaitMetadataUpdate(); else throw future.exception(); } else if (coordinator != null && client.connectionFailed(coordinator)) { //5. 找到但是连接不到GroupCoordinator,退避后重试 coordinatorDead(); time.sleep(retryBackoffMs); } } } private RequestFuture<Void> sendGroupCoordinatorRequest() { // initiate the group metadata request // 找到InFlightRequest中未确认请求最少的节点,认为此节点负载最低。 Node node = this.client.leastLoadedNode(); if (node == null) { // 找不到可用节点。 return RequestFuture.noBrokersAvailable(); } else { // create a group metadata request log.debug("Sending coordinator request for group {} to broker {}", groupId, node); //创建GroupCoordinatorRequest请求,并发送 GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId); return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest) .compose(new RequestFutureAdapter<ClientResponse, Void>() { @Override public void onSuccess(ClientResponse response, RequestFuture<Void> future) { //处理GroupMetadataResponse handleGroupMetadataResponse(response, future); } }); } } }

handleGroupMetadataResponse为处理发送GroupCoordinatorRequest后的响应函数:

private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void> future) { log.debug("Received group coordinator response {}", resp); //调用coordinatorUnknown()检测是否已经找到了GroupCoordinator且成功连接。如果已经成功连接就忽略这个GroupCoordinatorResponse,因为在发生GroupCoordinatorRequest时并没有防止重发的机制,可能有多个GroupCoordinatorResponse。 if (!coordinatorUnknown()) { // We already found the coordinator, so ignore the request future.complete(null); } else { //解析GroupCoordinatorResponse GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody()); // use MAX_VALUE - node.id as the coordinator id to mimic separate connections // for the coordinator in the underlying network client layer // TODO: this needs to be better handled in KAFKA-1935 Errors error = Errors.forCode(groupCoordinatorResponse.errorCode()); if (error == Errors.NONE) { //创建GroupCoordinator对应的Node对象 this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(), groupCoordinatorResponse.node().host(), groupCoordinatorResponse.node().port()); log.info("Discovered coordinator {} for group {}.", coordinator, groupId); //尝试与GroupCoordinator建立连接 client.tryConnect(coordinator); // 启动心跳线程 if (generation > 0) heartbeatTask.reset(); //调用complete方法把收到的响应传播出去 future.complete(null); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else { //如果错误码不是null,把异常传播初五,最终由ensureCoordinatorReady()方法中的步骤4处理。 future.raise(error); } } }

第二阶段 在找到对应的GroupCoordinator之后,进入Join Group阶段,向GroupCoordinator发送JoinGroupRequest请求并处理响应。函数入口是ensurePartitionAssignment

public final class ConsumerCoordinator extends AbstractCoordinator { public void ensureFreshMetadata() { //如果长时间没有更新或者Metadata.needUpdate字段为True,就更新Metadata if (this.metadata.updateRequested() || this.metadata.timeToNextUpdate(time.milliseconds()) == 0) awaitMetadataUpdate();//阻塞 } public void ensurePartitionAssignment() { if (subscriptions.partitionsAutoAssigned()) { // Due to a race condition between the initial metadata fetch and the initial rebalance, we need to ensure that // the metadata is fresh before joining initially, and then request the metadata update. If metadata update arrives // while the rebalance is still pending (for example, when the join group is still inflight), then we will lose // track of the fact that we need to rebalance again to reflect the change to the topic subscription. Without // ensuring that the metadata is fresh, any metadata update that changes the topic subscriptions and arrives with a // rebalance in progress will essentially be ignored. See KAFKA-3949 for the complete description of the problem. // 在ConsumerCoordinator的构造函数中未Metadata添加了监听器。当Metadata更新时就会使用SubscriptionState中的正则过滤topic,并更新SubscriptionState中的信息。 // 此处更新防止使用过期的Metadata进行rebalance操作而导致更多次的rebalance if (subscriptions.hasPatternSubscription()) client.ensureFreshMetadata(); ensureActiveGroup(); } } public void ensureActiveGroup() { //检测是否需要发送JoinGroupRequest请求 /* //是否使用了AUTO_TOPICS或者AUTO_PATTERN模式,检测rejonNeeded和needsPartitionAssignment两个字段的值。 return subscriptions.partitionsAutoAssigned() && super.needRejoin() || subscriptions.partitionAssignmentNeeded(); */ if (!needRejoin()) return; if (needsJoinPrepare) { //在发送JoinGroupRequest前的准备工作 onJoinPrepare(generation, memberId); needsJoinPrepare = false; } while (needRejoin()) { //检测GroupCoordinator状态 ensureCoordinatorReady(); // 如果还有发送到GroupCoordinator所在Node的请求,就阻塞等待。 //避免重复发送JoinGroupRequest请求 if (client.pendingRequestCount(this.coordinator) > 0) { client.awaitPendingRequests(this.coordinator); continue; } //创建并缓存请求,放在unsent中 RequestFuture<ByteBuffer> future = sendJoinGroupRequest(); //添加监听器 future.addListener(new RequestFutureListener<ByteBuffer>() { @Override public void onSuccess(ByteBuffer value) { // handle join completion in the callback so that the callback will be invoked // even if the consumer is woken up before finishing the rebalance onJoinComplete(generation, memberId, protocol, value); needsJoinPrepare = true; heartbeatTask.reset(); } @Override public void onFailure(RuntimeException e) { // we handle failures below after the request finishes. if the join completes // after having been woken up, the exception is ignored and we will rejoin } }); //阻塞等待JoinGroupRequest请求完成 client.poll(future); if (future.failed()) { //异常处理,退避后重试 RuntimeException exception = future.exception(); if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException || exception instanceof IllegalGenerationException) continue; else if (!future.isRetriable()) throw exception; time.sleep(retryBackoffMs); } } } //在发送JoinGroupRequest前的准备工作 protected void onJoinPrepare(int generation, String memberId) { // 如果开启了自动提交,就进行一次同步的提交offset操作,阻塞。 maybeAutoCommitOffsetsSync(); // 调用注册在SubscriptionState中的ConsumerRebalanceListener上的回调方法 ConsumerRebalanceListener listener = subscriptions.listener(); log.info("Revoking previously assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId); try { Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions()); listener.onPartitionsRevoked(revoked); } catch (WakeupException e) { throw e; } catch (Exception e) { log.error("User provided listener {} for group {} failed on partition revocation", listener.getClass().getName(), groupId, e); } assignmentSnapshot = null; //把needsPartitionAssignment设置为true。 subscriptions.needReassignment(); } }

在发送JoinGroupRequest后,处理响应的流程如下:

private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> { @Override public JoinGroupResponse parse(ClientResponse response) { return new JoinGroupResponse(response.responseBody()); } @Override public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) { Errors error = Errors.forCode(joinResponse.errorCode()); if (error == Errors.NONE) { //解析JoinGroupRequest,更新到本地 log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct()); AbstractCoordinator.this.memberId = joinResponse.memberId(); AbstractCoordinator.this.generation = joinResponse.generationId(); //修改rejoin标志 AbstractCoordinator.this.rejoinNeeded = false; AbstractCoordinator.this.protocol = joinResponse.groupProtocol(); sensors.joinLatency.record(response.requestLatencyMs()); //消费者根据leaderID判断自己是不是leader。如果是leader就进入onJoinLeader()方法,如果不是就进入onJoinFollower方法 //onjoinFollower方法是onJoinLeader方法的子集 if (joinResponse.isLeader()) { onJoinLeader(joinResponse).chain(future); } else { onJoinFollower().chain(future); } } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) { log.debug("Attempt to join group {} rejected since coordinator {} is loading the group.", groupId, coordinator); // backoff and retry future.raise(error); } else if (error == Errors.UNKNOWN_MEMBER_ID) { // reset the member id and retry immediately AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; log.debug("Attempt to join group {} failed due to unknown member id.", groupId); future.raise(Errors.UNKNOWN_MEMBER_ID); } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP) { // re-discover the coordinator and retry with backoff coordinatorDead(); log.debug("Attempt to join group {} failed due to obsolete coordinator information: {}", groupId, error.message()); future.raise(error); } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL || error == Errors.INVALID_SESSION_TIMEOUT || error == Errors.INVALID_GROUP_ID) { // log the error and re-throw the exception log.error("Attempt to join group {} failed due to fatal error: {}", groupId, error.message()); future.raise(error); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else { // unexpected error, throw the exception future.raise(new KafkaException("Unexpected error in join group response: " + error.message())); } } //下面是onJoinLeader的实现 private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) { try { // 进行分区分配,发结果发送给GroupCoordinator,onJoinFollower中只没有这一步。 Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(), joinResponse.members()); //创建并发送SyncGroupRequest,Follower和Leader都会进行这一步,然后处理各自的响应。 SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment); log.debug("Sending leader SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, request); return sendSyncGroupRequest(request); } catch (RuntimeException e) { return RequestFuture.failure(e); } } protected Map<String, ByteBuffer> performAssignment(String leaderId, String assignmentStrategy, Map<String, ByteBuffer> allSubscriptions) { //查找分区分配使用的PartitionAssignor PartitionAssignor assignor = lookupAssignor(assignmentStrategy); if (assignor == null) throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy); //反序列化操作 Set<String> allSubscribedTopics = new HashSet<>(); Map<String, Subscription> subscriptions = new HashMap<>(); for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) { Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue()); subscriptions.put(subscriptionEntry.getKey(), subscription); allSubscribedTopics.addAll(subscription.topics()); } // the leader will begin watching for changes to any of the topics the group is interested in, // which ensures that all metadata changes will eventually be seen // 对于leader来说,要关注消费者组中所有订阅的topic // follower只要关心自己订阅的topic this.subscriptions.groupSubscribe(allSubscribedTopics); metadata.setTopics(this.subscriptions.groupSubscription()); // update metadata (if needed) and keep track of the metadata used for assignment so that // we can check after rebalance completion whether anything has changed // 上述步骤期间,可能会有新的topic加入,更新metadata client.ensureFreshMetadata(); //记录快照 assignmentSnapshot = metadataSnapshot; log.debug("Performing assignment for group {} using strategy {} with subscriptions {}", groupId, assignor.name(), subscriptions); //进行分区分配 Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions); log.debug("Finished assignment for group {}: {}", groupId, assignment); //把分配的结果序列化,保存到map中返回,其中key是消费者的memberId,value是分配结果序列化后的ByuteBuffer Map<String, ByteBuffer> groupAssignment = new HashMap<>(); for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) { ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue()); groupAssignment.put(assignmentEntry.getKey(), buffer); } return groupAssignment; } }

第三阶段 在完成分区分配之后就进入了Synchronizing Group State阶段,主要逻辑是想GroupCoordinator发送SyncGroupRequest请求并处理SyncGroupResponse响应。

private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> { @Override public SyncGroupResponse parse(ClientResponse response) { return new SyncGroupResponse(response.responseBody()); } @Override public void handle(SyncGroupResponse syncResponse, RequestFuture<ByteBuffer> future) { Errors error = Errors.forCode(syncResponse.errorCode()); if (error == Errors.NONE) { log.info("Successfully joined group {} with generation {}", groupId, generation); sensors.syncLatency.record(response.requestLatencyMs()); //调用future.complete方法传播分区分配结果 future.complete(syncResponse.memberAssignment()); } else { //有出现异常情况,设置rejoinNeeded = true AbstractCoordinator.this.rejoinNeeded = true; if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else if (error == Errors.REBALANCE_IN_PROGRESS) { log.debug("SyncGroup for group {} failed due to coordinator rebalance", groupId); //传播异常 future.raise(error); } else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION) { log.debug("SyncGroup for group {} failed due to {}", groupId, error); AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; future.raise(error); } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP) { log.debug("SyncGroup for group {} failed due to {}", groupId, error); coordinatorDead(); future.raise(error); } else { future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message())); } } } }

SyncGroupResponse中得到的分区分配结果最终由ConsumerCoordinator.onJoinComplete()处理,此方法是在第二阶段ensureActiveGroup()方法的步骤中添加的RequestFutureListner调用。

protected void onJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer) { // if we were the assignor, then we need to make sure that there have been no metadata updates // since the rebalance begin. Otherwise, we won't rebalance again until the next metadata change // leader在开始分配分区之前,leader使用assignmentSnapshot字段记录了Metadata的快照。此时在leader中把此快照和最新的metadata快照尽心个对比。 //如果不一致就表示在分配过程中出现了topic变化,然后把needReassignment设置为true,重新进行rebalance if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) { subscriptions.needReassignment(); return; } //得到使用的分区策略 PartitionAssignor assignor = lookupAssignor(assignmentStrategy); if (assignor == null) throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy); //反序列化,拿到分配给当前消费者的分区,并添加到SubscriptionState.assignment集合中,之后消费者会按照此集合指定的分区进行消费,把needsPartitionAssignment设置为false Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer); // 允许从服务端获取最近一次提交的offset subscriptions.needRefreshCommits(); // 填充assignment集合 subscriptions.assignFromSubscribed(assignment.partitions()); // onAssignment回调函数,默认为空,用户可以自定义。 assignor.onAssignment(assignment); // 开启自动提交offset的定时任务 if (autoCommitEnabled) autoCommitTask.reschedule(); // 回调ConsumerRebalanceListener函数 ConsumerRebalanceListener listener = subscriptions.listener(); log.info("Setting newly assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId); try { Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions()); listener.onPartitionsAssigned(assigned); } catch (WakeupException e) { throw e; } catch (Exception e) { log.error("User provided listener {} for group {} failed on partition assignment", listener.getClass().getName(), groupId, e); } //为下次rebalance操作作准备。 needJoinPrepare = true; // 开启心跳定时任务 heartbeatTask.reset(); }

 

最新回复(0)