上图流程具体展开如下: 1. 加载并解析命令行参数,唯一的必要参数(Required)是zookeeper 2. 如果没有传入group.id,ConsoleConsumer将生成自己的group.id,即console-consumer-[10万以内的一个随机数] 3. 创建ConsumerConfig用于封装consumer的各种配置 4. 创建默认的消息格式化类,其定义的writeTo方法会默认将消息输出到控制台 5. 创建ZookeeperConsumerConnector。Kafka使用它来创建KafkaStream消费流 5.1 创建本地缓存, 保存topic下每个分区的信息,包括该分区底层的阻塞队列,已消费的位移、已获取到的最新位移以及获取大小等 5.2 创建本地缓存,保存每个topic分区当前在zookeeper中保存的位移值 5.3 创建本地缓存,保存topic的每个读取线程底层对应的阻塞队列,主要用于关闭Connector时可以批量关闭底层的阻塞队列 5.4 生成consumer id,规则为[group.id]_[主机名]_[时间戳]_[随机产生的一个UUID的前8位]。其中主机名就是运行ConsoleConsumer所在broker节点的主机名 5.5 创建获取线程管理器(ConsumerFetcherManager) 5.6 启动一个特定线程,用于定时地(默认是1分钟)向Zookeeper提交更改过的位移 6. 增加JVM关闭钩子,确保JVM关闭后资源也能够被释放 7. 创建KafkaStream并通过迭代器不断遍历该stream, KafkaStream的迭代器的底层实现包含一个阻塞队列,如果没有新的消息到来,该迭代器会一直阻塞,除非你显式设置了consumer.timeout.ms参数(默认是-1表示consumer会一直等待新消息的带来) 8. 每接收到一条新的消息,默认的消息格式化类会将其输出到控制台上。然后再次等待迭代器传过来的下一条消息
本质上来说,console consumer启动时会创建一个KafkaStream(可以简单翻译成Kafak流),该stream会不停地等待可消费的新消息——具体做法就是通过LinkedBlockingQueue阻塞队列来实现,后续会有详细描述。针对上面启动的顺序列表,我们在ConsoleConsumer.scala中逐一进行代码走读:
1. 加载必要参数 zookeeper ConsoleConsumer.scala类定义了main方法,说明这是个可执行的类。类的前100多行几乎都在处理命令行参数的解析。其中真正必要的参数只有zookeeper.connect一个,如下面代码所示: 1 // REQUIRED表示这是一个必须要指定的参数 2 val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + 3 "Multiple URLS can be given to allow fail-over.").withRequiredArg.describedAs("urls").ofType(classOf[String]) 2. 生成group.id 乍一看和官网上要求的配置不匹配,因为官网中说过consumer真正必要的参数实际上有两个:zookeeper.connect和group.id。由此可以推断console consumer应该会生成group.id的值,且它本质上也是一个consumer,必然属于一个消费组,因此也必然定义了consumer id。下面的代码中即展示了console consumer如何生成自己的group id: (consumer id是如何生成的后面再说) 1 // 如果没有显式指定group.id,那么代码就自己合成一个 2 // 具体格式: console-consumer-[10万以内的一个随机数] 3 // 10万是一个很大的数,因此只有非常低的几率会碰到多个console consumer的group id相同的情况 4 if(!consumerProps.containsKey("group.id")) { 5 consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000)) 6 groupIdPassed=false 7 } 3. 创建ConsumerConfig对象封装配置确定了consumer的group.id之后console consumer需要把传入参数封装进ConsumerConfig类中并把后者传给Consumer的create方法以构造一个ConsumerConnector——即初始化consumer了,具体逻辑见下面的代码:
1 val config = new ConsumerConfig(consumerProps) // 封装ConsumerConfig配置类 2 val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false 4. 创建默认的消息格式化类,其定义的writeTo方法会默认将消息输出到控制台 1 val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) // 创建消息格式类,用于最后的输出显示 2 val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt)) 3 val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1 5. 创建ZookeeperConsumerConnector ZookeeperConsumerConnector非常重要,它实现了ConsumerConnector接口(该接口定义了创建KafkaStream和提交位移的操作,如createMessageStreams、commitOffsets等)。Kakfa官网把这个接口称为high level的consumer API。对于大多数consumer来说,这个high level的consumer API提供的功能已经足够了。不过很多用户可能需要对位移有更大的控制,这个时候Kafka推荐用户使用被称为low level的consumer API—— SimpleConsumer。大家参考这篇文章来深入学习high level API的用法。目前为止,我们只需要知道Kafka通过下面的语句构建了ConsumerConnector这个consumer的核心接口: 1 val connector = Consumer.create(config) // 创建ConsumerConnector,Consumer核心接口 6. 构建JVM关闭钩子线程 这部分非常简单,就是在线程中关闭上一步创建的connector,并根据传入的参数决定是否删除zookeeper下/consumers/[group.id]节点 7. 创建KafkaStream,通过迭代器等待消息到来 由于console consumer支持同时消费多个topic的消息,因此它提供了类似于过滤器这样的实现,这也是为什么connector调用createMessageStreamsByFilter来创建KafkaStream的原因,如下面的代码所示。 1 val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0) 2 val iter = if(maxMessages >= 0) 3 stream.slice(0, maxMessages) 4 else 5 stream createMessageStreamsByFilter方法返回的是一组KafkaStream,但console consumer默认只是创建了1个stream,所以这里直接调用get(0)取到这个stream就可以了。 8. 通过迭代器以阻塞等待的方式消费消息 创建好KafkaStream之后,console consumer通过迭代器遍历KafkaStream。这里值得注意的是,该迭代器底层实现依赖一个阻塞队列。如果没有显式配置过consumer.timeout.ms参数(默认是-1表示consumer会一直等待新消息),那么迭代器会一直处于阻塞状态等待可供消费的消息——具体的实现细节参见下一篇。迭代器每收到一条消息后,它就会使用默认的消息格式化类DefaultMessageFormatter将消息输出到控制台,这也是console consumer名字的由来,如下面的代码所示: 1 for(messageAndTopic <- iter) { 2 try { 3 formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) // 输出到控制台 4 numMessages += 1 5 } catch { ... } 6 ... 7 }好了,至此我们按照启动顺序概述了console consumer启动时的各个阶段。不过,ZookeeperConsumerConnector和创建和迭代器的实现我们并未详细展开,这部分内容将作为后面续篇的内容呈现给大家。敬请期待!
转载于:https://www.cnblogs.com/huxi2b/p/4671925.html
相关资源:数据结构—成绩单生成器