首先我们在user_strategy_export表加了mq_state字段,用来记录消息发送状态。整个流程是这样的:当用户抽中奖品后,系统会先在数据库生成中奖记录,此时mq_state是待发送状态(0)。然后通过KafkaProducer发送发货消息,如果发送成功就把状态改成1,失败就改成2。这里有个关键点:发送MQ和更新数据库状态这两个操作不是事务性的,所以可能出现消息发了但状态没更新成功的情况。

为此我们做了两个保障机制:第一是Kafka发送消息时的回调处理,能在发送失败时立即标记状态;第二是准备了定时任务,会定期扫描mq_state不是1的记录重新发送。消费端处理消息时也考虑了幂等性,防止重复发货。

具体到代码实现上,主要改了三个地方:

  1. 在ActivityProcessImpl的抽奖流程最后增加了发送MQ消息的逻辑,用ListenableFuture的callback来处理发送结果

  2. 新建了KafkaProducer专门处理消息发送,把对象转成JSON字符串

  3. 消费端LotteryInvoiceListener根据奖品类型调用不同的发货处理器,处理失败时会抛出异常让Kafka自动重试

问题👿:

  • 为什么选择Kafka而不是其他消息队列?

主要看中它的高吞吐和持久化能力,抽奖系统在大促时会有突发流量,Kafka的磁盘顺序读写能扛住峰值压力。消息分区是按用户ID哈希分配的,这样同一个用户的抽奖消息会进同一个分区。消费者组设计成"抽奖发货组",每个分区只有一个消费者实例,避免重复发货。

  • Kafka集群挂了怎么办?

    • 数据库的mq_state字段会标记消息状态(0待发送/1成功/2失败),定时任务每分钟扫描状态不是1的记录重试发送。

    • 消费端做了幂等处理,就算重复收到消息也不会重复发货。

  • mq_state状态流转

0→1(发送成功更新)

0→2(发送失败更新)

0→1(定时任务补偿发送成功)

2→1(定时任务重试成功)

  • ListenableFuture的回调里直接更新数据库,如果更新失败怎么办?

在回调里如果更新数据库失败,会记一条错误日志并触发告警,同时定时任务会扫描到这条既不是1也不是2的"僵尸数据"(通过modified_time判断),由补偿机制重新处理。

  • 你们的消息幂等是怎么实现的?举个例子说明

    • 消费端先查redis判断orderId是否已处理

    • 数据库user_strategy_export表有orderId唯一索引

    • 发货操作前会检查发货状态

  • 定时任务扫描补偿的频率怎么定的?会不会有重复发送风险?

频率是动态调整的:平时1分钟1次,大促时10秒1次。重复发送确实会发生,所以消费端必须做幂等。我们通过给消息加唯一traceId来追踪重复情况。

  • 如果消息发送成功了但回调没执行,系统怎么发现并修复?

通过比对Kafka的ack和数据库状态来发现,定时任务会拉取最近5分钟的消息元数据,去和数据库记录做比对,发现“显示成功(ack=true)但数据库状态还是0(未确认)”的记录就补偿状态,

  • 消费端处理消息时抛异常了,你们的重试策略是什么?

默认重试3次,间隔2秒。对于实物类奖品会重试5次,虚拟商品只重试1次。所有重试都带着原始消息,不会重新序列化。通过Kafka的retry topic机制,

  • 遇到过消息积压吗?怎么解决的?

    • 动态增加消费者实例

    • 把非核心奖品转到备用队列

    • 降级部分风控检查

  • 为什么不用事务消息方案?比如RocketMQ的事务消息

  • 如果要求消息顺序性(比如同一个用户的发货顺序),你会怎么改造?

按随机哈希分配分区,改成按用户ID哈希。然后消费端每个分区启用单线程消费+本地队列。最后在数据库层加乐观锁保证顺序。原生顺序不选择是因为生产者必须等待前一条消息的ACK后才能发下一条,而我们的方案本质上是在消费端做"分流",Kafka生产者仍然可以批量发送。

  • 你们监控了哪些MQ相关指标?报警阈值怎么设置的?

消息发送成功率,消息延迟,积压量,补偿任务执行次数。

  • 大促时消息量暴涨10倍,你的方案需要做哪些调整?

可以预先扩容Kafka分区和消费者,还可以去启用消息压缩,同时对非核心的日志去实现降级处理。

  • 现在要接入一个新的发货服务,改造点在哪里?

在DistributionGoodsFactory新增处理类型,消费端增加新的奖品类型判断

  • 有遇到过消息丢失的情况吗?怎么排查的?

先看Kafka的留存日志,然后去查看补偿任务日志,最后对比数据库和MQ的offset。

  • 你们的消息体Schema是怎么管理的?兼容性怎么处理?

所有消息体都带version字段,消费者兼容最近3个版本。

  • 如果用内存队列替代Kafka,你会怎么设计?

会用Redis Stream做持久化,然后每个消费者维护自己的pending队列,并且定时去快照恢复状态

  • 如果要求Exactly-Once语义,现有架构要怎么调整?

让消息从生产到消费的整个流程中,确保每个消息被严格处理一次,既不会丢,也不会重复。MQ发送 和 数据库更新 放在同一个事务里,保证二者同时成功或失败。依赖 Kafka 的 transactional.id 防止重复发送。用 数据库唯一键(如 order_id)或 Redis原子操作 实现幂等。将 业务处理 和 Offset提交 放在同一个事务中。然后是将Offset和业务状态存在同一数据库。