Redis – 基于 Redis 的消息队列实现
简介
Redis 中提供了3种具有消息队列的功能的方式,分别是 List,PUBSUB,Stream,本文将讲解这三种消息队列的使用以及优缺点。
基于List实现消息队列
Redis 中提供了 List 的存储类型,我们可以利用 List 的存储特性实现消息的队列
List 的使用方法
命令 | 命令说明 | 命令示例 |
LPOP | 向左移出并获取列表的第一个元素 | LPOP keyName |
RPOP | 向右移除列表的最后一个元素,返回值为移除的元素。 | RPOP keyName |
RPUSH | 在列表中添加一个或多个值到列表尾部 | RPUSH keyName value1 [value2] |
LPUSH | 将一个或多个值插入到列表头部 | LPUSH keyName value1 [value2] |
BLPOP | 向左移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。 | BLPOP key1 [key2 ] timeout |
BRPOP | 向右移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。 | BRPOP key1 [key2 ] timeout |
通过上面的基本使用命令,我们可以通过 LPUSH 向左插入一个消息,然后使用 RPOP 或 BRPOP 向右获取一个消息,获取后,列表中的元素将被删除。
基于List的消息队列有哪些优缺点? 优点:
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点:
- 无法避免消息丢失
- 只支持单消费者
基于PubSub的消息队列
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
SUBSCRIBE channel [channel] :订阅一个或多个频道 PUBLISH channel msg :向一个频道发送消息
PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
PubSub 的使用方法
命令 | 命令说明 | 命令示例 |
SUBSCRIBE | 订阅某个频道 | SUBSCRIBE channel [channel] |
PSUBSCRIBE | 使用通配规则订阅频道,* 表示匹配所有,? 表示匹配单个字符,[ab] 表示可匹配a/b两个字符 | PSUBSCRIBE pattern[pattern] |
PUBLISH | 发布消息出去,使多个订阅者都收到消息 | PUBLISH channel message |
1.通过启动两个以上的Redis客户端,一个客户端作为消息发布,其余的客户端作为消息订阅者。
2.当消息发布时,所有消息订阅者都将收到消息。
基于PubSub的消息队列有哪些优缺点? 优点:
- 采用发布订阅模型,支持多生产、多消费
缺点:
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
基于Stream的消息队列
Stream 是 Redis 在 5.0 之后新推出的一种相对比较完善的消息队列功能,它有效的解决了 List 和 PUBSUB 两种消息队列的缺点。本文主要讲解的是使用 Redis 的 Stream 消息队列处理系统的消息。
发送消息的命令:
例如:
读取消息的方式之一:XREAD
例如,使用XREAD读取第一个消息:
XREAD阻塞方式,读取最新的消息:
消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
消息分流:
当消费者组中存在多个消费者时,消息可以被组内的任意消费者进行消费
消息标示:
消费者组会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后读取消息。确保每一个消息都会被消费
消息确认:
消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。
即消费者在读取消息后,消息不会马上被删除,而是被标记为“已读”,当其它消费者希望重新读取时依然可以被读取,直至该消息使用 XACK 命令确认为,才会正式被删除,有效的解决了消费者读取后报异常后实际未处理消息的问题。
消费者组使用方法
删除指定的消费者组
XGROUP DESTORY key groupName
给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername
删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername
从消费者组读取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
- group:消费组名称
- consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
- count:本次查询的最大数量
- BLOCK milliseconds:当没有消息时最长等待时间
- NOACK:无需手动ACK,获取到消息后自动确认
- STREAMS key:指定队列名称
- ID:获取消息的起始ID:
">":从下一个未消费的消息开始 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
类型消息队列的XREADGROUP命令特点:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
基于Redis的Stream结构作为消息队列,实现异步秒杀下单
需求:
- 创建一个Stream类型的消息队列,名为stream.orders
- 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
- 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
lua脚本代码如下:
---
--- 秒杀订单异步分配lua
--- 针对秒杀订单,在每一次秒杀时都会出现串行查询数据库的操作,性能比较低
--- 所以把非数据库操作的秒杀步骤都提取出来,先在Redis中处理
--- 如查询优惠券数量,秒杀成功用户等信息,生成订单号
--- 在Redis中先保存,再后续由数据库写入计单数据
---
-- 获取秒杀优惠券id,用于查询该优惠券还有多少份
local voucherKey = ARGV[1]
local stockKey = 'seckill:stock:' .. voucherKey
-- 获取登陆用户id,用于保存和查询该用户是否已下单
local userKey = ARGV[2]
local orderKey = 'seckill:order:' .. voucherKey
-- 查询当前还有多少张优惠券,如果优惠券少于0张,则说明没有库存
if (tonumber(redis.call('get', stockKey)) <= 0) then
return 1
end
-- 查询当前用户是否已下过单,如果已下单,返回2
if (redis.call('SISMEMBER', orderKey, userKey) == 1) then
return 2
end
-- 否则,用户就可以直接下单,下单时,减扣优惠券数量
redis.call('decrby', stockKey, 1)
-- 完成后,把该用户加入到秒杀成功集合中
redis.call('SADD', orderKey, userKey)
return 0
秒杀实现代码:
/**
* <p>
* 服务实现类
* </p>
*
* @author Unsoft
* @since 2021-12-22
*/
@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
private static final DefaultRedisScript<Long> DEFAULT_REDIS_SCRIPT;
static {
DEFAULT_REDIS_SCRIPT = new DefaultRedisScript<>();
DEFAULT_REDIS_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
DEFAULT_REDIS_SCRIPT.setResultType(Long.class);
}
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedisIDWorker redisIDWorker;
// 创建队列
private final BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
// 创建多线程
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
// 获取代理对象,以获得事务处理
private IVoucherOrderService proxy;
@PostConstruct
private void init() {
// 在对象创建完成时,就开始监听秒杀订单,并自动处理
SECKILL_ORDER_EXECUTOR.submit(() -> {
while (true){
try {
// 阻塞处理,如果暂时没有收到订单,会阻塞
VoucherOrder order = orderTasks.take();
proxy.createOrder(order);
} catch (Exception e) {
log.error("订单处理异常", e);
}
}
});
}
/**
* 秒杀优惠券
*
* @param voucherId
* @return
*/
@Override
@Transactional
public Result seckill(Long voucherId) {
Long userId = UserHolder.getUser().getId();
Long result = stringRedisTemplate.execute(
DEFAULT_REDIS_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString()
);
int i = result.intValue();
if (i != 0) {
return Result.fail(i == 1 ? "库存不足" : "用户不能重复下单");
}
// 获取当前线程的代理对象,不能在子线程中获取,会取不到
proxy = (IVoucherOrderService) AopContext.currentProxy();
// 创建秒杀订单对象,把对象存到队列中等待处理
long orderId = redisIDWorker.nextId("order");
VoucherOrder order = new VoucherOrder();
order.setId(orderId);
order.setUserId(userId);
order.setVoucherId(voucherId);
orderTasks.add(order);
return Result.ok(0);
}
/**
* 创建一人一单的秒杀订单
*
* @param order
* @return
*/
@Transactional
public void createOrder(VoucherOrder order) {
// 一人一单,针对单个人只能秒杀一次的锁定,通过在购买前查询是否已存在购买订单
// 但是这里有可能会出现线程安全问题,因此需要使用悲观锁进行线程控制
Long userid = UserHolder.getUser().getId();
Integer count = query().eq("user_id", userid).eq("voucher_id", order.getVoucherId()).count();
if (count > 0) {
log.error("只能购买一件");
return;
}
// 5.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", order.getVoucherId())
.ge("stock", 0)
.update();
if (!success) {
log.error("库存不足");
return;
}
// 6.创建订单
save(order);
}
}
共有 0 条评论