kafka消费者(三):ConsumerCoordinator

it2024-07-30  58

在kafka的消费者中通过ConsumerCoordinator组件实现与服务端的GroupCoordinator的交互,ConsumerCoordinator集成了AbstractCoordinator抽象类。AbstractCoordinator的核心字段如下:  

public abstract class AbstractCoordinator implements Closeable { //心跳任务的辅助类,记录了两次发送心跳消息的间隔(interval),最近发送心跳的时间(lastHeartbeatSend) //最近收到心跳的时间(lastHeartbeatReceive),过期时间(timeout),心跳任务重置时间(lastSessionReset) //还提供了计算下次发送心跳的时间方法(timeToNextHeartbeat),是否过期的方法(sessionTimeoutExpired) private final Heartbeat heartbeat; //定时任务,负责发送心跳请求和心跳响应处理,会添加到前面介绍的ConsumerNetworkClient private final HeartbeatTask heartbeatTask; private final int sessionTimeoutMs; private final GroupCoordinatorMetrics sensors; //当前消费者组ID protected final String groupId; //ConsumerNetworkClient对象,负责网络通信和执行定时任务 protected final ConsumerNetworkClient client; protected final Time time; protected final long retryBackoffMs; //标志是否需要执行发送JoinGroupRequest请求前的准备操作 private boolean needsJoinPrepare = true; //是否重亲发送JoinGroupRequest请求的条件之一 private boolean rejoinNeeded = true; //记录服务端GroupCoordinator所在Node的节点 protected Node coordinator; //服务端GroupCoordinator返回的分配给消费者唯一ID protected String memberId; protected String protocol; //服务端GroupCoordinator返回的年代信息,用来区分两次rebalance操作。 //由于网络延迟等问题,在执行Rebalance操作时可能受到上次rebalance过期的请求,避免这种干扰,每次rebalance操作都会递增generation的值 protected int generation; }

下面是ConsumerCoordinator的核心字段:

public final class ConsumerCoordinator extends AbstractCoordinator { //PartitionAssignor列表。在消费者发送的JoinGroupRequest请求中包含了消费者自身支持的PartitionAssignor信息。 //GroupCoordinator会从所有消费者中支持的分配策略中选择一个,通知leader使用此分配策略进行分区分配。由partition.assignment.strategy参数配置。 private final List<PartitionAssignor> assignors; //Kafka元数据 private final Metadata metadata; private final ConsumerCoordinatorMetrics sensors; //订阅状态 private final SubscriptionState subscriptions; //提交offset任务的callback private final OffsetCommitCallback defaultOffsetCommitCallback; //是否开启自动提交offset private final boolean autoCommitEnabled; //自动提交offset的定时任务 private final AutoCommitTask autoCommitTask; //ConsumerInterceptors集合 private final ConsumerInterceptors<?, ?> interceptors; //标识是否排除内部topic private final boolean excludeInternalTopics; //用来储存Metadata的快照信息,主要用来检测Topic是否发生了分区数量的变化。 //在ConsumerCoordinator的构造方法中,会为metadata添加一个监听器,当metadata更新时会做以下事情: //1. 如果是AUTO_PARTTERN模式,则使用用户自定义的正则表达式过滤Topic,得到需要订阅的Topic集合后,设置到SubscriptionState的subscription集合和groupSubscription集合中。 //如果是AUTO_PATTERN和AUTO_TOPICS模式,为当前metadata做一个快照,这个快照底层使用map记录每个topic中partition的个数。 //把新老快照做个对比,如果发生变化,就表示消费者订阅的topic发生分区数量的变化,就把subscriptionState的needsPartitionAssignment字段设置为true,需要重新进行分区分配。 //使用metadataSnapshot字段记录变化后的新快照 private MetadataSnapshot metadataSnapshot; //存储metadata的快照信息,不过是用来检测partition分区的过程中有没有分区数量变化。 //在leader消费者开始分配分区操作前,使用这个字段记录metadata快照;受到syncGroupResponse后,会比较此字段记录的快照与当前Metadata是否发生变化。如果发生变化,就要重新进行分区分配。 private MetadataSnapshot assignmentSnapshot; }

下面分析metadata的listener的具体实现:

private void addMetadataListener() { this.metadata.addListener(new Metadata.Listener() { @Override public void onMetadataUpdate(Cluster cluster) { //AUTO_PATTERN模式的处理 if (subscriptions.hasPatternSubscription()) { Set<String> unauthorizedTopics = new HashSet<String>(); //权限验证 for (String topic : cluster.unauthorizedTopics()) { if (filterTopic(topic)) unauthorizedTopics.add(topic); } if (!unauthorizedTopics.isEmpty()) throw new TopicAuthorizationException(unauthorizedTopics); final List<String> topicsToSubscribe = new ArrayList<>(); //通过pattern匹配topic for (String topic : cluster.topics()) if (filterTopic(topic)) topicsToSubscribe.add(topic); //更新subscription集合、groupSubscription集合、assignment集合 subscriptions.changeSubscription(topicsToSubscribe); //更新metadata需要记录元数据的topic集合 metadata.setTopics(subscriptions.groupSubscription()); } else if (!cluster.unauthorizedTopics().isEmpty()) { throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics())); } // c检测是否为ATUO_PATTERN或者AUTO_TOPICS模式 if (subscriptions.partitionsAutoAssigned()) { //创建快照 MetadataSnapshot snapshot = new MetadataSnapshot(subscriptions, cluster); //对比快照 if (!snapshot.equals(metadataSnapshot)) { //记录快照 metadataSnapshot = snapshot; //更新needsPartitionAssignment字段为true,表示需要进行分区分配 subscriptions.needReassignment(); } } } }); }

 

最新回复(0)