前言:
最近生产环境系统发现一个疑难杂症,看了很久的问题但是始终无法定位到问题并处理,然后查阅了相关资料也是定位不到问题,不过资料查阅却给了个新的思路,以此为跳板最终解决了问题。
一、问题描述
功能介绍: “主计划拆分子计划”是APS系统很常见的功能,功能大概意思是用户可选多个主计划一次性进行“展开子计划”生成子计划,因单个主计划生成子计划的逻辑相对复杂,所以单个计划耗时不能算低,故这里的批量操作使用了异步进行,这里使用了Kafka进行生产及消费消息。
问题起因: 功能完成之后上生产系统,然而偶尔会收到客户提出少量单据卡在中间状态,导致“展开”不了的问题,前前后后查了好久也没能找到具体问题并解决。
二、问题分析
分析数据: 通过查看用户提供的单据,发现这些数据都是卡在了某一个中间状态,这个状态是作为中转状态使用的,一开始计划的状态为“未展开”,点击执行“展开子计划”的功能之后,将计划标识为“展开中”之后再推送到Kafka消费处理,Kafka消费者接收到生产者的消息之后,将计划进行处理,处理完成之后再将状态标识为“展开完成”。
所以从数据上分析,问题点应该出现在消息生产到消费过程这段期间,但是纵观代码,发现已经对业务逻辑做了异常处理,如果是消息消费过程发生异常,都会将错误过程记录下来,所以再次定位到问题出现在Kafka的生产消息及消费消息这个过程。
查看日志: 根据以上分析结合日志监控的方式,确定问题数据实际上并未进行消费,所以猜想有两种情况:
- Kafka根本没有生产消息成功;
- 生产消息成功,但是Kafka未Poll到需要消费的消息。
三、进展
- 加addCallback回调方法
- 生产端的kafkaTemplate对象中,封装发送消息的方法,将send()方法封装为通用方法,增加addCallback()回调方法,用于消息生产成功之后回调记录日志。以此确认生产消息是否成功.
2. 参考相关资料:
查问题过程中,看到大佬写的文章,文章里描述了造成消费不成功的问题是因为“Kafka 内存饱和”造成的,但是实际上内存饱和造成的问题是Kafka消费服务Poll消息时候超时,相应的错误信息在我们系统日志中搜不到,最终也确认不是因为改原因造成的问题。(文章下文参考)
- 【AGP网关】Kafka 异常排查实(内网文章)
-
记一次kafka消费者不消费,消费组被踢出问题(外网文章)
- 研究kafka消费原理
当前确认问题点应该是出现在消费端消费不了消息导致的,那么重新研究一下Kafka消费端的实现原理。
消费者是通过KafkaConsumer对象的poll方法从Kafka队列中将消息拉取出来进行消费,这个poll方法可传入poll超时时间,超过设置的时间则会报拉取超时的异常“due to consumer poll timeout has expired.”,上文中大佬出现的报错就是提示这种拉取超时的报错,超时时间可通过配置节点【max.poll.interval.ms】进行配置;
KafkaConsumer对象poll到数据之后取到ConsumerRecords对象,然后就可以对数据进行消费,直到取到的ConsumerRecords对象是空的(isEmpty()为true)才停止消费。
这里发现有一个隐患的地方,当ConsumerRecords对象取到空数据才停止消费,那么这个ConsumerRecords对象是否会取到多个数据进行消费,是如何进行消费的?!
查阅相关资料,发现Kafka的消费原理是:KafkaConsumer 对象是实时拉取消息的,但不是实时消费消息的。KafkaConsumer 在 poll() 方法中从 Kafka 集群中批量拉取数据,将多个消息封装在 ConsumerRecords 对象中返回。这些消息可以在消费者应用程序的时间间隔内处理,但poll() 方法返回的消息不是立即消费的。只有在 ConsumerRecords 中的所有消息都被处理后,才会发送下一个拉取请求。如果在处理消息时发生错误,可以根据实际需要重新处理这些消息或跳过这些消息。
反观项目代码,发现在KafkaListener监听器拉取到数据之后,项目中仅仅只是取第一条数据进行消费,这里是不是有问题呢?(参考下面代码块)
/** * 计划发布SAP-KTWKZ * * @param records * @param consumer */ @KafkaListener(topics = {KafkaTopicConst.SCHEDULE_DAY_PLAN_RELEASE_SAP_KTWKZ_TOPIC}, id = KafkaTopicConst.SCHEDULE_DAY_PLAN_RELEASE_SAP_KTWKZ_TOPIC, containerFactory = "batchFactory") public void dayPlanReleaseToSapKTWKZTopic(ConsumerRecords<String, String> records, Consumer<String, String> consumer) { String data = ApsKafkaUtils.getFirstRecordValues(); log.info("mq消费-KT万颗子计划发布SAP数据:\n{}", data); try { SystemPostDTO systemPostDTO = JsonUtils.fromJson(data, SystemPostDTO.class); String postContent = systemPostDTO.getData(); if (StrUtil.isEmpty(postContent)) { log.warn("mq消费-KT万颗子计划发布SAP数据异常警告,systemPostDTO.data数据不能为空"); return; } pushRecordService.consumePushProductionOrderKtWKZ(postContent); } catch (Exception e) { log.error("mq消费-KT万颗子计划发布SAP数据异常:", e); } }
四、问题重现及
重现:
从第三点的分析中,暂时确定可能是ConsumerRecords对象接收到多条消息,但是消费端仅仅消费了第一条消息导致的问题,那么通过写demo来测试批量生产消息是否会导致ConsumerRecords一次性拉取到多条消息。
生产:
@ApiOperation("测试Kafka poll消息机制") @PostMapping("/v99/schedule/kafka/test") public ResponseEntity<String> testKafkaPollMessage( @RequestParam("testData") String testData) { String topic = "my-test-topic"; for (int i = 0; i < 100; i++) { Thread.sleep(0);//参数有0,10,100 SubassemblyOpenPlanListInfoRespDTO dto = kafkaDemo.sendMessage(testData); } return Results.success("Success"); }
消费:
@KafkaListener(topics = {"my-test-topic"}, id = "my-test-topic", containerFactory = "batchFactory") public void myTestKafka(ConsumerRecords<String, String> records, Consumer<String, String> consumer) { List<String> recordValues = ApsKafkaUtils.getRecordValues(); try { log.info("接收到的数据为【{}】{}", recordValues.size(), JsonUtils.toJson(recordValues)); } catch (Exception e) { log.error("接收到的数据为异常:", e); } }
以上接口,通过调用发现确实出现ConsumerRecords对象poll到多条消息的情况:
其中,for循环中执行等待时间越长,出现一个ConsumerRecords对象拉取到多条数据的情况越少:
那么分析为什么在实际使用过程中,【主计划拆分子计划】这个功能是偶然出现消费失败的问题,而不是稳定出现呢?
再次通过代码ReView的方式去回顾一下这个功能,发现当前代码中,是使用了for循环将一批次计划单循环推送给MQ进行消费,单个循环里执行了一次读库一次写库的操作,一次循环耗时大概几十毫秒,与上述demo的Thread.sleep(10)场景类似,所以基本确定偶发这种问题的原因出现在这里。
解决:
其实解决该问题很简单,只需要在消费端获取到ConsumerRecords对象之后,将拉取到的所有消息列表循环消费而不是只消费单条消息即可,之前的仅消费单条消息的场景经过沟通确认只存在某些特殊场景才需要使用,暂时不再保证该种场景。
五、总结
本案例中,通过日志、业务场景、写Demo使用并发工具等方式来分析及重现问题,将一个生产上的疑难杂症处理掉,其中也通过参考大佬的文章,虽然问题描述和大佬描述的基本一致,也和网络上的Blog描述一致,但是产生的问题却并不一样。
总的来说,其实解决问题不难,重要的是要了解问题,了解原理以及了解到解决问题的步骤,建议从多个方面一起查看问题。从其他参考文章描述中,可以从业务、日志、内存环境等查看问题,我这里补充一点,也可以多多结合业务来适当写demo去测试问题,可能也会有意外收获。
其他
大家有没有遇到其他的生产上的疑难杂症呢,大家都是怎么遇到问题,最后怎么解决问题呢,这里大家不妨进行讨论,也可以列出多多的跟进方案或者工具,大家一起学习进步。
转自:https://www.cnblogs.com/evlinjun/archive/2023/05/20/17417352.html