业务上线后,发现Kafka的消费者一直在重复拉取同一批数据。被消费的topic配置了10个分区,只有每个分区的第一批数据能够出队,并且无限循环。
因测试环境数据量比较小,一直无法复现问题。只能查生产环境的日志排查。
初步猜测数据被消费之后,没有正常commit到Kafka服务端,导致Topic分区offset再消费完毕后未进行更新,下次取数据时还是从老offset开始取数据。
检查客户端配置 自动提交已开启(enable.auto.commit的默认配置为true), 自动提交时间为5s(offsets.commit.timeout.ms的默认配置为5000)。 既然默认已开启自动提交,按道理offset应该会被更新吖。然而并没有,Why?添加手动提交代码 每批数据处理完毕后,执行 consumer.commitSync(); 然并卵,why? 只能添加日志埋点。 发现消费者程序每批取出了6000多条数据,每批处理时间长达5分钟。 另外一条关键日志,info级别,在每批数据处理完毕后打印出来:o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 2147483646 dead. 可以看出,等到每批数据处理完毕时,消费者已经被标记为dead。可以推断出处理超时了!查看客户端超时配置 会话超时时间为5s(session.timeout.ms的默认值为5000),而消费者程序处理一批数据竟然要5分钟! 先尝试修改会话超时时间为30分钟,结果提示其余几个超时时间也必须同步修改为合理值。 仔细一想,Kafka作为流式处理系统,目的就是快速响应,把会话时间改为30分钟明显是不合理的。 优化消费者程序的性能 尝试优化程序性能,每条数据逐条处理改为成批处理。速度明显提升,但是要6000+条数据在30秒内处理完毕,臣妾还是办不到啊!!修改每次拉取的字节数 查看文档,发现消费者每次拉取数据的最大字节数(max.partition.fetch.bytes)为 1048576 Byte,即:1 MB。 1MB可取出6000+条数据,那我改成100KB,岂不是只取出600+条数据? consumerConfig.put("max.partition.fetch.bytes", 100 * 1024); //100kb 果然奏效!每次只取出600+条数据,加上原先的性能优化,每批数据都控制在10秒内处理完毕。 至此,不再出现日志 Marking the coordinator 2147483646 dead. 数据也不再死循环了。问题至此解决,可以安心下班了。(*^_^*)转载于:https://www.cnblogs.com/sea-horse/p/7615811.html
