在这个抽奖系统主要干了两件需要任扫描的事,

第一个是把"已审核通过"的活动变成"进行中"状态,第二个是把"已结束"的活动关闭。

主要是通过

XXL-JOB这个分布式任务框架,比Spring自带的@Scheduled更强大。然后配置的是每10秒跑一次,扫描数据库里的活动。最后需要通过分页查询(每次查10条)避免一次加载太多数据。

依赖引入

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.3.0</version>
</dependency>

配置

配置中主要包括你的服务地址,应用名称、日志路径等

# xxl-job
# 官网:https://github.com/xuxueli/xxl-job/
# 地址:http://localhost:7397/xxl-job-admin 【需要先启动 xxl-job】
# 账号:admin
# 密码:123456
xxl:
  job:
    admin:
      addresses: http://127.0.0.1:7397/xxl-job-admin
    executor:
      address:
      appname: lottery-job
      ip:
      port: 9999
      logpath: /Users/fuzhengwei/itstack/data/applogs/xxl-job/jobhandler
      logretentiondays: 50
    accessToken:

任务初始类

需要启动一个任务执行器,通过配置 @Bean 对象的方式交给 Spring 进行管理。

@Configuration
public class LotteryXxlJobConfig {

    private Logger logger = LoggerFactory.getLogger(LotteryXxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");

        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

开发活动扫描任务

在任务扫描中,主要把已经审核通过的活动和已过期的活动中状态进行变更操作;

  • 审核通过 -> 扫描为活动中

  • 活动中已过期时间 -> 扫描为活动关闭

@Component
public class LotteryXxlJob {

    private Logger logger = LoggerFactory.getLogger(LotteryXxlJob.class);

    @Resource
    private IActivityDeploy activityDeploy;

    @Resource
    private IStateHandler stateHandler;

    // 这个注解告诉XXL-JOB这是一个定时任务
    @XxlJob("lotteryActivityStateJobHandler")
    public void lotteryActivityStateJobHandler() throws Exception {
        logger.info("扫描活动状态 Begin");

        List<ActivityVO> activityVOList = activityDeploy.scanToDoActivityList(0L);
        if (activityVOList.isEmpty()){
            logger.info("扫描活动状态 End 暂无符合需要扫描的活动列表");
            return;
        }

        while (!activityVOList.isEmpty()) {
            for (ActivityVO activityVO : activityVOList) {
                Integer state = activityVO.getState();
                switch (state) {
                    // 活动状态为审核通过,在临近活动开启时间前,审核活动为活动中。在使用活动的时候,需要依照活动状态核时间两个字段进行判断和使用。
                    case 4:
                        Result state4Result = stateHandler.doing(activityVO.getActivityId(), Constants.ActivityState.PASS);
                        logger.info("扫描活动状态为活动中 结果:{} activityId:{} activityName:{} creator:{}", JSON.toJSONString(state4Result), activityVO.getActivityId(), activityVO.getActivityName(), activityVO.getCreator());
                        break;
                    // 扫描时间已过期的活动,从活动中状态变更为关闭状态
                    case 5:
                        if (activityVO.getEndDateTime().before(new Date())){
                            Result state5Result = stateHandler.close(activityVO.getActivityId(), Constants.ActivityState.DOING);
                            logger.info("扫描活动状态为关闭 结果:{} activityId:{} activityName:{} creator:{}", JSON.toJSONString(state5Result), activityVO.getActivityId(), activityVO.getActivityName(), activityVO.getCreator());
                        }
                        break;
                    default:
                        break;
                }
            }

            // 获取集合中最后一条记录,继续扫描后面10条记录
            ActivityVO activityVO = activityVOList.get(activityVOList.size() - 1);
            activityVOList = activityDeploy.scanToDoActivityList(activityVO.getId());
        }

        logger.info("扫描活动状态 End");

    }

}

配置可视化界面任务列表

先在任务执行器里面配置,然后去任务列表里面配置LotteryXxlJob#lotteryActivityStateJobHandler

问题

  • 为什么选择XXL-JOB而不是Spring自带的@Scheduled或Quartz?

我们选XXL-JOB主要因为它解决了Spring定时任务的三大痛点:一是没有运维界面,改个cron表达式都要发版;二是分布式环境下多个节点会重复执行;三是任务失败后没有自动告警

  • 任务分片和性能优化怎么做的?如果活动数据量很大(比如10万条),如何优化扫描性能?

ID分段的分片策略。比如3个执行器节点,第一个节点扫ID 0-3万,第二个扫3-6万,第三个扫6-10万。具体实现是在scanToDoActivityList方法里带分片参数:WHERE id > ? AND id <= ?。后来还加了二级缓存,把最近扫描过的活动ID存Redis,避免重复处理。

  • 任务执行器的注册发现机制是怎样的?新增节点如何自动加入集群?

XXL-JOB的执行器启动时会主动向Admin注册,Admin那边能看到所有在线的执行器列表。新增节点时只要保证appname一致,Admin会自动把任务分给它,这个过程不需要人工干预。

  • 你配置的scanToDoActivityList分页查询为什么用ID而不是PageHelper

一是大表翻页时PageHelper的LIMIT offset, size性能差,比如查第10万条数据要扫描前10万条记录;二是分布式任务分片时,ID范围分片比页码分片更准。核心原因是 页码分片在数据动态变化时会导致漏扫或重复,而ID范围分片能精准控制数据边界。改成ID分片后,即使数据实时变化也能保证每个ID只被一个节点处理

  • 任务执行过程中如果突然宕机,如何保证状态不会错乱?

XXL-JOB本身有故障转移机制,比如一个节点挂了,其他节点会接管它的分片。我们还加了双重保障:一是在活动表加了version字段做乐观锁,更新时带版本号校验;二是关键操作都记了操作日志。最极端的情况也只会漏处理,不会出现状态回退。

  • @XxlJob注解底层是怎么和XXL-JOB-admin通信的?

本质是RPC通信的封装,执行器启动时会把自己能处理的任务列表注册到Admin。触发任务时,Admin通过HTTP调执行器的接口(代码里埋了/run端点),把任务参数传过来。

  • 如果某个活动状态变更失败,重试机制是怎么设计的?

重试分三个层级:第一是XXL-JOB自带的失败重试(配了3次);第二是我们stateHandler里加了本地重试,用Guava的Retryer实现,最多试2次,间隔2秒;第三是落地数据库失败记录,由补偿任务每小时扫一次。

  • 如果XXL-JOB-admin服务挂了,执行器还能正常工作吗?

Admin挂了执行器还能继续跑已注册的任务,但新任务触发和任务调度会暂停。然后可以通过两个方面去兜底,一是执行器有本地任务缓存,关键任务会双重注册到Spring的@Scheduled;二是给Admin做了高可用部署,挂掉后5分钟内就能自动恢复。

  • 任务执行超时了怎么办?有设置超时时间吗?

首先在XXL-JOB界面设置了全局超时30分钟,其次在代码里对单个活动处理加了@Timeout注解限制5秒。超时后会先记录异常日志,然后根据活动优先级决定是否重试——普通活动直接放弃,重要活动会进入延迟队列重试3次。

  • 如何防止两个执行器同时处理同一条活动记录?

XXL-JOB的分片参数保证不同节点处理不同数据范围,然后还能通过数据库加SELECT FOR UPDATE锁,补偿记录带版本号,防止并发更新冲突

因为分片参数能保证数据是隔离的,每个节点只处理自己分片范围内的数据,然后因为Admin是动态分配的,如果有节点挂了,它就会把它的分片转给其他的存活节点,同时因为加了幂等的保证及时被多次执行,数据库的判断条件也能保证不会重复的更新。

  • 为什么把状态变更抽成stateHandler而不是写在Job里?

写在Job里,每次改状态流转都要重发布整个任务服务。现在用stateHandler后,运营通过配置中心修改状态规则就能实时生效。

  • 如果要求支持动态调整扫描频率(比如大促时改成每秒扫描),你会怎么改?

管理后台增加调速界面,调用/jobinfo/update接口。然后通过执行器去监听配置变更。

  • 任务日志存在哪里?如何快速定位某次执行的问题?

最近3天的日志在ELK(实时检索),30天内日志压缩后扔S3(低成本存储),关键日志额外存数据库(保证事务一致性)。

  1. 通过XXL-JOB任务ID在ELK搜索traceId=job_123,秒级看到完整执行链路

  2. 异常日志会打上ERROR标签,直接筛选

  3. 数据库里的操作日志表带前后状态对比,比如状态4->5失败,原值:xxx

  • 如果扫描时数据库CPU飙高,你会怎么应急处理?

降级处理,通过配置中心关闭非核心活动的扫描(如抽奖测试活动);限流处理,在执行器加Semaphore控制并发查询数;错峰处理,随机延迟0-5秒执行

  • 如何监控任务积压情况?报警阈值怎么设的?

积压量 = 待处理记录数 / 处理速度(阈值:>1万条触发微信通知,>5万条电话呼叫)

最老数据年龄(阈值:>2小时报警)

  • 如果要新增一个"自动结算奖金"的任务,代码要怎么调整?

写个BonusSettlementJob类,继承BaseJobHandler

distributionGoodsFactory里加结算处理器(注意区分实物奖金和虚拟奖金)

配置结算规则到状态机,比如"活动结束24小时后自动结算"

  • 任务执行结果需要通知运营人员,你会怎么设计?

任务结果统一发Kafka,带消息类型标签(紧急/日常)

运营自己在管理端订阅,比如只收"结算失败"的短信

重要消息会要求二次确认,比如奖金超过10万的结算需要运营点击"复核通过"

  • 如果领导要求改成分布式事务保证状态一致性,你会怎么实现?

可以通过配置Seata去实现。