Redis 限流

Redis 限流能有限的解决 缓存击穿 问题,通过控制一段时间内的访问量来控制,进入缓存和数据库的请求次数。

使用 Pipeline

这里我用springboot项目来演示:

@RestController
public class DemoTest {

    @Autowired
    RedisTemplate redisTemplate;

    @GetMapping("/pipelineTest")
    public void PipelineTest() {
        //用户id
        String userId = "zhangsan";
        //行为key
        String actionKey = "good";
        //时间范围
        Integer period = 60;
        //次数上限
        Integer maxCount = 5;

        List<Object> list = redisTemplate.executePipelined(new RedisCallback<Long>() {

            @Override
            public Long doInRedis(RedisConnection connection) throws DataAccessException {
                connection.openPipeline();
                String key = String.format("hist:%s:%s", userId, actionKey);
                long nowTs = System.currentTimeMillis();
                //业务代码,connection中有各种方法,方法名和redis命令行一致,string,list,set,zset
                connection.zAdd(key.getBytes(), nowTs, (nowTs + "").getBytes());

                //清理无用的数据,当前时间减去时间范围就是范围外的时间,然后去除,保证connection是最新60秒内的数据
                connection.zRemRangeByScore(key.getBytes(), 0, nowTs - period * 1000);
                //设置key有效时间 对冷数据自动清理
                connection.expire(key.getBytes(), period + 1);
                //获取次数
                Long card = connection.zCard(key.getBytes());
                return card;
            }
        });

        

        System.out.println(list);
        if ((Long) list.get(list.size() - 1) >= maxCount) {
            System.out.println("已经超出最大点赞量");
        }
    }
}

调用 redisTemplate.executePipelined 方法,需要一个回调,RedisConnection 中的方法和redis命令是一样的,如保存 string 类型方法名也是 setconnection.set(key,value),zset类型就是 connection.zAdd。

除了 RedisCallback,redisTemplate 还提供了,SessionCallback 也可以操作 redis,方法和 RedisCallback 是有不一样的,具体我们以后再说。

List<Object> list = redisTemplate.executePipelined(new SessionCallback<Long>() {

    @Override
    public Long execute(RedisOperations operations) throws DataAccessException {
        String key = String.format("hist:%s:%s", userId, actionKey);
        long nowTs = System.currentTimeMillis();
        ZSetOperations zSetOperations = operations.opsForZSet();

        zSetOperations.add(key,(nowTs + "").getBytes(),nowTs);
        zSetOperations.removeRangeByScore(key, 0, nowTs - period * 1000);
        redisTemplate.expire(key,nowTs - period * 1000, TimeUnit.SECONDS);
        Long card = zSetOperations.size(key);
        return card;
    }
});

Redis-Cell 令牌桶限流

如果在一个并发量很高的接口,再使用 Pipeline 的话,就会导致 zset 变的很大,这显然不太合适。此时可以用 Redis-Cell 来限流

简介

系统会维护一个容器用于存放令牌(token),并且系统以一个固定速率往容器中添加令牌(token),这个速率通常更加系统的处理能力来权衡。当客户端的请求打过来时,需要从令牌桶中获取到令牌(token)之后,这个请求才会被处理,否则直接拒绝服务。

image-20230206224144834

安装 Redis-Cell

Redis默认是没有集成Redis-Cell这个限流模块的,就好比Redis使用布隆过滤器一样,我们也需要对该模块进行安装与集成。

https://github.com/brandur/redis-cell/releases/download/v0.2.4/redis-cellv0.2.4-x86_64-unknown-linux-gnu.tar.gz

下载完成后解压,然后重启。

接下来修改 redis.conf 文件,加载额外的模块:

loadmodule /root/redis-5.0.7/redis-cell/libredis_cell.so

重启 Redis

CL.THROTTLE 指令

指令 cl.throttle 参数含义

 CL.THROTTLE  ysmc  10  6 60 1
               ▲     ▲  ▲  ▲ ▲
               |     |  |  | └───── apply 1 token (default if omitted) (本次申请一个token)
               |     |  └──┴─────── 6 tokens / 60 seconds  (60秒添加6个token到令牌桶中)
               |     └───────────── 10 max_burst    (最大的突发请求,不是令牌桶的最大容量)
               └─────────────────── key "ysmc" (限流key)

输出参数值含义

127.0.0.1:6379> cl.throttle ysmc 10 6 60 1
1) (integer) 0                    # 当前请求是否被允许,0表示允许,1表示不允许
2) (integer) 11                    # 令牌桶的最大容量,令牌桶中令牌数的最大值(max_burst + 1)
3) (integer) 10                   # 令牌桶中当前的令牌数
4) (integer) -1                    # 如果被拒绝,需要多长时间后在重试,如果当前被允许则为-1
5) (integer) 12                    # 多长时间后令牌桶中的令牌会满

注意:

最大容量个 并不会进行限流,最后一个返回值没有意义,不要看

也就是说前 11 个返回第一个参数是 都是 0,并不是 10秒返回一个,后面的正常

java代码实现Redis-Cell 限流

导入依赖

<dependency>
    <groupId>io.lettuce</groupId>
    <artifactId>lettuce-core</artifactId>
    <version>5.3.4.RELEASE</version>
</dependency>
public interface IRedisCommand extends Commands {
 
    /**
     * 定义限流方法
     *
     * @param key           限流key
     * @param maxBurst      最大的突发请求,桶容量等于maxBurst + 1
     * @param tokens        tokens 与 seconds 是组合参数,表示seconds秒内添加个tokens
     * @param seconds       tokens 与 seconds 是组合参数,表示seconds秒内添加个tokens
     * @param apply         当前申请的token数
     * @return
     */
    @Command("CL.THROTTLE ?0 ?1 ?2 ?3 ?4")
    List<Object> throttle(String key, long maxBurst, long tokens, long seconds, long apply);
 
}
/**
 * <p>
 *      Redis-Cell令牌桶限流
 * </p>
 *
 * @Author: ysmc
 * @Date: 2021/9/8 23:47
 */
public class TokenBucketRateLimiter {


    private static final String SUCCESS = "0";
    private RedisClient client;
    private StatefulRedisConnection<String, String> connection;
    private IRedisCommand command;

//    private final Logger logger = LoggerFactory.getLogger(TokenBucketRateLimiter.class);
    private static final Logger log = LoggerFactory.getLogger(TokenBucketRateLimiter.class);
    public TokenBucketRateLimiter(RedisClient client) {
        this.client = client;
        this.connection = client.connect();
        this.command = new RedisCommandFactory(connection).getCommands(IRedisCommand.class);
    }
 
 
    /**
     * 请是否被允许
     *
     * @param key
     * @param maxBurst
     * @param tokens
     * @param seconds
     * @param apply
     * @return
     */
    public boolean isActionAllowed(String key, long maxBurst, long tokens, long seconds, long apply) {
        // 执行 CL.THROTTLE
        List<Object> result = command.throttle(key, maxBurst, tokens, seconds, apply);

        if (result != null && result.size() > 0) {
            log.info("{}限流{}返回:{}", new Date(),key, result);
            return SUCCESS.equals(result.get(0).toString());
        }
        log.info("{}限流{}返回:{}",new Date(), key, result);
        return false;
    }
 
}
/**
 * <p>
 *      测试令牌桶限流
 *      测试参数 1 秒内只能访问 100 次
 * </p>
 *
 * @Author: ysmc
 * @Date: 2021/9/9 0:02
 */
public class TestTokenBucketRateLimiter {
 
    public static void main(String[] args) {
        RedisClient client = RedisClient.create("redis://123456@192.168.96.128:6379");
        TokenBucketRateLimiter limiter = new TokenBucketRateLimiter(client);
        // cl.throttle liziba 10 5 60 1
        for (int i = 1; i <= 1000; i++) {
            
            // 1秒产生100个令牌,也就是1秒最多放出100个请求,一个请求获取一个令牌
            boolean success = limiter.isActionAllowed("ysmc",100,100,1,1);
            
        }
 
    }
 
}