Redis – 基于 Redis 的消息队列实现

简介

Redis 中提供了3种具有消息队列的功能的方式,分别是 List,PUBSUB,Stream,本文将讲解这三种消息队列的使用以及优缺点。

 

基于List实现消息队列

Redis 中提供了 List 的存储类型,我们可以利用 List 的存储特性实现消息的队列

 

List 的使用方法

 

命令 命令说明 命令示例
LPOP 向左移出并获取列表的第一个元素 LPOP keyName
RPOP 向右移除列表的最后一个元素,返回值为移除的元素。 RPOP keyName
RPUSH 在列表中添加一个或多个值到列表尾部 RPUSH keyName value1 [value2]
LPUSH 将一个或多个值插入到列表头部 LPUSH keyName value1 [value2]
BLPOP 向左移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。 BLPOP key1 [key2 ] timeout
BRPOP 向右移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。 BRPOP key1 [key2 ] timeout

通过上面的基本使用命令,我们可以通过 LPUSH 向左插入一个消息,然后使用 RPOP 或 BRPOP 向右获取一个消息,获取后,列表中的元素将被删除。

 

基于List的消息队列有哪些优缺点? 优点:

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

 

基于PubSub的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

SUBSCRIBE channel [channel] :订阅一个或多个频道 PUBLISH channel msg :向一个频道发送消息 

PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道

PubSub 的使用方法

命令 命令说明 命令示例
SUBSCRIBE 订阅某个频道 SUBSCRIBE channel [channel]
PSUBSCRIBE 使用通配规则订阅频道,* 表示匹配所有,? 表示匹配单个字符,[ab] 表示可匹配a/b两个字符 PSUBSCRIBE pattern[pattern]
PUBLISH 发布消息出去,使多个订阅者都收到消息 PUBLISH channel message

1.通过启动两个以上的Redis客户端,一个客户端作为消息发布,其余的客户端作为消息订阅者。

2.当消息发布时,所有消息订阅者都将收到消息。

基于PubSub的消息队列有哪些优缺点? 优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

 

基于Stream的消息队列

Stream 是 Redis 在 5.0 之后新推出的一种相对比较完善的消息队列功能,它有效的解决了 List 和 PUBSUB 两种消息队列的缺点。本文主要讲解的是使用 Redis 的 Stream 消息队列处理系统的消息。

发送消息的命令:

 

例如:

 

读取消息的方式之一:XREAD

 

例如,使用XREAD读取第一个消息:

 

XREAD阻塞方式,读取最新的消息:

 

消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

消息分流:

当消费者组中存在多个消费者时,消息可以被组内的任意消费者进行消费

消息标示:

消费者组会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后读取消息。确保每一个消息都会被消费

消息确认:

消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。

即消费者在读取消息后,消息不会马上被删除,而是被标记为“已读”,当其它消费者希望重新读取时依然可以被读取,直至该消息使用 XACK 命令确认为,才会正式被删除,有效的解决了消费者读取后报异常后实际未处理消息的问题。

 

消费者组使用方法

删除指定的消费者组

XGROUP DESTORY key groupName

给指定的消费者组添加消费者

XGROUP CREATECONSUMER key groupname consumername

删除消费者组中的指定消费者

XGROUP DELCONSUMER key groupname consumername

从消费者组读取消息

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group:消费组名称
  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  • count:本次查询的最大数量
  • BLOCK milliseconds:当没有消息时最长等待时间
  • NOACK:无需手动ACK,获取到消息后自动确认
  • STREAMS key:指定队列名称
  • ID:获取消息的起始ID:

">":从下一个未消费的消息开始 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

 

类型消息队列的XREADGROUP命令特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次

 

基于Redis的Stream结构作为消息队列,实现异步秒杀下单

需求:

  • 创建一个Stream类型的消息队列,名为stream.orders
  • 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
  • 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

lua脚本代码如下:

---
--- 秒杀订单异步分配lua
--- 针对秒杀订单,在每一次秒杀时都会出现串行查询数据库的操作,性能比较低
--- 所以把非数据库操作的秒杀步骤都提取出来,先在Redis中处理
--- 如查询优惠券数量,秒杀成功用户等信息,生成订单号
--- 在Redis中先保存,再后续由数据库写入计单数据
---

-- 获取秒杀优惠券id,用于查询该优惠券还有多少份
local voucherKey = ARGV[1]
local stockKey = 'seckill:stock:' .. voucherKey

-- 获取登陆用户id,用于保存和查询该用户是否已下单
local userKey = ARGV[2]
local orderKey = 'seckill:order:' .. voucherKey

-- 查询当前还有多少张优惠券,如果优惠券少于0张,则说明没有库存
if (tonumber(redis.call('get', stockKey)) <= 0) then
    return 1
end

-- 查询当前用户是否已下过单,如果已下单,返回2
if (redis.call('SISMEMBER', orderKey, userKey) == 1) then
    return 2
end

-- 否则,用户就可以直接下单,下单时,减扣优惠券数量
redis.call('decrby', stockKey, 1)
-- 完成后,把该用户加入到秒杀成功集合中
redis.call('SADD', orderKey, userKey)
return 0

 

秒杀实现代码:

/**
 * <p>
 * 服务实现类
 * </p>
 *
 * @author Unsoft
 * @since 2021-12-22
 */
@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {


    private static final DefaultRedisScript<Long> DEFAULT_REDIS_SCRIPT;

    static {
        DEFAULT_REDIS_SCRIPT = new DefaultRedisScript<>();
        DEFAULT_REDIS_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        DEFAULT_REDIS_SCRIPT.setResultType(Long.class);
    }


    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    private RedisIDWorker redisIDWorker;

    // 创建队列
    private final BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);

    // 创建多线程
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    // 获取代理对象,以获得事务处理
    private IVoucherOrderService proxy;

    @PostConstruct
    private void init() {
        // 在对象创建完成时,就开始监听秒杀订单,并自动处理
        SECKILL_ORDER_EXECUTOR.submit(() -> {
            while (true){
                try {
                    // 阻塞处理,如果暂时没有收到订单,会阻塞
                    VoucherOrder order = orderTasks.take();
                    proxy.createOrder(order);
                } catch (Exception e) {
                    log.error("订单处理异常", e);
                }
            }
        });
    }

    /**
     * 秒杀优惠券
     *
     * @param voucherId
     * @return
     */
    @Override
    @Transactional
    public Result seckill(Long voucherId) {

        Long userId = UserHolder.getUser().getId();

        Long result = stringRedisTemplate.execute(
                DEFAULT_REDIS_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString()
        );
        int i = result.intValue();
        if (i != 0) {
            return Result.fail(i == 1 ? "库存不足" : "用户不能重复下单");
        }

        // 获取当前线程的代理对象,不能在子线程中获取,会取不到
        proxy = (IVoucherOrderService) AopContext.currentProxy();

        // 创建秒杀订单对象,把对象存到队列中等待处理
        long orderId = redisIDWorker.nextId("order");
        VoucherOrder order = new VoucherOrder();
        order.setId(orderId);
        order.setUserId(userId);
        order.setVoucherId(voucherId);
        orderTasks.add(order);

        return Result.ok(0);
    }

    /**
     * 创建一人一单的秒杀订单
     *
     * @param order
     * @return
     */
    @Transactional
    public void createOrder(VoucherOrder order) {
        // 一人一单,针对单个人只能秒杀一次的锁定,通过在购买前查询是否已存在购买订单
        // 但是这里有可能会出现线程安全问题,因此需要使用悲观锁进行线程控制
        Long userid = UserHolder.getUser().getId();
        Integer count = query().eq("user_id", userid).eq("voucher_id", order.getVoucherId()).count();
        if (count > 0) {
            log.error("只能购买一件");
            return;
        }

        // 5.扣减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1")
                .eq("voucher_id", order.getVoucherId())
                .ge("stock", 0)
                .update();
        if (!success) {
            log.error("库存不足");
            return;
        }

        // 6.创建订单
        save(order);

    }
}

 

 

如果您喜欢本站,点击这儿不花一分钱捐赠本站

这些信息可能会帮助到你: 下载帮助 | 报毒说明 | 进站必看

修改版本安卓软件,加群提示为修改者自留,非本站信息,注意鉴别

THE END
分享
二维码
打赏
海报
Redis – 基于 Redis 的消息队列实现
简介 Redis 中提供了3种具有消息队列的功能的方式,分别是 List,PUBSUB,Stream,本文将讲解这三种消息队列的使用以及优缺点。   基于List实现消息队列 Redis 中提供了 List 的存储类型……
<<上一篇
下一篇>>