Leader消费者在受到JoinGroupResponse之后,会按照其中指定的分区分配策略进行分区分配,每个分区分配策略即使一个PartitionAssignor接口的实现。 PartitionAssignor定义了Assignment和Subscription两个内部类。进行分区分配需要两方面的数据,metadata中记录的集群元数据和每个member的订阅信息。为了用户增强对分配结果的控制,就将用户订阅信息和一些影响分配的用户自定义信息封装成Subscription。例如,“用户自定义数据”可以是每个消费者的权重,在分配时可以根据此分配分区。其中,topics集合表示某个Member订阅的topic集合,userData表示用户自定义的数据。PartitionAssignor接口提供了subscription()方法,用户添加用户自定义数据,在创建JoinGroupRequest的时候回调用subscription()方法。
Assginment中保存了分配的结果,partitions表示分配给某消费者的topicPartition集合,userData是用户自定义的数据。 再来看看PartitionAssignor的方法,assign()是子类实现的完成Partition分配的抽象方法。onAssignment()方法时在每个消费者受到Leader分配结果时的回调函数。
public abstract class AbstractPartitionAssignor implements PartitionAssignor { public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) { Set<String> allSubscribedTopics = new HashSet<>(); Map<String, List<String>> topicSubscriptions = new HashMap<>(); //解析subscriptions集合,去除userData信息 for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) { List<String> topics = subscriptionEntry.getValue().topics(); allSubscribedTopics.addAll(topics); topicSubscriptions.put(subscriptionEntry.getKey(), topics); } //统计每个Topic分区个数 Map<String, Integer> partitionsPerTopic = new HashMap<>(); for (String topic : allSubscribedTopics) { Integer numPartitions = metadata.partitionCountForTopic(topic); if (numPartitions != null && numPartitions > 0) partitionsPerTopic.put(topic, numPartitions); else log.debug("Skipping assignment for topic {} since no metadata is available", topic); } //把分配分区具体逻辑委托给了assign()重载,由子类实现 Map<String, List<TopicPartition>> rawAssignments = assign(partitionsPerTopic, topicSubscriptions); // 整理分区结果 Map<String, Assignment> assignments = new HashMap<>(); for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet()) assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue())); return assignments; } }如果需要在自定义PartitionAssingor时需要使用userData控制分区分配结果,就不能直接继承AbstractPartitionAssignor,而需要直接实现PartitionAssignor接口。 RangeAssignor和RoundRobinAssignor都是Kafka提供的默认实现,前者根据顺序分配取模确定,后者partition进行字典排序,顺序轮训消费者。