问题思考?
redis分布式限流的情况下,如果请求不能被丢弃,那么被限流的请求该怎么处理呢?在大流量请求的情况下,如何保证接口的稳定性?网络上大部分讲关于分布式限流,都是用redis+lua来实现,被限流的请求都是直接丢弃,然后在前端给用户提示友好信息。
这种做法当然没错,但是如果有一种业务场景不能丢弃请求,每个请求都有和业务相关的关键信息该怎么办呢?其实这时候就需要利用kafka+aop来实现。
项目背景:
需要同步第三方的数据到本地,第三方的接口做了限制,接口只能承受最多100次/秒的请求。 按日期的维度去同步90天的数据。实现方案:
首先,在调用第三方接口的service上加上限流的注解,防止超频调用第三方接口而导致被加入黑名单。其次,利用redis+lua来做分布式限流。最后,利用aop拦截每一个调用服务的请求,验证此时的请求数量是否已经大于限制的数量,如果大于则将该请求放到kafak队列,进行下一次的处理。代码实现:
自定义限流注解,加在需要被限制的请求方法上面。 /** * @Author: guandezhi * @Date: 2019/6/21 0:03 */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface DerbyRateLimiter { int limit() default 100; //限定次数 int timeout() default 1; //超时时间 int totalNum() default 0; String bizRemark() default ""; }redis+lua分布式限流实现
/** * @Author: guandezhi * @Date: 2019/6/21 0:03 */ public class CountRateLimiter { /** * @param jedis * @param limit 阈值/单位时间 * @param seconds 单位时间 秒 * @param name 名称 * @return * @throws IOException */ public static boolean acquire( Jedis jedis, int limit, int seconds, String name) throws IOException { String luaScript = "local key = KEYS[1] " + "local limit = tonumber(ARGV[1]) " + "local timeout = tostring(ARGV[2]) " + "local current = tonumber(redis.call(\"INCRBY\", key,\"1\")) " + "if current <= 1 then " + " redis.call(\"expire\", key,timeout) " + "end " + "if current > limit then " + " return 0 " + "else " + " return 1 " + "end"; return (Long) jedis.eval(luaScript, Lists.newArrayList(name), Lists.newArrayList(String.valueOf(limit), String.valueOf(seconds))) == 1; } }利用aop实现请求拦截
/** * @Author: guandezhi * @Date: 2019/6/21 0:03 */ @Slf4j @Aspect @Component public class DerbyDynamicLimitAop { @Autowired private RedisTemplate redisTemplate; @Autowired private EmailService emailService; @Autowired private KafkaTemplate kafkaTemplate; @Around("@annotation(rateLimiter)") public Object execute(ProceedingJoinPoint joinPoint, DerbyRateLimiter rateLimiter) { String methodName = joinPoint.getSignature().getName(); int limit = rateLimiter.limit(); int timeout = rateLimiter.timeout(); String name = "travel:hotel:data:ratelimit:" + methodName + ":"; Jedis jedis = JedisUtil.getJedis(); Boolean success = null; long ttl = 0L; try { success = CountRateLimiter.acquire(jedis, limit, timeout, name); ttl = jedis.ttl(name) > 0 ? ttl : 0; } catch (Exception e) { log.error("DynamicLimitAop redis分布式限流异常:{}", e.getMessage()); } finally { jedis.close(); } try { if (!success) { log.warn("动态拉取数据接口超频:当前频次:{},限制频次:{},休眠:{}秒", jedis.get(name), limit, ttl); //将被限流的请求发送到队列中后续处理 kafkaTemplate.send(KafkaQueueTopic.KAFKA_QUEUE_DERBYDYNAMICLIMIT_LIST, getRequestParamsJson(joinPoint)); } else { return joinPoint.proceed(); } } catch (Throwable e) { log.error("DynamicLimitAop异常:{}", e); } return new ResultVo<>();注意:
1.此处的redis连接需要及时关闭,否则可能会导致redis连接数不够用的情况。
将队列中被限流的请求重新调用接口
/** * 监听被限流的请求 * * @Author: guandezhi * @Date: 2019/5/14 15:06 */ @Slf4j @Component public class DerbyLimitAopKafkaListener { @Autowired @Qualifier(value = "derbyRoomService") private RoomService derbyRoomService; @Resource private ThreadPoolTaskExecutor taskExecutor; @KafkaListener(topics = KafkaQueueTopic.KAFKA_QUEUE_DERBYDYNAMICLIMIT_LIST, containerFactory = "batchFactory") public void listen(List<ConsumerRecord<String, String>> records) { try { if (CollectionUtils.isNotEmpty(records)) { for (ConsumerRecord<String, String> record : records) { RequestParam requestParam = JSONObject.parseObject(record.value(), RequestParam.class); if (requestParam == null) { continue; } log.info("开始批处理被限流的请求:{}", JSONObject.toJSONString(requestParam)); taskExecutor.execute(() -> { try { derbyRoomService.loadRoom(requestParam.getRequest(), requestParam.getData()); } catch (Exception e) { log.error("DerbyLimitAopKafkaListener, requestParam {},异常 {}", JSONObject.toJSONString(requestParam), e.toString()); } }); } } } catch (Exception e) { log.error("DerbyLimitAopKafkaListener批量消费异常 records:{},异常信息 {}", JSONObject.toJSONString(records), e.getMessage()); } } }这样就实现了被限流的请求重新进入接口调用的循环中。虽然有被限流,但是请求都没有被丢弃。