在这个抽奖系统主要干了两件需要任扫描的事,
第一个是把"已审核通过"的活动变成"进行中"状态,第二个是把"已结束"的活动关闭。
主要是通过
XXL-JOB这个分布式任务框架,比Spring自带的@Scheduled更强大。然后配置的是每10秒跑一次,扫描数据库里的活动。最后需要通过分页查询(每次查10条)避免一次加载太多数据。
依赖引入
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>
配置
配置中主要包括你的服务地址,应用名称、日志路径等
# xxl-job
# 官网:https://github.com/xuxueli/xxl-job/
# 地址:http://localhost:7397/xxl-job-admin 【需要先启动 xxl-job】
# 账号:admin
# 密码:123456
xxl:
job:
admin:
addresses: http://127.0.0.1:7397/xxl-job-admin
executor:
address:
appname: lottery-job
ip:
port: 9999
logpath: /Users/fuzhengwei/itstack/data/applogs/xxl-job/jobhandler
logretentiondays: 50
accessToken:
任务初始类
需要启动一个任务执行器,通过配置 @Bean 对象的方式交给 Spring 进行管理。
@Configuration
public class LotteryXxlJobConfig {
private Logger logger = LoggerFactory.getLogger(LotteryXxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
开发活动扫描任务
在任务扫描中,主要把已经审核通过的活动和已过期的活动中状态进行变更操作;
审核通过 -> 扫描为活动中
活动中已过期时间 -> 扫描为活动关闭
@Component
public class LotteryXxlJob {
private Logger logger = LoggerFactory.getLogger(LotteryXxlJob.class);
@Resource
private IActivityDeploy activityDeploy;
@Resource
private IStateHandler stateHandler;
// 这个注解告诉XXL-JOB这是一个定时任务
@XxlJob("lotteryActivityStateJobHandler")
public void lotteryActivityStateJobHandler() throws Exception {
logger.info("扫描活动状态 Begin");
List<ActivityVO> activityVOList = activityDeploy.scanToDoActivityList(0L);
if (activityVOList.isEmpty()){
logger.info("扫描活动状态 End 暂无符合需要扫描的活动列表");
return;
}
while (!activityVOList.isEmpty()) {
for (ActivityVO activityVO : activityVOList) {
Integer state = activityVO.getState();
switch (state) {
// 活动状态为审核通过,在临近活动开启时间前,审核活动为活动中。在使用活动的时候,需要依照活动状态核时间两个字段进行判断和使用。
case 4:
Result state4Result = stateHandler.doing(activityVO.getActivityId(), Constants.ActivityState.PASS);
logger.info("扫描活动状态为活动中 结果:{} activityId:{} activityName:{} creator:{}", JSON.toJSONString(state4Result), activityVO.getActivityId(), activityVO.getActivityName(), activityVO.getCreator());
break;
// 扫描时间已过期的活动,从活动中状态变更为关闭状态
case 5:
if (activityVO.getEndDateTime().before(new Date())){
Result state5Result = stateHandler.close(activityVO.getActivityId(), Constants.ActivityState.DOING);
logger.info("扫描活动状态为关闭 结果:{} activityId:{} activityName:{} creator:{}", JSON.toJSONString(state5Result), activityVO.getActivityId(), activityVO.getActivityName(), activityVO.getCreator());
}
break;
default:
break;
}
}
// 获取集合中最后一条记录,继续扫描后面10条记录
ActivityVO activityVO = activityVOList.get(activityVOList.size() - 1);
activityVOList = activityDeploy.scanToDoActivityList(activityVO.getId());
}
logger.info("扫描活动状态 End");
}
}
配置可视化界面任务列表
先在任务执行器里面配置,然后去任务列表里面配置LotteryXxlJob#lotteryActivityStateJobHandler
问题
为什么选择XXL-JOB而不是Spring自带的@Scheduled或Quartz?
我们选XXL-JOB主要因为它解决了Spring定时任务的三大痛点:一是没有运维界面,改个cron表达式都要发版;二是分布式环境下多个节点会重复执行;三是任务失败后没有自动告警。
任务分片和性能优化怎么做的?如果活动数据量很大(比如10万条),如何优化扫描性能?
ID分段的分片策略。比如3个执行器节点,第一个节点扫ID 0-3万,第二个扫3-6万,第三个扫6-10万。具体实现是在scanToDoActivityList方法里带分片参数:WHERE id > ? AND id <= ?。后来还加了二级缓存,把最近扫描过的活动ID存Redis,避免重复处理。
任务执行器的注册发现机制是怎样的?新增节点如何自动加入集群?
XXL-JOB的执行器启动时会主动向Admin注册,Admin那边能看到所有在线的执行器列表。新增节点时只要保证appname一致,Admin会自动把任务分给它,这个过程不需要人工干预。
你配置的
scanToDoActivityList
分页查询为什么用ID而不是PageHelper
?
一是大表翻页时PageHelper的LIMIT offset, size
性能差,比如查第10万条数据要扫描前10万条记录;二是分布式任务分片时,ID范围分片比页码分片更准。核心原因是 页码分片在数据动态变化时会导致漏扫或重复,而ID范围分片能精准控制数据边界。改成ID分片后,即使数据实时变化也能保证每个ID只被一个节点处理。
任务执行过程中如果突然宕机,如何保证状态不会错乱?
XXL-JOB本身有故障转移机制,比如一个节点挂了,其他节点会接管它的分片。我们还加了双重保障:一是在活动表加了version字段做乐观锁,更新时带版本号校验;二是关键操作都记了操作日志。最极端的情况也只会漏处理,不会出现状态回退。
@XxlJob
注解底层是怎么和XXL-JOB-admin
通信的?
本质是RPC通信的封装,执行器启动时会把自己能处理的任务列表注册到Admin。触发任务时,Admin通过HTTP调执行器的接口(代码里埋了/run
端点),把任务参数传过来。
如果某个活动状态变更失败,重试机制是怎么设计的?
重试分三个层级:第一是XXL-JOB自带的失败重试(配了3次);第二是我们在stateHandler
里加了本地重试,用Guava的Retryer实现,最多试2次,间隔2秒;第三是落地数据库失败记录,由补偿任务每小时扫一次。
如果
XXL-JOB-admin
服务挂了,执行器还能正常工作吗?
Admin挂了执行器还能继续跑已注册的任务,但新任务触发和任务调度会暂停。然后可以通过两个方面去兜底,一是执行器有本地任务缓存,关键任务会双重注册到Spring的@Scheduled;二是给Admin做了高可用部署,挂掉后5分钟内就能自动恢复。
任务执行超时了怎么办?有设置超时时间吗?
首先在XXL-JOB界面设置了全局超时30分钟,其次在代码里对单个活动处理加了@Timeout注解限制5秒。超时后会先记录异常日志,然后根据活动优先级决定是否重试——普通活动直接放弃,重要活动会进入延迟队列重试3次。
如何防止两个执行器同时处理同一条活动记录?
XXL-JOB的分片参数保证不同节点处理不同数据范围,然后还能通过数据库加SELECT FOR UPDATE锁,补偿记录带版本号,防止并发更新冲突
因为分片参数能保证数据是隔离的,每个节点只处理自己分片范围内的数据,然后因为Admin是动态分配的,如果有节点挂了,它就会把它的分片转给其他的存活节点,同时因为加了幂等的保证及时被多次执行,数据库的判断条件也能保证不会重复的更新。
为什么把状态变更抽成stateHandler而不是写在Job里?
写在Job里,每次改状态流转都要重发布整个任务服务。现在用stateHandler后,运营通过配置中心修改状态规则就能实时生效。
如果要求支持动态调整扫描频率(比如大促时改成每秒扫描),你会怎么改?
管理后台增加调速界面,调用/jobinfo/update
接口。然后通过执行器去监听配置变更。
任务日志存在哪里?如何快速定位某次执行的问题?
最近3天的日志在ELK(实时检索),30天内日志压缩后扔S3(低成本存储),关键日志额外存数据库(保证事务一致性)。
通过XXL-JOB任务ID在ELK搜索
traceId=job_123
,秒级看到完整执行链路异常日志会打上
ERROR
标签,直接筛选数据库里的操作日志表带前后状态对比,比如
状态4->5失败,原值:xxx
如果扫描时数据库CPU飙高,你会怎么应急处理?
降级处理,通过配置中心关闭非核心活动的扫描(如抽奖测试活动);限流处理,在执行器加Semaphore
控制并发查询数;错峰处理,随机延迟0-5秒执行
如何监控任务积压情况?报警阈值怎么设的?
积压量 = 待处理记录数 / 处理速度(阈值:>1万条触发微信通知,>5万条电话呼叫)
最老数据年龄(阈值:>2小时报警)
如果要新增一个"自动结算奖金"的任务,代码要怎么调整?
写个BonusSettlementJob
类,继承BaseJobHandler
在distributionGoodsFactory
里加结算处理器(注意区分实物奖金和虚拟奖金)
配置结算规则到状态机,比如"活动结束24小时后自动结算"
任务执行结果需要通知运营人员,你会怎么设计?
任务结果统一发Kafka,带消息类型标签(紧急/日常)
运营自己在管理端订阅,比如只收"结算失败"的短信
重要消息会要求二次确认,比如奖金超过10万的结算需要运营点击"复核通过"
如果领导要求改成分布式事务保证状态一致性,你会怎么实现?
可以通过配置Seata去实现。
评论