SubscriptionState是KafkaConsumer用来追踪TopicPartition和offset的对应关系。
public class SubscriptionState { private enum SubscriptionType { NONE, //初始值 AUTO_TOPICS, // 按照指定Topic名字进行订阅,自动分配分区 AUTO_PATTERN, //按照指定的正则表达式匹配Topic,自动分配分区 USER_ASSIGNED //手动指定消费者消费的Topic以及分区编号 }; /* 订阅的模式 */ private SubscriptionType subscriptionType; /* 按照正则表达式匹配的pattern */ private Pattern subscribedPattern; /* 消如果使用AUTO_TOPICS和AUTO_PATTERN模式,则使用这个集合记录订阅的topics, */ private final Set<String> subscription; /* Consumer Group会选举一个Leader,Leader使用该集合记录消费者组中所有消费者订阅的topic,其他的消费者就在这个集合中保留其自身订阅的topic */ private final Set<String> groupSubscription; /* 如果使用USER_ASSIGNED模式,记录分配给消费者的TopicPartition集合 */ private final Set<TopicPartition> userAssignment; /* 无论什么订阅模式,都用这个assigment记录每个TopicPartition的消费状态 */ private final Map<TopicPartition, TopicPartitionState> assignment; /* 标志是否需要进行一次分区分配 */ private boolean needsPartitionAssignment; /* 是否需要从GourpCoordinator中获取最近提交的offset,当出现异步提交offset操作或者rebalance刚刚完成时会设置为true,成功获取最近提交offset之后会设置为false */ private boolean needsFetchCommittedOffsets; /* 默认offsetReset策略 */ private final OffsetResetStrategy defaultResetStrategy; /* 监听分区分配操作 */ private ConsumerRebalanceListener listener; private static final String SUBSCRIPTION_EXCEPTION_MESSAGE = "Subscription to topics, partitions and pattern are mutually exclusive"; }上面的assignment是Map<TopicPartition, TopicPartitionState>类型,TopicPartitionState表示的是TopicPartition的状态,他的字段有:
private static class TopicPartitionState { private Long position; // 下次要从kafka服务端中获取的消息的offset private OffsetAndMetadata committed; // 记录最后一次提交的offset private boolean paused; // 记录了当前TopicPartition是否处于暂停状态,与Consumer的pause()方法相关 private OffsetResetStrategy resetStrategy; // 重置position的策略 }下面介绍消费者的订阅subscribe()方法
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) { //用户未指定ConsumerRebalanceListener时,默认使用NoOpConsumerRebalanceListener //其中所有方法的实现都是空的 if (listener == null) throw new IllegalArgumentException("RebalanceListener cannot be null"); 选择AUTO_TOPICS订阅模式 setSubscriptionType(SubscriptionType.AUTO_TOPICS); this.listener = listener; changeSubscription(topics); } public void changeSubscription(Collection<String> topicsToSubscribe) { //订阅的topic有变化 if (!this.subscription.equals(new HashSet<>(topicsToSubscribe))) { this.subscription.clear();//清空subscription集合 this.subscription.addAll(topicsToSubscribe);//添加订阅的Topic this.groupSubscription.addAll(topicsToSubscribe); this.needsPartitionAssignment = true;//标志需要重新分配分区 // 同步assignment与subscription集合 for (Iterator<TopicPartition> it = assignment.keySet().iterator(); it.hasNext(); ) { TopicPartition tp = it.next(); if (!subscription.contains(tp.topic())) it.remove(); } } }